package events import ( "fmt" "math/rand" "sync" "sync/atomic" "time" "github.com/Sirupsen/logrus" ) // RetryingSink retries the write until success or an ErrSinkClosed is // returned. Underlying sink must have p > 0 of succeeding or the sink will // block. Retry is configured with a RetryStrategy. Concurrent calls to a // retrying sink are serialized through the sink, meaning that if one is // in-flight, another will not proceed. type RetryingSink struct { sink Sink strategy RetryStrategy closed chan struct{} once sync.Once } // NewRetryingSink returns a sink that will retry writes to a sink, backing // off on failure. Parameters threshold and backoff adjust the behavior of the // circuit breaker. func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink { rs := &RetryingSink{ sink: sink, strategy: strategy, closed: make(chan struct{}), } return rs } // Write attempts to flush the events to the downstream sink until it succeeds // or the sink is closed. func (rs *RetryingSink) Write(event Event) error { logger := logrus.WithField("event", event) retry: select { case <-rs.closed: return ErrSinkClosed default: } if backoff := rs.strategy.Proceed(event); backoff > 0 { select { case <-time.After(backoff): // TODO(stevvooe): This branch holds up the next try. Before, we // would simply break to the "retry" label and then possibly wait // again. However, this requires all retry strategies to have a // large probability of probing the sync for success, rather than // just backing off and sending the request. case <-rs.closed: return ErrSinkClosed } } if err := rs.sink.Write(event); err != nil { if err == ErrSinkClosed { // terminal! return err } logger := logger.WithError(err) // shadow!! if rs.strategy.Failure(event, err) { logger.Errorf("retryingsink: dropped event") return nil } logger.Errorf("retryingsink: error writing event, retrying") goto retry } rs.strategy.Success(event) return nil } // Close closes the sink and the underlying sink. func (rs *RetryingSink) Close() error { rs.once.Do(func() { close(rs.closed) }) return nil } func (rs RetryingSink) String() string { // Serialize a copy of the RetryingSink without the sync.Once, to avoid // a data race. rs2 := map[string]interface{}{ "sink": rs.sink, "strategy": rs.strategy, "closed": rs.closed, } return fmt.Sprint(rs2) } // RetryStrategy defines a strategy for retrying event sink writes. // // All methods should be goroutine safe. type RetryStrategy interface { // Proceed is called before every event send. If proceed returns a // positive, non-zero integer, the retryer will back off by the provided // duration. // // An event is provided, by may be ignored. Proceed(event Event) time.Duration // Failure reports a failure to the strategy. If this method returns true, // the event should be dropped. Failure(event Event, err error) bool // Success should be called when an event is sent successfully. Success(event Event) } // Breaker implements a circuit breaker retry strategy. // // The current implementation never drops events. type Breaker struct { threshold int recent int last time.Time backoff time.Duration // time after which we retry after failure. mu sync.Mutex } var _ RetryStrategy = &Breaker{} // NewBreaker returns a breaker that will backoff after the threshold has been // tripped. A Breaker is thread safe and may be shared by many goroutines. func NewBreaker(threshold int, backoff time.Duration) *Breaker { return &Breaker{ threshold: threshold, backoff: backoff, } } // Proceed checks the failures against the threshold. func (b *Breaker) Proceed(event Event) time.Duration { b.mu.Lock() defer b.mu.Unlock() if b.recent < b.threshold { return 0 } return b.last.Add(b.backoff).Sub(time.Now()) } // Success resets the breaker. func (b *Breaker) Success(event Event) { b.mu.Lock() defer b.mu.Unlock() b.recent = 0 b.last = time.Time{} } // Failure records the failure and latest failure time. func (b *Breaker) Failure(event Event, err error) bool { b.mu.Lock() defer b.mu.Unlock() b.recent++ b.last = time.Now().UTC() return false // never drop events. } var ( // DefaultExponentialBackoffConfig provides a default configuration for // exponential backoff. DefaultExponentialBackoffConfig = ExponentialBackoffConfig{ Base: time.Second, Factor: time.Second, Max: 20 * time.Second, } ) // ExponentialBackoffConfig configures backoff parameters. // // Note that these parameters operate on the upper bound for choosing a random // value. For example, at Base=1s, a random value in [0,1s) will be chosen for // the backoff value. type ExponentialBackoffConfig struct { // Base is the minimum bound for backing off after failure. Base time.Duration // Factor sets the amount of time by which the backoff grows with each // failure. Factor time.Duration // Max is the absolute maxiumum bound for a single backoff. Max time.Duration } // ExponentialBackoff implements random backoff with exponentially increasing // bounds as the number consecutive failures increase. type ExponentialBackoff struct { config ExponentialBackoffConfig failures uint64 // consecutive failure counter. } // NewExponentialBackoff returns an exponential backoff strategy with the // desired config. If config is nil, the default is returned. func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff { return &ExponentialBackoff{ config: config, } } // Proceed returns the next randomly bound exponential backoff time. func (b *ExponentialBackoff) Proceed(event Event) time.Duration { return b.backoff(atomic.LoadUint64(&b.failures)) } // Success resets the failures counter. func (b *ExponentialBackoff) Success(event Event) { atomic.StoreUint64(&b.failures, 0) } // Failure increments the failure counter. func (b *ExponentialBackoff) Failure(event Event, err error) bool { atomic.AddUint64(&b.failures, 1) return false } // backoff calculates the amount of time to wait based on the number of // consecutive failures. func (b *ExponentialBackoff) backoff(failures uint64) time.Duration { if failures <= 0 { // proceed normally when there are no failures. return 0 } factor := b.config.Factor if factor <= 0 { factor = DefaultExponentialBackoffConfig.Factor } backoff := b.config.Base + factor*time.Duration(1<<(failures-1)) max := b.config.Max if max <= 0 { max = DefaultExponentialBackoffConfig.Max } if backoff > max || backoff < 0 { backoff = max } // Choose a uniformly distributed value from [0, backoff). return time.Duration(rand.Int63n(int64(backoff))) }