mirror of
https://github.com/edgelesssys/constellation.git
synced 2024-12-28 17:09:30 -05:00
kubecmd: retry any k8s errors in CLI and Terraform (#3028)
* Retry any k8s errors in CLI and Terraform * Use structured logging in `kubecmd` package --------- Signed-off-by: Daniel Weiße <dw@edgeless.systems>
This commit is contained in:
parent
f189aa186f
commit
485ebb151e
@ -39,7 +39,7 @@ func (k *KubeCmd) BackupCRDs(ctx context.Context, fileHandler file.Handler, upgr
|
||||
for i := range crds {
|
||||
path := filepath.Join(crdBackupFolder, crds[i].Name+".yaml")
|
||||
|
||||
k.log.Debug(fmt.Sprintf("Creating CRD backup: %q", path))
|
||||
k.log.Debug("Creating CRD backup", "path", path)
|
||||
|
||||
// We have to manually set kind/apiversion because of a long-standing limitation of the API:
|
||||
// https://github.com/kubernetes/kubernetes/issues/3030#issuecomment-67543738
|
||||
@ -64,7 +64,7 @@ func (k *KubeCmd) BackupCRDs(ctx context.Context, fileHandler file.Handler, upgr
|
||||
func (k *KubeCmd) BackupCRs(ctx context.Context, fileHandler file.Handler, crds []apiextensionsv1.CustomResourceDefinition, upgradeDir string) error {
|
||||
k.log.Debug("Starting CR backup")
|
||||
for _, crd := range crds {
|
||||
k.log.Debug(fmt.Sprintf("Creating backup for resource type: %q", crd.Name))
|
||||
k.log.Debug("Creating backup", "crdName", crd.Name)
|
||||
|
||||
// Iterate over all versions of the CRD
|
||||
// TODO(daniel-weisse): Consider iterating over crd.Status.StoredVersions instead
|
||||
@ -72,7 +72,7 @@ func (k *KubeCmd) BackupCRs(ctx context.Context, fileHandler file.Handler, crds
|
||||
// a version that is not installed in the cluster.
|
||||
// With the StoredVersions field, we could only iterate over the installed versions.
|
||||
for _, version := range crd.Spec.Versions {
|
||||
k.log.Debug(fmt.Sprintf("Creating backup of CRs for %q at version %q", crd.Name, version.Name))
|
||||
k.log.Debug("Starting CustomResource backup", "crdName", crd.Name, "version", version.Name)
|
||||
|
||||
gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: version.Name, Resource: crd.Spec.Names.Plural}
|
||||
crs, err := k.kubectl.ListCRs(ctx, gvr)
|
||||
@ -80,7 +80,7 @@ func (k *KubeCmd) BackupCRs(ctx context.Context, fileHandler file.Handler, crds
|
||||
if !k8serrors.IsNotFound(err) {
|
||||
return fmt.Errorf("retrieving CR %s: %w", crd.Name, err)
|
||||
}
|
||||
k.log.Debug(fmt.Sprintf("No CRs found for %q at version %q, skipping...", crd.Name, version.Name))
|
||||
k.log.Debug("No CustomResources found. Skipping...", "crdName", crd.Name, "version", version.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -101,9 +101,9 @@ func (k *KubeCmd) BackupCRs(ctx context.Context, fileHandler file.Handler, crds
|
||||
}
|
||||
}
|
||||
|
||||
k.log.Debug(fmt.Sprintf("Backup for resource type %q complete", crd.Name))
|
||||
k.log.Debug("CustomResource backup complete", "crdName", crd.Name)
|
||||
}
|
||||
k.log.Debug("CR backup complete")
|
||||
k.log.Debug("All CustomResource backups completed")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -47,10 +47,6 @@ import (
|
||||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
||||
const (
|
||||
maxRetryAttempts = 20
|
||||
)
|
||||
|
||||
// ErrInProgress signals that an upgrade is in progress inside the cluster.
|
||||
var ErrInProgress = errors.New("upgrade in progress")
|
||||
|
||||
@ -69,6 +65,7 @@ func (e *applyError) Error() string {
|
||||
type KubeCmd struct {
|
||||
kubectl kubectlInterface
|
||||
retryInterval time.Duration
|
||||
maxAttempts int
|
||||
log debugLog
|
||||
}
|
||||
|
||||
@ -82,6 +79,7 @@ func New(kubeConfig []byte, log debugLog) (*KubeCmd, error) {
|
||||
return &KubeCmd{
|
||||
kubectl: client,
|
||||
retryInterval: time.Second * 5,
|
||||
maxAttempts: 20,
|
||||
log: log,
|
||||
}, nil
|
||||
}
|
||||
@ -103,7 +101,7 @@ func (k *KubeCmd) UpgradeNodeImage(ctx context.Context, imageVersion semver.Semv
|
||||
return fmt.Errorf("updating image version: %w", err)
|
||||
}
|
||||
|
||||
k.log.Debug(fmt.Sprintf("Updating local copy of nodeVersion image version from %q to %q", nodeVersion.Spec.ImageVersion, imageVersion.String()))
|
||||
k.log.Debug("Updating local copy of nodeVersion image version", "oldVersion", nodeVersion.Spec.ImageVersion, "newVersion", imageVersion.String())
|
||||
nodeVersion.Spec.ImageReference = imageReference
|
||||
nodeVersion.Spec.ImageVersion = imageVersion.String()
|
||||
|
||||
@ -121,41 +119,31 @@ func (k *KubeCmd) UpgradeKubernetesVersion(ctx context.Context, kubernetesVersio
|
||||
return err
|
||||
}
|
||||
|
||||
var upgradeErr *compatibility.InvalidUpgradeError
|
||||
// We have to allow users to specify outdated k8s patch versions.
|
||||
// Therefore, this code has to skip k8s updates if a user configures an outdated (i.e. invalid) k8s version.
|
||||
var components *corev1.ConfigMap
|
||||
_, err = versions.NewValidK8sVersion(string(kubernetesVersion), true)
|
||||
if err != nil {
|
||||
err = compatibility.NewInvalidUpgradeError(
|
||||
if _, err := versions.NewValidK8sVersion(string(kubernetesVersion), true); err != nil {
|
||||
return fmt.Errorf("skipping Kubernetes upgrade: %w", compatibility.NewInvalidUpgradeError(
|
||||
nodeVersion.Spec.KubernetesClusterVersion,
|
||||
string(kubernetesVersion),
|
||||
fmt.Errorf("unsupported Kubernetes version, supported versions are %s", strings.Join(versions.SupportedK8sVersions(), ", ")),
|
||||
fmt.Errorf("unsupported Kubernetes version, supported versions are %s", strings.Join(versions.SupportedK8sVersions(), ", "))),
|
||||
)
|
||||
} else {
|
||||
versionConfig, ok := versions.VersionConfigs[kubernetesVersion]
|
||||
if !ok {
|
||||
err = compatibility.NewInvalidUpgradeError(
|
||||
nodeVersion.Spec.KubernetesClusterVersion,
|
||||
string(kubernetesVersion),
|
||||
fmt.Errorf("no version config matching K8s %s", kubernetesVersion),
|
||||
)
|
||||
} else {
|
||||
components, err = k.prepareUpdateK8s(&nodeVersion, versionConfig.ClusterVersion,
|
||||
versionConfig.KubernetesComponents, force)
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case err == nil:
|
||||
err := k.applyComponentsCM(ctx, components)
|
||||
if err != nil {
|
||||
return fmt.Errorf("applying k8s components ConfigMap: %w", err)
|
||||
}
|
||||
case errors.As(err, &upgradeErr):
|
||||
return fmt.Errorf("skipping Kubernetes upgrade: %w", err)
|
||||
default:
|
||||
return fmt.Errorf("updating Kubernetes version: %w", err)
|
||||
versionConfig, ok := versions.VersionConfigs[kubernetesVersion]
|
||||
if !ok {
|
||||
return fmt.Errorf("skipping Kubernetes upgrade: %w", compatibility.NewInvalidUpgradeError(
|
||||
nodeVersion.Spec.KubernetesClusterVersion,
|
||||
string(kubernetesVersion),
|
||||
fmt.Errorf("no version config matching K8s %s", kubernetesVersion),
|
||||
))
|
||||
}
|
||||
components, err := k.prepareUpdateK8s(&nodeVersion, versionConfig.ClusterVersion, versionConfig.KubernetesComponents, force)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := k.applyComponentsCM(ctx, components); err != nil {
|
||||
return fmt.Errorf("applying k8s components ConfigMap: %w", err)
|
||||
}
|
||||
|
||||
updatedNodeVersion, err := k.applyNodeVersion(ctx, nodeVersion)
|
||||
@ -167,8 +155,13 @@ func (k *KubeCmd) UpgradeKubernetesVersion(ctx context.Context, kubernetesVersio
|
||||
|
||||
// ClusterStatus returns a map from node name to NodeStatus.
|
||||
func (k *KubeCmd) ClusterStatus(ctx context.Context) (map[string]NodeStatus, error) {
|
||||
nodes, err := k.kubectl.GetNodes(ctx)
|
||||
if err != nil {
|
||||
var nodes []corev1.Node
|
||||
|
||||
if err := k.retryAction(ctx, func(ctx context.Context) error {
|
||||
var err error
|
||||
nodes, err = k.kubectl.GetNodes(ctx)
|
||||
return err
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("getting nodes: %w", err)
|
||||
}
|
||||
|
||||
@ -183,7 +176,7 @@ func (k *KubeCmd) ClusterStatus(ctx context.Context) (map[string]NodeStatus, err
|
||||
// GetClusterAttestationConfig fetches the join-config configmap from the cluster,
|
||||
// and returns the attestation config.
|
||||
func (k *KubeCmd) GetClusterAttestationConfig(ctx context.Context, variant variant.Variant) (config.AttestationCfg, error) {
|
||||
existingConf, err := retryGetJoinConfig(ctx, k.kubectl, k.retryInterval, k.log)
|
||||
existingConf, err := k.retryGetJoinConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("retrieving current attestation config: %w", err)
|
||||
}
|
||||
@ -208,19 +201,19 @@ func (k *KubeCmd) ApplyJoinConfig(ctx context.Context, newAttestConfig config.At
|
||||
return fmt.Errorf("marshaling attestation config: %w", err)
|
||||
}
|
||||
|
||||
joinConfig, err := retryGetJoinConfig(ctx, k.kubectl, k.retryInterval, k.log)
|
||||
joinConfig, err := k.retryGetJoinConfig(ctx)
|
||||
if err != nil {
|
||||
if !k8serrors.IsNotFound(err) {
|
||||
return fmt.Errorf("getting %s ConfigMap: %w", constants.JoinConfigMap, err)
|
||||
}
|
||||
|
||||
k.log.Debug(fmt.Sprintf("ConfigMap %q does not exist in namespace %q, creating it now", constants.JoinConfigMap, constants.ConstellationNamespace))
|
||||
if err := retryAction(ctx, k.retryInterval, maxRetryAttempts, func(ctx context.Context) error {
|
||||
k.log.Debug("ConfigMap does not exist, creating it now", "name", constants.JoinConfigMap, "namespace", constants.ConstellationNamespace)
|
||||
if err := k.retryAction(ctx, func(ctx context.Context) error {
|
||||
return k.kubectl.CreateConfigMap(ctx, joinConfigMap(newConfigJSON, measurementSalt))
|
||||
}, k.log); err != nil {
|
||||
}); err != nil {
|
||||
return fmt.Errorf("creating join-config ConfigMap: %w", err)
|
||||
}
|
||||
k.log.Debug(fmt.Sprintf("Created %q ConfigMap in namespace %q", constants.JoinConfigMap, constants.ConstellationNamespace))
|
||||
k.log.Debug("Created ConfigMap", "name", constants.JoinConfigMap, "namespace", constants.ConstellationNamespace)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -228,10 +221,10 @@ func (k *KubeCmd) ApplyJoinConfig(ctx context.Context, newAttestConfig config.At
|
||||
joinConfig.Data[constants.AttestationConfigFilename+"_backup"] = joinConfig.Data[constants.AttestationConfigFilename]
|
||||
joinConfig.Data[constants.AttestationConfigFilename] = string(newConfigJSON)
|
||||
k.log.Debug("Triggering attestation config update now")
|
||||
if err := retryAction(ctx, k.retryInterval, maxRetryAttempts, func(ctx context.Context) error {
|
||||
if err := k.retryAction(ctx, func(ctx context.Context) error {
|
||||
_, err = k.kubectl.UpdateConfigMap(ctx, joinConfig)
|
||||
return err
|
||||
}, k.log); err != nil {
|
||||
}); err != nil {
|
||||
return fmt.Errorf("setting new attestation config: %w", err)
|
||||
}
|
||||
|
||||
@ -266,7 +259,7 @@ func (k *KubeCmd) ExtendClusterConfigCertSANs(ctx context.Context, alternativeNa
|
||||
k.log.Debug("No new SANs to add to the cluster's apiserver SAN field")
|
||||
return nil
|
||||
}
|
||||
k.log.Debug(fmt.Sprintf("Extending the cluster's apiserver SAN field with the following SANs: %q", strings.Join(missingSANs, ", ")))
|
||||
k.log.Debug("Extending the cluster's apiserver SAN field", "certSANs", strings.Join(missingSANs, ", "))
|
||||
|
||||
clusterConfiguration.APIServer.CertSANs = append(clusterConfiguration.APIServer.CertSANs, missingSANs...)
|
||||
sort.Strings(clusterConfiguration.APIServer.CertSANs)
|
||||
@ -278,7 +271,10 @@ func (k *KubeCmd) ExtendClusterConfigCertSANs(ctx context.Context, alternativeNa
|
||||
|
||||
kubeadmConfig.Data[constants.ClusterConfigurationKey] = string(newConfigYAML)
|
||||
k.log.Debug("Triggering kubeadm config update now")
|
||||
if _, err = k.kubectl.UpdateConfigMap(ctx, kubeadmConfig); err != nil {
|
||||
if err = k.retryAction(ctx, func(ctx context.Context) error {
|
||||
_, err := k.kubectl.UpdateConfigMap(ctx, kubeadmConfig)
|
||||
return err
|
||||
}); err != nil {
|
||||
return fmt.Errorf("setting new kubeadm config: %w", err)
|
||||
}
|
||||
|
||||
@ -299,14 +295,19 @@ func (k *KubeCmd) GetConstellationVersion(ctx context.Context) (NodeVersion, err
|
||||
|
||||
// getConstellationVersion returns the NodeVersion object of a Constellation cluster.
|
||||
func (k *KubeCmd) getConstellationVersion(ctx context.Context) (updatev1alpha1.NodeVersion, error) {
|
||||
raw, err := k.kubectl.GetCR(ctx, schema.GroupVersionResource{
|
||||
Group: "update.edgeless.systems",
|
||||
Version: "v1alpha1",
|
||||
Resource: "nodeversions",
|
||||
}, constants.NodeVersionResourceName)
|
||||
if err != nil {
|
||||
var raw *unstructured.Unstructured
|
||||
if err := k.retryAction(ctx, func(ctx context.Context) error {
|
||||
var err error
|
||||
raw, err = k.kubectl.GetCR(ctx, schema.GroupVersionResource{
|
||||
Group: "update.edgeless.systems",
|
||||
Version: "v1alpha1",
|
||||
Resource: "nodeversions",
|
||||
}, constants.NodeVersionResourceName)
|
||||
return err
|
||||
}); err != nil {
|
||||
return updatev1alpha1.NodeVersion{}, err
|
||||
}
|
||||
|
||||
var nodeVersion updatev1alpha1.NodeVersion
|
||||
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(raw.UnstructuredContent(), &nodeVersion); err != nil {
|
||||
return updatev1alpha1.NodeVersion{}, fmt.Errorf("converting unstructured to NodeVersion: %w", err)
|
||||
@ -318,10 +319,15 @@ func (k *KubeCmd) getConstellationVersion(ctx context.Context) (updatev1alpha1.N
|
||||
// getClusterConfiguration fetches the kubeadm-config configmap from the cluster, extracts the config
|
||||
// and returns both the full configmap and the ClusterConfiguration.
|
||||
func (k *KubeCmd) getClusterConfiguration(ctx context.Context) (kubeadmv1beta3.ClusterConfiguration, *corev1.ConfigMap, error) {
|
||||
existingConf, err := k.kubectl.GetConfigMap(ctx, constants.ConstellationNamespace, constants.KubeadmConfigMap)
|
||||
if err != nil {
|
||||
var existingConf *corev1.ConfigMap
|
||||
if err := k.retryAction(ctx, func(ctx context.Context) error {
|
||||
var err error
|
||||
existingConf, err = k.kubectl.GetConfigMap(ctx, constants.ConstellationNamespace, constants.KubeadmConfigMap)
|
||||
return err
|
||||
}); err != nil {
|
||||
return kubeadmv1beta3.ClusterConfiguration{}, nil, fmt.Errorf("retrieving current kubeadm-config: %w", err)
|
||||
}
|
||||
|
||||
clusterConf, ok := existingConf.Data[constants.ClusterConfigurationKey]
|
||||
if !ok {
|
||||
return kubeadmv1beta3.ClusterConfiguration{}, nil, errors.New("ClusterConfiguration missing from kubeadm-config")
|
||||
@ -337,9 +343,16 @@ func (k *KubeCmd) getClusterConfiguration(ctx context.Context) (kubeadmv1beta3.C
|
||||
|
||||
// applyComponentsCM applies the k8s components ConfigMap to the cluster.
|
||||
func (k *KubeCmd) applyComponentsCM(ctx context.Context, components *corev1.ConfigMap) error {
|
||||
// If the map already exists we can use that map and assume it has the same content as 'configMap'.
|
||||
if err := k.kubectl.CreateConfigMap(ctx, components); err != nil && !k8serrors.IsAlreadyExists(err) {
|
||||
return fmt.Errorf("creating k8s-components ConfigMap: %w. %T", err, err)
|
||||
if err := k.retryAction(ctx, func(ctx context.Context) error {
|
||||
// If the components ConfigMap already exists we assume it is up to date,
|
||||
// since its name is derived from a hash of its contents.
|
||||
err := k.kubectl.CreateConfigMap(ctx, components)
|
||||
if err != nil && !k8serrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("creating k8s-components ConfigMap: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -347,31 +360,35 @@ func (k *KubeCmd) applyComponentsCM(ctx context.Context, components *corev1.Conf
|
||||
func (k *KubeCmd) applyNodeVersion(ctx context.Context, nodeVersion updatev1alpha1.NodeVersion) (updatev1alpha1.NodeVersion, error) {
|
||||
k.log.Debug("Triggering NodeVersion upgrade now")
|
||||
var updatedNodeVersion updatev1alpha1.NodeVersion
|
||||
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
newNode, err := k.getConstellationVersion(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("retrieving current NodeVersion: %w", err)
|
||||
}
|
||||
|
||||
updateNodeVersions(nodeVersion, &newNode)
|
||||
// Retry the entire "retry-on-conflict" block to retry if the block fails, e.g. due to etcd timeouts.
|
||||
err := k.retryAction(ctx, func(ctx context.Context) error {
|
||||
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
newNode, err := k.getConstellationVersion(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("retrieving current NodeVersion: %w", err)
|
||||
}
|
||||
|
||||
raw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&newNode)
|
||||
if err != nil {
|
||||
return fmt.Errorf("converting nodeVersion to unstructured: %w", err)
|
||||
}
|
||||
updated, err := k.kubectl.UpdateCR(ctx, schema.GroupVersionResource{
|
||||
Group: "update.edgeless.systems",
|
||||
Version: "v1alpha1",
|
||||
Resource: "nodeversions",
|
||||
}, &unstructured.Unstructured{Object: raw})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
updateNodeVersions(nodeVersion, &newNode)
|
||||
|
||||
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updated.UnstructuredContent(), &updatedNodeVersion); err != nil {
|
||||
return fmt.Errorf("converting unstructured to NodeVersion: %w", err)
|
||||
}
|
||||
return nil
|
||||
raw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&newNode)
|
||||
if err != nil {
|
||||
return fmt.Errorf("converting nodeVersion to unstructured: %w", err)
|
||||
}
|
||||
updated, err := k.kubectl.UpdateCR(ctx, schema.GroupVersionResource{
|
||||
Group: "update.edgeless.systems",
|
||||
Version: "v1alpha1",
|
||||
Resource: "nodeversions",
|
||||
}, &unstructured.Unstructured{Object: raw})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updated.UnstructuredContent(), &updatedNodeVersion); err != nil {
|
||||
return fmt.Errorf("converting unstructured to NodeVersion: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
return updatedNodeVersion, err
|
||||
@ -405,17 +422,52 @@ func (k *KubeCmd) prepareUpdateK8s(nodeVersion *updatev1alpha1.NodeVersion, newC
|
||||
}
|
||||
if !force {
|
||||
if err := compatibility.IsValidUpgrade(nodeVersion.Spec.KubernetesClusterVersion, newClusterVersion); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("skipping Kubernetes upgrade: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
k.log.Debug(fmt.Sprintf("Updating local copy of nodeVersion Kubernetes version from %q to %q", nodeVersion.Spec.KubernetesClusterVersion, newClusterVersion))
|
||||
k.log.Debug("Updating local copy of nodeVersion Kubernetes version", "oldVersion", nodeVersion.Spec.KubernetesClusterVersion, "newVersion", newClusterVersion)
|
||||
nodeVersion.Spec.KubernetesComponentsReference = configMap.ObjectMeta.Name
|
||||
nodeVersion.Spec.KubernetesClusterVersion = newClusterVersion
|
||||
|
||||
return &configMap, nil
|
||||
}
|
||||
|
||||
func (k *KubeCmd) retryGetJoinConfig(ctx context.Context) (*corev1.ConfigMap, error) {
|
||||
var ctr int
|
||||
retrieable := func(err error) bool {
|
||||
if k8serrors.IsNotFound(err) {
|
||||
return false
|
||||
}
|
||||
ctr++
|
||||
k.log.Debug("Getting join-config ConfigMap failed", "attempt", ctr, "maxAttempts", k.maxAttempts, "error", err)
|
||||
return ctr < k.maxAttempts
|
||||
}
|
||||
|
||||
var joinConfig *corev1.ConfigMap
|
||||
var err error
|
||||
doer := &kubeDoer{
|
||||
action: func(ctx context.Context) error {
|
||||
joinConfig, err = k.kubectl.GetConfigMap(ctx, constants.ConstellationNamespace, constants.JoinConfigMap)
|
||||
return err
|
||||
},
|
||||
}
|
||||
retrier := conretry.NewIntervalRetrier(doer, k.retryInterval, retrieable)
|
||||
|
||||
err = retrier.Do(ctx)
|
||||
return joinConfig, err
|
||||
}
|
||||
|
||||
func (k *KubeCmd) retryAction(ctx context.Context, action func(ctx context.Context) error) error {
|
||||
ctr := 0
|
||||
retrier := conretry.NewIntervalRetrier(&kubeDoer{action: action}, k.retryInterval, func(err error) bool {
|
||||
ctr++
|
||||
k.log.Debug("Action failed", "attempt", ctr, "maxAttempts", k.maxAttempts, "error", err)
|
||||
return ctr < k.maxAttempts
|
||||
})
|
||||
return retrier.Do(ctx)
|
||||
}
|
||||
|
||||
func checkForApplyError(expected, actual updatev1alpha1.NodeVersion) error {
|
||||
var err error
|
||||
switch {
|
||||
@ -454,41 +506,6 @@ func (k *kubeDoer) Do(ctx context.Context) error {
|
||||
return k.action(ctx)
|
||||
}
|
||||
|
||||
func retryGetJoinConfig(ctx context.Context, kubectl kubectlInterface, retryInterval time.Duration, log debugLog) (*corev1.ConfigMap, error) {
|
||||
var retries int
|
||||
retrieable := func(err error) bool {
|
||||
if k8serrors.IsNotFound(err) {
|
||||
return false
|
||||
}
|
||||
retries++
|
||||
log.Debug(fmt.Sprintf("Getting join-config ConfigMap failed (attempt %d/%d): %q", retries, maxRetryAttempts, err))
|
||||
return retries < maxRetryAttempts
|
||||
}
|
||||
|
||||
var joinConfig *corev1.ConfigMap
|
||||
var err error
|
||||
doer := &kubeDoer{
|
||||
action: func(ctx context.Context) error {
|
||||
joinConfig, err = kubectl.GetConfigMap(ctx, constants.ConstellationNamespace, constants.JoinConfigMap)
|
||||
return err
|
||||
},
|
||||
}
|
||||
retrier := conretry.NewIntervalRetrier(doer, retryInterval, retrieable)
|
||||
|
||||
err = retrier.Do(ctx)
|
||||
return joinConfig, err
|
||||
}
|
||||
|
||||
func retryAction(ctx context.Context, retryInterval time.Duration, maxRetries int, action func(ctx context.Context) error, log debugLog) error {
|
||||
ctr := 0
|
||||
retrier := conretry.NewIntervalRetrier(&kubeDoer{action: action}, retryInterval, func(err error) bool {
|
||||
ctr++
|
||||
log.Debug(fmt.Sprintf("Action failed (attempt %d/%d): %q", ctr, maxRetries, err))
|
||||
return ctr < maxRetries
|
||||
})
|
||||
return retrier.Do(ctx)
|
||||
}
|
||||
|
||||
// kubectlInterface provides access to the Kubernetes API.
|
||||
type kubectlInterface interface {
|
||||
GetNodes(ctx context.Context) ([]corev1.Node, error)
|
||||
|
@ -174,8 +174,10 @@ func TestUpgradeNodeImage(t *testing.T) {
|
||||
}
|
||||
|
||||
upgrader := KubeCmd{
|
||||
kubectl: kubectl,
|
||||
log: logger.NewTest(t),
|
||||
kubectl: kubectl,
|
||||
retryInterval: time.Millisecond,
|
||||
maxAttempts: 5,
|
||||
log: logger.NewTest(t),
|
||||
}
|
||||
|
||||
err = upgrader.UpgradeNodeImage(context.Background(), tc.newImageVersion, fmt.Sprintf("/path/to/image:%s", tc.newImageVersion.String()), tc.force)
|
||||
@ -285,8 +287,10 @@ func TestUpgradeKubernetesVersion(t *testing.T) {
|
||||
}
|
||||
|
||||
upgrader := KubeCmd{
|
||||
kubectl: kubectl,
|
||||
log: logger.NewTest(t),
|
||||
kubectl: kubectl,
|
||||
retryInterval: time.Millisecond,
|
||||
maxAttempts: 5,
|
||||
log: logger.NewTest(t),
|
||||
}
|
||||
|
||||
err = upgrader.UpgradeKubernetesVersion(context.Background(), tc.newKubernetesVersion, tc.force)
|
||||
@ -341,7 +345,9 @@ func TestIsValidImageUpgrade(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
upgrader := &KubeCmd{
|
||||
log: logger.NewTest(t),
|
||||
retryInterval: time.Millisecond,
|
||||
maxAttempts: 5,
|
||||
log: logger.NewTest(t),
|
||||
}
|
||||
|
||||
nodeVersion := updatev1alpha1.NodeVersion{
|
||||
@ -392,7 +398,9 @@ func TestUpdateK8s(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
upgrader := &KubeCmd{
|
||||
log: logger.NewTest(t),
|
||||
retryInterval: time.Millisecond,
|
||||
maxAttempts: 5,
|
||||
log: logger.NewTest(t),
|
||||
}
|
||||
|
||||
nodeVersion := updatev1alpha1.NodeVersion{
|
||||
@ -589,6 +597,7 @@ func TestApplyJoinConfig(t *testing.T) {
|
||||
kubectl: tc.kubectl,
|
||||
log: logger.NewTest(t),
|
||||
retryInterval: time.Millisecond,
|
||||
maxAttempts: 5,
|
||||
}
|
||||
|
||||
err := cmd.ApplyJoinConfig(context.Background(), tc.newAttestationCfg, []byte{0x11})
|
||||
@ -611,6 +620,62 @@ func TestApplyJoinConfig(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryAction(t *testing.T) {
|
||||
maxAttempts := 3
|
||||
|
||||
testCases := map[string]struct {
|
||||
failures int
|
||||
wantErr bool
|
||||
}{
|
||||
"no failures": {
|
||||
failures: 0,
|
||||
},
|
||||
"fail once": {
|
||||
failures: 1,
|
||||
},
|
||||
"fail equal to maxAttempts": {
|
||||
failures: maxAttempts,
|
||||
wantErr: true,
|
||||
},
|
||||
"fail more than maxAttempts": {
|
||||
failures: maxAttempts + 5,
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
k := &KubeCmd{
|
||||
retryInterval: time.Millisecond,
|
||||
maxAttempts: maxAttempts,
|
||||
log: logger.NewTest(t),
|
||||
}
|
||||
|
||||
errs := map[int]error{}
|
||||
for idx := range tc.failures {
|
||||
errs[idx] = assert.AnError
|
||||
}
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
failureCtr := 0
|
||||
action := func(context.Context) error {
|
||||
defer func() { failureCtr++ }()
|
||||
return errs[failureCtr]
|
||||
}
|
||||
|
||||
err := k.retryAction(context.Background(), action)
|
||||
if tc.wantErr {
|
||||
assert.Error(err)
|
||||
assert.Equal(min(tc.failures, maxAttempts), failureCtr)
|
||||
return
|
||||
}
|
||||
assert.NoError(err)
|
||||
assert.Equal(tc.failures, failureCtr-1)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeUnstructuredClient struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user