From ce374243ef6d873080b337fa67a01f7d11aa2690 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Wei=C3=9Fe?= <66256922+daniel-weisse@users.noreply.github.com> Date: Tue, 29 Aug 2023 11:40:44 +0200 Subject: [PATCH] cli: retry join-config operations (#2290) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Weiße --- cli/internal/kubecmd/BUILD.bazel | 1 + cli/internal/kubecmd/kubecmd.go | 82 ++++++++++--- cli/internal/kubecmd/kubecmd_test.go | 175 +++++++++++++++++++++++++-- 3 files changed, 228 insertions(+), 30 deletions(-) diff --git a/cli/internal/kubecmd/BUILD.bazel b/cli/internal/kubecmd/BUILD.bazel index 80c1d42e4..075529fdd 100644 --- a/cli/internal/kubecmd/BUILD.bazel +++ b/cli/internal/kubecmd/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//internal/imagefetcher", "//internal/kubernetes", "//internal/kubernetes/kubectl", + "//internal/retry", "//internal/versions", "//internal/versions/components", "//operators/constellation-node-operator/api/v1alpha1", diff --git a/cli/internal/kubecmd/kubecmd.go b/cli/internal/kubecmd/kubecmd.go index 4d24814e2..41c88c633 100644 --- a/cli/internal/kubecmd/kubecmd.go +++ b/cli/internal/kubecmd/kubecmd.go @@ -24,6 +24,7 @@ import ( "io" "sort" "strings" + "time" "github.com/edgelesssys/constellation/v2/internal/api/versionsapi" "github.com/edgelesssys/constellation/v2/internal/attestation/variant" @@ -35,6 +36,7 @@ import ( "github.com/edgelesssys/constellation/v2/internal/imagefetcher" internalk8s "github.com/edgelesssys/constellation/v2/internal/kubernetes" "github.com/edgelesssys/constellation/v2/internal/kubernetes/kubectl" + conretry "github.com/edgelesssys/constellation/v2/internal/retry" "github.com/edgelesssys/constellation/v2/internal/versions" "github.com/edgelesssys/constellation/v2/internal/versions/components" updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1" @@ -50,6 +52,10 @@ import ( "sigs.k8s.io/yaml" ) +const ( + maxRetryAttempts = 5 +) + // ErrInProgress signals that an upgrade is in progress inside the cluster. var ErrInProgress = errors.New("upgrade in progress") @@ -66,11 +72,12 @@ func (e *applyError) Error() string { // KubeCmd handles interaction with the cluster's components using the CLI. type KubeCmd struct { - kubectl kubectlInterface - imageFetcher imageFetcher - outWriter io.Writer - fileHandler file.Handler - log debugLog + kubectl kubectlInterface + imageFetcher imageFetcher + outWriter io.Writer + fileHandler file.Handler + retryInterval time.Duration + log debugLog } // New returns a new KubeCmd. @@ -81,11 +88,12 @@ func New(outWriter io.Writer, kubeConfigPath string, fileHandler file.Handler, l } return &KubeCmd{ - kubectl: client, - fileHandler: fileHandler, - imageFetcher: imagefetcher.New(), - outWriter: outWriter, - log: log, + kubectl: client, + fileHandler: fileHandler, + imageFetcher: imagefetcher.New(), + outWriter: outWriter, + retryInterval: time.Second * 5, + log: log, }, nil } @@ -206,14 +214,34 @@ func (k *KubeCmd) ApplyJoinConfig(ctx context.Context, newAttestConfig config.At return fmt.Errorf("marshaling attestation config: %w", err) } - joinConfig, err := k.kubectl.GetConfigMap(ctx, constants.ConstellationNamespace, constants.JoinConfigMap) - if err != nil { + var retries int + retrieable := func(err error) bool { + if k8serrors.IsNotFound(err) { + return false + } + retries++ + k.log.Debugf("Getting join-config ConfigMap failed (attempt %d/%d): %s", retries, maxRetryAttempts, err) + return retries <= maxRetryAttempts + } + + var joinConfig *corev1.ConfigMap + doer := &kubeDoer{ + action: func(ctx context.Context) error { + joinConfig, err = k.kubectl.GetConfigMap(ctx, constants.ConstellationNamespace, constants.JoinConfigMap) + return err + }, + } + retrier := conretry.NewIntervalRetrier(doer, k.retryInterval, retrieable) + + if err := retrier.Do(ctx); err != nil { if !k8serrors.IsNotFound(err) { return fmt.Errorf("getting %s ConfigMap: %w", constants.JoinConfigMap, err) } k.log.Debugf("ConfigMap %q does not exist in namespace %q, creating it now", constants.JoinConfigMap, constants.ConstellationNamespace) - if err := k.kubectl.CreateConfigMap(ctx, joinConfigMap(newConfigJSON, measurementSalt)); err != nil { + if err := retryAction(ctx, k.retryInterval, maxRetryAttempts, func(ctx context.Context) error { + return k.kubectl.CreateConfigMap(ctx, joinConfigMap(newConfigJSON, measurementSalt)) + }, k.log); err != nil { return fmt.Errorf("creating join-config ConfigMap: %w", err) } k.log.Debugf("Created %q ConfigMap in namespace %q", constants.JoinConfigMap, constants.ConstellationNamespace) @@ -224,7 +252,10 @@ func (k *KubeCmd) ApplyJoinConfig(ctx context.Context, newAttestConfig config.At joinConfig.Data[constants.AttestationConfigFilename+"_backup"] = joinConfig.Data[constants.AttestationConfigFilename] joinConfig.Data[constants.AttestationConfigFilename] = string(newConfigJSON) k.log.Debugf("Triggering attestation config update now") - if _, err = k.kubectl.UpdateConfigMap(ctx, joinConfig); err != nil { + if err := retryAction(ctx, k.retryInterval, maxRetryAttempts, func(ctx context.Context) error { + _, err = k.kubectl.UpdateConfigMap(ctx, joinConfig) + return err + }, k.log); err != nil { return fmt.Errorf("setting new attestation config: %w", err) } @@ -335,7 +366,7 @@ func (k *KubeCmd) applyComponentsCM(ctx context.Context, components *corev1.Conf func (k *KubeCmd) applyNodeVersion(ctx context.Context, nodeVersion updatev1alpha1.NodeVersion) (updatev1alpha1.NodeVersion, error) { k.log.Debugf("Triggering NodeVersion upgrade now") var updatedNodeVersion updatev1alpha1.NodeVersion - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { newNode, err := k.getConstellationVersion(ctx) if err != nil { return fmt.Errorf("retrieving current NodeVersion: %w", err) @@ -500,7 +531,25 @@ func (k *KubeCmd) RemoveHelmKeepAnnotation(ctx context.Context) error { return nil } -// kubectlInterface is provides access to the Kubernetes API. +type kubeDoer struct { + action func(ctx context.Context) error +} + +func (k *kubeDoer) Do(ctx context.Context) error { + return k.action(ctx) +} + +func retryAction(ctx context.Context, retryInterval time.Duration, maxRetries int, action func(ctx context.Context) error, log debugLog) error { + ctr := 0 + retrier := conretry.NewIntervalRetrier(&kubeDoer{action: action}, retryInterval, func(err error) bool { + ctr++ + log.Debugf("Action failed (attempt %d/%d): %s", ctr, maxRetries, err) + return ctr <= maxRetries + }) + return retrier.Do(ctx) +} + +// kubectlInterface provides access to the Kubernetes API. type kubectlInterface interface { GetNodes(ctx context.Context) ([]corev1.Node, error) GetConfigMap(ctx context.Context, namespace, name string) (*corev1.ConfigMap, error) @@ -514,7 +563,6 @@ type kubectlInterface interface { type debugLog interface { Debugf(format string, args ...any) - Sync() } // imageFetcher gets an image reference from the versionsapi. diff --git a/cli/internal/kubecmd/kubecmd_test.go b/cli/internal/kubecmd/kubecmd_test.go index 73ec1d60e..312d2c833 100644 --- a/cli/internal/kubecmd/kubecmd_test.go +++ b/cli/internal/kubecmd/kubecmd_test.go @@ -12,6 +12,7 @@ import ( "errors" "io" "testing" + "time" "github.com/edgelesssys/constellation/v2/internal/attestation/measurements" "github.com/edgelesssys/constellation/v2/internal/attestation/variant" @@ -471,16 +472,28 @@ func newJoinConfigMap(data string) *corev1.ConfigMap { } } -func TestApplyAttestationConfig(t *testing.T) { +func TestApplyJoinConfig(t *testing.T) { mustMarshal := func(cfg config.AttestationCfg) string { data, err := json.Marshal(cfg) require.NoError(t, err) return string(data) } + // repeatedErrors returns the given error multiple times + // This is needed in tests, since the retry logic will retry multiple times + // If the retry limit is raised in [KubeCmd.ApplyJoinConfig], it should also + // be updated here + repeatedErrors := func(err error) []error { + var errs []error + for i := 0; i < 20; i++ { + errs = append(errs, err) + } + return errs + } testCases := map[string]struct { newAttestationCfg config.AttestationCfg - kubectl *stubKubectl + kubectl *fakeConfigMapClient + wantUpdate bool wantErr bool }{ "success": { @@ -489,7 +502,7 @@ func TestApplyAttestationConfig(t *testing.T) { 0: measurements.WithAllBytes(0x00, measurements.WarnOnly, measurements.PCRMeasurementLength), }, }, - kubectl: &stubKubectl{ + kubectl: &fakeConfigMapClient{ configMaps: map[string]*corev1.ConfigMap{ constants.JoinConfigMap: newJoinConfigMap(mustMarshal(&config.QEMUVTPM{ Measurements: measurements.M{ @@ -498,6 +511,7 @@ func TestApplyAttestationConfig(t *testing.T) { })), }, }, + wantUpdate: true, }, "Get ConfigMap error": { newAttestationCfg: &config.QEMUVTPM{ @@ -505,19 +519,70 @@ func TestApplyAttestationConfig(t *testing.T) { 0: measurements.WithAllBytes(0x00, measurements.WarnOnly, measurements.PCRMeasurementLength), }, }, - kubectl: &stubKubectl{ - getCMErr: assert.AnError, + kubectl: &fakeConfigMapClient{ + getErrs: repeatedErrors(assert.AnError), }, wantErr: true, }, + "Get ConfigMap fails then returns ConfigMap": { + newAttestationCfg: &config.QEMUVTPM{ + Measurements: measurements.M{ + 0: measurements.WithAllBytes(0x00, measurements.WarnOnly, measurements.PCRMeasurementLength), + }, + }, + kubectl: &fakeConfigMapClient{ + configMaps: map[string]*corev1.ConfigMap{ + constants.JoinConfigMap: newJoinConfigMap(mustMarshal(&config.QEMUVTPM{ + Measurements: measurements.M{ + 0: measurements.WithAllBytes(0xFF, measurements.WarnOnly, measurements.PCRMeasurementLength), + }, + })), + }, + getErrs: []error{assert.AnError, assert.AnError}, + }, + wantUpdate: true, + }, + "Get ConfigMap fails then fails with NotFound": { + newAttestationCfg: &config.QEMUVTPM{ + Measurements: measurements.M{ + 0: measurements.WithAllBytes(0x00, measurements.WarnOnly, measurements.PCRMeasurementLength), + }, + }, + kubectl: &fakeConfigMapClient{ + getErrs: []error{assert.AnError, assert.AnError, k8serrors.NewNotFound(schema.GroupResource{}, "")}, + }, + }, "ConfigMap does not exist yet": { newAttestationCfg: &config.QEMUVTPM{ Measurements: measurements.M{ 0: measurements.WithAllBytes(0x00, measurements.WarnOnly, measurements.PCRMeasurementLength), }, }, - kubectl: &stubKubectl{ - getCMErr: k8serrors.NewNotFound(schema.GroupResource{}, ""), + kubectl: &fakeConfigMapClient{ + getErrs: repeatedErrors(k8serrors.NewNotFound(schema.GroupResource{}, "")), + }, + }, + "Create ConfigMap fails": { + newAttestationCfg: &config.QEMUVTPM{ + Measurements: measurements.M{ + 0: measurements.WithAllBytes(0x00, measurements.WarnOnly, measurements.PCRMeasurementLength), + }, + }, + kubectl: &fakeConfigMapClient{ + getErrs: repeatedErrors(k8serrors.NewNotFound(schema.GroupResource{}, "")), + createErrs: repeatedErrors(assert.AnError), + }, + wantErr: true, + }, + "Create ConfigMap fails then succeeds": { + newAttestationCfg: &config.QEMUVTPM{ + Measurements: measurements.M{ + 0: measurements.WithAllBytes(0x00, measurements.WarnOnly, measurements.PCRMeasurementLength), + }, + }, + kubectl: &fakeConfigMapClient{ + getErrs: repeatedErrors(k8serrors.NewNotFound(schema.GroupResource{}, "")), + createErrs: []error{assert.AnError, assert.AnError}, }, }, "Update ConfigMap error": { @@ -526,7 +591,7 @@ func TestApplyAttestationConfig(t *testing.T) { 0: measurements.WithAllBytes(0x00, measurements.WarnOnly, measurements.PCRMeasurementLength), }, }, - kubectl: &stubKubectl{ + kubectl: &fakeConfigMapClient{ configMaps: map[string]*corev1.ConfigMap{ constants.JoinConfigMap: newJoinConfigMap(mustMarshal(&config.QEMUVTPM{ Measurements: measurements.M{ @@ -534,10 +599,28 @@ func TestApplyAttestationConfig(t *testing.T) { }, })), }, - updateCMErr: assert.AnError, + updateErrs: repeatedErrors(assert.AnError), }, wantErr: true, }, + "Update ConfigMap fails then succeeds": { + newAttestationCfg: &config.QEMUVTPM{ + Measurements: measurements.M{ + 0: measurements.WithAllBytes(0x00, measurements.WarnOnly, measurements.PCRMeasurementLength), + }, + }, + kubectl: &fakeConfigMapClient{ + configMaps: map[string]*corev1.ConfigMap{ + constants.JoinConfigMap: newJoinConfigMap(mustMarshal(&config.QEMUVTPM{ + Measurements: measurements.M{ + 0: measurements.WithAllBytes(0xFF, measurements.WarnOnly, measurements.PCRMeasurementLength), + }, + })), + }, + updateErrs: []error{assert.AnError, assert.AnError}, + }, + wantUpdate: true, + }, } for name, tc := range testCases { @@ -546,9 +629,10 @@ func TestApplyAttestationConfig(t *testing.T) { require := require.New(t) cmd := &KubeCmd{ - kubectl: tc.kubectl, - log: logger.NewTest(t), - outWriter: io.Discard, + kubectl: tc.kubectl, + log: logger.NewTest(t), + retryInterval: time.Millisecond, + outWriter: io.Discard, } err := cmd.ApplyJoinConfig(context.Background(), tc.newAttestationCfg, []byte{0x11}) @@ -557,7 +641,14 @@ func TestApplyAttestationConfig(t *testing.T) { return } assert.NoError(err) - cfg, ok := tc.kubectl.configMaps[constants.JoinConfigMap] + + var cfg *corev1.ConfigMap + var ok bool + if tc.wantUpdate { + cfg, ok = tc.kubectl.updatedConfigMaps[constants.JoinConfigMap] + } else { + cfg, ok = tc.kubectl.configMaps[constants.JoinConfigMap] + } require.True(ok) assert.Equal(mustMarshal(tc.newAttestationCfg), cfg.Data[constants.AttestationConfigFilename]) }) @@ -669,3 +760,61 @@ func unstructedObjectWithGeneration(nodeVersion updatev1alpha1.NodeVersion, gene object.SetGeneration(generation) return object } + +type fakeConfigMapClient struct { + getErrs []error + updatedConfigMaps map[string]*corev1.ConfigMap + updateErrs []error + configMaps map[string]*corev1.ConfigMap + createErrs []error + kubectlInterface +} + +func (f *fakeConfigMapClient) GetConfigMap(_ context.Context, _, name string) (*corev1.ConfigMap, error) { + if len(f.getErrs) > 0 { + err := f.getErrs[0] + if len(f.getErrs) > 1 { + f.getErrs = f.getErrs[1:] + } else { + f.getErrs = nil + } + return nil, err + } + return f.configMaps[name], nil +} + +func (f *fakeConfigMapClient) UpdateConfigMap(_ context.Context, configMap *corev1.ConfigMap) (*corev1.ConfigMap, error) { + if len(f.updateErrs) > 0 { + err := f.updateErrs[0] + if len(f.updateErrs) > 1 { + f.updateErrs = f.updateErrs[1:] + } else { + f.updateErrs = nil + } + return nil, err + } + + if f.updatedConfigMaps == nil { + f.updatedConfigMaps = map[string]*corev1.ConfigMap{} + } + f.updatedConfigMaps[configMap.ObjectMeta.Name] = configMap + return f.updatedConfigMaps[configMap.ObjectMeta.Name], nil +} + +func (f *fakeConfigMapClient) CreateConfigMap(_ context.Context, configMap *corev1.ConfigMap) error { + if len(f.createErrs) > 0 { + err := f.createErrs[0] + if len(f.createErrs) > 1 { + f.createErrs = f.createErrs[1:] + } else { + f.createErrs = nil + } + return err + } + + if f.configMaps == nil { + f.configMaps = map[string]*corev1.ConfigMap{} + } + f.configMaps[configMap.ObjectMeta.Name] = configMap + return nil +}