constellation/internal/retry/retry.go
Otto Bittner c743398a23 AB#2181: retry k8s downloads (#286)
Generalize retrier:
* Generalize Do to use a supplied 'retriable' function
* Make clock an optional argument in NewIntervalRetrier
* Move grpc/retrier to interal package
* Update existing unittests to not use retry feature

Add retryDownloadToTempDir:
* Wrap downloadToTempDir with retrier.
* Retry if TCP connection is reset.
* Abort by canceling the context.
* Use a mock server in the unit test that serves responses
depending on the state received through a state channel.

Co-authored-by: katexochen <49727155+katexochen@users.noreply.github.com>
2022-07-21 15:20:12 +02:00

62 lines
1.3 KiB
Go

package retry
import (
"context"
"time"
"k8s.io/utils/clock"
)
// IntervalRetrier retries a call with an interval. The call is defined in the Doer property.
type IntervalRetrier struct {
interval time.Duration
doer Doer
clock clock.WithTicker
retriable func(error) bool
}
// NewIntervalRetrier returns a new IntervalRetrier. The optional clock is used for testing.
func NewIntervalRetrier(doer Doer, interval time.Duration, retriable func(error) bool, optClock ...clock.WithTicker) *IntervalRetrier {
var clock clock.WithTicker = clock.RealClock{}
if len(optClock) > 0 {
clock = optClock[0]
}
return &IntervalRetrier{
interval: interval,
doer: doer,
clock: clock,
retriable: retriable,
}
}
// Do retries performing a call until it succeeds, returns a permanent error or the context is cancelled.
func (r *IntervalRetrier) Do(ctx context.Context) error {
ticker := r.clock.NewTicker(r.interval)
defer ticker.Stop()
for {
err := r.doer.Do(ctx)
if err == nil {
return nil
}
if !r.retriable(err) {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C():
}
}
}
type Doer interface {
// Do performs an operation.
//
// It should return an error that can be checked for retriability.
Do(ctx context.Context) error
}