vendor: google.golang.org/grpc v1.56.3

server: prohibit more than MaxConcurrentStreams handlers from running at once
(CVE-2023-44487).

In addition to this change, applications should ensure they do not leave running
tasks behind related to the RPC before returning from method handlers, or should
enforce appropriate limits on any such work.

- https://github.com/grpc/grpc-go/compare/v1.56.2...v1.56.3

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2023-10-24 14:57:16 +02:00
parent 39e1de95ab
commit 8073525c00
No known key found for this signature in database
GPG Key ID: 76698F39D527CE8C
6 changed files with 56 additions and 34 deletions

View File

@ -85,6 +85,6 @@ require (
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.56.2 // indirect
google.golang.org/grpc v1.56.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)

View File

@ -358,8 +358,8 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/grpc v1.0.5/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.56.2 h1:fVRFRnXvU+x6C4IlHZewvJOVHoOv1TUuQyoRsYnB4bI=
google.golang.org/grpc v1.56.2/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc=
google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=

View File

@ -171,15 +171,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
}}
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
if config.MaxStreams != math.MaxUint32 {
isettings = append(isettings, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: maxStreams,
Val: config.MaxStreams,
})
}
dynamicWindow := true
@ -258,7 +253,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
framer: framer,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
maxStreams: maxStreams,
maxStreams: config.MaxStreams,
inTapHandle: config.InTapHandle,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,

View File

@ -115,12 +115,6 @@ type serviceInfo struct {
mdata interface{}
}
type serverWorkerData struct {
st transport.ServerTransport
wg *sync.WaitGroup
stream *transport.Stream
}
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts serverOptions
@ -145,7 +139,7 @@ type Server struct {
channelzID *channelz.Identifier
czData *channelzData
serverWorkerChannel chan *serverWorkerData
serverWorkerChannel chan func()
}
type serverOptions struct {
@ -177,6 +171,7 @@ type serverOptions struct {
}
var defaultServerOptions = serverOptions{
maxConcurrentStreams: math.MaxUint32,
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
@ -387,6 +382,9 @@ func MaxSendMsgSize(m int) ServerOption {
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func MaxConcurrentStreams(n uint32) ServerOption {
if n == 0 {
n = math.MaxUint32
}
return newFuncServerOption(func(o *serverOptions) {
o.maxConcurrentStreams = n
})
@ -567,24 +565,19 @@ const serverWorkerResetThreshold = 1 << 16
// [1] https://github.com/golang/go/issues/18138
func (s *Server) serverWorker() {
for completed := 0; completed < serverWorkerResetThreshold; completed++ {
data, ok := <-s.serverWorkerChannel
f, ok := <-s.serverWorkerChannel
if !ok {
return
}
s.handleSingleStream(data)
f()
}
go s.serverWorker()
}
func (s *Server) handleSingleStream(data *serverWorkerData) {
defer data.wg.Done()
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
}
// initServerWorkers creates worker goroutines and a channel to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannel = make(chan *serverWorkerData)
s.serverWorkerChannel = make(chan func())
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
go s.serverWorker()
}
@ -943,21 +936,26 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close(errors.New("finished serving streams for the server transport"))
var wg sync.WaitGroup
streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
streamQuota.acquire()
f := func() {
defer streamQuota.release()
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannel <- data:
case s.serverWorkerChannel <- f:
return
default:
// If all stream workers are busy, fallback to the default code path.
}
}
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
go f()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
@ -2052,3 +2050,32 @@ func validateSendCompressor(name, clientCompressors string) error {
}
return fmt.Errorf("client does not support compressor %q", name)
}
// atomicSemaphore implements a blocking, counting semaphore. acquire should be
// called synchronously; release may be called asynchronously.
type atomicSemaphore struct {
n int64
wait chan struct{}
}
func (q *atomicSemaphore) acquire() {
if atomic.AddInt64(&q.n, -1) < 0 {
// We ran out of quota. Block until a release happens.
<-q.wait
}
}
func (q *atomicSemaphore) release() {
// N.B. the "<= 0" check below should allow for this to work with multiple
// concurrent calls to acquire, but also note that with synchronous calls to
// acquire, as our system does, n will never be less than -1. There are
// fairness issues (queuing) to consider if this was to be generalized.
if atomic.AddInt64(&q.n, 1) <= 0 {
// An acquire was waiting on us. Unblock it.
q.wait <- struct{}{}
}
}
func newHandlerQuota(n uint32) *atomicSemaphore {
return &atomicSemaphore{n: int64(n), wait: make(chan struct{}, 1)}
}

View File

@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.56.2"
const Version = "1.56.3"

2
vendor/modules.txt vendored
View File

@ -373,7 +373,7 @@ golang.org/x/tools/internal/typesinternal
# google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
## explicit; go 1.19
google.golang.org/genproto/googleapis/rpc/status
# google.golang.org/grpc v1.56.2
# google.golang.org/grpc v1.56.3
## explicit; go 1.17
google.golang.org/grpc
google.golang.org/grpc/attributes