From b3bb486e59838c912d3108745b11dd882b7dc436 Mon Sep 17 00:00:00 2001 From: Adrian Stobbe Date: Mon, 11 Sep 2023 09:26:20 +0200 Subject: [PATCH] node-operator: fix data race in executor (#2326) --- .../internal/executor/executor.go | 35 ++++--- .../internal/executor/executor_test.go | 98 +++++++------------ 2 files changed, 57 insertions(+), 76 deletions(-) diff --git a/operators/constellation-node-operator/internal/executor/executor.go b/operators/constellation-node-operator/internal/executor/executor.go index c74faf38f..152dc3a40 100644 --- a/operators/constellation-node-operator/internal/executor/executor.go +++ b/operators/constellation-node-operator/internal/executor/executor.go @@ -33,10 +33,10 @@ type Controller interface { Reconcile(ctx context.Context) (Result, error) } -// Executor is a task executor / scheduler. +// taskExecutor is a task executor / scheduler. // It will call the reconcile method of the given controller with a regular interval // or when triggered externally. -type Executor struct { +type taskExecutor struct { running atomic.Bool // controller is the controller to be reconciled. @@ -55,21 +55,31 @@ type Executor struct { } // New creates a new Executor. -func New(controller Controller, cfg Config) *Executor { +func New(controller Controller, cfg Config) Executor { cfg.applyDefaults() - return &Executor{ + return &taskExecutor{ controller: controller, pollingFrequency: cfg.PollingFrequency, rateLimiter: cfg.RateLimiter, + externalTrigger: make(chan struct{}, 1), + stop: make(chan struct{}, 1), } } +// Executor is a task executor / scheduler. +type Executor interface { + Start(ctx context.Context) StopWaitFn + Running() bool + Trigger() +} + // StopWaitFn is a function that can be called to stop the executor and wait for it to stop. type StopWaitFn func() // Start starts the executor in a separate go routine. // Call Stop to stop the executor. -func (e *Executor) Start(ctx context.Context) StopWaitFn { +// IMPORTANT: The executor can only be started once. +func (e *taskExecutor) Start(ctx context.Context) StopWaitFn { wg := &sync.WaitGroup{} logr := log.FromContext(ctx) stopWait := func() { @@ -83,9 +93,6 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn { if !e.running.CompareAndSwap(false, true) { return stopWait } - - e.externalTrigger = make(chan struct{}, 1) - e.stop = make(chan struct{}, 1) // execute is used by the go routines below to communicate // that a reconciliation should happen execute := make(chan struct{}, 1) @@ -99,6 +106,9 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn { // timer routine is responsible for triggering the reconciliation after the timer expires // or when triggered externally go func() { + defer func() { + e.running.Store(false) + }() defer wg.Done() defer close(execute) defer logr.Info("Timer stopped") @@ -119,9 +129,6 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn { // executor routine is responsible for executing the reconciliation go func() { - defer func() { - e.running.Store(false) - }() defer wg.Done() defer close(nextScheduledReconcile) defer logr.Info("Executor stopped") @@ -157,7 +164,7 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn { // Stop stops the executor. // It does not block until the executor is stopped. -func (e *Executor) Stop() { +func (e *taskExecutor) Stop() { select { case e.stop <- struct{}{}: default: @@ -167,13 +174,13 @@ func (e *Executor) Stop() { // Running returns true if the executor is running. // When the executor is stopped, it is not running anymore. -func (e *Executor) Running() bool { +func (e *taskExecutor) Running() bool { return e.running.Load() } // Trigger triggers a reconciliation. // If a reconciliation is already pending, this call is a no-op. -func (e *Executor) Trigger() { +func (e *taskExecutor) Trigger() { select { case e.externalTrigger <- struct{}{}: default: diff --git a/operators/constellation-node-operator/internal/executor/executor_test.go b/operators/constellation-node-operator/internal/executor/executor_test.go index 96b1d8240..91d0a20f1 100644 --- a/operators/constellation-node-operator/internal/executor/executor_test.go +++ b/operators/constellation-node-operator/internal/executor/executor_test.go @@ -20,40 +20,14 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } -func TestNew(t *testing.T) { - testCases := map[string]struct { - config Config - wantPollingFrequency time.Duration - }{ - "applies default polling frequency": { - config: Config{}, - wantPollingFrequency: defaultPollingFrequency, - }, - "custom polling frequency": { - config: Config{ - PollingFrequency: time.Hour, - }, - wantPollingFrequency: time.Hour, - }, - } - - for name, tc := range testCases { - t.Run(name, func(t *testing.T) { - assert := assert.New(t) - exec := New(nil, tc.config) - assert.Equal(tc.wantPollingFrequency, exec.pollingFrequency) - }) - } -} - func TestStartTriggersImmediateReconciliation(t *testing.T) { assert := assert.New(t) ctrl := newStubController(Result{}, nil) - exec := Executor{ - controller: ctrl, - pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. - rateLimiter: &stubRateLimiter{}, // no rate limiting + cfg := Config{ + PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. + RateLimiter: &stubRateLimiter{}, // no rate limiting } + exec := New(ctrl, cfg) // on start, the executor should trigger a reconciliation stopAndWait := exec.Start(context.Background()) <-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called @@ -68,11 +42,11 @@ func TestStartTriggersImmediateReconciliation(t *testing.T) { func TestStartMultipleTimesIsCoalesced(t *testing.T) { assert := assert.New(t) ctrl := newStubController(Result{}, nil) - exec := Executor{ - controller: ctrl, - pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. - rateLimiter: &stubRateLimiter{}, // no rate limiting + cfg := Config{ + PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. + RateLimiter: &stubRateLimiter{}, // no rate limiting } + exec := New(ctrl, cfg) // start once stopAndWait := exec.Start(context.Background()) // start again multiple times @@ -93,11 +67,11 @@ func TestErrorTriggersImmediateReconciliation(t *testing.T) { assert := assert.New(t) // returning an error should trigger a reconciliation immediately ctrl := newStubController(Result{}, errors.New("reconciler error")) - exec := Executor{ - controller: ctrl, - pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. - rateLimiter: &stubRateLimiter{}, // no rate limiting + cfg := Config{ + PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. + RateLimiter: &stubRateLimiter{}, // no rate limiting } + exec := New(ctrl, cfg) stopAndWait := exec.Start(context.Background()) for i := 0; i < 10; i++ { <-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called @@ -115,13 +89,13 @@ func TestErrorTriggersRateLimiting(t *testing.T) { assert := assert.New(t) // returning an error should trigger a reconciliation immediately ctrl := newStubController(Result{}, errors.New("reconciler error")) - exec := Executor{ - controller: ctrl, - pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. - rateLimiter: &stubRateLimiter{ + cfg := Config{ + PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. + RateLimiter: &stubRateLimiter{ whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. }, } + exec := New(ctrl, cfg) stopAndWait := exec.Start(context.Background()) <-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called once to trigger rate limiting ctrl.stop <- struct{}{} @@ -139,13 +113,13 @@ func TestRequeueAfterResultRequeueInterval(t *testing.T) { Requeue: true, RequeueAfter: time.Microsecond, }, nil) - exec := Executor{ - controller: ctrl, - pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. - rateLimiter: &stubRateLimiter{ + cfg := Config{ + PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. + RateLimiter: &stubRateLimiter{ whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. }, } + exec := New(ctrl, cfg) stopAndWait := exec.Start(context.Background()) for i := 0; i < 10; i++ { <-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called @@ -162,13 +136,13 @@ func TestRequeueAfterResultRequeueInterval(t *testing.T) { func TestExternalTrigger(t *testing.T) { assert := assert.New(t) ctrl := newStubController(Result{}, nil) - exec := Executor{ - controller: ctrl, - pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. - rateLimiter: &stubRateLimiter{ + cfg := Config{ + PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. + RateLimiter: &stubRateLimiter{ whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. }, } + exec := New(ctrl, cfg) stopAndWait := exec.Start(context.Background()) <-ctrl.waitUntilReconciled // initial trigger for i := 0; i < 10; i++ { @@ -186,13 +160,13 @@ func TestExternalTrigger(t *testing.T) { func TestSimultaneousExternalTriggers(t *testing.T) { assert := assert.New(t) ctrl := newStubController(Result{}, nil) - exec := Executor{ - controller: ctrl, - pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. - rateLimiter: &stubRateLimiter{ + cfg := Config{ + PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. + RateLimiter: &stubRateLimiter{ whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. }, } + exec := New(ctrl, cfg) stopAndWait := exec.Start(context.Background()) <-ctrl.waitUntilReconciled // initial trigger for i := 0; i < 100; i++ { @@ -212,11 +186,11 @@ func TestContextCancel(t *testing.T) { assert := assert.New(t) ctx, cancel := context.WithCancel(context.Background()) ctrl := newStubController(Result{}, nil) - exec := Executor{ - controller: ctrl, - pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. - rateLimiter: &stubRateLimiter{}, // no rate limiting + cfg := Config{ + PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. + RateLimiter: &stubRateLimiter{}, // no rate limiting } + exec := New(ctrl, cfg) _ = exec.Start(ctx) // no need to explicitly stop the executor, it will stop when the context is canceled <-ctrl.waitUntilReconciled // initial trigger @@ -238,13 +212,13 @@ func TestContextCancel(t *testing.T) { func TestRequeueAfterPollingFrequency(t *testing.T) { assert := assert.New(t) ctrl := newStubController(Result{}, nil) - exec := Executor{ - controller: ctrl, - pollingFrequency: time.Microsecond, // basically no delay - rateLimiter: &stubRateLimiter{ + cfg := Config{ + PollingFrequency: time.Microsecond, // basically no delay + RateLimiter: &stubRateLimiter{ whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test. }, } + exec := New(ctrl, cfg) stopAndWait := exec.Start(context.Background()) for i := 0; i < 10; i++ { <-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called