constellation/internal/grpc/retry/retry.go

85 lines
1.9 KiB
Go
Raw Normal View History

2022-06-21 15:59:12 +00:00
package retry
import (
"context"
"errors"
2022-06-21 15:59:12 +00:00
"strings"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/utils/clock"
)
2022-06-29 12:28:37 +00:00
// IntervalRetrier is retries a grpc call with an interval.
type IntervalRetrier struct {
2022-06-21 15:59:12 +00:00
interval time.Duration
doer Doer
clock clock.WithTicker
}
2022-06-29 12:28:37 +00:00
// NewIntervalRetrier returns a new IntervalRetrier.
func NewIntervalRetrier(doer Doer, interval time.Duration) *IntervalRetrier {
return &IntervalRetrier{
2022-06-21 15:59:12 +00:00
interval: interval,
doer: doer,
clock: clock.RealClock{},
}
}
2022-06-29 12:28:37 +00:00
// Do retries performing a grpc call until it succeeds, returns a permanent error or the context is cancelled.
func (r *IntervalRetrier) Do(ctx context.Context) error {
2022-06-21 15:59:12 +00:00
ticker := r.clock.NewTicker(r.interval)
defer ticker.Stop()
for {
err := r.doer.Do(ctx)
if err == nil {
return nil
}
if !r.serviceIsUnavailable(err) {
return err
}
select {
2022-06-29 12:28:37 +00:00
case <-ctx.Done():
2022-06-21 15:59:12 +00:00
return ctx.Err()
case <-ticker.C():
}
}
}
2022-06-29 12:28:37 +00:00
// serviceIsUnavailable checks if the error is a grpc status with code Unavailable.
// In the special case of an authentication handshake failure, false is returned to prevent further retries.
func (r *IntervalRetrier) serviceIsUnavailable(err error) bool {
// taken from google.golang.org/grpc/status.FromError
var targetErr interface {
GRPCStatus() *status.Status
Error() string
}
if !errors.As(err, &targetErr) {
return false
}
statusErr, ok := status.FromError(targetErr)
2022-06-21 15:59:12 +00:00
if !ok {
return false
}
2022-06-21 15:59:12 +00:00
if statusErr.Code() != codes.Unavailable {
return false
}
2022-06-21 15:59:12 +00:00
// ideally we would check the error type directly, but grpc only provides a string
2022-06-29 12:28:37 +00:00
return !strings.HasPrefix(statusErr.Message(), `connection error: desc = "transport: authentication handshake failed`)
2022-06-21 15:59:12 +00:00
}
type Doer interface {
2022-06-29 12:28:37 +00:00
// Do performs a grpc operation.
//
// It should return a grpc status with code Unavailable error to signal a transient fault.
2022-06-21 15:59:12 +00:00
Do(ctx context.Context) error
}