/* Copyright (c) Edgeless Systems GmbH SPDX-License-Identifier: AGPL-3.0-only */ package controllers import ( "context" "encoding/json" "errors" "reflect" "strings" "time" mainconstants "github.com/edgelesssys/constellation/v2/internal/constants" "github.com/edgelesssys/constellation/v2/internal/versions/components" nodeutil "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/node" "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/patch" "golang.org/x/mod/semver" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/version" ref "k8s.io/client-go/tools/reference" "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/source" updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1" nodemaintenancev1beta1 "github.com/medik8s/node-maintenance-operator/api/v1beta1" ) const ( // nodeOverprovisionLimit is the maximum number of extra nodes created during the update procedure at any point in time. nodeOverprovisionLimit = 1 // nodeJoinTimeout is the time limit pending nodes have to join the cluster before being terminated. nodeJoinTimeout = time.Minute * 30 // nodeLeaveTimeout is the time limit pending nodes have to leave the cluster and being terminated. nodeLeaveTimeout = time.Minute donorAnnotation = "constellation.edgeless.systems/donor" heirAnnotation = "constellation.edgeless.systems/heir" scalingGroupAnnotation = "constellation.edgeless.systems/scaling-group-id" nodeImageAnnotation = "constellation.edgeless.systems/node-image" obsoleteAnnotation = "constellation.edgeless.systems/obsolete" conditionNodeVersionUpToDateReason = "NodeVersionsUpToDate" conditionNodeVersionUpToDateMessage = "Node version of every node is up to date" conditionNodeVersionOutOfDateReason = "NodeVersionsOutOfDate" conditionNodeVersionOutOfDateMessage = "Some node versions are out of date" ) // NodeVersionReconciler reconciles a NodeVersion object. type NodeVersionReconciler struct { nodeReplacer etcdRemover clusterUpgrader kubernetesServerVersionGetter client.Client Scheme *runtime.Scheme } // NewNodeVersionReconciler creates a new NodeVersionReconciler. func NewNodeVersionReconciler(nodeReplacer nodeReplacer, etcdRemover etcdRemover, clusterUpgrader clusterUpgrader, k8sVerGetter kubernetesServerVersionGetter, client client.Client, scheme *runtime.Scheme) *NodeVersionReconciler { return &NodeVersionReconciler{ nodeReplacer: nodeReplacer, etcdRemover: etcdRemover, clusterUpgrader: clusterUpgrader, kubernetesServerVersionGetter: k8sVerGetter, Client: client, Scheme: scheme, } } //+kubebuilder:rbac:groups=update.edgeless.systems,resources=nodeversions,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=update.edgeless.systems,resources=nodeversions/status,verbs=get;update;patch //+kubebuilder:rbac:groups=update.edgeless.systems,resources=nodeversions/finalizers,verbs=update //+kubebuilder:rbac:groups=nodemaintenance.medik8s.io,resources=nodemaintenances,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=nodes/status,verbs=get //+kubebuilder:rbac:groups="",resources=configmaps,verbs=list;get // Reconcile replaces outdated nodes with new nodes as specified in the NodeVersion spec. func (r *NodeVersionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logr := log.FromContext(ctx) logr.Info("Reconciling NodeVersion") var desiredNodeVersion updatev1alpha1.NodeVersion if err := r.Get(ctx, req.NamespacedName, &desiredNodeVersion); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } // Check if we need to upgrade the cluster version. serverVer, err := r.ServerVersion() if err != nil { return ctrl.Result{}, err } // GitVersion is the semantic version of the Kubernetes server e.g. "v1.24.9" if semver.Compare(serverVer.GitVersion, desiredNodeVersion.Spec.KubernetesClusterVersion) != 0 { r.tryStartClusterVersionUpgrade(ctx, req.NamespacedName) } // get list of autoscaling strategies // there should be exactly one autoscaling strategy but we do not specify its name. // if there is no autoscaling strategy, it is assumed that autoscaling is disabled. var autoscalingStrategiesList updatev1alpha1.AutoscalingStrategyList if err := r.List(ctx, &autoscalingStrategiesList); err != nil { return ctrl.Result{}, err } var autoscalingEnabled bool for _, autoscalingStrategy := range autoscalingStrategiesList.Items { if autoscalingStrategy.Status.Enabled { autoscalingEnabled = true break } } // get list of all nodes var nodeList corev1.NodeList if err := r.List(ctx, &nodeList); err != nil { logr.Error(err, "Unable to list nodes") return ctrl.Result{}, err } // get list of all pending nodes var pendingNodeList updatev1alpha1.PendingNodeList if err := r.List(ctx, &pendingNodeList, client.InNamespace(req.Namespace)); err != nil { logr.Error(err, "Unable to list pending nodes") return ctrl.Result{}, err } // get list of all scaling groups var scalingGroupList updatev1alpha1.ScalingGroupList if err := r.List(ctx, &scalingGroupList, client.InNamespace(req.Namespace)); err != nil { logr.Error(err, "Unable to list scaling groups") return ctrl.Result{}, err } scalingGroupByID := make(map[string]updatev1alpha1.ScalingGroup, len(scalingGroupList.Items)) for _, scalingGroup := range scalingGroupList.Items { scalingGroupByID[strings.ToLower(scalingGroup.Spec.GroupID)] = scalingGroup } annotatedNodes, invalidNodes := r.annotateNodes(ctx, nodeList.Items) groups := groupNodes(annotatedNodes, pendingNodeList.Items, desiredNodeVersion.Spec.ImageReference, desiredNodeVersion.Spec.KubernetesComponentsReference) logr.Info("Grouped nodes", "outdatedNodes", len(groups.Outdated), "upToDateNodes", len(groups.UpToDate), "donorNodes", len(groups.Donors), "heirNodes", len(groups.Heirs), "mintNodes", len(groups.Mint), "pendingNodes", len(pendingNodeList.Items), "awaitingAnnotationNodes", len(groups.AwaitingAnnotation), "obsoleteNodes", len(groups.Obsolete), "invalidNodes", len(invalidNodes)) // extraNodes are nodes that exist in the scaling group which cannot be used for regular workloads. // consists of nodes that are // - being created (joining) // - being destroyed (leaving) // - heirs to outdated nodes extraNodes := len(groups.Heirs) + len(groups.AwaitingAnnotation) + len(pendingNodeList.Items) // newNodesBudget is the maximum number of new nodes that can be created in this Reconcile call. var newNodesBudget int if extraNodes < nodeOverprovisionLimit { newNodesBudget = nodeOverprovisionLimit - extraNodes } logr.Info("Budget for new nodes", "newNodesBudget", newNodesBudget) status := nodeVersionStatus(r.Scheme, groups, pendingNodeList.Items, invalidNodes, newNodesBudget) if err := r.tryUpdateStatus(ctx, req.NamespacedName, status); err != nil { logr.Error(err, "Updating status") } allNodesUpToDate := len(groups.Outdated)+len(groups.Heirs)+len(groups.AwaitingAnnotation)+len(pendingNodeList.Items)+len(groups.Obsolete) == 0 if err := r.ensureAutoscaling(ctx, autoscalingEnabled, allNodesUpToDate); err != nil { logr.Error(err, "Ensure autoscaling", "autoscalingEnabledIs", autoscalingEnabled, "autoscalingEnabledWant", allNodesUpToDate) return ctrl.Result{}, err } if allNodesUpToDate { logr.Info("All node versions up to date") return ctrl.Result{}, nil } // should requeue is set if a node is deleted var shouldRequeue bool // find pairs of mint nodes and outdated nodes in the same scaling group to become donor & heir replacementPairs := r.pairDonorsAndHeirs(ctx, &desiredNodeVersion, groups.Outdated, groups.Mint) // extend replacement pairs to include existing pairs of donors and heirs replacementPairs = r.matchDonorsAndHeirs(ctx, replacementPairs, groups.Donors, groups.Heirs) // replace donor nodes by heirs for _, pair := range replacementPairs { logr.Info("Replacing node", "donorNode", pair.donor.Name, "heirNode", pair.heir.Name) done, err := r.replaceNode(ctx, &desiredNodeVersion, pair) if err != nil { logr.Error(err, "Replacing node") return ctrl.Result{}, err } if done { shouldRequeue = true // remove donor annotation from heir if err := r.patchUnsetNodeAnnotations(ctx, pair.heir.Name, []string{donorAnnotation}); err != nil { logr.Error(err, "Unable to remove donor annotation from heir", "heirNode", pair.heir.Name) } } } // only create new nodes if the autoscaler is disabled. // otherwise, new nodes will also be created by the autoscaler if autoscalingEnabled { return ctrl.Result{Requeue: shouldRequeue}, nil } newNodeConfig := newNodeConfig{desiredNodeVersion, groups.Outdated, pendingNodeList.Items, scalingGroupByID, newNodesBudget} if err := r.createNewNodes(ctx, newNodeConfig); err != nil { return ctrl.Result{Requeue: shouldRequeue}, nil } // cleanup obsolete nodes for _, node := range groups.Obsolete { done, err := r.deleteNode(ctx, &desiredNodeVersion, node) if err != nil { logr.Error(err, "Unable to remove obsolete node") } if done { shouldRequeue = true } } return ctrl.Result{Requeue: shouldRequeue}, nil } // SetupWithManager sets up the controller with the Manager. func (r *NodeVersionReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&updatev1alpha1.NodeVersion{}). Watches( &source.Kind{Type: &updatev1alpha1.ScalingGroup{}}, handler.EnqueueRequestsFromMapFunc(r.findObjectsForScalingGroup), builder.WithPredicates(scalingGroupImageChangedPredicate()), ). Watches( &source.Kind{Type: &updatev1alpha1.AutoscalingStrategy{}}, handler.EnqueueRequestsFromMapFunc(r.findAllNodeVersions), builder.WithPredicates(autoscalerEnabledStatusChangedPredicate()), ). Watches( &source.Kind{Type: &corev1.Node{}}, handler.EnqueueRequestsFromMapFunc(r.findAllNodeVersions), builder.WithPredicates(nodeReadyPredicate()), ). Watches( &source.Kind{Type: &nodemaintenancev1beta1.NodeMaintenance{}}, handler.EnqueueRequestsFromMapFunc(r.findAllNodeVersions), builder.WithPredicates(nodeMaintenanceSucceededPredicate()), ). Watches( &source.Kind{Type: &updatev1alpha1.JoiningNode{}}, handler.EnqueueRequestsFromMapFunc(r.findAllNodeVersions), builder.WithPredicates(joiningNodeDeletedPredicate()), ). Owns(&updatev1alpha1.PendingNode{}). Complete(r) } // annotateNodes takes all nodes of the cluster and annotates them with the scaling group they are in and the image they are using. func (r *NodeVersionReconciler) annotateNodes(ctx context.Context, nodes []corev1.Node) (annotatedNodes, invalidNodes []corev1.Node) { logr := log.FromContext(ctx) for _, node := range nodes { annotations := make(map[string]string) if node.Spec.ProviderID == "" { logr.Info("Node is missing providerID", "invalidNode", node.Name) invalidNodes = append(invalidNodes, node) continue } if _, ok := node.Annotations[scalingGroupAnnotation]; !ok { scalingGroupID, err := r.nodeReplacer.GetScalingGroupID(ctx, node.Spec.ProviderID) if err != nil { logr.Error(err, "Unable to get node scaling group") invalidNodes = append(invalidNodes, node) continue } annotations[scalingGroupAnnotation] = scalingGroupID } if _, ok := node.Annotations[nodeImageAnnotation]; !ok { nodeImage, err := r.nodeReplacer.GetNodeImage(ctx, node.Spec.ProviderID) if err != nil { logr.Error(err, "Unable to get node image") invalidNodes = append(invalidNodes, node) continue } annotations[nodeImageAnnotation] = nodeImage } if len(annotations) > 0 { if err := r.patchNodeAnnotations(ctx, node.Name, annotations); err != nil { logr.Error(err, "Unable to patch node annotations") invalidNodes = append(invalidNodes, node) continue } if err := r.Get(ctx, types.NamespacedName{Name: node.Name}, &node); err != nil { logr.Error(err, "Unable to get patched node") invalidNodes = append(invalidNodes, node) continue } } annotatedNodes = append(annotatedNodes, node) } return annotatedNodes, invalidNodes } func (r *NodeVersionReconciler) tryStartClusterVersionUpgrade(ctx context.Context, nodeVersionName types.NamespacedName) { // try to set the cluster version upgrade status to "in progress" // lock the node version for cluster upgrades if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { nodeVersion := &updatev1alpha1.NodeVersion{} if err := r.Get(ctx, nodeVersionName, nodeVersion); err != nil { log.FromContext(ctx).Error(err, "Unable to get node version") return err } if nodeVersion.Status.ActiveClusterVersionUpgrade { return errors.New("cluster version upgrade already in progress") } nodeVersion.Status.ActiveClusterVersionUpgrade = true if err := r.Status().Update(ctx, nodeVersion); err != nil { log.FromContext(ctx).Error(err, "Unable to update node version status") return err } return nil }); err != nil { return } // get clusterKubernetesVersion from nodeVersion nodeVersion := &updatev1alpha1.NodeVersion{} if err := r.Get(ctx, nodeVersionName, nodeVersion); err != nil { log.FromContext(ctx).Error(err, "Unable to get node version") return } // get components configmap componentsConfigMap := &corev1.ConfigMap{} if err := r.Get(ctx, types.NamespacedName{Name: nodeVersion.Spec.KubernetesComponentsReference, Namespace: "kube-system"}, componentsConfigMap); err != nil { log.FromContext(ctx).Error(err, "Unable to get components configmap") return } // unmarshal components from configmap componentsRaw := componentsConfigMap.Data[mainconstants.ComponentsListKey] var clusterComponents components.Components if err := json.Unmarshal([]byte(componentsRaw), &clusterComponents); err != nil { log.FromContext(ctx).Error(err, "Unable to unmarshal components") return } log.FromContext(ctx).Info("Starting cluster upgrade", "clusterVersion", nodeVersion.Spec.KubernetesClusterVersion) kubeadm, err := clusterComponents.GetKubeadmComponent() if err != nil { log.FromContext(ctx).Error(err, "Unable to get kubeadm component") return } // talk to the upgrade-agent to start the upgrade if err := r.Upgrade(ctx, kubeadm.URL, kubeadm.Hash, nodeVersion.Spec.KubernetesClusterVersion); err != nil { log.FromContext(ctx).Error(err, "Unable to upgrade cluster") return } // set the cluster version upgrade status to "completed" if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { nodeVersion := &updatev1alpha1.NodeVersion{} if err := r.Get(ctx, nodeVersionName, nodeVersion); err != nil { log.FromContext(ctx).Error(err, "Unable to get node version") return err } nodeVersion.Status.ActiveClusterVersionUpgrade = false if err := r.Status().Update(ctx, nodeVersion); err != nil { log.FromContext(ctx).Error(err, "Unable to update node version status") return err } return nil }); err != nil { log.FromContext(ctx).Error(err, "Unable to set cluster version upgrade status to completed") return } } // pairDonorsAndHeirs takes a list of outdated nodes (that do not yet have a heir node) and a list of mint nodes (nodes using the latest image) and pairs matching nodes to become donor and heir. // outdatedNodes is also updated with heir annotations. func (r *NodeVersionReconciler) pairDonorsAndHeirs(ctx context.Context, controller metav1.Object, outdatedNodes []corev1.Node, mintNodes []mintNode) []replacementPair { logr := log.FromContext(ctx) var pairs []replacementPair for _, mintNode := range mintNodes { var foundReplacement bool // find outdated node in the same group for i := range outdatedNodes { outdatedNode := &outdatedNodes[i] if !strings.EqualFold(outdatedNode.Annotations[scalingGroupAnnotation], mintNode.pendingNode.Spec.ScalingGroupID) || len(outdatedNode.Annotations[heirAnnotation]) != 0 { continue } // mark as donor <-> heir pair and delete "pending node" resource if err := r.patchNodeAnnotations(ctx, mintNode.node.Name, map[string]string{donorAnnotation: outdatedNode.Name}); err != nil { logr.Error(err, "Unable to update mint node donor annotation", "mintNode", mintNode.node.Name) break } if mintNode.node.Annotations == nil { mintNode.node.Annotations = make(map[string]string) } mintNode.node.Annotations[donorAnnotation] = outdatedNode.Name if err := r.patchNodeAnnotations(ctx, outdatedNode.Name, map[string]string{heirAnnotation: mintNode.node.Name}); err != nil { logr.Error(err, "Unable to update outdated node heir annotation", "outdatedNode", outdatedNode.Name) break } outdatedNode.Annotations[heirAnnotation] = mintNode.node.Name if err := r.Delete(ctx, &mintNode.pendingNode); err != nil { logr.Error(err, "Unable to delete pending node resource", "pendingNode", mintNode.pendingNode.Name) break } pairs = append(pairs, replacementPair{ donor: *outdatedNode, heir: mintNode.node, }) logr.Info("New matched up pair", "donorNode", outdatedNode.Name, "heirNode", mintNode.node.Name) foundReplacement = true break } if !foundReplacement { logr.Info("No replacement found for mint node. Marking as outdated.", "mintNode", mintNode.node.Name, "scalingGroupID", mintNode.pendingNode.Spec.ScalingGroupID) // mint node was not needed as heir. Cleanup obsolete resources. if err := r.Delete(ctx, &mintNode.pendingNode); err != nil { logr.Error(err, "Unable to delete pending node resource", "pendingNode", mintNode.pendingNode.Name) break } if err := r.patchNodeAnnotations(ctx, mintNode.node.Name, map[string]string{obsoleteAnnotation: "true"}); err != nil { logr.Error(err, "Unable to update mint node obsolete annotation", "mintNode", mintNode.node.Name) break } if _, err := r.deleteNode(ctx, controller, mintNode.node); err != nil { logr.Error(err, "Unable to delete obsolete node", "obsoleteNode", mintNode.node.Name) break } } } return pairs } // matchDonorsAndHeirs takes separate lists of donors and heirs and matches each heir to its previously chosen donor. // a list of replacement pairs is returned. // donors and heirs with invalid pair references are cleaned up (the donor/heir annotations gets removed). func (r *NodeVersionReconciler) matchDonorsAndHeirs(ctx context.Context, pairs []replacementPair, donors, heirs []corev1.Node) []replacementPair { logr := log.FromContext(ctx) for _, heir := range heirs { var foundPair bool for _, donor := range donors { if heir.Annotations[donorAnnotation] == donor.Name { pairs = append(pairs, replacementPair{ donor: donor, heir: heir, }) foundPair = true break } } if !foundPair { // remove donor annotation from heir if err := r.patchUnsetNodeAnnotations(ctx, heir.Name, []string{donorAnnotation}); err != nil { logr.Error(err, "Unable to remove donor annotation from heir", "heirNode", heir.Name) } delete(heir.Annotations, donorAnnotation) } } // iterate over all donors and remove donor annotation from nodes that are not in a pair // (cleanup) for _, donor := range donors { var foundPair bool for _, pair := range pairs { if pair.donor.Name == donor.Name { foundPair = true break } } if !foundPair { // remove heir annotation from donor if err := r.patchUnsetNodeAnnotations(ctx, donor.Name, []string{heirAnnotation}); err != nil { logr.Error(err, "Unable to remove heir annotation from donor", "donorNode", donor.Name) } delete(donor.Annotations, heirAnnotation) } } return pairs } // ensureAutoscaling will ensure that the autoscaling is enabled or disabled as needed. func (r *NodeVersionReconciler) ensureAutoscaling(ctx context.Context, autoscalingEnabled bool, wantAutoscalingEnabled bool) error { if autoscalingEnabled == wantAutoscalingEnabled { return nil } var autoscalingStrategiesList updatev1alpha1.AutoscalingStrategyList if err := r.List(ctx, &autoscalingStrategiesList); err != nil { return err } for i := range autoscalingStrategiesList.Items { if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { var autoscalingStrategy updatev1alpha1.AutoscalingStrategy if err := r.Get(ctx, types.NamespacedName{Name: autoscalingStrategiesList.Items[i].Name}, &autoscalingStrategy); err != nil { return err } autoscalingStrategy.Spec.Enabled = wantAutoscalingEnabled return r.Client.Update(ctx, &autoscalingStrategy) }); err != nil { return err } } return nil } // replaceNode take a donor and a heir node and then replaces the donor node by the heir node. // // Replacing nodes involves the following steps: // Labels are copied from the donor node to the heir node. // Readiness of the heir node is awaited. // Deletion of the donor node is scheduled. func (r *NodeVersionReconciler) replaceNode(ctx context.Context, controller metav1.Object, pair replacementPair) (bool, error) { logr := log.FromContext(ctx) if !reflect.DeepEqual(nodeutil.FilterLabels(pair.donor.Labels), nodeutil.FilterLabels(pair.heir.Labels)) { if err := r.copyNodeLabels(ctx, pair.donor.Name, pair.heir.Name); err != nil { logr.Error(err, "Copy node labels") return false, err } } heirReady := nodeutil.Ready(&pair.heir) if !heirReady { return false, nil } return r.deleteNode(ctx, controller, pair.donor) } // deleteNode safely removes a node from the cluster and issues termination of the node by the CSP. func (r *NodeVersionReconciler) deleteNode(ctx context.Context, controller metav1.Object, node corev1.Node) (bool, error) { logr := log.FromContext(ctx) // cordon & drain node using node-maintenance-operator var foundNodeMaintenance nodemaintenancev1beta1.NodeMaintenance err := r.Get(ctx, types.NamespacedName{Name: node.Name}, &foundNodeMaintenance) if client.IgnoreNotFound(err) != nil { // unexpected error occurred return false, err } if err != nil { // NodeMaintenance resource does not exist yet nodeMaintenance := nodemaintenancev1beta1.NodeMaintenance{ ObjectMeta: metav1.ObjectMeta{ Name: node.Name, }, Spec: nodemaintenancev1beta1.NodeMaintenanceSpec{ NodeName: node.Name, Reason: "node is replaced due to OS image update", }, } return false, r.Create(ctx, &nodeMaintenance) } // NodeMaintenance resource already exists. Check cordon & drain status. if foundNodeMaintenance.Status.Phase != nodemaintenancev1beta1.MaintenanceSucceeded { logr.Info("Cordon & drain in progress", "maintenanceNode", node.Name, "nodeMaintenanceStatus", foundNodeMaintenance.Status.Phase) return false, nil } // node is unused & ready to be replaced if nodeutil.IsControlPlaneNode(&node) { nodeVPCIP, err := nodeutil.VPCIP(&node) if err != nil { logr.Error(err, "Unable to get node VPC IP") return false, err } if err := r.RemoveEtcdMemberFromCluster(ctx, nodeVPCIP); err != nil { logr.Error(err, "Unable to remove etcd member from cluster") return false, err } } if err := r.Delete(ctx, &node); err != nil { logr.Error(err, "Deleting node") return false, err } logr.Info("Deleted node", "deletedNode", node.Name) // schedule deletion of the node with the CSP if err := r.DeleteNode(ctx, node.Spec.ProviderID); err != nil { logr.Error(err, "Scheduling CSP node deletion", "providerID", node.Spec.ProviderID) } deadline := metav1.NewTime(time.Now().Add(nodeLeaveTimeout)) pendingNode := updatev1alpha1.PendingNode{ ObjectMeta: metav1.ObjectMeta{ Namespace: controller.GetNamespace(), Name: node.Name, }, Spec: updatev1alpha1.PendingNodeSpec{ ProviderID: node.Spec.ProviderID, ScalingGroupID: node.Annotations[scalingGroupAnnotation], NodeName: node.Name, Goal: updatev1alpha1.NodeGoalLeave, Deadline: &deadline, }, } if err := ctrl.SetControllerReference(controller, &pendingNode, r.Scheme); err != nil { return false, err } if err := r.Create(ctx, &pendingNode); err != nil { logr.Error(err, "Tracking CSP node deletion") } return true, nil } // createNewNodes creates new nodes using up to date images as replacement for outdated nodes. func (r *NodeVersionReconciler) createNewNodes(ctx context.Context, config newNodeConfig) error { logr := log.FromContext(ctx) if config.newNodesBudget < 1 || len(config.outdatedNodes) == 0 { return nil } outdatedNodesPerScalingGroup := make(map[string]int) for _, node := range config.outdatedNodes { // skip outdated nodes that got assigned an heir in this Reconcile call if len(node.Annotations[heirAnnotation]) != 0 { continue } outdatedNodesPerScalingGroup[strings.ToLower(node.Annotations[scalingGroupAnnotation])]++ } pendingJoiningNodesPerScalingGroup := make(map[string]int) for _, pendingNode := range config.pendingNodes { // skip pending nodes that are not joining if pendingNode.Spec.Goal != updatev1alpha1.NodeGoalJoin { continue } pendingJoiningNodesPerScalingGroup[strings.ToLower(pendingNode.Spec.ScalingGroupID)]++ } requiredNodesPerScalingGroup := make(map[string]int, len(outdatedNodesPerScalingGroup)) for scalingGroupID := range outdatedNodesPerScalingGroup { scalingGroupID := strings.ToLower(scalingGroupID) if pendingJoiningNodesPerScalingGroup[scalingGroupID] < outdatedNodesPerScalingGroup[scalingGroupID] { requiredNodesPerScalingGroup[scalingGroupID] = outdatedNodesPerScalingGroup[scalingGroupID] - pendingJoiningNodesPerScalingGroup[scalingGroupID] } } for scalingGroupID := range requiredNodesPerScalingGroup { scalingGroup, ok := config.scalingGroupByID[scalingGroupID] if !ok { logr.Info("Scaling group does not have matching resource", "scalingGroup", scalingGroupID, "scalingGroups", config.scalingGroupByID) continue } if !strings.EqualFold(scalingGroup.Status.ImageReference, config.desiredNodeVersion.Spec.ImageReference) { logr.Info("Scaling group does not use latest image", "scalingGroup", scalingGroupID, "usedImage", scalingGroup.Status.ImageReference, "wantedImage", config.desiredNodeVersion.Spec.ImageReference) continue } if requiredNodesPerScalingGroup[scalingGroupID] == 0 { continue } for { if config.newNodesBudget == 0 { return nil } if requiredNodesPerScalingGroup[scalingGroupID] == 0 { break } logr.Info("Creating new node", "scalingGroup", scalingGroupID) nodeName, providerID, err := r.CreateNode(ctx, scalingGroup.Spec.GroupID) if err != nil { return err } deadline := metav1.NewTime(time.Now().Add(nodeJoinTimeout)) pendingNode := &updatev1alpha1.PendingNode{ ObjectMeta: metav1.ObjectMeta{Name: nodeName}, Spec: updatev1alpha1.PendingNodeSpec{ ProviderID: providerID, ScalingGroupID: scalingGroup.Spec.GroupID, NodeName: nodeName, Goal: updatev1alpha1.NodeGoalJoin, Deadline: &deadline, }, } if err := ctrl.SetControllerReference(&config.desiredNodeVersion, pendingNode, r.Scheme); err != nil { return err } if err := r.Create(ctx, pendingNode); err != nil { return err } logr.Info("Created new node", "createdNode", nodeName, "scalingGroup", scalingGroupID) requiredNodesPerScalingGroup[scalingGroupID]-- config.newNodesBudget-- } } return nil } // patchNodeAnnotations attempts to patch node annotations in a retry loop. func (r *NodeVersionReconciler) patchNodeAnnotations(ctx context.Context, nodeName string, annotations map[string]string) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { var node corev1.Node if err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil { return err } patchedNode := node.DeepCopy() patch := patch.SetAnnotations(&node, patchedNode, annotations) return r.Client.Patch(ctx, patchedNode, patch) }) } // patchNodeAnnotations attempts to remove node annotations using a patch in a retry loop. func (r *NodeVersionReconciler) patchUnsetNodeAnnotations(ctx context.Context, nodeName string, annotationKeys []string) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { var node corev1.Node if err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil { return err } patchedNode := node.DeepCopy() patch := patch.UnsetAnnotations(&node, patchedNode, annotationKeys) return r.Client.Patch(ctx, patchedNode, patch) }) } // copyNodeLabels attempts to copy all node labels (except for reserved labels) from one node to another in a retry loop. func (r *NodeVersionReconciler) copyNodeLabels(ctx context.Context, oldNodeName, newNodeName string) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { var oldNode corev1.Node if err := r.Get(ctx, types.NamespacedName{Name: oldNodeName}, &oldNode); err != nil { return err } var newNode corev1.Node if err := r.Get(ctx, types.NamespacedName{Name: newNodeName}, &newNode); err != nil { return err } patchedNode := newNode.DeepCopy() patch := patch.SetLabels(&newNode, patchedNode, nodeutil.FilterLabels(oldNode.GetLabels())) return r.Client.Patch(ctx, patchedNode, patch) }) } // tryUpdateStatus attempts to update the NodeVersion status field in a retry loop. func (r *NodeVersionReconciler) tryUpdateStatus(ctx context.Context, name types.NamespacedName, status updatev1alpha1.NodeVersionStatus) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { var nodeVersion updatev1alpha1.NodeVersion if err := r.Get(ctx, name, &nodeVersion); err != nil { return err } nodeVersion.Status = *status.DeepCopy() return r.Status().Update(ctx, &nodeVersion) }) } // nodeVersionStatus generates the NodeVersion.Status field given node groups and the budget for new nodes. func nodeVersionStatus(scheme *runtime.Scheme, groups nodeGroups, pendingNodes []updatev1alpha1.PendingNode, invalidNodes []corev1.Node, newNodesBudget int) updatev1alpha1.NodeVersionStatus { var status updatev1alpha1.NodeVersionStatus outdatedCondition := metav1.Condition{ Type: updatev1alpha1.ConditionOutdated, } if len(groups.Outdated)+len(groups.Heirs)+len(pendingNodes)+len(groups.Obsolete) == 0 { outdatedCondition.Status = metav1.ConditionFalse outdatedCondition.Reason = conditionNodeVersionUpToDateReason outdatedCondition.Message = conditionNodeVersionUpToDateMessage } else { outdatedCondition.Status = metav1.ConditionTrue outdatedCondition.Reason = conditionNodeVersionOutOfDateReason outdatedCondition.Message = conditionNodeVersionOutOfDateMessage } meta.SetStatusCondition(&status.Conditions, outdatedCondition) for _, node := range groups.Outdated { nodeRef, err := ref.GetReference(scheme, &node) if err != nil { continue } status.Outdated = append(status.Outdated, *nodeRef) } for _, node := range groups.UpToDate { nodeRef, err := ref.GetReference(scheme, &node) if err != nil { continue } status.UpToDate = append(status.UpToDate, *nodeRef) } for _, node := range groups.Donors { nodeRef, err := ref.GetReference(scheme, &node) if err != nil { continue } status.Donors = append(status.Donors, *nodeRef) } for _, node := range groups.Heirs { nodeRef, err := ref.GetReference(scheme, &node) if err != nil { continue } status.Heirs = append(status.Heirs, *nodeRef) } for _, node := range groups.AwaitingAnnotation { nodeRef, err := ref.GetReference(scheme, &node) if err != nil { continue } status.AwaitingAnnotation = append(status.Heirs, *nodeRef) } for _, node := range groups.Obsolete { nodeRef, err := ref.GetReference(scheme, &node) if err != nil { continue } status.Obsolete = append(status.Obsolete, *nodeRef) } for _, node := range invalidNodes { nodeRef, err := ref.GetReference(scheme, &node) if err != nil { continue } status.Invalid = append(status.Invalid, *nodeRef) } for _, mintN := range groups.Mint { nodeRef, err := ref.GetReference(scheme, &mintN.node) if err != nil { continue } status.Mints = append(status.Mints, *nodeRef) } for _, pending := range pendingNodes { pendingRef, err := ref.GetReference(scheme, &pending) if err != nil { continue } status.Pending = append(status.Pending, *pendingRef) } status.Budget = uint32(newNodesBudget) return status } // mintNode is a pair of a freshly joined kubernetes nodes // and the corresponding (left over) pending node resource. type mintNode struct { node corev1.Node pendingNode updatev1alpha1.PendingNode } // replacementPair is a pair of a donor (outdated node that should be replaced) // and a heir (up to date node that inherits node labels). type replacementPair struct { donor corev1.Node heir corev1.Node } // nodeGroups is a collection of disjoint sets of nodes. // every properly annotated kubernetes node can be placed in exactly one of the sets. type nodeGroups struct { // Outdated nodes are nodes that // do not use the most recent version AND // are not yet a donor to an up to date heir node Outdated, // UpToDate nodes are nodes that // use the most recent version, // are not an heir to an outdated donor node AND // are not mint nodes UpToDate, // Donors are nodes that // do not use the most recent version AND // are paired up with an up to date heir node Donors, // Heirs are nodes that // use the most recent version AND // are paired up with an outdated donor node Heirs, // AwaitingAnnotation nodes are nodes that // are missing annotations. AwaitingAnnotation, // Obsolete nodes are nodes that // were created by the operator as replacements (heirs) // but could not get paired up with a donor node. // They will be cleaned up by the operator. Obsolete []corev1.Node // Mint nodes are nodes that // use the most recent version AND // were created by the operator as replacements (heirs) // and are awaiting pairing up with a donor node. Mint []mintNode } // groupNodes classifies nodes by placing each into exactly one group. func groupNodes(nodes []corev1.Node, pendingNodes []updatev1alpha1.PendingNode, latestImageReference string, latestK8sComponentsReference string) nodeGroups { groups := nodeGroups{} for _, node := range nodes { if node.Annotations[obsoleteAnnotation] == "true" { groups.Obsolete = append(groups.Obsolete, node) continue } if node.Annotations[nodeImageAnnotation] == "" || node.Annotations[mainconstants.NodeKubernetesComponentsAnnotationKey] == "" { groups.AwaitingAnnotation = append(groups.AwaitingAnnotation, node) continue } if !strings.EqualFold(node.Annotations[nodeImageAnnotation], latestImageReference) || !strings.EqualFold(node.Annotations[mainconstants.NodeKubernetesComponentsAnnotationKey], latestK8sComponentsReference) { if heir := node.Annotations[heirAnnotation]; heir != "" { groups.Donors = append(groups.Donors, node) } else { groups.Outdated = append(groups.Outdated, node) } continue } if node.Annotations[donorAnnotation] != "" { groups.Heirs = append(groups.Heirs, node) continue } if pendingNode := nodeutil.FindPending(pendingNodes, &node); pendingNode != nil { groups.Mint = append(groups.Mint, mintNode{ node: node, pendingNode: *pendingNode, }) continue } groups.UpToDate = append(groups.UpToDate, node) } return groups } type nodeReplacer interface { // GetNodeImage retrieves the image currently used by a node. GetNodeImage(ctx context.Context, providerID string) (string, error) // GetScalingGroupID retrieves the scaling group that a node is part of. GetScalingGroupID(ctx context.Context, providerID string) (string, error) // CreateNode creates a new node inside a specified scaling group at the CSP and returns its future name and provider id. CreateNode(ctx context.Context, scalingGroupID string) (nodeName, providerID string, err error) // DeleteNode starts the termination of the node at the CSP. DeleteNode(ctx context.Context, providerID string) error } type etcdRemover interface { // RemoveEtcdMemberFromCluster removes an etcd member from the cluster. RemoveEtcdMemberFromCluster(ctx context.Context, vpcIP string) error } type clusterUpgrader interface { // UpgradeCluster upgrades the cluster to the specified version. Upgrade(ctx context.Context, KubeadmURL, KubeadmHash, WantedKubernetesVersion string) error } type kubernetesServerVersionGetter interface { ServerVersion() (*version.Info, error) } type newNodeConfig struct { desiredNodeVersion updatev1alpha1.NodeVersion outdatedNodes []corev1.Node pendingNodes []updatev1alpha1.PendingNode scalingGroupByID map[string]updatev1alpha1.ScalingGroup newNodesBudget int }