bump google.golang.org/grpc v1.12.2

full diff: https://github.com/grpc/grpc-go/compare/v1.12.0...v1.12.2

- grpc/grpc-go#2074 transport/server: fix race between writing status and header
  - fix grpc/grpc-go#1972 Possible race sending headers from server while receiving message over size limit
- grpc/grpc-go#2074 transport: account for user configured small io write buffer
  - fix grpc/grpc-go#2089 Server abruptly terminates connections if write buffer is small enough

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2019-04-06 13:08:57 +02:00
parent 58ec72afca
commit 25e6a64e2a
No known key found for this signature in database
GPG Key ID: 76698F39D527CE8C
5 changed files with 88 additions and 58 deletions

View File

@ -86,7 +86,7 @@ golang.org/x/sys d455e41777fca6e8a5a79e34a14b
golang.org/x/text f21a4dfb5e38f5895301dc265a8def02365cc3d0 # v0.3.0 golang.org/x/text f21a4dfb5e38f5895301dc265a8def02365cc3d0 # v0.3.0
golang.org/x/time fbb02b2291d28baffd63558aa44b4b56f178d650 golang.org/x/time fbb02b2291d28baffd63558aa44b4b56f178d650
google.golang.org/genproto 02b4e95473316948020af0b7a4f0f22c73929b0e google.golang.org/genproto 02b4e95473316948020af0b7a4f0f22c73929b0e
google.golang.org/grpc 41344da2231b913fa3d983840a57a6b1b7b631a1 # v1.12.0 google.golang.org/grpc 7a6a684ca69eb4cae85ad0a484f2e531598c047b # v1.12.2
gopkg.in/inf.v0 d2d2541c53f18d2a059457998ce2876cc8e67cbf # v0.9.1 gopkg.in/inf.v0 d2d2541c53f18d2a059457998ce2876cc8e67cbf # v0.9.1
gopkg.in/yaml.v2 5420a8b6744d3b0345ab293f6fcba19c978f1183 # v2.2.1 gopkg.in/yaml.v2 5420a8b6744d3b0345ab293f6fcba19c978f1183 # v2.2.1
gotest.tools 7c797b5133e5460410dbb22ba779bf35e6975dea # v2.2.0 gotest.tools 7c797b5133e5460410dbb22ba779bf35e6975dea # v2.2.0

View File

@ -722,6 +722,6 @@ const (
) )
// Version is the current grpc version. // Version is the current grpc version.
const Version = "1.12.0" const Version = "1.12.2"
const grpcUA = "grpc-go/" + Version const grpcUA = "grpc-go/" + Version

View File

@ -682,28 +682,7 @@ func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
}) })
} }
// WriteHeader sends the header metedata md back to the client. func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.headerOk || s.getState() == streamDone {
return ErrIllegalHeaderWrite
}
s.headerOk = true
if md.Len() > 0 {
if s.header.Len() > 0 {
s.header = metadata.Join(s.header, md)
} else {
s.header = md
}
}
md = s.header
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
if s.sendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
for k, vv := range md { for k, vv := range md {
if isReservedHeader(k) { if isReservedHeader(k) {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
@ -713,6 +692,37 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
} }
} }
return headerFields
}
// WriteHeader sends the header metedata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() || s.getState() == streamDone {
return ErrIllegalHeaderWrite
}
s.hdrMu.Lock()
if md.Len() > 0 {
if s.header.Len() > 0 {
s.header = metadata.Join(s.header, md)
} else {
s.header = md
}
}
t.writeHeaderLocked(s)
s.hdrMu.Unlock()
return nil
}
func (t *http2Server) writeHeaderLocked(s *Stream) {
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
if s.sendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
t.controlBuf.put(&headerFrame{ t.controlBuf.put(&headerFrame{
streamID: s.id, streamID: s.id,
hf: headerFields, hf: headerFields,
@ -728,7 +738,6 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
outHeader := &stats.OutHeader{} outHeader := &stats.OutHeader{}
t.stats.HandleRPC(s.Context(), outHeader) t.stats.HandleRPC(s.Context(), outHeader)
} }
return nil
} }
// WriteStatus sends stream status to the client and terminates the stream. // WriteStatus sends stream status to the client and terminates the stream.
@ -736,22 +745,21 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted. // OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
if !s.headerOk && s.header.Len() > 0 {
if err := t.WriteHeader(s, nil); err != nil {
return err
}
} else {
if s.getState() == streamDone { if s.getState() == streamDone {
return nil return nil
} }
} s.hdrMu.Lock()
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size. // first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
if !s.headerOk { if !s.updateHeaderSent() { // No headers have been sent.
if len(s.header) > 0 { // Send a separate header frame.
t.writeHeaderLocked(s)
} else { // Send a trailer only response.
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
} }
}
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
@ -766,16 +774,8 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
} }
// Attach the trailer metadata. // Attach the trailer metadata.
for k, vv := range s.trailer { headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
// Clients don't tolerate reading restricted headers after some non restricted ones were sent. trailingHeader := &headerFrame{
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
trailer := &headerFrame{
streamID: s.id, streamID: s.id,
hf: headerFields, hf: headerFields,
endStream: true, endStream: true,
@ -783,7 +783,8 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
atomic.StoreUint32(&t.resetPingStrikes, 1) atomic.StoreUint32(&t.resetPingStrikes, 1)
}, },
} }
t.closeStream(s, false, 0, trailer, true) s.hdrMu.Unlock()
t.closeStream(s, false, 0, trailingHeader, true)
if t.stats != nil { if t.stats != nil {
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{}) t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
} }
@ -793,7 +794,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error). // is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
if !s.headerOk { // Headers haven't been written yet. if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil { if err := t.WriteHeader(s, nil); err != nil {
// TODO(mmukhi, dfawley): Make sure this is the right code to return. // TODO(mmukhi, dfawley): Make sure this is the right code to return.
return streamErrorf(codes.Internal, "transport: %v", err) return streamErrorf(codes.Internal, "transport: %v", err)

View File

@ -531,11 +531,15 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
if w.err != nil { if w.err != nil {
return 0, w.err return 0, w.err
} }
n = copy(w.buf[w.offset:], b) for len(b) > 0 {
w.offset += n nn := copy(w.buf[w.offset:], b)
b = b[nn:]
w.offset += nn
n += nn
if w.offset >= w.batchSize { if w.offset >= w.batchSize {
err = w.Flush() err = w.Flush()
} }
}
return n, err return n, err
} }

View File

@ -185,13 +185,20 @@ type Stream struct {
headerChan chan struct{} // closed to indicate the end of header metadata. headerChan chan struct{} // closed to indicate the end of header metadata.
headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
header metadata.MD // the received header metadata. header metadata.MD // the received header metadata.
trailer metadata.MD // the key-value map of trailer metadata. trailer metadata.MD // the key-value map of trailer metadata.
headerOk bool // becomes true from the first header is about to send // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
headerSent uint32
state streamState state streamState
status *status.Status // the status error received from the server // On client-side it is the status error received from the server.
// On server-side it is unused.
status *status.Status
bytesReceived uint32 // indicates whether any bytes have been received on this stream bytesReceived uint32 // indicates whether any bytes have been received on this stream
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
@ -201,6 +208,17 @@ type Stream struct {
contentSubtype string contentSubtype string
} }
// isHeaderSent is only valid on the server-side.
func (s *Stream) isHeaderSent() bool {
return atomic.LoadUint32(&s.headerSent) == 1
}
// updateHeaderSent updates headerSent and returns true
// if it was alreay set. It is valid only on server-side.
func (s *Stream) updateHeaderSent() bool {
return atomic.SwapUint32(&s.headerSent, 1) == 1
}
func (s *Stream) swapState(st streamState) streamState { func (s *Stream) swapState(st streamState) streamState {
return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st))) return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
} }
@ -313,10 +331,12 @@ func (s *Stream) SetHeader(md metadata.MD) error {
if md.Len() == 0 { if md.Len() == 0 {
return nil return nil
} }
if s.headerOk || atomic.LoadUint32((*uint32)(&s.state)) == uint32(streamDone) { if s.isHeaderSent() || s.getState() == streamDone {
return ErrIllegalHeaderWrite return ErrIllegalHeaderWrite
} }
s.hdrMu.Lock()
s.header = metadata.Join(s.header, md) s.header = metadata.Join(s.header, md)
s.hdrMu.Unlock()
return nil return nil
} }
@ -335,7 +355,12 @@ func (s *Stream) SetTrailer(md metadata.MD) error {
if md.Len() == 0 { if md.Len() == 0 {
return nil return nil
} }
if s.getState() == streamDone {
return ErrIllegalHeaderWrite
}
s.hdrMu.Lock()
s.trailer = metadata.Join(s.trailer, md) s.trailer = metadata.Join(s.trailer, md)
s.hdrMu.Unlock()
return nil return nil
} }