diff --git a/internal/constellation/kubecmd/backup.go b/internal/constellation/kubecmd/backup.go index 28763a146..e4ad27633 100644 --- a/internal/constellation/kubecmd/backup.go +++ b/internal/constellation/kubecmd/backup.go @@ -39,7 +39,7 @@ func (k *KubeCmd) BackupCRDs(ctx context.Context, fileHandler file.Handler, upgr for i := range crds { path := filepath.Join(crdBackupFolder, crds[i].Name+".yaml") - k.log.Debug(fmt.Sprintf("Creating CRD backup: %q", path)) + k.log.Debug("Creating CRD backup", "path", path) // We have to manually set kind/apiversion because of a long-standing limitation of the API: // https://github.com/kubernetes/kubernetes/issues/3030#issuecomment-67543738 @@ -64,7 +64,7 @@ func (k *KubeCmd) BackupCRDs(ctx context.Context, fileHandler file.Handler, upgr func (k *KubeCmd) BackupCRs(ctx context.Context, fileHandler file.Handler, crds []apiextensionsv1.CustomResourceDefinition, upgradeDir string) error { k.log.Debug("Starting CR backup") for _, crd := range crds { - k.log.Debug(fmt.Sprintf("Creating backup for resource type: %q", crd.Name)) + k.log.Debug("Creating backup", "crdName", crd.Name) // Iterate over all versions of the CRD // TODO(daniel-weisse): Consider iterating over crd.Status.StoredVersions instead @@ -72,7 +72,7 @@ func (k *KubeCmd) BackupCRs(ctx context.Context, fileHandler file.Handler, crds // a version that is not installed in the cluster. // With the StoredVersions field, we could only iterate over the installed versions. for _, version := range crd.Spec.Versions { - k.log.Debug(fmt.Sprintf("Creating backup of CRs for %q at version %q", crd.Name, version.Name)) + k.log.Debug("Starting CustomResource backup", "crdName", crd.Name, "version", version.Name) gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: version.Name, Resource: crd.Spec.Names.Plural} crs, err := k.kubectl.ListCRs(ctx, gvr) @@ -80,7 +80,7 @@ func (k *KubeCmd) BackupCRs(ctx context.Context, fileHandler file.Handler, crds if !k8serrors.IsNotFound(err) { return fmt.Errorf("retrieving CR %s: %w", crd.Name, err) } - k.log.Debug(fmt.Sprintf("No CRs found for %q at version %q, skipping...", crd.Name, version.Name)) + k.log.Debug("No CustomResources found. Skipping...", "crdName", crd.Name, "version", version.Name) continue } @@ -101,9 +101,9 @@ func (k *KubeCmd) BackupCRs(ctx context.Context, fileHandler file.Handler, crds } } - k.log.Debug(fmt.Sprintf("Backup for resource type %q complete", crd.Name)) + k.log.Debug("CustomResource backup complete", "crdName", crd.Name) } - k.log.Debug("CR backup complete") + k.log.Debug("All CustomResource backups completed") return nil } diff --git a/internal/constellation/kubecmd/kubecmd.go b/internal/constellation/kubecmd/kubecmd.go index ad6b83c77..1ebf99265 100644 --- a/internal/constellation/kubecmd/kubecmd.go +++ b/internal/constellation/kubecmd/kubecmd.go @@ -47,10 +47,6 @@ import ( "sigs.k8s.io/yaml" ) -const ( - maxRetryAttempts = 20 -) - // ErrInProgress signals that an upgrade is in progress inside the cluster. var ErrInProgress = errors.New("upgrade in progress") @@ -69,6 +65,7 @@ func (e *applyError) Error() string { type KubeCmd struct { kubectl kubectlInterface retryInterval time.Duration + maxAttempts int log debugLog } @@ -82,6 +79,7 @@ func New(kubeConfig []byte, log debugLog) (*KubeCmd, error) { return &KubeCmd{ kubectl: client, retryInterval: time.Second * 5, + maxAttempts: 20, log: log, }, nil } @@ -103,7 +101,7 @@ func (k *KubeCmd) UpgradeNodeImage(ctx context.Context, imageVersion semver.Semv return fmt.Errorf("updating image version: %w", err) } - k.log.Debug(fmt.Sprintf("Updating local copy of nodeVersion image version from %q to %q", nodeVersion.Spec.ImageVersion, imageVersion.String())) + k.log.Debug("Updating local copy of nodeVersion image version", "oldVersion", nodeVersion.Spec.ImageVersion, "newVersion", imageVersion.String()) nodeVersion.Spec.ImageReference = imageReference nodeVersion.Spec.ImageVersion = imageVersion.String() @@ -121,41 +119,31 @@ func (k *KubeCmd) UpgradeKubernetesVersion(ctx context.Context, kubernetesVersio return err } - var upgradeErr *compatibility.InvalidUpgradeError // We have to allow users to specify outdated k8s patch versions. // Therefore, this code has to skip k8s updates if a user configures an outdated (i.e. invalid) k8s version. - var components *corev1.ConfigMap - _, err = versions.NewValidK8sVersion(string(kubernetesVersion), true) - if err != nil { - err = compatibility.NewInvalidUpgradeError( + if _, err := versions.NewValidK8sVersion(string(kubernetesVersion), true); err != nil { + return fmt.Errorf("skipping Kubernetes upgrade: %w", compatibility.NewInvalidUpgradeError( nodeVersion.Spec.KubernetesClusterVersion, string(kubernetesVersion), - fmt.Errorf("unsupported Kubernetes version, supported versions are %s", strings.Join(versions.SupportedK8sVersions(), ", ")), + fmt.Errorf("unsupported Kubernetes version, supported versions are %s", strings.Join(versions.SupportedK8sVersions(), ", "))), ) - } else { - versionConfig, ok := versions.VersionConfigs[kubernetesVersion] - if !ok { - err = compatibility.NewInvalidUpgradeError( - nodeVersion.Spec.KubernetesClusterVersion, - string(kubernetesVersion), - fmt.Errorf("no version config matching K8s %s", kubernetesVersion), - ) - } else { - components, err = k.prepareUpdateK8s(&nodeVersion, versionConfig.ClusterVersion, - versionConfig.KubernetesComponents, force) - } } - switch { - case err == nil: - err := k.applyComponentsCM(ctx, components) - if err != nil { - return fmt.Errorf("applying k8s components ConfigMap: %w", err) - } - case errors.As(err, &upgradeErr): - return fmt.Errorf("skipping Kubernetes upgrade: %w", err) - default: - return fmt.Errorf("updating Kubernetes version: %w", err) + versionConfig, ok := versions.VersionConfigs[kubernetesVersion] + if !ok { + return fmt.Errorf("skipping Kubernetes upgrade: %w", compatibility.NewInvalidUpgradeError( + nodeVersion.Spec.KubernetesClusterVersion, + string(kubernetesVersion), + fmt.Errorf("no version config matching K8s %s", kubernetesVersion), + )) + } + components, err := k.prepareUpdateK8s(&nodeVersion, versionConfig.ClusterVersion, versionConfig.KubernetesComponents, force) + if err != nil { + return err + } + + if err := k.applyComponentsCM(ctx, components); err != nil { + return fmt.Errorf("applying k8s components ConfigMap: %w", err) } updatedNodeVersion, err := k.applyNodeVersion(ctx, nodeVersion) @@ -167,8 +155,13 @@ func (k *KubeCmd) UpgradeKubernetesVersion(ctx context.Context, kubernetesVersio // ClusterStatus returns a map from node name to NodeStatus. func (k *KubeCmd) ClusterStatus(ctx context.Context) (map[string]NodeStatus, error) { - nodes, err := k.kubectl.GetNodes(ctx) - if err != nil { + var nodes []corev1.Node + + if err := k.retryAction(ctx, func(ctx context.Context) error { + var err error + nodes, err = k.kubectl.GetNodes(ctx) + return err + }); err != nil { return nil, fmt.Errorf("getting nodes: %w", err) } @@ -183,7 +176,7 @@ func (k *KubeCmd) ClusterStatus(ctx context.Context) (map[string]NodeStatus, err // GetClusterAttestationConfig fetches the join-config configmap from the cluster, // and returns the attestation config. func (k *KubeCmd) GetClusterAttestationConfig(ctx context.Context, variant variant.Variant) (config.AttestationCfg, error) { - existingConf, err := retryGetJoinConfig(ctx, k.kubectl, k.retryInterval, k.log) + existingConf, err := k.retryGetJoinConfig(ctx) if err != nil { return nil, fmt.Errorf("retrieving current attestation config: %w", err) } @@ -208,19 +201,19 @@ func (k *KubeCmd) ApplyJoinConfig(ctx context.Context, newAttestConfig config.At return fmt.Errorf("marshaling attestation config: %w", err) } - joinConfig, err := retryGetJoinConfig(ctx, k.kubectl, k.retryInterval, k.log) + joinConfig, err := k.retryGetJoinConfig(ctx) if err != nil { if !k8serrors.IsNotFound(err) { return fmt.Errorf("getting %s ConfigMap: %w", constants.JoinConfigMap, err) } - k.log.Debug(fmt.Sprintf("ConfigMap %q does not exist in namespace %q, creating it now", constants.JoinConfigMap, constants.ConstellationNamespace)) - if err := retryAction(ctx, k.retryInterval, maxRetryAttempts, func(ctx context.Context) error { + k.log.Debug("ConfigMap does not exist, creating it now", "name", constants.JoinConfigMap, "namespace", constants.ConstellationNamespace) + if err := k.retryAction(ctx, func(ctx context.Context) error { return k.kubectl.CreateConfigMap(ctx, joinConfigMap(newConfigJSON, measurementSalt)) - }, k.log); err != nil { + }); err != nil { return fmt.Errorf("creating join-config ConfigMap: %w", err) } - k.log.Debug(fmt.Sprintf("Created %q ConfigMap in namespace %q", constants.JoinConfigMap, constants.ConstellationNamespace)) + k.log.Debug("Created ConfigMap", "name", constants.JoinConfigMap, "namespace", constants.ConstellationNamespace) return nil } @@ -228,10 +221,10 @@ func (k *KubeCmd) ApplyJoinConfig(ctx context.Context, newAttestConfig config.At joinConfig.Data[constants.AttestationConfigFilename+"_backup"] = joinConfig.Data[constants.AttestationConfigFilename] joinConfig.Data[constants.AttestationConfigFilename] = string(newConfigJSON) k.log.Debug("Triggering attestation config update now") - if err := retryAction(ctx, k.retryInterval, maxRetryAttempts, func(ctx context.Context) error { + if err := k.retryAction(ctx, func(ctx context.Context) error { _, err = k.kubectl.UpdateConfigMap(ctx, joinConfig) return err - }, k.log); err != nil { + }); err != nil { return fmt.Errorf("setting new attestation config: %w", err) } @@ -266,7 +259,7 @@ func (k *KubeCmd) ExtendClusterConfigCertSANs(ctx context.Context, alternativeNa k.log.Debug("No new SANs to add to the cluster's apiserver SAN field") return nil } - k.log.Debug(fmt.Sprintf("Extending the cluster's apiserver SAN field with the following SANs: %q", strings.Join(missingSANs, ", "))) + k.log.Debug("Extending the cluster's apiserver SAN field", "certSANs", strings.Join(missingSANs, ", ")) clusterConfiguration.APIServer.CertSANs = append(clusterConfiguration.APIServer.CertSANs, missingSANs...) sort.Strings(clusterConfiguration.APIServer.CertSANs) @@ -278,7 +271,10 @@ func (k *KubeCmd) ExtendClusterConfigCertSANs(ctx context.Context, alternativeNa kubeadmConfig.Data[constants.ClusterConfigurationKey] = string(newConfigYAML) k.log.Debug("Triggering kubeadm config update now") - if _, err = k.kubectl.UpdateConfigMap(ctx, kubeadmConfig); err != nil { + if err = k.retryAction(ctx, func(ctx context.Context) error { + _, err := k.kubectl.UpdateConfigMap(ctx, kubeadmConfig) + return err + }); err != nil { return fmt.Errorf("setting new kubeadm config: %w", err) } @@ -299,14 +295,19 @@ func (k *KubeCmd) GetConstellationVersion(ctx context.Context) (NodeVersion, err // getConstellationVersion returns the NodeVersion object of a Constellation cluster. func (k *KubeCmd) getConstellationVersion(ctx context.Context) (updatev1alpha1.NodeVersion, error) { - raw, err := k.kubectl.GetCR(ctx, schema.GroupVersionResource{ - Group: "update.edgeless.systems", - Version: "v1alpha1", - Resource: "nodeversions", - }, constants.NodeVersionResourceName) - if err != nil { + var raw *unstructured.Unstructured + if err := k.retryAction(ctx, func(ctx context.Context) error { + var err error + raw, err = k.kubectl.GetCR(ctx, schema.GroupVersionResource{ + Group: "update.edgeless.systems", + Version: "v1alpha1", + Resource: "nodeversions", + }, constants.NodeVersionResourceName) + return err + }); err != nil { return updatev1alpha1.NodeVersion{}, err } + var nodeVersion updatev1alpha1.NodeVersion if err := runtime.DefaultUnstructuredConverter.FromUnstructured(raw.UnstructuredContent(), &nodeVersion); err != nil { return updatev1alpha1.NodeVersion{}, fmt.Errorf("converting unstructured to NodeVersion: %w", err) @@ -318,10 +319,15 @@ func (k *KubeCmd) getConstellationVersion(ctx context.Context) (updatev1alpha1.N // getClusterConfiguration fetches the kubeadm-config configmap from the cluster, extracts the config // and returns both the full configmap and the ClusterConfiguration. func (k *KubeCmd) getClusterConfiguration(ctx context.Context) (kubeadmv1beta3.ClusterConfiguration, *corev1.ConfigMap, error) { - existingConf, err := k.kubectl.GetConfigMap(ctx, constants.ConstellationNamespace, constants.KubeadmConfigMap) - if err != nil { + var existingConf *corev1.ConfigMap + if err := k.retryAction(ctx, func(ctx context.Context) error { + var err error + existingConf, err = k.kubectl.GetConfigMap(ctx, constants.ConstellationNamespace, constants.KubeadmConfigMap) + return err + }); err != nil { return kubeadmv1beta3.ClusterConfiguration{}, nil, fmt.Errorf("retrieving current kubeadm-config: %w", err) } + clusterConf, ok := existingConf.Data[constants.ClusterConfigurationKey] if !ok { return kubeadmv1beta3.ClusterConfiguration{}, nil, errors.New("ClusterConfiguration missing from kubeadm-config") @@ -337,9 +343,16 @@ func (k *KubeCmd) getClusterConfiguration(ctx context.Context) (kubeadmv1beta3.C // applyComponentsCM applies the k8s components ConfigMap to the cluster. func (k *KubeCmd) applyComponentsCM(ctx context.Context, components *corev1.ConfigMap) error { - // If the map already exists we can use that map and assume it has the same content as 'configMap'. - if err := k.kubectl.CreateConfigMap(ctx, components); err != nil && !k8serrors.IsAlreadyExists(err) { - return fmt.Errorf("creating k8s-components ConfigMap: %w. %T", err, err) + if err := k.retryAction(ctx, func(ctx context.Context) error { + // If the components ConfigMap already exists we assume it is up to date, + // since its name is derived from a hash of its contents. + err := k.kubectl.CreateConfigMap(ctx, components) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return err + } + return nil + }); err != nil { + return fmt.Errorf("creating k8s-components ConfigMap: %w", err) } return nil } @@ -347,31 +360,35 @@ func (k *KubeCmd) applyComponentsCM(ctx context.Context, components *corev1.Conf func (k *KubeCmd) applyNodeVersion(ctx context.Context, nodeVersion updatev1alpha1.NodeVersion) (updatev1alpha1.NodeVersion, error) { k.log.Debug("Triggering NodeVersion upgrade now") var updatedNodeVersion updatev1alpha1.NodeVersion - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - newNode, err := k.getConstellationVersion(ctx) - if err != nil { - return fmt.Errorf("retrieving current NodeVersion: %w", err) - } - updateNodeVersions(nodeVersion, &newNode) + // Retry the entire "retry-on-conflict" block to retry if the block fails, e.g. due to etcd timeouts. + err := k.retryAction(ctx, func(ctx context.Context) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + newNode, err := k.getConstellationVersion(ctx) + if err != nil { + return fmt.Errorf("retrieving current NodeVersion: %w", err) + } - raw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&newNode) - if err != nil { - return fmt.Errorf("converting nodeVersion to unstructured: %w", err) - } - updated, err := k.kubectl.UpdateCR(ctx, schema.GroupVersionResource{ - Group: "update.edgeless.systems", - Version: "v1alpha1", - Resource: "nodeversions", - }, &unstructured.Unstructured{Object: raw}) - if err != nil { - return err - } + updateNodeVersions(nodeVersion, &newNode) - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updated.UnstructuredContent(), &updatedNodeVersion); err != nil { - return fmt.Errorf("converting unstructured to NodeVersion: %w", err) - } - return nil + raw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&newNode) + if err != nil { + return fmt.Errorf("converting nodeVersion to unstructured: %w", err) + } + updated, err := k.kubectl.UpdateCR(ctx, schema.GroupVersionResource{ + Group: "update.edgeless.systems", + Version: "v1alpha1", + Resource: "nodeversions", + }, &unstructured.Unstructured{Object: raw}) + if err != nil { + return err + } + + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updated.UnstructuredContent(), &updatedNodeVersion); err != nil { + return fmt.Errorf("converting unstructured to NodeVersion: %w", err) + } + return nil + }) }) return updatedNodeVersion, err @@ -405,17 +422,52 @@ func (k *KubeCmd) prepareUpdateK8s(nodeVersion *updatev1alpha1.NodeVersion, newC } if !force { if err := compatibility.IsValidUpgrade(nodeVersion.Spec.KubernetesClusterVersion, newClusterVersion); err != nil { - return nil, err + return nil, fmt.Errorf("skipping Kubernetes upgrade: %w", err) } } - k.log.Debug(fmt.Sprintf("Updating local copy of nodeVersion Kubernetes version from %q to %q", nodeVersion.Spec.KubernetesClusterVersion, newClusterVersion)) + k.log.Debug("Updating local copy of nodeVersion Kubernetes version", "oldVersion", nodeVersion.Spec.KubernetesClusterVersion, "newVersion", newClusterVersion) nodeVersion.Spec.KubernetesComponentsReference = configMap.ObjectMeta.Name nodeVersion.Spec.KubernetesClusterVersion = newClusterVersion return &configMap, nil } +func (k *KubeCmd) retryGetJoinConfig(ctx context.Context) (*corev1.ConfigMap, error) { + var ctr int + retrieable := func(err error) bool { + if k8serrors.IsNotFound(err) { + return false + } + ctr++ + k.log.Debug("Getting join-config ConfigMap failed", "attempt", ctr, "maxAttempts", k.maxAttempts, "error", err) + return ctr < k.maxAttempts + } + + var joinConfig *corev1.ConfigMap + var err error + doer := &kubeDoer{ + action: func(ctx context.Context) error { + joinConfig, err = k.kubectl.GetConfigMap(ctx, constants.ConstellationNamespace, constants.JoinConfigMap) + return err + }, + } + retrier := conretry.NewIntervalRetrier(doer, k.retryInterval, retrieable) + + err = retrier.Do(ctx) + return joinConfig, err +} + +func (k *KubeCmd) retryAction(ctx context.Context, action func(ctx context.Context) error) error { + ctr := 0 + retrier := conretry.NewIntervalRetrier(&kubeDoer{action: action}, k.retryInterval, func(err error) bool { + ctr++ + k.log.Debug("Action failed", "attempt", ctr, "maxAttempts", k.maxAttempts, "error", err) + return ctr < k.maxAttempts + }) + return retrier.Do(ctx) +} + func checkForApplyError(expected, actual updatev1alpha1.NodeVersion) error { var err error switch { @@ -454,41 +506,6 @@ func (k *kubeDoer) Do(ctx context.Context) error { return k.action(ctx) } -func retryGetJoinConfig(ctx context.Context, kubectl kubectlInterface, retryInterval time.Duration, log debugLog) (*corev1.ConfigMap, error) { - var retries int - retrieable := func(err error) bool { - if k8serrors.IsNotFound(err) { - return false - } - retries++ - log.Debug(fmt.Sprintf("Getting join-config ConfigMap failed (attempt %d/%d): %q", retries, maxRetryAttempts, err)) - return retries < maxRetryAttempts - } - - var joinConfig *corev1.ConfigMap - var err error - doer := &kubeDoer{ - action: func(ctx context.Context) error { - joinConfig, err = kubectl.GetConfigMap(ctx, constants.ConstellationNamespace, constants.JoinConfigMap) - return err - }, - } - retrier := conretry.NewIntervalRetrier(doer, retryInterval, retrieable) - - err = retrier.Do(ctx) - return joinConfig, err -} - -func retryAction(ctx context.Context, retryInterval time.Duration, maxRetries int, action func(ctx context.Context) error, log debugLog) error { - ctr := 0 - retrier := conretry.NewIntervalRetrier(&kubeDoer{action: action}, retryInterval, func(err error) bool { - ctr++ - log.Debug(fmt.Sprintf("Action failed (attempt %d/%d): %q", ctr, maxRetries, err)) - return ctr < maxRetries - }) - return retrier.Do(ctx) -} - // kubectlInterface provides access to the Kubernetes API. type kubectlInterface interface { GetNodes(ctx context.Context) ([]corev1.Node, error) diff --git a/internal/constellation/kubecmd/kubecmd_test.go b/internal/constellation/kubecmd/kubecmd_test.go index cdaf99921..74e9562c1 100644 --- a/internal/constellation/kubecmd/kubecmd_test.go +++ b/internal/constellation/kubecmd/kubecmd_test.go @@ -174,8 +174,10 @@ func TestUpgradeNodeImage(t *testing.T) { } upgrader := KubeCmd{ - kubectl: kubectl, - log: logger.NewTest(t), + kubectl: kubectl, + retryInterval: time.Millisecond, + maxAttempts: 5, + log: logger.NewTest(t), } err = upgrader.UpgradeNodeImage(context.Background(), tc.newImageVersion, fmt.Sprintf("/path/to/image:%s", tc.newImageVersion.String()), tc.force) @@ -285,8 +287,10 @@ func TestUpgradeKubernetesVersion(t *testing.T) { } upgrader := KubeCmd{ - kubectl: kubectl, - log: logger.NewTest(t), + kubectl: kubectl, + retryInterval: time.Millisecond, + maxAttempts: 5, + log: logger.NewTest(t), } err = upgrader.UpgradeKubernetesVersion(context.Background(), tc.newKubernetesVersion, tc.force) @@ -341,7 +345,9 @@ func TestIsValidImageUpgrade(t *testing.T) { assert := assert.New(t) upgrader := &KubeCmd{ - log: logger.NewTest(t), + retryInterval: time.Millisecond, + maxAttempts: 5, + log: logger.NewTest(t), } nodeVersion := updatev1alpha1.NodeVersion{ @@ -392,7 +398,9 @@ func TestUpdateK8s(t *testing.T) { assert := assert.New(t) upgrader := &KubeCmd{ - log: logger.NewTest(t), + retryInterval: time.Millisecond, + maxAttempts: 5, + log: logger.NewTest(t), } nodeVersion := updatev1alpha1.NodeVersion{ @@ -589,6 +597,7 @@ func TestApplyJoinConfig(t *testing.T) { kubectl: tc.kubectl, log: logger.NewTest(t), retryInterval: time.Millisecond, + maxAttempts: 5, } err := cmd.ApplyJoinConfig(context.Background(), tc.newAttestationCfg, []byte{0x11}) @@ -611,6 +620,62 @@ func TestApplyJoinConfig(t *testing.T) { } } +func TestRetryAction(t *testing.T) { + maxAttempts := 3 + + testCases := map[string]struct { + failures int + wantErr bool + }{ + "no failures": { + failures: 0, + }, + "fail once": { + failures: 1, + }, + "fail equal to maxAttempts": { + failures: maxAttempts, + wantErr: true, + }, + "fail more than maxAttempts": { + failures: maxAttempts + 5, + wantErr: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + k := &KubeCmd{ + retryInterval: time.Millisecond, + maxAttempts: maxAttempts, + log: logger.NewTest(t), + } + + errs := map[int]error{} + for idx := range tc.failures { + errs[idx] = assert.AnError + } + + assert := assert.New(t) + + failureCtr := 0 + action := func(context.Context) error { + defer func() { failureCtr++ }() + return errs[failureCtr] + } + + err := k.retryAction(context.Background(), action) + if tc.wantErr { + assert.Error(err) + assert.Equal(min(tc.failures, maxAttempts), failureCtr) + return + } + assert.NoError(err) + assert.Equal(tc.failures, failureCtr-1) + }) + } +} + type fakeUnstructuredClient struct { mock.Mock }