vendor: google.golang.org/grpc v1.50.1

full diff: https://github.com/grpc/grpc-go/compare/v1.48.0...v1.50.1

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2023-03-15 01:24:52 +01:00
parent d213548bd0
commit 6c8cc226f0
No known key found for this signature in database
GPG Key ID: 76698F39D527CE8C
27 changed files with 559 additions and 316 deletions

View File

@ -73,6 +73,6 @@ require (
golang.org/x/net v0.7.0 // indirect golang.org/x/net v0.7.0 // indirect
golang.org/x/time v0.3.0 // indirect golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20220706185917-7780775163c4 // indirect google.golang.org/genproto v0.0.0-20220706185917-7780775163c4 // indirect
google.golang.org/grpc v1.48.0 // indirect google.golang.org/grpc v1.50.1 // indirect
google.golang.org/protobuf v1.28.1 // indirect google.golang.org/protobuf v1.28.1 // indirect
) )

View File

@ -718,8 +718,8 @@ google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w= google.golang.org/grpc v1.50.1 h1:DS/BukOZWp8s6p4Dt/tOaJaTQyPyOoCcrjroHuCeLzY=
google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@ -244,7 +244,7 @@ type DoneInfo struct {
// ServerLoad is the load received from server. It's usually sent as part of // ServerLoad is the load received from server. It's usually sent as part of
// trailing metadata. // trailing metadata.
// //
// The only supported type now is *orca_v1.LoadReport. // The only supported type now is *orca_v3.LoadReport.
ServerLoad interface{} ServerLoad interface{}
} }
@ -371,56 +371,3 @@ type ClientConnState struct {
// ErrBadResolverState may be returned by UpdateClientConnState to indicate a // ErrBadResolverState may be returned by UpdateClientConnState to indicate a
// problem with the provided name resolver data. // problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state") var ErrBadResolverState = errors.New("bad resolver state")
// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
// and returns one aggregated connectivity state.
//
// It's not thread safe.
type ConnectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transient failure state.
numIdle uint64 // Number of addrConns in idle state.
}
// RecordTransition records state change happening in subConn and based on that
// it evaluates what aggregated state should be.
//
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else if at least one SubConn is TransientFailure, the aggregated state is Transient Failure;
// - Else if at least one SubConn is Idle, the aggregated state is Idle;
// - Else there are no subconns and the aggregated state is Transient Failure
//
// Shutdown is not considered.
func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
case connectivity.Idle:
cse.numIdle += updateVal
}
}
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready
}
if cse.numConnecting > 0 {
return connectivity.Connecting
}
if cse.numTransientFailure > 0 {
return connectivity.TransientFailure
}
if cse.numIdle > 0 {
return connectivity.Idle
}
return connectivity.TransientFailure
}

View File

@ -0,0 +1,70 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package balancer
import "google.golang.org/grpc/connectivity"
// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
// and returns one aggregated connectivity state.
//
// It's not thread safe.
type ConnectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transient failure state.
numIdle uint64 // Number of addrConns in idle state.
}
// RecordTransition records state change happening in subConn and based on that
// it evaluates what aggregated state should be.
//
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else if at least one SubConn is Idle, the aggregated state is Idle;
// - Else if at least one SubConn is TransientFailure (or there are no SubConns), the aggregated state is Transient Failure.
//
// Shutdown is not considered.
func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
case connectivity.Idle:
cse.numIdle += updateVal
}
}
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready
}
if cse.numConnecting > 0 {
return connectivity.Connecting
}
if cse.numIdle > 0 {
return connectivity.Idle
}
return connectivity.TransientFailure
}

View File

@ -22,7 +22,7 @@
package roundrobin package roundrobin
import ( import (
"sync" "sync/atomic"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base" "google.golang.org/grpc/balancer/base"
@ -60,7 +60,7 @@ func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
// Start at a random index, as the same RR balancer rebuilds a new // Start at a random index, as the same RR balancer rebuilds a new
// picker when SubConn states change, and we don't want to apply excess // picker when SubConn states change, and we don't want to apply excess
// load to the first server in the list. // load to the first server in the list.
next: grpcrand.Intn(len(scs)), next: uint32(grpcrand.Intn(len(scs))),
} }
} }
@ -69,15 +69,13 @@ type rrPicker struct {
// created. The slice is immutable. Each Get() will do a round robin // created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn. // selection from it and return the selected SubConn.
subConns []balancer.SubConn subConns []balancer.SubConn
next uint32
mu sync.Mutex
next int
} }
func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
p.mu.Lock() subConnsLen := uint32(len(p.subConns))
sc := p.subConns[p.next] nextIndex := atomic.AddUint32(&p.next, 1)
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock() sc := p.subConns[nextIndex%subConnsLen]
return balancer.PickResult{SubConn: sc}, nil return balancer.PickResult{SubConn: sc}, nil
} }

View File

@ -712,8 +712,8 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
ac.ctx, ac.cancel = context.WithCancel(cc.ctx) ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called. // Track ac in cc. This needs to be done before any getTransport(...) is called.
cc.mu.Lock() cc.mu.Lock()
defer cc.mu.Unlock()
if cc.conns == nil { if cc.conns == nil {
cc.mu.Unlock()
return nil, ErrClientConnClosing return nil, ErrClientConnClosing
} }
@ -732,7 +732,6 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
}) })
cc.conns[ac] = struct{}{} cc.conns[ac] = struct{}{}
cc.mu.Unlock()
return ac, nil return ac, nil
} }

View File

@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal" "google.golang.org/grpc/internal"
internalbackoff "google.golang.org/grpc/internal/backoff" internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/transport" "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
@ -36,12 +37,13 @@ import (
) )
func init() { func init() {
internal.AddExtraDialOptions = func(opt ...DialOption) { internal.AddGlobalDialOptions = func(opt ...DialOption) {
extraDialOptions = append(extraDialOptions, opt...) extraDialOptions = append(extraDialOptions, opt...)
} }
internal.ClearExtraDialOptions = func() { internal.ClearGlobalDialOptions = func() {
extraDialOptions = nil extraDialOptions = nil
} }
internal.WithBinaryLogger = withBinaryLogger
} }
// dialOptions configure a Dial call. dialOptions are set by the DialOption // dialOptions configure a Dial call. dialOptions are set by the DialOption
@ -61,6 +63,7 @@ type dialOptions struct {
timeout time.Duration timeout time.Duration
scChan <-chan ServiceConfig scChan <-chan ServiceConfig
authority string authority string
binaryLogger binarylog.Logger
copts transport.ConnectOptions copts transport.ConnectOptions
callOptions []CallOption callOptions []CallOption
channelzParentID *channelz.Identifier channelzParentID *channelz.Identifier
@ -84,7 +87,7 @@ var extraDialOptions []DialOption
// EmptyDialOption does not alter the dial configuration. It can be embedded in // EmptyDialOption does not alter the dial configuration. It can be embedded in
// another structure to build custom dial options. // another structure to build custom dial options.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -275,7 +278,7 @@ func WithBlock() DialOption {
// the context.DeadlineExceeded error. // the context.DeadlineExceeded error.
// Implies WithBlock() // Implies WithBlock()
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -304,7 +307,7 @@ func WithInsecure() DialOption {
// WithNoProxy returns a DialOption which disables the use of proxies for this // WithNoProxy returns a DialOption which disables the use of proxies for this
// ClientConn. This is ignored if WithDialer or WithContextDialer are used. // ClientConn. This is ignored if WithDialer or WithContextDialer are used.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -335,7 +338,7 @@ func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
// the ClientConn.WithCreds. This should not be used together with // the ClientConn.WithCreds. This should not be used together with
// WithTransportCredentials. // WithTransportCredentials.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -391,10 +394,24 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
// all the RPCs and underlying network connections in this ClientConn. // all the RPCs and underlying network connections in this ClientConn.
func WithStatsHandler(h stats.Handler) DialOption { func WithStatsHandler(h stats.Handler) DialOption {
return newFuncDialOption(func(o *dialOptions) { return newFuncDialOption(func(o *dialOptions) {
if h == nil {
logger.Error("ignoring nil parameter in grpc.WithStatsHandler ClientOption")
// Do not allow a nil stats handler, which would otherwise cause
// panics.
return
}
o.copts.StatsHandlers = append(o.copts.StatsHandlers, h) o.copts.StatsHandlers = append(o.copts.StatsHandlers, h)
}) })
} }
// withBinaryLogger returns a DialOption that specifies the binary logger for
// this ClientConn.
func withBinaryLogger(bl binarylog.Logger) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.binaryLogger = bl
})
}
// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
// non-temporary dial errors. If f is true, and dialer returns a non-temporary // non-temporary dial errors. If f is true, and dialer returns a non-temporary
// error, gRPC will fail the connection to the network address and won't try to // error, gRPC will fail the connection to the network address and won't try to
@ -403,7 +420,7 @@ func WithStatsHandler(h stats.Handler) DialOption {
// FailOnNonTempDialError only affects the initial dial, and does not do // FailOnNonTempDialError only affects the initial dial, and does not do
// anything useful unless you are also using WithBlock(). // anything useful unless you are also using WithBlock().
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -483,7 +500,7 @@ func WithAuthority(a string) DialOption {
// current ClientConn's parent. This function is used in nested channel creation // current ClientConn's parent. This function is used in nested channel creation
// (e.g. grpclb dial). // (e.g. grpclb dial).
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -528,9 +545,6 @@ func WithDefaultServiceConfig(s string) DialOption {
// service config enables them. This does not impact transparent retries, which // service config enables them. This does not impact transparent retries, which
// will happen automatically if no data is written to the wire or if the RPC is // will happen automatically if no data is written to the wire or if the RPC is
// unprocessed by the remote server. // unprocessed by the remote server.
//
// Retry support is currently enabled by default, but may be disabled by
// setting the environment variable "GRPC_GO_RETRY" to "off".
func WithDisableRetry() DialOption { func WithDisableRetry() DialOption {
return newFuncDialOption(func(o *dialOptions) { return newFuncDialOption(func(o *dialOptions) {
o.disableRetry = true o.disableRetry = true
@ -548,7 +562,7 @@ func WithMaxHeaderListSize(s uint32) DialOption {
// WithDisableHealthCheck disables the LB channel health checking for all // WithDisableHealthCheck disables the LB channel health checking for all
// SubConns of this ClientConn. // SubConns of this ClientConn.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -595,7 +609,7 @@ func withMinConnectDeadline(f func() time.Duration) DialOption {
// resolver.Register. They will be matched against the scheme used for the // resolver.Register. They will be matched against the scheme used for the
// current Dial only, and will take precedence over the global registry. // current Dial only, and will take precedence over the global registry.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -37,7 +37,7 @@ type Logger interface {
// binLogger is the global binary logger for the binary. One of this should be // binLogger is the global binary logger for the binary. One of this should be
// built at init time from the configuration (environment variable or flags). // built at init time from the configuration (environment variable or flags).
// //
// It is used to get a methodLogger for each individual method. // It is used to get a MethodLogger for each individual method.
var binLogger Logger var binLogger Logger
var grpclogLogger = grpclog.Component("binarylog") var grpclogLogger = grpclog.Component("binarylog")
@ -56,11 +56,11 @@ func GetLogger() Logger {
return binLogger return binLogger
} }
// GetMethodLogger returns the methodLogger for the given methodName. // GetMethodLogger returns the MethodLogger for the given methodName.
// //
// methodName should be in the format of "/service/method". // methodName should be in the format of "/service/method".
// //
// Each methodLogger returned by this method is a new instance. This is to // Each MethodLogger returned by this method is a new instance. This is to
// generate sequence id within the call. // generate sequence id within the call.
func GetMethodLogger(methodName string) MethodLogger { func GetMethodLogger(methodName string) MethodLogger {
if binLogger == nil { if binLogger == nil {
@ -117,7 +117,7 @@ func (l *logger) setDefaultMethodLogger(ml *MethodLoggerConfig) error {
// Set method logger for "service/*". // Set method logger for "service/*".
// //
// New methodLogger with same service overrides the old one. // New MethodLogger with same service overrides the old one.
func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig) error { func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig) error {
if _, ok := l.config.Services[service]; ok { if _, ok := l.config.Services[service]; ok {
return fmt.Errorf("conflicting service rules for service %v found", service) return fmt.Errorf("conflicting service rules for service %v found", service)
@ -131,7 +131,7 @@ func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig)
// Set method logger for "service/method". // Set method logger for "service/method".
// //
// New methodLogger with same method overrides the old one. // New MethodLogger with same method overrides the old one.
func (l *logger) setMethodMethodLogger(method string, ml *MethodLoggerConfig) error { func (l *logger) setMethodMethodLogger(method string, ml *MethodLoggerConfig) error {
if _, ok := l.config.Blacklist[method]; ok { if _, ok := l.config.Blacklist[method]; ok {
return fmt.Errorf("conflicting blacklist rules for method %v found", method) return fmt.Errorf("conflicting blacklist rules for method %v found", method)
@ -161,11 +161,11 @@ func (l *logger) setBlacklist(method string) error {
return nil return nil
} }
// getMethodLogger returns the methodLogger for the given methodName. // getMethodLogger returns the MethodLogger for the given methodName.
// //
// methodName should be in the format of "/service/method". // methodName should be in the format of "/service/method".
// //
// Each methodLogger returned by this method is a new instance. This is to // Each MethodLogger returned by this method is a new instance. This is to
// generate sequence id within the call. // generate sequence id within the call.
func (l *logger) GetMethodLogger(methodName string) MethodLogger { func (l *logger) GetMethodLogger(methodName string) MethodLogger {
s, m, err := grpcutil.ParseMethod(methodName) s, m, err := grpcutil.ParseMethod(methodName)
@ -174,16 +174,16 @@ func (l *logger) GetMethodLogger(methodName string) MethodLogger {
return nil return nil
} }
if ml, ok := l.config.Methods[s+"/"+m]; ok { if ml, ok := l.config.Methods[s+"/"+m]; ok {
return newMethodLogger(ml.Header, ml.Message) return NewTruncatingMethodLogger(ml.Header, ml.Message)
} }
if _, ok := l.config.Blacklist[s+"/"+m]; ok { if _, ok := l.config.Blacklist[s+"/"+m]; ok {
return nil return nil
} }
if ml, ok := l.config.Services[s]; ok { if ml, ok := l.config.Services[s]; ok {
return newMethodLogger(ml.Header, ml.Message) return NewTruncatingMethodLogger(ml.Header, ml.Message)
} }
if l.config.All == nil { if l.config.All == nil {
return nil return nil
} }
return newMethodLogger(l.config.All.Header, l.config.All.Message) return NewTruncatingMethodLogger(l.config.All.Header, l.config.All.Message)
} }

View File

@ -57,7 +57,7 @@ func NewLoggerFromConfigString(s string) Logger {
return l return l
} }
// fillMethodLoggerWithConfigString parses config, creates methodLogger and adds // fillMethodLoggerWithConfigString parses config, creates TruncatingMethodLogger and adds
// it to the right map in the logger. // it to the right map in the logger.
func (l *logger) fillMethodLoggerWithConfigString(config string) error { func (l *logger) fillMethodLoggerWithConfigString(config string) error {
// "" is invalid. // "" is invalid.

View File

@ -52,7 +52,9 @@ type MethodLogger interface {
Log(LogEntryConfig) Log(LogEntryConfig)
} }
type methodLogger struct { // TruncatingMethodLogger is a method logger that truncates headers and messages
// based on configured fields.
type TruncatingMethodLogger struct {
headerMaxLen, messageMaxLen uint64 headerMaxLen, messageMaxLen uint64
callID uint64 callID uint64
@ -61,8 +63,9 @@ type methodLogger struct {
sink Sink // TODO(blog): make this plugable. sink Sink // TODO(blog): make this plugable.
} }
func newMethodLogger(h, m uint64) *methodLogger { // NewTruncatingMethodLogger returns a new truncating method logger.
return &methodLogger{ func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
return &TruncatingMethodLogger{
headerMaxLen: h, headerMaxLen: h,
messageMaxLen: m, messageMaxLen: m,
@ -75,8 +78,8 @@ func newMethodLogger(h, m uint64) *methodLogger {
// Build is an internal only method for building the proto message out of the // Build is an internal only method for building the proto message out of the
// input event. It's made public to enable other library to reuse as much logic // input event. It's made public to enable other library to reuse as much logic
// in methodLogger as possible. // in TruncatingMethodLogger as possible.
func (ml *methodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry { func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
m := c.toProto() m := c.toProto()
timestamp, _ := ptypes.TimestampProto(time.Now()) timestamp, _ := ptypes.TimestampProto(time.Now())
m.Timestamp = timestamp m.Timestamp = timestamp
@ -95,11 +98,11 @@ func (ml *methodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
} }
// Log creates a proto binary log entry, and logs it to the sink. // Log creates a proto binary log entry, and logs it to the sink.
func (ml *methodLogger) Log(c LogEntryConfig) { func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) {
ml.sink.Write(ml.Build(c)) ml.sink.Write(ml.Build(c))
} }
func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) { func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
if ml.headerMaxLen == maxUInt { if ml.headerMaxLen == maxUInt {
return false return false
} }
@ -129,7 +132,7 @@ func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
return truncated return truncated
} }
func (ml *methodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) { func (ml *TruncatingMethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
if ml.messageMaxLen == maxUInt { if ml.messageMaxLen == maxUInt {
return false return false
} }

View File

@ -0,0 +1,36 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package envconfig
import "os"
const (
envObservabilityConfig = "GRPC_GCP_OBSERVABILITY_CONFIG"
envObservabilityConfigFile = "GRPC_GCP_OBSERVABILITY_CONFIG_FILE"
)
var (
// ObservabilityConfig is the json configuration for the gcp/observability
// package specified directly in the envObservabilityConfig env var.
ObservabilityConfig = os.Getenv(envObservabilityConfig)
// ObservabilityConfigFile is the json configuration for the
// gcp/observability specified in a file with the location specified in
// envObservabilityConfigFile env var.
ObservabilityConfigFile = os.Getenv(envObservabilityConfigFile)
)

View File

@ -84,9 +84,9 @@ var (
// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false". // "GRPC_XDS_EXPERIMENTAL_RBAC" to "false".
XDSRBAC = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false") XDSRBAC = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false")
// XDSOutlierDetection indicates whether outlier detection support is // XDSOutlierDetection indicates whether outlier detection support is
// enabled, which can be enabled by setting the environment variable // enabled, which can be disabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "true". // "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "false".
XDSOutlierDetection = strings.EqualFold(os.Getenv(outlierDetectionSupportEnv), "true") XDSOutlierDetection = !strings.EqualFold(os.Getenv(outlierDetectionSupportEnv), "false")
// XDSFederation indicates whether federation support is enabled. // XDSFederation indicates whether federation support is enabled.
XDSFederation = strings.EqualFold(os.Getenv(federationEnv), "true") XDSFederation = strings.EqualFold(os.Getenv(federationEnv), "true")

View File

@ -52,6 +52,13 @@ func Intn(n int) int {
return r.Intn(n) return r.Intn(n)
} }
// Int31n implements rand.Int31n on the grpcrand global source.
func Int31n(n int32) int32 {
mu.Lock()
defer mu.Unlock()
return r.Int31n(n)
}
// Float64 implements rand.Float64 on the grpcrand global source. // Float64 implements rand.Float64 on the grpcrand global source.
func Float64() float64 { func Float64() float64 {
mu.Lock() mu.Lock()

View File

@ -39,6 +39,11 @@ func ParseMethod(methodName string) (service, method string, _ error) {
return methodName[:pos], methodName[pos+1:], nil return methodName[:pos], methodName[pos+1:], nil
} }
// baseContentType is the base content-type for gRPC. This is a valid
// content-type on it's own, but can also include a content-subtype such as
// "proto" as a suffix after "+" or ";". See
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
// for more details.
const baseContentType = "application/grpc" const baseContentType = "application/grpc"
// ContentSubtype returns the content-subtype for the given content-type. The // ContentSubtype returns the content-subtype for the given content-type. The

View File

@ -63,20 +63,30 @@ var (
// xDS-enabled server invokes this method on a grpc.Server when a particular // xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode. // listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string) DrainServerTransports interface{} // func(*grpc.Server, string)
// AddExtraServerOptions adds an array of ServerOption that will be // AddGlobalServerOptions adds an array of ServerOption that will be
// effective globally for newly created servers. The priority will be: 1. // effective globally for newly created servers. The priority will be: 1.
// user-provided; 2. this method; 3. default values. // user-provided; 2. this method; 3. default values.
AddExtraServerOptions interface{} // func(opt ...ServerOption) AddGlobalServerOptions interface{} // func(opt ...ServerOption)
// ClearExtraServerOptions clears the array of extra ServerOption. This // ClearGlobalServerOptions clears the array of extra ServerOption. This
// method is useful in testing and benchmarking. // method is useful in testing and benchmarking.
ClearExtraServerOptions func() ClearGlobalServerOptions func()
// AddExtraDialOptions adds an array of DialOption that will be effective // AddGlobalDialOptions adds an array of DialOption that will be effective
// globally for newly created client channels. The priority will be: 1. // globally for newly created client channels. The priority will be: 1.
// user-provided; 2. this method; 3. default values. // user-provided; 2. this method; 3. default values.
AddExtraDialOptions interface{} // func(opt ...DialOption) AddGlobalDialOptions interface{} // func(opt ...DialOption)
// ClearExtraDialOptions clears the array of extra DialOption. This // ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking. // method is useful in testing and benchmarking.
ClearExtraDialOptions func() ClearGlobalDialOptions func()
// JoinServerOptions combines the server options passed as arguments into a
// single server option.
JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption
// WithBinaryLogger returns a DialOption that specifies the binary logger
// for a ClientConn.
WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption
// BinaryLogger returns a ServerOption that can set the binary logger for a
// server.
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using // NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
// the provided xds bootstrap config instead of the global configuration from // the provided xds bootstrap config instead of the global configuration from
@ -117,22 +127,6 @@ var (
// //
// TODO: Remove this function once the RBAC env var is removed. // TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func() UnregisterRBACHTTPFilterForTesting func()
// RegisterOutlierDetectionBalancerForTesting registers the Outlier
// Detection Balancer for testing purposes, regardless of the Outlier
// Detection environment variable.
//
// TODO: Remove this function once the Outlier Detection env var is removed.
RegisterOutlierDetectionBalancerForTesting func()
// UnregisterOutlierDetectionBalancerForTesting unregisters the Outlier
// Detection Balancer for testing purposes. This is needed because there is
// no way to unregister the Outlier Detection Balancer after registering it
// solely for testing purposes using
// RegisterOutlierDetectionBalancerForTesting().
//
// TODO: Remove this function once the Outlier Detection env var is removed.
UnregisterOutlierDetectionBalancerForTesting func()
) )
// HealthChecker defines the signature of the client-side LB channel health checking function. // HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@ -49,8 +49,9 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolv
} }
addr := resolver.Address{Addr: endpoint} addr := resolver.Address{Addr: endpoint}
if b.scheme == unixAbstractScheme { if b.scheme == unixAbstractScheme {
// prepend "\x00" to address for unix-abstract // We can not prepend \0 as c++ gRPC does, as in Golang '@' is used to signify we do
addr.Addr = "\x00" + addr.Addr // not want trailing \0 in address.
addr.Addr = "@" + addr.Addr
} }
cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(addr, "unix")}}) cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(addr, "unix")}})
return &nopResolver{}, nil return &nopResolver{}, nil

View File

@ -886,9 +886,9 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
// A data item is represented by a dataFrame, since it later translates into // A data item is represented by a dataFrame, since it later translates into
// multiple HTTP2 data frames. // multiple HTTP2 data frames.
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data. // Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
// maximum possilbe HTTP2 frame size. // maximum possible HTTP2 frame size.
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true // Client sends out empty data frame with endStream = true

View File

@ -78,6 +78,7 @@ type http2Client struct {
framer *framer framer *framer
// controlBuf delivers all the control related tasks (e.g., window // controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller. // updates, reset streams, and various settings) to the controller.
// Do not access controlBuf with mu held.
controlBuf *controlBuffer controlBuf *controlBuffer
fc *trInFlow fc *trInFlow
// The scheme used: https if TLS is on, http otherwise. // The scheme used: https if TLS is on, http otherwise.
@ -109,6 +110,7 @@ type http2Client struct {
waitingStreams uint32 waitingStreams uint32
nextID uint32 nextID uint32
// Do not access controlBuf with mu held.
mu sync.Mutex // guard the following variables mu sync.Mutex // guard the following variables
state transportState state transportState
activeStreams map[uint32]*Stream activeStreams map[uint32]*Stream
@ -324,6 +326,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
keepaliveEnabled: keepaliveEnabled, keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(), bufferPool: newBufferPool(),
} }
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
if md, ok := addr.Metadata.(*metadata.MD); ok { if md, ok := addr.Metadata.(*metadata.MD); ok {
t.md = *md t.md = *md
@ -467,7 +471,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
func (t *http2Client) getPeer() *peer.Peer { func (t *http2Client) getPeer() *peer.Peer {
return &peer.Peer{ return &peer.Peer{
Addr: t.remoteAddr, Addr: t.remoteAddr,
AuthInfo: t.authInfo, AuthInfo: t.authInfo, // Can be nil
} }
} }
@ -685,7 +689,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
cleanup(err) cleanup(err)
return err return err
} }
t.activeStreams[id] = s
if channelz.IsOn() { if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1) atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
@ -719,6 +722,13 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
t.nextID += 2 t.nextID += 2
s.id = h.streamID s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)} s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
if t.activeStreams == nil { // Can be niled from Close().
t.mu.Unlock()
return false // Don't create a stream if the transport is already closed.
}
t.activeStreams[s.id] = s
t.mu.Unlock()
if t.streamQuota > 0 && t.waitingStreams > 0 { if t.streamQuota > 0 && t.waitingStreams > 0 {
select { select {
case t.streamsQuotaAvailable <- struct{}{}: case t.streamsQuotaAvailable <- struct{}{}:
@ -744,13 +754,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
} }
for { for {
success, err := t.controlBuf.executeAndPut(func(it interface{}) bool { success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
if !checkForStreamQuota(it) { return checkForHeaderListSize(it) && checkForStreamQuota(it)
return false
}
if !checkForHeaderListSize(it) {
return false
}
return true
}, hdr) }, hdr)
if err != nil { if err != nil {
// Connection closed. // Connection closed.
@ -1003,13 +1007,13 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
// for the transport and the stream based on the current bdp // for the transport and the stream based on the current bdp
// estimation. // estimation.
func (t *http2Client) updateFlowControl(n uint32) { func (t *http2Client) updateFlowControl(n uint32) {
t.mu.Lock()
for _, s := range t.activeStreams {
s.fc.newLimit(n)
}
t.mu.Unlock()
updateIWS := func(interface{}) bool { updateIWS := func(interface{}) bool {
t.initialWindowSize = int32(n) t.initialWindowSize = int32(n)
t.mu.Lock()
for _, s := range t.activeStreams {
s.fc.newLimit(n)
}
t.mu.Unlock()
return true return true
} }
t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)}) t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
@ -1215,7 +1219,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
default: default:
t.setGoAwayReason(f) t.setGoAwayReason(f)
close(t.goAway) close(t.goAway)
t.controlBuf.put(&incomingGoAway{}) defer t.controlBuf.put(&incomingGoAway{}) // Defer as t.mu is currently held.
// Notify the clientconn about the GOAWAY before we set the state to // Notify the clientconn about the GOAWAY before we set the state to
// draining, to allow the client to stop attempting to create streams // draining, to allow the client to stop attempting to create streams
// before disallowing new streams on this connection. // before disallowing new streams on this connection.
@ -1228,18 +1232,29 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if upperLimit == 0 { // This is the first GoAway Frame. if upperLimit == 0 { // This is the first GoAway Frame.
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
} }
t.prevGoAwayID = id
if len(t.activeStreams) == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
return
}
streamsToClose := make([]*Stream, 0)
for streamID, stream := range t.activeStreams { for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit { if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server. // The stream was unprocessed by the server.
atomic.StoreUint32(&stream.unprocessed, 1) if streamID > id && streamID <= upperLimit {
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) atomic.StoreUint32(&stream.unprocessed, 1)
streamsToClose = append(streamsToClose, stream)
}
} }
} }
t.prevGoAwayID = id
active := len(t.activeStreams)
t.mu.Unlock() t.mu.Unlock()
if active == 0 { // Called outside t.mu because closeStream can take controlBuf's mu, which
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) // could induce deadlock and is not allowed.
for _, stream := range streamsToClose {
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
} }
} }

View File

@ -265,6 +265,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
czData: new(channelzData), czData: new(channelzData),
bufferPool: newBufferPool(), bufferPool: newBufferPool(),
} }
// Add peer information to the http2server context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
t.controlBuf = newControlBuffer(t.done) t.controlBuf = newControlBuffer(t.done)
if dynamicWindow { if dynamicWindow {
t.bdpEst = &bdpEstimator{ t.bdpEst = &bdpEstimator{
@ -485,14 +488,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
} else { } else {
s.ctx, s.cancel = context.WithCancel(t.ctx) s.ctx, s.cancel = context.WithCancel(t.ctx)
} }
pr := &peer.Peer{
Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
s.ctx = peer.NewContext(s.ctx, pr)
// Attach the received metadata to the context. // Attach the received metadata to the context.
if len(mdata) > 0 { if len(mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, mdata) s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
@ -945,15 +941,16 @@ func (t *http2Server) streamContextErr(s *Stream) error {
// WriteHeader sends the header metadata md back to the client. // WriteHeader sends the header metadata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() { s.hdrMu.Lock()
return ErrIllegalHeaderWrite defer s.hdrMu.Unlock()
}
if s.getState() == streamDone { if s.getState() == streamDone {
return t.streamContextErr(s) return t.streamContextErr(s)
} }
s.hdrMu.Lock() if s.updateHeaderSent() {
return ErrIllegalHeaderWrite
}
if md.Len() > 0 { if md.Len() > 0 {
if s.header.Len() > 0 { if s.header.Len() > 0 {
s.header = metadata.Join(s.header, md) s.header = metadata.Join(s.header, md)
@ -962,10 +959,8 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
} }
} }
if err := t.writeHeaderLocked(s); err != nil { if err := t.writeHeaderLocked(s); err != nil {
s.hdrMu.Unlock()
return status.Convert(err).Err() return status.Convert(err).Err()
} }
s.hdrMu.Unlock()
return nil return nil
} }
@ -1013,17 +1008,19 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted. // OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
s.hdrMu.Lock()
defer s.hdrMu.Unlock()
if s.getState() == streamDone { if s.getState() == streamDone {
return nil return nil
} }
s.hdrMu.Lock()
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size. // first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
if !s.updateHeaderSent() { // No headers have been sent. if !s.updateHeaderSent() { // No headers have been sent.
if len(s.header) > 0 { // Send a separate header frame. if len(s.header) > 0 { // Send a separate header frame.
if err := t.writeHeaderLocked(s); err != nil { if err := t.writeHeaderLocked(s); err != nil {
s.hdrMu.Unlock()
return err return err
} }
} else { // Send a trailer only response. } else { // Send a trailer only response.
@ -1052,7 +1049,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
endStream: true, endStream: true,
onWrite: t.setResetPingStrikes, onWrite: t.setResetPingStrikes,
} }
s.hdrMu.Unlock()
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
if !success { if !success {
if err != nil { if err != nil {
@ -1415,6 +1412,13 @@ func (t *http2Server) getOutFlowWindow() int64 {
} }
} }
func (t *http2Server) getPeer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo, // Can be nil
}
}
func getJitter(v time.Duration) time.Duration { func getJitter(v time.Duration) time.Duration {
if v == infinity { if v == infinity {
return 0 return 0

View File

@ -20,7 +20,6 @@ package transport
import ( import (
"bufio" "bufio"
"bytes"
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"io" "io"
@ -45,14 +44,8 @@ import (
const ( const (
// http2MaxFrameLen specifies the max length of a HTTP2 frame. // http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues // https://httpwg.org/specs/rfc7540.html#SettingValues
http2InitHeaderTableSize = 4096 http2InitHeaderTableSize = 4096
// baseContentType is the base content-type for gRPC. This is a valid
// content-type on it's own, but can also include a content-subtype such as
// "proto" as a suffix after "+" or ";". See
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
// for more details.
) )
var ( var (
@ -257,13 +250,13 @@ func encodeGrpcMessage(msg string) string {
} }
func encodeGrpcMessageUnchecked(msg string) string { func encodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer var sb strings.Builder
for len(msg) > 0 { for len(msg) > 0 {
r, size := utf8.DecodeRuneInString(msg) r, size := utf8.DecodeRuneInString(msg)
for _, b := range []byte(string(r)) { for _, b := range []byte(string(r)) {
if size > 1 { if size > 1 {
// If size > 1, r is not ascii. Always do percent encoding. // If size > 1, r is not ascii. Always do percent encoding.
buf.WriteString(fmt.Sprintf("%%%02X", b)) fmt.Fprintf(&sb, "%%%02X", b)
continue continue
} }
@ -272,14 +265,14 @@ func encodeGrpcMessageUnchecked(msg string) string {
// //
// fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD". // fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
if b >= spaceByte && b <= tildeByte && b != percentByte { if b >= spaceByte && b <= tildeByte && b != percentByte {
buf.WriteByte(b) sb.WriteByte(b)
} else { } else {
buf.WriteString(fmt.Sprintf("%%%02X", b)) fmt.Fprintf(&sb, "%%%02X", b)
} }
} }
msg = msg[size:] msg = msg[size:]
} }
return buf.String() return sb.String()
} }
// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage. // decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
@ -297,23 +290,23 @@ func decodeGrpcMessage(msg string) string {
} }
func decodeGrpcMessageUnchecked(msg string) string { func decodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer var sb strings.Builder
lenMsg := len(msg) lenMsg := len(msg)
for i := 0; i < lenMsg; i++ { for i := 0; i < lenMsg; i++ {
c := msg[i] c := msg[i]
if c == percentByte && i+2 < lenMsg { if c == percentByte && i+2 < lenMsg {
parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8) parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
if err != nil { if err != nil {
buf.WriteByte(c) sb.WriteByte(c)
} else { } else {
buf.WriteByte(byte(parsed)) sb.WriteByte(byte(parsed))
i += 2 i += 2
} }
} else { } else {
buf.WriteByte(c) sb.WriteByte(c)
} }
} }
return buf.String() return sb.String()
} }
type bufWriter struct { type bufWriter struct {

View File

@ -50,7 +50,7 @@ type MD map[string][]string
// Keys beginning with "grpc-" are reserved for grpc-internal use only and may // Keys beginning with "grpc-" are reserved for grpc-internal use only and may
// result in errors if set in metadata. // result in errors if set in metadata.
func New(m map[string]string) MD { func New(m map[string]string) MD {
md := MD{} md := make(MD, len(m))
for k, val := range m { for k, val := range m {
key := strings.ToLower(k) key := strings.ToLower(k)
md[key] = append(md[key], val) md[key] = append(md[key], val)
@ -74,7 +74,7 @@ func Pairs(kv ...string) MD {
if len(kv)%2 == 1 { if len(kv)%2 == 1 {
panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv))) panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv)))
} }
md := MD{} md := make(MD, len(kv)/2)
for i := 0; i < len(kv); i += 2 { for i := 0; i < len(kv); i += 2 {
key := strings.ToLower(kv[i]) key := strings.ToLower(kv[i])
md[key] = append(md[key], kv[i+1]) md[key] = append(md[key], kv[i+1])
@ -182,19 +182,51 @@ func FromIncomingContext(ctx context.Context) (MD, bool) {
if !ok { if !ok {
return nil, false return nil, false
} }
out := MD{} out := make(MD, len(md))
for k, v := range md { for k, v := range md {
// We need to manually convert all keys to lower case, because MD is a // We need to manually convert all keys to lower case, because MD is a
// map, and there's no guarantee that the MD attached to the context is // map, and there's no guarantee that the MD attached to the context is
// created using our helper functions. // created using our helper functions.
key := strings.ToLower(k) key := strings.ToLower(k)
s := make([]string, len(v)) out[key] = copyOf(v)
copy(s, v)
out[key] = s
} }
return out, true return out, true
} }
// ValueFromIncomingContext returns the metadata value corresponding to the metadata
// key from the incoming metadata if it exists. Key must be lower-case.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ValueFromIncomingContext(ctx context.Context, key string) []string {
md, ok := ctx.Value(mdIncomingKey{}).(MD)
if !ok {
return nil
}
if v, ok := md[key]; ok {
return copyOf(v)
}
for k, v := range md {
// We need to manually convert all keys to lower case, because MD is a
// map, and there's no guarantee that the MD attached to the context is
// created using our helper functions.
if strings.ToLower(k) == key {
return copyOf(v)
}
}
return nil
}
// the returned slice must not be modified in place
func copyOf(v []string) []string {
vals := make([]string, len(v))
copy(vals, v)
return vals
}
// FromOutgoingContextRaw returns the un-merged, intermediary contents of rawMD. // FromOutgoingContextRaw returns the un-merged, intermediary contents of rawMD.
// //
// Remember to perform strings.ToLower on the keys, for both the returned MD (MD // Remember to perform strings.ToLower on the keys, for both the returned MD (MD
@ -222,15 +254,18 @@ func FromOutgoingContext(ctx context.Context) (MD, bool) {
return nil, false return nil, false
} }
out := MD{} mdSize := len(raw.md)
for i := range raw.added {
mdSize += len(raw.added[i]) / 2
}
out := make(MD, mdSize)
for k, v := range raw.md { for k, v := range raw.md {
// We need to manually convert all keys to lower case, because MD is a // We need to manually convert all keys to lower case, because MD is a
// map, and there's no guarantee that the MD attached to the context is // map, and there's no guarantee that the MD attached to the context is
// created using our helper functions. // created using our helper functions.
key := strings.ToLower(k) key := strings.ToLower(k)
s := make([]string, len(v)) out[key] = copyOf(v)
copy(s, v)
out[key] = s
} }
for _, added := range raw.added { for _, added := range raw.added {
if len(added)%2 == 1 { if len(added)%2 == 1 {

View File

@ -73,12 +73,14 @@ func init() {
internal.DrainServerTransports = func(srv *Server, addr string) { internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr) srv.drainServerTransports(addr)
} }
internal.AddExtraServerOptions = func(opt ...ServerOption) { internal.AddGlobalServerOptions = func(opt ...ServerOption) {
extraServerOptions = opt extraServerOptions = append(extraServerOptions, opt...)
} }
internal.ClearExtraServerOptions = func() { internal.ClearGlobalServerOptions = func() {
extraServerOptions = nil extraServerOptions = nil
} }
internal.BinaryLogger = binaryLogger
internal.JoinServerOptions = newJoinServerOption
} }
var statusOK = status.New(codes.OK, "") var statusOK = status.New(codes.OK, "")
@ -155,6 +157,7 @@ type serverOptions struct {
streamInt StreamServerInterceptor streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor chainStreamInts []StreamServerInterceptor
binaryLogger binarylog.Logger
inTapHandle tap.ServerInHandle inTapHandle tap.ServerInHandle
statsHandlers []stats.Handler statsHandlers []stats.Handler
maxConcurrentStreams uint32 maxConcurrentStreams uint32
@ -190,7 +193,7 @@ type ServerOption interface {
// EmptyServerOption does not alter the server configuration. It can be embedded // EmptyServerOption does not alter the server configuration. It can be embedded
// in another structure to build custom server options. // in another structure to build custom server options.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -214,6 +217,22 @@ func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
} }
} }
// joinServerOption provides a way to combine arbitrary number of server
// options into one.
type joinServerOption struct {
opts []ServerOption
}
func (mdo *joinServerOption) apply(do *serverOptions) {
for _, opt := range mdo.opts {
opt.apply(do)
}
}
func newJoinServerOption(opts ...ServerOption) ServerOption {
return &joinServerOption{opts: opts}
}
// WriteBufferSize determines how much data can be batched before doing a write on the wire. // WriteBufferSize determines how much data can be batched before doing a write on the wire.
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
// The default value for this buffer is 32KB. // The default value for this buffer is 32KB.
@ -305,7 +324,7 @@ func CustomCodec(codec Codec) ServerOption {
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
// Will be supported throughout 1.x. // Will be supported throughout 1.x.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -426,7 +445,7 @@ func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptio
// InTapHandle returns a ServerOption that sets the tap handle for all the server // InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed. // transport to be created. Only one can be installed.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -442,10 +461,24 @@ func InTapHandle(h tap.ServerInHandle) ServerOption {
// StatsHandler returns a ServerOption that sets the stats handler for the server. // StatsHandler returns a ServerOption that sets the stats handler for the server.
func StatsHandler(h stats.Handler) ServerOption { func StatsHandler(h stats.Handler) ServerOption {
return newFuncServerOption(func(o *serverOptions) { return newFuncServerOption(func(o *serverOptions) {
if h == nil {
logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
// Do not allow a nil stats handler, which would otherwise cause
// panics.
return
}
o.statsHandlers = append(o.statsHandlers, h) o.statsHandlers = append(o.statsHandlers, h)
}) })
} }
// binaryLogger returns a ServerOption that can set the binary logger for the
// server.
func binaryLogger(bl binarylog.Logger) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.binaryLogger = bl
})
}
// UnknownServiceHandler returns a ServerOption that allows for adding a custom // UnknownServiceHandler returns a ServerOption that allows for adding a custom
// unknown service handler. The provided method is a bidi-streaming RPC service // unknown service handler. The provided method is a bidi-streaming RPC service
// handler that will be invoked instead of returning the "unimplemented" gRPC // handler that will be invoked instead of returning the "unimplemented" gRPC
@ -469,7 +502,7 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
// new connections. If this is not set, the default is 120 seconds. A zero or // new connections. If this is not set, the default is 120 seconds. A zero or
// negative value will result in an immediate timeout. // negative value will result in an immediate timeout.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -490,7 +523,7 @@ func MaxHeaderListSize(s uint32) ServerOption {
// HeaderTableSize returns a ServerOption that sets the size of dynamic // HeaderTableSize returns a ServerOption that sets the size of dynamic
// header table for stream. // header table for stream.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -505,7 +538,7 @@ func HeaderTableSize(s uint32) ServerOption {
// zero (default) will disable workers and spawn a new goroutine for each // zero (default) will disable workers and spawn a new goroutine for each
// stream. // stream.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -898,7 +931,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
if err != credentials.ErrConnDispatched { if err != credentials.ErrConnDispatched {
// Don't log on ErrConnDispatched and io.EOF to prevent log spam. // Don't log on ErrConnDispatched and io.EOF to prevent log spam.
if err != io.EOF { if err != io.EOF {
channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
} }
c.Close() c.Close()
} }
@ -956,19 +989,19 @@ var _ http.Handler = (*Server)(nil)
// To share one port (such as 443 for https) between gRPC and an // To share one port (such as 443 for https) between gRPC and an
// existing http.Handler, use a root http.Handler such as: // existing http.Handler, use a root http.Handler such as:
// //
// if r.ProtoMajor == 2 && strings.HasPrefix( // if r.ProtoMajor == 2 && strings.HasPrefix(
// r.Header.Get("Content-Type"), "application/grpc") { // r.Header.Get("Content-Type"), "application/grpc") {
// grpcServer.ServeHTTP(w, r) // grpcServer.ServeHTTP(w, r)
// } else { // } else {
// yourMux.ServeHTTP(w, r) // yourMux.ServeHTTP(w, r)
// } // }
// //
// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
// separate from grpc-go's HTTP/2 server. Performance and features may vary // separate from grpc-go's HTTP/2 server. Performance and features may vary
// between the two paths. ServeHTTP does not support some gRPC features // between the two paths. ServeHTTP does not support some gRPC features
// available through grpc-go's HTTP/2 server. // available through grpc-go's HTTP/2 server.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -1193,9 +1226,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
} }
}() }()
} }
var binlogs []binarylog.MethodLogger
binlog := binarylog.GetMethodLogger(stream.Method()) if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
if binlog != nil { binlogs = append(binlogs, ml)
}
if s.opts.binaryLogger != nil {
if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
binlogs = append(binlogs, ml)
}
}
if len(binlogs) != 0 {
ctx := stream.Context() ctx := stream.Context()
md, _ := metadata.FromIncomingContext(ctx) md, _ := metadata.FromIncomingContext(ctx)
logEntry := &binarylog.ClientHeader{ logEntry := &binarylog.ClientHeader{
@ -1215,7 +1255,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if peer, ok := peer.FromContext(ctx); ok { if peer, ok := peer.FromContext(ctx); ok {
logEntry.PeerAddr = peer.Addr logEntry.PeerAddr = peer.Addr
} }
binlog.Log(logEntry) for _, binlog := range binlogs {
binlog.Log(logEntry)
}
} }
// comp and cp are used for compression. decomp and dc are used for // comp and cp are used for compression. decomp and dc are used for
@ -1255,7 +1297,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
} }
var payInfo *payloadInfo var payInfo *payloadInfo
if len(shs) != 0 || binlog != nil { if len(shs) != 0 || len(binlogs) != 0 {
payInfo = &payloadInfo{} payInfo = &payloadInfo{}
} }
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
@ -1281,10 +1323,13 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Length: len(d), Length: len(d),
}) })
} }
if binlog != nil { if len(binlogs) != 0 {
binlog.Log(&binarylog.ClientMessage{ cm := &binarylog.ClientMessage{
Message: d, Message: d,
}) }
for _, binlog := range binlogs {
binlog.Log(cm)
}
} }
if trInfo != nil { if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
@ -1308,18 +1353,24 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if e := t.WriteStatus(stream, appStatus); e != nil { if e := t.WriteStatus(stream, appStatus); e != nil {
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
} }
if binlog != nil { if len(binlogs) != 0 {
if h, _ := stream.Header(); h.Len() > 0 { if h, _ := stream.Header(); h.Len() > 0 {
// Only log serverHeader if there was header. Otherwise it can // Only log serverHeader if there was header. Otherwise it can
// be trailer only. // be trailer only.
binlog.Log(&binarylog.ServerHeader{ sh := &binarylog.ServerHeader{
Header: h, Header: h,
}) }
for _, binlog := range binlogs {
binlog.Log(sh)
}
} }
binlog.Log(&binarylog.ServerTrailer{ st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(), Trailer: stream.Trailer(),
Err: appErr, Err: appErr,
}) }
for _, binlog := range binlogs {
binlog.Log(st)
}
} }
return appErr return appErr
} }
@ -1345,26 +1396,34 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
} }
} }
if binlog != nil { if len(binlogs) != 0 {
h, _ := stream.Header() h, _ := stream.Header()
binlog.Log(&binarylog.ServerHeader{ sh := &binarylog.ServerHeader{
Header: h, Header: h,
}) }
binlog.Log(&binarylog.ServerTrailer{ st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(), Trailer: stream.Trailer(),
Err: appErr, Err: appErr,
}) }
for _, binlog := range binlogs {
binlog.Log(sh)
binlog.Log(st)
}
} }
return err return err
} }
if binlog != nil { if len(binlogs) != 0 {
h, _ := stream.Header() h, _ := stream.Header()
binlog.Log(&binarylog.ServerHeader{ sh := &binarylog.ServerHeader{
Header: h, Header: h,
}) }
binlog.Log(&binarylog.ServerMessage{ sm := &binarylog.ServerMessage{
Message: reply, Message: reply,
}) }
for _, binlog := range binlogs {
binlog.Log(sh)
binlog.Log(sm)
}
} }
if channelz.IsOn() { if channelz.IsOn() {
t.IncrMsgSent() t.IncrMsgSent()
@ -1376,11 +1435,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// Should the logging be in WriteStatus? Should we ignore the WriteStatus // Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it? // error or allow the stats handler to see it?
err = t.WriteStatus(stream, statusOK) err = t.WriteStatus(stream, statusOK)
if binlog != nil { if len(binlogs) != 0 {
binlog.Log(&binarylog.ServerTrailer{ st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(), Trailer: stream.Trailer(),
Err: appErr, Err: appErr,
}) }
for _, binlog := range binlogs {
binlog.Log(st)
}
} }
return err return err
} }
@ -1493,8 +1555,15 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}() }()
} }
ss.binlog = binarylog.GetMethodLogger(stream.Method()) if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
if ss.binlog != nil { ss.binlogs = append(ss.binlogs, ml)
}
if s.opts.binaryLogger != nil {
if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
ss.binlogs = append(ss.binlogs, ml)
}
}
if len(ss.binlogs) != 0 {
md, _ := metadata.FromIncomingContext(ctx) md, _ := metadata.FromIncomingContext(ctx)
logEntry := &binarylog.ClientHeader{ logEntry := &binarylog.ClientHeader{
Header: md, Header: md,
@ -1513,7 +1582,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if peer, ok := peer.FromContext(ss.Context()); ok { if peer, ok := peer.FromContext(ss.Context()); ok {
logEntry.PeerAddr = peer.Addr logEntry.PeerAddr = peer.Addr
} }
ss.binlog.Log(logEntry) for _, binlog := range ss.binlogs {
binlog.Log(logEntry)
}
} }
// If dc is set and matches the stream's compression, use it. Otherwise, try // If dc is set and matches the stream's compression, use it. Otherwise, try
@ -1579,11 +1650,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.mu.Unlock() ss.mu.Unlock()
} }
t.WriteStatus(ss.s, appStatus) t.WriteStatus(ss.s, appStatus)
if ss.binlog != nil { if len(ss.binlogs) != 0 {
ss.binlog.Log(&binarylog.ServerTrailer{ st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(), Trailer: ss.s.Trailer(),
Err: appErr, Err: appErr,
}) }
for _, binlog := range ss.binlogs {
binlog.Log(st)
}
} }
// TODO: Should we log an error from WriteStatus here and below? // TODO: Should we log an error from WriteStatus here and below?
return appErr return appErr
@ -1594,11 +1668,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.mu.Unlock() ss.mu.Unlock()
} }
err = t.WriteStatus(ss.s, statusOK) err = t.WriteStatus(ss.s, statusOK)
if ss.binlog != nil { if len(ss.binlogs) != 0 {
ss.binlog.Log(&binarylog.ServerTrailer{ st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(), Trailer: ss.s.Trailer(),
Err: appErr, Err: appErr,
}) }
for _, binlog := range ss.binlogs {
binlog.Log(st)
}
} }
return err return err
} }
@ -1674,7 +1751,7 @@ type streamKey struct{}
// NewContextWithServerTransportStream creates a new context from ctx and // NewContextWithServerTransportStream creates a new context from ctx and
// attaches stream to it. // attaches stream to it.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -1689,7 +1766,7 @@ func NewContextWithServerTransportStream(ctx context.Context, stream ServerTrans
// //
// See also NewContextWithServerTransportStream. // See also NewContextWithServerTransportStream.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -1704,7 +1781,7 @@ type ServerTransportStream interface {
// ctx. Returns nil if the given context has no stream associated with it // ctx. Returns nil if the given context has no stream associated with it
// (which implies it is not an RPC invocation context). // (which implies it is not an RPC invocation context).
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -1825,12 +1902,12 @@ func (s *Server) getCodec(contentSubtype string) baseCodec {
// When called multiple times, all the provided metadata will be merged. All // When called multiple times, all the provided metadata will be merged. All
// the metadata will be sent out when one of the following happens: // the metadata will be sent out when one of the following happens:
// //
// - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader. // - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
// - The first response message is sent. For unary handlers, this occurs when // - The first response message is sent. For unary handlers, this occurs when
// the handler returns; for streaming handlers, this can happen when stream's // the handler returns; for streaming handlers, this can happen when stream's
// SendMsg method is called. // SendMsg method is called.
// - An RPC status is sent out (error or success). This occurs when the handler // - An RPC status is sent out (error or success). This occurs when the handler
// returns. // returns.
// //
// SetHeader will fail if called after any of the events above. // SetHeader will fail if called after any of the events above.
// //

View File

@ -57,10 +57,9 @@ type lbConfig struct {
type ServiceConfig struct { type ServiceConfig struct {
serviceconfig.Config serviceconfig.Config
// LB is the load balancer the service providers recommends. The balancer // LB is the load balancer the service providers recommends. This is
// specified via grpc.WithBalancerName will override this. This is deprecated; // deprecated; lbConfigs is preferred. If lbConfig and LB are both present,
// lbConfigs is preferred. If lbConfig and LB are both present, lbConfig // lbConfig will be used.
// will be used.
LB *string LB *string
// lbConfig is the service config's load balancing configuration. If // lbConfig is the service config's load balancing configuration. If

View File

@ -140,13 +140,13 @@ type ClientStream interface {
// To ensure resources are not leaked due to the stream returned, one of the following // To ensure resources are not leaked due to the stream returned, one of the following
// actions must be performed: // actions must be performed:
// //
// 1. Call Close on the ClientConn. // 1. Call Close on the ClientConn.
// 2. Cancel the context provided. // 2. Cancel the context provided.
// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated // 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
// client-streaming RPC, for instance, might use the helper function // client-streaming RPC, for instance, might use the helper function
// CloseAndRecv (note that CloseSend does not Recv, therefore is not // CloseAndRecv (note that CloseSend does not Recv, therefore is not
// guaranteed to release all resources). // guaranteed to release all resources).
// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. // 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
// //
// If none of the above happen, a goroutine and a context will be leaked, and grpc // If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message. // will not call the optionally-configured stats handler with a stats.End message.
@ -301,12 +301,13 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
if !cc.dopts.disableRetry { if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
} }
cs.binlog = binarylog.GetMethodLogger(method) if ml := binarylog.GetMethodLogger(method); ml != nil {
cs.binlogs = append(cs.binlogs, ml)
cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */) }
if err != nil { if cc.dopts.binaryLogger != nil {
cs.finish(err) if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
return nil, err cs.binlogs = append(cs.binlogs, ml)
}
} }
// Pick the transport to use and create a new stream on the transport. // Pick the transport to use and create a new stream on the transport.
@ -328,7 +329,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
return nil, err return nil, err
} }
if cs.binlog != nil { if len(cs.binlogs) != 0 {
md, _ := metadata.FromOutgoingContext(ctx) md, _ := metadata.FromOutgoingContext(ctx)
logEntry := &binarylog.ClientHeader{ logEntry := &binarylog.ClientHeader{
OnClientSide: true, OnClientSide: true,
@ -342,7 +343,9 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
logEntry.Timeout = 0 logEntry.Timeout = 0
} }
} }
cs.binlog.Log(logEntry) for _, binlog := range cs.binlogs {
binlog.Log(logEntry)
}
} }
if desc != unaryStreamDesc { if desc != unaryStreamDesc {
@ -486,7 +489,7 @@ type clientStream struct {
retryThrottler *retryThrottler // The throttler active when the RPC began. retryThrottler *retryThrottler // The throttler active when the RPC began.
binlog binarylog.MethodLogger // Binary logger, can be nil. binlogs []binarylog.MethodLogger
// serverHeaderBinlogged is a boolean for whether server header has been // serverHeaderBinlogged is a boolean for whether server header has been
// logged. Server header will be logged when the first time one of those // logged. Server header will be logged when the first time one of those
// happens: stream.Header(), stream.Recv(). // happens: stream.Header(), stream.Recv().
@ -704,6 +707,18 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
// already be status errors. // already be status errors.
return toRPCErr(op(cs.attempt)) return toRPCErr(op(cs.attempt))
} }
if len(cs.buffer) == 0 {
// For the first op, which controls creation of the stream and
// assigns cs.attempt, we need to create a new attempt inline
// before executing the first op. On subsequent ops, the attempt
// is created immediately before replaying the ops.
var err error
if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.mu.Unlock()
cs.finish(err)
return err
}
}
a := cs.attempt a := cs.attempt
cs.mu.Unlock() cs.mu.Unlock()
err := op(a) err := op(a)
@ -738,7 +753,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
cs.finish(err) cs.finish(err)
return nil, err return nil, err
} }
if cs.binlog != nil && !cs.serverHeaderBinlogged { if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
// Only log if binary log is on and header has not been logged. // Only log if binary log is on and header has not been logged.
logEntry := &binarylog.ServerHeader{ logEntry := &binarylog.ServerHeader{
OnClientSide: true, OnClientSide: true,
@ -748,8 +763,10 @@ func (cs *clientStream) Header() (metadata.MD, error) {
if peer, ok := peer.FromContext(cs.Context()); ok { if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr logEntry.PeerAddr = peer.Addr
} }
cs.binlog.Log(logEntry)
cs.serverHeaderBinlogged = true cs.serverHeaderBinlogged = true
for _, binlog := range cs.binlogs {
binlog.Log(logEntry)
}
} }
return m, nil return m, nil
} }
@ -823,38 +840,44 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
return a.sendMsg(m, hdr, payload, data) return a.sendMsg(m, hdr, payload, data)
} }
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil { if len(cs.binlogs) != 0 && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{ cm := &binarylog.ClientMessage{
OnClientSide: true, OnClientSide: true,
Message: data, Message: data,
}) }
for _, binlog := range cs.binlogs {
binlog.Log(cm)
}
} }
return err return err
} }
func (cs *clientStream) RecvMsg(m interface{}) error { func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.binlog != nil && !cs.serverHeaderBinlogged { if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
// Call Header() to binary log header if it's not already logged. // Call Header() to binary log header if it's not already logged.
cs.Header() cs.Header()
} }
var recvInfo *payloadInfo var recvInfo *payloadInfo
if cs.binlog != nil { if len(cs.binlogs) != 0 {
recvInfo = &payloadInfo{} recvInfo = &payloadInfo{}
} }
err := cs.withRetry(func(a *csAttempt) error { err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo) return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked) }, cs.commitAttemptLocked)
if cs.binlog != nil && err == nil { if len(cs.binlogs) != 0 && err == nil {
cs.binlog.Log(&binarylog.ServerMessage{ sm := &binarylog.ServerMessage{
OnClientSide: true, OnClientSide: true,
Message: recvInfo.uncompressedBytes, Message: recvInfo.uncompressedBytes,
}) }
for _, binlog := range cs.binlogs {
binlog.Log(sm)
}
} }
if err != nil || !cs.desc.ServerStreams { if err != nil || !cs.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream. // err != nil or non-server-streaming indicates end of stream.
cs.finish(err) cs.finish(err)
if cs.binlog != nil { if len(cs.binlogs) != 0 {
// finish will not log Trailer. Log Trailer here. // finish will not log Trailer. Log Trailer here.
logEntry := &binarylog.ServerTrailer{ logEntry := &binarylog.ServerTrailer{
OnClientSide: true, OnClientSide: true,
@ -867,7 +890,9 @@ func (cs *clientStream) RecvMsg(m interface{}) error {
if peer, ok := peer.FromContext(cs.Context()); ok { if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr logEntry.PeerAddr = peer.Addr
} }
cs.binlog.Log(logEntry) for _, binlog := range cs.binlogs {
binlog.Log(logEntry)
}
} }
} }
return err return err
@ -888,10 +913,13 @@ func (cs *clientStream) CloseSend() error {
return nil return nil
} }
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }) cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
if cs.binlog != nil { if len(cs.binlogs) != 0 {
cs.binlog.Log(&binarylog.ClientHalfClose{ chc := &binarylog.ClientHalfClose{
OnClientSide: true, OnClientSide: true,
}) }
for _, binlog := range cs.binlogs {
binlog.Log(chc)
}
} }
// We never returned an error here for reasons. // We never returned an error here for reasons.
return nil return nil
@ -924,10 +952,13 @@ func (cs *clientStream) finish(err error) {
// //
// Only one of cancel or trailer needs to be logged. In the cases where // Only one of cancel or trailer needs to be logged. In the cases where
// users don't call RecvMsg, users must have already canceled the RPC. // users don't call RecvMsg, users must have already canceled the RPC.
if cs.binlog != nil && status.Code(err) == codes.Canceled { if len(cs.binlogs) != 0 && status.Code(err) == codes.Canceled {
cs.binlog.Log(&binarylog.Cancel{ c := &binarylog.Cancel{
OnClientSide: true, OnClientSide: true,
}) }
for _, binlog := range cs.binlogs {
binlog.Log(c)
}
} }
if err == nil { if err == nil {
cs.retryThrottler.successfulRPC() cs.retryThrottler.successfulRPC()
@ -999,6 +1030,7 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
} }
return io.EOF // indicates successful end of stream. return io.EOF // indicates successful end of stream.
} }
return toRPCErr(err) return toRPCErr(err)
} }
if a.trInfo != nil { if a.trInfo != nil {
@ -1447,7 +1479,7 @@ type serverStream struct {
statsHandler []stats.Handler statsHandler []stats.Handler
binlog binarylog.MethodLogger binlogs []binarylog.MethodLogger
// serverHeaderBinlogged indicates whether server header has been logged. It // serverHeaderBinlogged indicates whether server header has been logged. It
// will happen when one of the following two happens: stream.SendHeader(), // will happen when one of the following two happens: stream.SendHeader(),
// stream.Send(). // stream.Send().
@ -1481,12 +1513,15 @@ func (ss *serverStream) SendHeader(md metadata.MD) error {
} }
err = ss.t.WriteHeader(ss.s, md) err = ss.t.WriteHeader(ss.s, md)
if ss.binlog != nil && !ss.serverHeaderBinlogged { if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
h, _ := ss.s.Header() h, _ := ss.s.Header()
ss.binlog.Log(&binarylog.ServerHeader{ sh := &binarylog.ServerHeader{
Header: h, Header: h,
}) }
ss.serverHeaderBinlogged = true ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs {
binlog.Log(sh)
}
} }
return err return err
} }
@ -1543,17 +1578,23 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
return toRPCErr(err) return toRPCErr(err)
} }
if ss.binlog != nil { if len(ss.binlogs) != 0 {
if !ss.serverHeaderBinlogged { if !ss.serverHeaderBinlogged {
h, _ := ss.s.Header() h, _ := ss.s.Header()
ss.binlog.Log(&binarylog.ServerHeader{ sh := &binarylog.ServerHeader{
Header: h, Header: h,
}) }
ss.serverHeaderBinlogged = true ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs {
binlog.Log(sh)
}
} }
ss.binlog.Log(&binarylog.ServerMessage{ sm := &binarylog.ServerMessage{
Message: data, Message: data,
}) }
for _, binlog := range ss.binlogs {
binlog.Log(sm)
}
} }
if len(ss.statsHandler) != 0 { if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler { for _, sh := range ss.statsHandler {
@ -1592,13 +1633,16 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
} }
}() }()
var payInfo *payloadInfo var payInfo *payloadInfo
if len(ss.statsHandler) != 0 || ss.binlog != nil { if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
payInfo = &payloadInfo{} payInfo = &payloadInfo{}
} }
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil { if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
if err == io.EOF { if err == io.EOF {
if ss.binlog != nil { if len(ss.binlogs) != 0 {
ss.binlog.Log(&binarylog.ClientHalfClose{}) chc := &binarylog.ClientHalfClose{}
for _, binlog := range ss.binlogs {
binlog.Log(chc)
}
} }
return err return err
} }
@ -1619,10 +1663,13 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}) })
} }
} }
if ss.binlog != nil { if len(ss.binlogs) != 0 {
ss.binlog.Log(&binarylog.ClientMessage{ cm := &binarylog.ClientMessage{
Message: payInfo.uncompressedBytes, Message: payInfo.uncompressedBytes,
}) }
for _, binlog := range ss.binlogs {
binlog.Log(cm)
}
} }
return nil return nil
} }

View File

@ -19,4 +19,4 @@
package grpc package grpc
// Version is the current grpc version. // Version is the current grpc version.
const Version = "1.48.0" const Version = "1.50.1"

View File

@ -147,7 +147,6 @@ grpc.NewGZIPDecompressor
grpc.RPCCompressor grpc.RPCCompressor
grpc.RPCDecompressor grpc.RPCDecompressor
grpc.ServiceConfig grpc.ServiceConfig
grpc.WithBalancerName
grpc.WithCompressor grpc.WithCompressor
grpc.WithDecompressor grpc.WithDecompressor
grpc.WithDialer grpc.WithDialer

4
vendor/modules.txt vendored
View File

@ -299,8 +299,8 @@ golang.org/x/time/rate
# google.golang.org/genproto v0.0.0-20220706185917-7780775163c4 # google.golang.org/genproto v0.0.0-20220706185917-7780775163c4
## explicit; go 1.15 ## explicit; go 1.15
google.golang.org/genproto/googleapis/rpc/status google.golang.org/genproto/googleapis/rpc/status
# google.golang.org/grpc v1.48.0 # google.golang.org/grpc v1.50.1
## explicit; go 1.14 ## explicit; go 1.17
google.golang.org/grpc google.golang.org/grpc
google.golang.org/grpc/attributes google.golang.org/grpc/attributes
google.golang.org/grpc/backoff google.golang.org/grpc/backoff