node-operator: fix data race in executor (#2326)

This commit is contained in:
Adrian Stobbe 2023-09-11 09:26:20 +02:00 committed by GitHub
parent 92726dad2a
commit b3bb486e59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 76 deletions

View File

@ -33,10 +33,10 @@ type Controller interface {
Reconcile(ctx context.Context) (Result, error) Reconcile(ctx context.Context) (Result, error)
} }
// Executor is a task executor / scheduler. // taskExecutor is a task executor / scheduler.
// It will call the reconcile method of the given controller with a regular interval // It will call the reconcile method of the given controller with a regular interval
// or when triggered externally. // or when triggered externally.
type Executor struct { type taskExecutor struct {
running atomic.Bool running atomic.Bool
// controller is the controller to be reconciled. // controller is the controller to be reconciled.
@ -55,21 +55,31 @@ type Executor struct {
} }
// New creates a new Executor. // New creates a new Executor.
func New(controller Controller, cfg Config) *Executor { func New(controller Controller, cfg Config) Executor {
cfg.applyDefaults() cfg.applyDefaults()
return &Executor{ return &taskExecutor{
controller: controller, controller: controller,
pollingFrequency: cfg.PollingFrequency, pollingFrequency: cfg.PollingFrequency,
rateLimiter: cfg.RateLimiter, rateLimiter: cfg.RateLimiter,
externalTrigger: make(chan struct{}, 1),
stop: make(chan struct{}, 1),
} }
} }
// Executor is a task executor / scheduler.
type Executor interface {
Start(ctx context.Context) StopWaitFn
Running() bool
Trigger()
}
// StopWaitFn is a function that can be called to stop the executor and wait for it to stop. // StopWaitFn is a function that can be called to stop the executor and wait for it to stop.
type StopWaitFn func() type StopWaitFn func()
// Start starts the executor in a separate go routine. // Start starts the executor in a separate go routine.
// Call Stop to stop the executor. // Call Stop to stop the executor.
func (e *Executor) Start(ctx context.Context) StopWaitFn { // IMPORTANT: The executor can only be started once.
func (e *taskExecutor) Start(ctx context.Context) StopWaitFn {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
logr := log.FromContext(ctx) logr := log.FromContext(ctx)
stopWait := func() { stopWait := func() {
@ -83,9 +93,6 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn {
if !e.running.CompareAndSwap(false, true) { if !e.running.CompareAndSwap(false, true) {
return stopWait return stopWait
} }
e.externalTrigger = make(chan struct{}, 1)
e.stop = make(chan struct{}, 1)
// execute is used by the go routines below to communicate // execute is used by the go routines below to communicate
// that a reconciliation should happen // that a reconciliation should happen
execute := make(chan struct{}, 1) execute := make(chan struct{}, 1)
@ -99,6 +106,9 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn {
// timer routine is responsible for triggering the reconciliation after the timer expires // timer routine is responsible for triggering the reconciliation after the timer expires
// or when triggered externally // or when triggered externally
go func() { go func() {
defer func() {
e.running.Store(false)
}()
defer wg.Done() defer wg.Done()
defer close(execute) defer close(execute)
defer logr.Info("Timer stopped") defer logr.Info("Timer stopped")
@ -119,9 +129,6 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn {
// executor routine is responsible for executing the reconciliation // executor routine is responsible for executing the reconciliation
go func() { go func() {
defer func() {
e.running.Store(false)
}()
defer wg.Done() defer wg.Done()
defer close(nextScheduledReconcile) defer close(nextScheduledReconcile)
defer logr.Info("Executor stopped") defer logr.Info("Executor stopped")
@ -157,7 +164,7 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn {
// Stop stops the executor. // Stop stops the executor.
// It does not block until the executor is stopped. // It does not block until the executor is stopped.
func (e *Executor) Stop() { func (e *taskExecutor) Stop() {
select { select {
case e.stop <- struct{}{}: case e.stop <- struct{}{}:
default: default:
@ -167,13 +174,13 @@ func (e *Executor) Stop() {
// Running returns true if the executor is running. // Running returns true if the executor is running.
// When the executor is stopped, it is not running anymore. // When the executor is stopped, it is not running anymore.
func (e *Executor) Running() bool { func (e *taskExecutor) Running() bool {
return e.running.Load() return e.running.Load()
} }
// Trigger triggers a reconciliation. // Trigger triggers a reconciliation.
// If a reconciliation is already pending, this call is a no-op. // If a reconciliation is already pending, this call is a no-op.
func (e *Executor) Trigger() { func (e *taskExecutor) Trigger() {
select { select {
case e.externalTrigger <- struct{}{}: case e.externalTrigger <- struct{}{}:
default: default:

View File

@ -20,40 +20,14 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m) goleak.VerifyTestMain(m)
} }
func TestNew(t *testing.T) {
testCases := map[string]struct {
config Config
wantPollingFrequency time.Duration
}{
"applies default polling frequency": {
config: Config{},
wantPollingFrequency: defaultPollingFrequency,
},
"custom polling frequency": {
config: Config{
PollingFrequency: time.Hour,
},
wantPollingFrequency: time.Hour,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
exec := New(nil, tc.config)
assert.Equal(tc.wantPollingFrequency, exec.pollingFrequency)
})
}
}
func TestStartTriggersImmediateReconciliation(t *testing.T) { func TestStartTriggersImmediateReconciliation(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
ctrl := newStubController(Result{}, nil) ctrl := newStubController(Result{}, nil)
exec := Executor{ cfg := Config{
controller: ctrl, PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. RateLimiter: &stubRateLimiter{}, // no rate limiting
rateLimiter: &stubRateLimiter{}, // no rate limiting
} }
exec := New(ctrl, cfg)
// on start, the executor should trigger a reconciliation // on start, the executor should trigger a reconciliation
stopAndWait := exec.Start(context.Background()) stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called <-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
@ -68,11 +42,11 @@ func TestStartTriggersImmediateReconciliation(t *testing.T) {
func TestStartMultipleTimesIsCoalesced(t *testing.T) { func TestStartMultipleTimesIsCoalesced(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
ctrl := newStubController(Result{}, nil) ctrl := newStubController(Result{}, nil)
exec := Executor{ cfg := Config{
controller: ctrl, PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. RateLimiter: &stubRateLimiter{}, // no rate limiting
rateLimiter: &stubRateLimiter{}, // no rate limiting
} }
exec := New(ctrl, cfg)
// start once // start once
stopAndWait := exec.Start(context.Background()) stopAndWait := exec.Start(context.Background())
// start again multiple times // start again multiple times
@ -93,11 +67,11 @@ func TestErrorTriggersImmediateReconciliation(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
// returning an error should trigger a reconciliation immediately // returning an error should trigger a reconciliation immediately
ctrl := newStubController(Result{}, errors.New("reconciler error")) ctrl := newStubController(Result{}, errors.New("reconciler error"))
exec := Executor{ cfg := Config{
controller: ctrl, PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. RateLimiter: &stubRateLimiter{}, // no rate limiting
rateLimiter: &stubRateLimiter{}, // no rate limiting
} }
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background()) stopAndWait := exec.Start(context.Background())
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called <-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
@ -115,13 +89,13 @@ func TestErrorTriggersRateLimiting(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
// returning an error should trigger a reconciliation immediately // returning an error should trigger a reconciliation immediately
ctrl := newStubController(Result{}, errors.New("reconciler error")) ctrl := newStubController(Result{}, errors.New("reconciler error"))
exec := Executor{ cfg := Config{
controller: ctrl, PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. RateLimiter: &stubRateLimiter{
rateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
}, },
} }
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background()) stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called once to trigger rate limiting <-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called once to trigger rate limiting
ctrl.stop <- struct{}{} ctrl.stop <- struct{}{}
@ -139,13 +113,13 @@ func TestRequeueAfterResultRequeueInterval(t *testing.T) {
Requeue: true, Requeue: true,
RequeueAfter: time.Microsecond, RequeueAfter: time.Microsecond,
}, nil) }, nil)
exec := Executor{ cfg := Config{
controller: ctrl, PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. RateLimiter: &stubRateLimiter{
rateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
}, },
} }
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background()) stopAndWait := exec.Start(context.Background())
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called <-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
@ -162,13 +136,13 @@ func TestRequeueAfterResultRequeueInterval(t *testing.T) {
func TestExternalTrigger(t *testing.T) { func TestExternalTrigger(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
ctrl := newStubController(Result{}, nil) ctrl := newStubController(Result{}, nil)
exec := Executor{ cfg := Config{
controller: ctrl, PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. RateLimiter: &stubRateLimiter{
rateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
}, },
} }
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background()) stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // initial trigger <-ctrl.waitUntilReconciled // initial trigger
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@ -186,13 +160,13 @@ func TestExternalTrigger(t *testing.T) {
func TestSimultaneousExternalTriggers(t *testing.T) { func TestSimultaneousExternalTriggers(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
ctrl := newStubController(Result{}, nil) ctrl := newStubController(Result{}, nil)
exec := Executor{ cfg := Config{
controller: ctrl, PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. RateLimiter: &stubRateLimiter{
rateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
}, },
} }
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background()) stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // initial trigger <-ctrl.waitUntilReconciled // initial trigger
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
@ -212,11 +186,11 @@ func TestContextCancel(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ctrl := newStubController(Result{}, nil) ctrl := newStubController(Result{}, nil)
exec := Executor{ cfg := Config{
controller: ctrl, PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. RateLimiter: &stubRateLimiter{}, // no rate limiting
rateLimiter: &stubRateLimiter{}, // no rate limiting
} }
exec := New(ctrl, cfg)
_ = exec.Start(ctx) // no need to explicitly stop the executor, it will stop when the context is canceled _ = exec.Start(ctx) // no need to explicitly stop the executor, it will stop when the context is canceled
<-ctrl.waitUntilReconciled // initial trigger <-ctrl.waitUntilReconciled // initial trigger
@ -238,13 +212,13 @@ func TestContextCancel(t *testing.T) {
func TestRequeueAfterPollingFrequency(t *testing.T) { func TestRequeueAfterPollingFrequency(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
ctrl := newStubController(Result{}, nil) ctrl := newStubController(Result{}, nil)
exec := Executor{ cfg := Config{
controller: ctrl, PollingFrequency: time.Microsecond, // basically no delay
pollingFrequency: time.Microsecond, // basically no delay RateLimiter: &stubRateLimiter{
rateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
}, },
} }
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background()) stopAndWait := exec.Start(context.Background())
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called <-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called