mirror of
https://github.com/edgelesssys/constellation.git
synced 2025-06-20 12:14:23 -04:00
[node operator] NodeImage controller impl
Signed-off-by: Malte Poll <mp@edgeless.systems>
This commit is contained in:
parent
7b6205e900
commit
12ce267bac
3 changed files with 1137 additions and 11 deletions
|
@ -3,38 +3,183 @@ package controllers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
nodeutil "github.com/edgelesssys/constellation/operators/constellation-node-operator/internal/node"
|
||||||
|
"github.com/edgelesssys/constellation/operators/constellation-node-operator/internal/patch"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
ref "k8s.io/client-go/tools/reference"
|
||||||
|
"k8s.io/client-go/util/retry"
|
||||||
ctrl "sigs.k8s.io/controller-runtime"
|
ctrl "sigs.k8s.io/controller-runtime"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||||
|
|
||||||
updatev1alpha1 "github.com/edgelesssys/constellation/operators/constellation-node-operator/api/v1alpha1"
|
updatev1alpha1 "github.com/edgelesssys/constellation/operators/constellation-node-operator/api/v1alpha1"
|
||||||
|
nodemaintenancev1beta1 "github.com/medik8s/node-maintenance-operator/api/v1beta1"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// nodeOverprovisionLimit is the maximum number of extra nodes created during the update procedure at any point in time.
|
||||||
|
nodeOverprovisionLimit = 2
|
||||||
|
// nodeJoinTimeout is the time limit pending nodes have to join the cluster before being terminated.
|
||||||
|
nodeJoinTimeout = time.Minute * 15
|
||||||
|
// nodeLeaveTimeout is the time limit pending nodes have to leave the cluster and being terminated.
|
||||||
|
nodeLeaveTimeout = time.Minute
|
||||||
|
donorAnnotation = "constellation.edgeless.systems/donor"
|
||||||
|
heirAnnotation = "constellation.edgeless.systems/heir"
|
||||||
|
scalingGroupAnnotation = "constellation.edgeless.systems/scaling-group-id"
|
||||||
|
nodeImageAnnotation = "constellation.edgeless.systems/node-image"
|
||||||
|
obsoleteAnnotation = "constellation.edgeless.systems/obsolete"
|
||||||
|
conditionNodeImageUpToDateReason = "NodeImagesUpToDate"
|
||||||
|
conditionNodeImageUpToDateMessage = "Node image of every node is up to date"
|
||||||
|
conditionNodeImageOutOfDateReason = "NodeImagesOutOfDate"
|
||||||
|
conditionNodeImageOutOfDateMessage = "Some node images are out of date"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NodeImageReconciler reconciles a NodeImage object
|
// NodeImageReconciler reconciles a NodeImage object
|
||||||
type NodeImageReconciler struct {
|
type NodeImageReconciler struct {
|
||||||
|
nodeReplacer
|
||||||
client.Client
|
client.Client
|
||||||
Scheme *runtime.Scheme
|
Scheme *runtime.Scheme
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewNodeImageReconciler creates a new NodeImageReconciler.
|
||||||
|
func NewNodeImageReconciler(nodeReplacer nodeReplacer, client client.Client, scheme *runtime.Scheme) *NodeImageReconciler {
|
||||||
|
return &NodeImageReconciler{
|
||||||
|
nodeReplacer: nodeReplacer,
|
||||||
|
Client: client,
|
||||||
|
Scheme: scheme,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//+kubebuilder:rbac:groups=update.edgeless.systems,resources=nodeimages,verbs=get;list;watch;create;update;patch;delete
|
//+kubebuilder:rbac:groups=update.edgeless.systems,resources=nodeimages,verbs=get;list;watch;create;update;patch;delete
|
||||||
//+kubebuilder:rbac:groups=update.edgeless.systems,resources=nodeimages/status,verbs=get;update;patch
|
//+kubebuilder:rbac:groups=update.edgeless.systems,resources=nodeimages/status,verbs=get;update;patch
|
||||||
//+kubebuilder:rbac:groups=update.edgeless.systems,resources=nodeimages/finalizers,verbs=update
|
//+kubebuilder:rbac:groups=update.edgeless.systems,resources=nodeimages/finalizers,verbs=update
|
||||||
|
|
||||||
// Reconcile is part of the main kubernetes reconciliation loop which aims to
|
// Reconcile replaces outdated nodes (using an old image) with new nodes (using a new image) as specified in the NodeImage spec.
|
||||||
// move the current state of the cluster closer to the desired state.
|
|
||||||
// TODO(user): Modify the Reconcile function to compare the state specified by
|
|
||||||
// the NodeImage object against the actual cluster state, and then
|
|
||||||
// perform operations to make the cluster state reflect the state specified by
|
|
||||||
// the user.
|
|
||||||
//
|
|
||||||
// For more details, check Reconcile and its Result here:
|
|
||||||
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.2/pkg/reconcile
|
|
||||||
func (r *NodeImageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
func (r *NodeImageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||||
_ = log.FromContext(ctx)
|
logr := log.FromContext(ctx)
|
||||||
|
logr.Info("Reconciling NodeImage")
|
||||||
|
|
||||||
// TODO(user): your logic here
|
var desiredNodeImage updatev1alpha1.NodeImage
|
||||||
|
if err := r.Get(ctx, req.NamespacedName, &desiredNodeImage); err != nil {
|
||||||
|
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||||
|
}
|
||||||
|
// get list of autoscaling strategies
|
||||||
|
// there should be exactly one autoscaling strategy but we do not specify its name.
|
||||||
|
// if there is no autoscaling strategy, it is assumed that autoscaling is disabled.
|
||||||
|
var autoscalingStrategiesList updatev1alpha1.AutoscalingStrategyList
|
||||||
|
if err := r.List(ctx, &autoscalingStrategiesList); err != nil {
|
||||||
|
return ctrl.Result{}, err
|
||||||
|
}
|
||||||
|
var autoscalingEnabled bool
|
||||||
|
for _, autoscalingStrategy := range autoscalingStrategiesList.Items {
|
||||||
|
if autoscalingStrategy.Status.Enabled {
|
||||||
|
autoscalingEnabled = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// get list of all nodes
|
||||||
|
var nodeList corev1.NodeList
|
||||||
|
if err := r.List(ctx, &nodeList); err != nil {
|
||||||
|
logr.Error(err, "Unable to list nodes")
|
||||||
|
return ctrl.Result{}, err
|
||||||
|
}
|
||||||
|
// get list of all pending nodes
|
||||||
|
var pendingNodeList updatev1alpha1.PendingNodeList
|
||||||
|
if err := r.List(ctx, &pendingNodeList, client.InNamespace(req.Namespace)); err != nil {
|
||||||
|
logr.Error(err, "Unable to list pending nodes")
|
||||||
|
return ctrl.Result{}, err
|
||||||
|
}
|
||||||
|
// get list of all scaling groups
|
||||||
|
var scalingGroupList updatev1alpha1.ScalingGroupList
|
||||||
|
if err := r.List(ctx, &scalingGroupList, client.InNamespace(req.Namespace)); err != nil {
|
||||||
|
logr.Error(err, "Unable to list scaling groups")
|
||||||
|
return ctrl.Result{}, err
|
||||||
|
}
|
||||||
|
scalingGroupByID := make(map[string]updatev1alpha1.ScalingGroup, len(scalingGroupList.Items))
|
||||||
|
for _, scalingGroup := range scalingGroupList.Items {
|
||||||
|
scalingGroupByID[scalingGroup.Spec.GroupID] = scalingGroup
|
||||||
|
}
|
||||||
|
annotatedNodes, invalidNodes := r.annotateNodes(ctx, nodeList.Items)
|
||||||
|
groups := groupNodes(annotatedNodes, pendingNodeList.Items, desiredNodeImage.Spec.ImageReference)
|
||||||
|
|
||||||
|
logr.Info("Grouped nodes",
|
||||||
|
"outdatedNodes", len(groups.Outdated),
|
||||||
|
"upToDateNodes", len(groups.UpToDate),
|
||||||
|
"donorNodes", len(groups.Donors),
|
||||||
|
"heirNodes", len(groups.Heirs),
|
||||||
|
"mintNodes", len(groups.Mint),
|
||||||
|
"pendingNodes", len(pendingNodeList.Items),
|
||||||
|
"obsoleteNodes", len(groups.Obsolete),
|
||||||
|
"invalidNodes", len(invalidNodes))
|
||||||
|
|
||||||
|
// extraNodes are nodes that exist in the scaling group which cannot be used for regular workloads.
|
||||||
|
// consists of nodes that are
|
||||||
|
// - being created (joining)
|
||||||
|
// - being destroyed (leaving)
|
||||||
|
// - heirs to outdated nodes
|
||||||
|
extraNodes := len(groups.Heirs) + len(pendingNodeList.Items)
|
||||||
|
// newNodesBudget is the maximum number of new nodes that can be created in this Reconcile call.
|
||||||
|
var newNodesBudget int
|
||||||
|
if extraNodes < nodeOverprovisionLimit {
|
||||||
|
newNodesBudget = nodeOverprovisionLimit - extraNodes
|
||||||
|
}
|
||||||
|
logr.Info("Budget for new nodes", "newNodesBudget", newNodesBudget)
|
||||||
|
|
||||||
|
status := nodeImageStatus(r.Scheme, groups, pendingNodeList.Items, invalidNodes, newNodesBudget)
|
||||||
|
if err := r.tryUpdateStatus(ctx, req.NamespacedName, status); err != nil {
|
||||||
|
logr.Error(err, "Updating status")
|
||||||
|
}
|
||||||
|
|
||||||
|
allNodesUpToDate := len(groups.Outdated)+len(groups.Heirs)+len(pendingNodeList.Items)+len(groups.Obsolete) == 0
|
||||||
|
if err := r.ensureAutoscaling(ctx, autoscalingEnabled, allNodesUpToDate); err != nil {
|
||||||
|
logr.Error(err, "Ensure autoscaling", "autoscalingEnabledIs", autoscalingEnabled, "autoscalingEnabledWant", allNodesUpToDate)
|
||||||
|
return ctrl.Result{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if allNodesUpToDate {
|
||||||
|
logr.Info("All node images up to date")
|
||||||
|
return ctrl.Result{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// find pairs of mint nodes and outdated nodes in the same scaling group to become donor & heir
|
||||||
|
replacementPairs := r.pairDonorsAndHeirs(ctx, &desiredNodeImage, groups.Outdated, groups.Mint)
|
||||||
|
// extend replacement pairs to include existing pairs of donors and heirs
|
||||||
|
replacementPairs = r.matchDonorsAndHeirs(ctx, replacementPairs, groups.Donors, groups.Heirs)
|
||||||
|
// replace donor nodes by heirs
|
||||||
|
for _, pair := range replacementPairs {
|
||||||
|
logr.Info("Replacing node", "donorNode", pair.donor.Name, "heirNode", pair.heir.Name)
|
||||||
|
if err := r.replaceNode(ctx, &desiredNodeImage, pair); err != nil {
|
||||||
|
logr.Error(err, "Replacing node")
|
||||||
|
return ctrl.Result{}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// only create new nodes if the autoscaler is disabled.
|
||||||
|
// otherwise, new nodes will also be created by the autoscaler
|
||||||
|
if autoscalingEnabled {
|
||||||
|
return ctrl.Result{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.createNewNodes(ctx, desiredNodeImage, groups.Outdated, pendingNodeList.Items, scalingGroupByID, newNodesBudget); err != nil {
|
||||||
|
return ctrl.Result{}, err
|
||||||
|
}
|
||||||
|
// cleanup obsolete nodes
|
||||||
|
for _, node := range groups.Obsolete {
|
||||||
|
if _, err := r.deleteNode(ctx, &desiredNodeImage, node); err != nil {
|
||||||
|
logr.Error(err, "Unable to remove obsolete node")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ctrl.Result{}, nil
|
return ctrl.Result{}, nil
|
||||||
}
|
}
|
||||||
|
@ -43,5 +188,590 @@ func (r *NodeImageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
|
||||||
func (r *NodeImageReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
func (r *NodeImageReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||||
return ctrl.NewControllerManagedBy(mgr).
|
return ctrl.NewControllerManagedBy(mgr).
|
||||||
For(&updatev1alpha1.NodeImage{}).
|
For(&updatev1alpha1.NodeImage{}).
|
||||||
|
Watches(
|
||||||
|
&source.Kind{Type: &updatev1alpha1.ScalingGroup{}},
|
||||||
|
handler.EnqueueRequestsFromMapFunc(r.findObjectsForScalingGroup),
|
||||||
|
builder.WithPredicates(scalingGroupImageChangedPredicate()),
|
||||||
|
).
|
||||||
|
Watches(
|
||||||
|
&source.Kind{Type: &updatev1alpha1.AutoscalingStrategy{}},
|
||||||
|
handler.EnqueueRequestsFromMapFunc(r.findAllNodeImages),
|
||||||
|
builder.WithPredicates(autoscalerEnabledStatusChangedPredicate()),
|
||||||
|
).
|
||||||
|
Watches(
|
||||||
|
&source.Kind{Type: &corev1.Node{}},
|
||||||
|
handler.EnqueueRequestsFromMapFunc(r.findAllNodeImages),
|
||||||
|
builder.WithPredicates(nodeReadyPredicate()),
|
||||||
|
).
|
||||||
|
Watches(
|
||||||
|
&source.Kind{Type: &nodemaintenancev1beta1.NodeMaintenance{}},
|
||||||
|
handler.EnqueueRequestsFromMapFunc(r.findAllNodeImages),
|
||||||
|
builder.WithPredicates(nodeMaintenanceSucceededPredicate()),
|
||||||
|
).
|
||||||
|
Owns(&updatev1alpha1.PendingNode{}).
|
||||||
Complete(r)
|
Complete(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// annotateNodes takes all nodes of the cluster and annotates them with the scaling group they are in and the image they are using.
|
||||||
|
func (r *NodeImageReconciler) annotateNodes(ctx context.Context, nodes []corev1.Node) (annotatedNodes, invalidNodes []corev1.Node) {
|
||||||
|
logr := log.FromContext(ctx)
|
||||||
|
for _, node := range nodes {
|
||||||
|
annotations := make(map[string]string)
|
||||||
|
if node.Spec.ProviderID == "" {
|
||||||
|
logr.Info("Node is missing providerID", "invalidNode", node.Name)
|
||||||
|
invalidNodes = append(invalidNodes, node)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := node.Annotations[scalingGroupAnnotation]; !ok {
|
||||||
|
scalingGroupID, err := r.nodeReplacer.GetScalingGroupID(ctx, node.Spec.ProviderID)
|
||||||
|
if err != nil {
|
||||||
|
logr.Error(err, "Unable to get node scaling group")
|
||||||
|
invalidNodes = append(invalidNodes, node)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
annotations[scalingGroupAnnotation] = scalingGroupID
|
||||||
|
}
|
||||||
|
if _, ok := node.Annotations[nodeImageAnnotation]; !ok {
|
||||||
|
nodeImage, err := r.nodeReplacer.GetNodeImage(ctx, node.Spec.ProviderID)
|
||||||
|
if err != nil {
|
||||||
|
logr.Error(err, "Unable to get node image")
|
||||||
|
invalidNodes = append(invalidNodes, node)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
annotations[nodeImageAnnotation] = nodeImage
|
||||||
|
}
|
||||||
|
if len(annotations) > 0 {
|
||||||
|
if err := r.patchNodeAnnotations(ctx, node.Name, annotations); err != nil {
|
||||||
|
logr.Error(err, "Unable to patch node annotations")
|
||||||
|
invalidNodes = append(invalidNodes, node)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := r.Get(ctx, types.NamespacedName{Name: node.Name}, &node); err != nil {
|
||||||
|
logr.Error(err, "Unable to get patched node")
|
||||||
|
invalidNodes = append(invalidNodes, node)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
annotatedNodes = append(annotatedNodes, node)
|
||||||
|
}
|
||||||
|
return annotatedNodes, invalidNodes
|
||||||
|
}
|
||||||
|
|
||||||
|
// pairDonorsAndHeirs takes a list of outdated nodes (that do not yet have a heir node) and a list of mint nodes (nodes using the latest image) and pairs matching nodes to become donor and heir.
|
||||||
|
// outdatedNodes is also updated with heir annotations.
|
||||||
|
func (r *NodeImageReconciler) pairDonorsAndHeirs(ctx context.Context, controller metav1.Object, outdatedNodes []corev1.Node, mintNodes []mintNode) []replacementPair {
|
||||||
|
logr := log.FromContext(ctx)
|
||||||
|
var pairs []replacementPair
|
||||||
|
for _, mintNode := range mintNodes {
|
||||||
|
var foundReplacement bool
|
||||||
|
// find outdated node in the same group
|
||||||
|
for i := range outdatedNodes {
|
||||||
|
outdatedNode := &outdatedNodes[i]
|
||||||
|
if outdatedNode.Annotations[scalingGroupAnnotation] != mintNode.pendingNode.Spec.ScalingGroupID || len(outdatedNode.Annotations[heirAnnotation]) != 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// mark as donor <-> heir pair and delete "pending node" resource
|
||||||
|
if err := r.patchNodeAnnotations(ctx, mintNode.node.Name, map[string]string{donorAnnotation: outdatedNode.Name}); err != nil {
|
||||||
|
logr.Error(err, "Unable to update mint node donor annotation", "mintNode", mintNode.node.Name)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if mintNode.node.Annotations == nil {
|
||||||
|
mintNode.node.Annotations = make(map[string]string)
|
||||||
|
}
|
||||||
|
mintNode.node.Annotations[donorAnnotation] = outdatedNode.Name
|
||||||
|
if err := r.patchNodeAnnotations(ctx, outdatedNode.Name, map[string]string{heirAnnotation: mintNode.node.Name}); err != nil {
|
||||||
|
logr.Error(err, "Unable to update outdated node heir annotation", "outdatedNode", outdatedNode.Name)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
outdatedNode.Annotations[heirAnnotation] = mintNode.node.Name
|
||||||
|
if err := r.Delete(ctx, &mintNode.pendingNode); err != nil {
|
||||||
|
logr.Error(err, "Unable to delete pending node resource", "pendingNode", mintNode.pendingNode.Name)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
pairs = append(pairs, replacementPair{
|
||||||
|
donor: *outdatedNode,
|
||||||
|
heir: mintNode.node,
|
||||||
|
})
|
||||||
|
logr.Info("New matched up pair", "donorNode", outdatedNode.Name, "heirNode", mintNode.node.Name)
|
||||||
|
foundReplacement = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if !foundReplacement {
|
||||||
|
// mint node was not needed as heir. Cleanup obsolete resources.
|
||||||
|
if err := r.Delete(ctx, &mintNode.pendingNode); err != nil {
|
||||||
|
logr.Error(err, "Unable to delete pending node resource", "pendingNode", mintNode.pendingNode.Name)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err := r.patchNodeAnnotations(ctx, mintNode.node.Name, map[string]string{obsoleteAnnotation: "true"}); err != nil {
|
||||||
|
logr.Error(err, "Unable to update mint node obsolete annotation", "mintNode", mintNode.node.Name)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if _, err := r.deleteNode(ctx, controller, mintNode.node); err != nil {
|
||||||
|
logr.Error(err, "Unable to delete obsolete node", "obsoleteNode", mintNode.node.Name)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pairs
|
||||||
|
}
|
||||||
|
|
||||||
|
// matchDonorsAndHeirs takes separate lists of donors and heirs and matches each heir to its previously chosen donor.
|
||||||
|
// a list of replacement pairs is returned.
|
||||||
|
// donors and heirs with invalid pair references are cleaned up (the donor/heir annotations gets removed).
|
||||||
|
func (r *NodeImageReconciler) matchDonorsAndHeirs(ctx context.Context, pairs []replacementPair, donors, heirs []corev1.Node) []replacementPair {
|
||||||
|
logr := log.FromContext(ctx)
|
||||||
|
for _, heir := range heirs {
|
||||||
|
var foundPair bool
|
||||||
|
for _, donor := range donors {
|
||||||
|
if heir.Annotations[donorAnnotation] == donor.Name {
|
||||||
|
pairs = append(pairs, replacementPair{
|
||||||
|
donor: donor,
|
||||||
|
heir: heir,
|
||||||
|
})
|
||||||
|
foundPair = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !foundPair {
|
||||||
|
// remove donor annotation from heir
|
||||||
|
if err := r.patchUnsetNodeAnnotations(ctx, heir.Name, []string{donorAnnotation}); err != nil {
|
||||||
|
logr.Error(err, "Unable to remove donor annotation from heir", "heirNode", heir.Name)
|
||||||
|
}
|
||||||
|
delete(heir.Annotations, donorAnnotation)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// iterate over all donors and remove donor annotation from nodes that are not in a pair
|
||||||
|
// (cleanup)
|
||||||
|
for _, donor := range donors {
|
||||||
|
var foundPair bool
|
||||||
|
for _, pair := range pairs {
|
||||||
|
if pair.donor.Name == donor.Name {
|
||||||
|
foundPair = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !foundPair {
|
||||||
|
// remove heir annotation from donor
|
||||||
|
if err := r.patchUnsetNodeAnnotations(ctx, donor.Name, []string{heirAnnotation}); err != nil {
|
||||||
|
logr.Error(err, "Unable to remove heir annotation from donor", "donorNode", donor.Name)
|
||||||
|
}
|
||||||
|
delete(donor.Annotations, heirAnnotation)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pairs
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensureAutoscaling will ensure that the autoscaling is enabled or disabled as needed.
|
||||||
|
func (r *NodeImageReconciler) ensureAutoscaling(ctx context.Context, autoscalingEnabled bool, wantAutoscalingEnabled bool) error {
|
||||||
|
if autoscalingEnabled == wantAutoscalingEnabled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var autoscalingStrategiesList updatev1alpha1.AutoscalingStrategyList
|
||||||
|
if err := r.List(ctx, &autoscalingStrategiesList); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for i := range autoscalingStrategiesList.Items {
|
||||||
|
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||||
|
var autoscalingStrategy updatev1alpha1.AutoscalingStrategy
|
||||||
|
if err := r.Get(ctx, types.NamespacedName{Name: autoscalingStrategiesList.Items[i].Name}, &autoscalingStrategy); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
autoscalingStrategy.Spec.Enabled = wantAutoscalingEnabled
|
||||||
|
return r.Client.Update(ctx, &autoscalingStrategy)
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// replaceNode take a donor and a heir node and then replaces the donor node by the heir node.
|
||||||
|
//
|
||||||
|
// Replacing nodes involves the following steps:
|
||||||
|
// Labels are copied from the donor node to the heir node.
|
||||||
|
// Readiness of the heir node is awaited.
|
||||||
|
// Deletion of the donor node is scheduled.
|
||||||
|
func (r *NodeImageReconciler) replaceNode(ctx context.Context, controller metav1.Object, pair replacementPair) error {
|
||||||
|
logr := log.FromContext(ctx)
|
||||||
|
if !reflect.DeepEqual(nodeutil.FilterLabels(pair.donor.Labels), nodeutil.FilterLabels(pair.heir.Labels)) {
|
||||||
|
if err := r.copyNodeLabels(ctx, pair.donor.Name, pair.heir.Name); err != nil {
|
||||||
|
logr.Error(err, "Copy node labels")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
heirReady := nodeutil.Ready(&pair.heir)
|
||||||
|
if !heirReady {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err := r.deleteNode(ctx, controller, pair.donor)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteNode safely removes a node from the cluster and issues termination of the node by the CSP.
|
||||||
|
func (r *NodeImageReconciler) deleteNode(ctx context.Context, controller metav1.Object, node corev1.Node) (bool, error) {
|
||||||
|
logr := log.FromContext(ctx)
|
||||||
|
// cordon & drain node using node-maintenance-operator
|
||||||
|
var foundNodeMaintenance nodemaintenancev1beta1.NodeMaintenance
|
||||||
|
err := r.Get(ctx, types.NamespacedName{Name: node.Name}, &foundNodeMaintenance)
|
||||||
|
if client.IgnoreNotFound(err) != nil {
|
||||||
|
// unexpected error occurred
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// NodeMaintenance resource does not exist yet
|
||||||
|
nodeMaintenance := nodemaintenancev1beta1.NodeMaintenance{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: node.Name,
|
||||||
|
},
|
||||||
|
Spec: nodemaintenancev1beta1.NodeMaintenanceSpec{
|
||||||
|
NodeName: node.Name,
|
||||||
|
Reason: "node is replaced due to OS image update",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return false, r.Create(ctx, &nodeMaintenance)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodeMaintenance resource already exists. Check cordon & drain status.
|
||||||
|
if foundNodeMaintenance.Status.Phase != nodemaintenancev1beta1.MaintenanceSucceeded {
|
||||||
|
logr.Info("Cordon & drain in progress", "maintenanceNode", node.Name, "nodeMaintenanceStatus", foundNodeMaintenance.Status.Phase)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// node is unused & ready to be replaced
|
||||||
|
if err := r.Delete(ctx, &node); err != nil {
|
||||||
|
logr.Error(err, "Deleting node")
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
logr.Info("Deleted node", "deletedNode", node.Name)
|
||||||
|
// schedule deletion of the node with the CSP
|
||||||
|
if err := r.DeleteNode(ctx, node.Spec.ProviderID); err != nil {
|
||||||
|
logr.Error(err, "Scheduling CSP node deletion", "providerID", node.Spec.ProviderID)
|
||||||
|
}
|
||||||
|
deadline := metav1.NewTime(time.Now().Add(nodeLeaveTimeout))
|
||||||
|
pendingNode := updatev1alpha1.PendingNode{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: controller.GetNamespace(),
|
||||||
|
Name: node.Name,
|
||||||
|
},
|
||||||
|
Spec: updatev1alpha1.PendingNodeSpec{
|
||||||
|
ProviderID: node.Spec.ProviderID,
|
||||||
|
ScalingGroupID: node.Annotations[scalingGroupAnnotation],
|
||||||
|
NodeName: node.Name,
|
||||||
|
Goal: updatev1alpha1.NodeGoalLeave,
|
||||||
|
Deadline: &deadline,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := ctrl.SetControllerReference(controller, &pendingNode, r.Scheme); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if err := r.Create(ctx, &pendingNode); err != nil {
|
||||||
|
logr.Error(err, "Tracking CSP node deletion")
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createNewNodes creates new nodes using up to date images as replacement for outdated nodes.
|
||||||
|
func (r *NodeImageReconciler) createNewNodes(
|
||||||
|
ctx context.Context, desiredNodeImage updatev1alpha1.NodeImage,
|
||||||
|
outdatedNodes []corev1.Node, pendingNodes []updatev1alpha1.PendingNode,
|
||||||
|
scalingGroupByID map[string]updatev1alpha1.ScalingGroup, newNodesBudget int,
|
||||||
|
) error {
|
||||||
|
logr := log.FromContext(ctx)
|
||||||
|
if newNodesBudget < 1 || len(outdatedNodes) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
outdatedNodesPerScalingGroup := make(map[string]int)
|
||||||
|
for _, node := range outdatedNodes {
|
||||||
|
// skip outdated nodes that got assigned an heir in this Reconcile call
|
||||||
|
if len(node.Annotations[heirAnnotation]) != 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
outdatedNodesPerScalingGroup[node.Annotations[scalingGroupAnnotation]]++
|
||||||
|
}
|
||||||
|
pendingJoiningNodesPerScalingGroup := make(map[string]int)
|
||||||
|
for _, pendingNode := range pendingNodes {
|
||||||
|
// skip pending nodes that are not joining
|
||||||
|
if pendingNode.Spec.Goal != updatev1alpha1.NodeGoalJoin {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pendingJoiningNodesPerScalingGroup[pendingNode.Spec.ScalingGroupID]++
|
||||||
|
}
|
||||||
|
requiredNodesPerScalingGroup := make(map[string]int, len(outdatedNodesPerScalingGroup))
|
||||||
|
for scalingGroupID := range outdatedNodesPerScalingGroup {
|
||||||
|
if pendingJoiningNodesPerScalingGroup[scalingGroupID] < outdatedNodesPerScalingGroup[scalingGroupID] {
|
||||||
|
requiredNodesPerScalingGroup[scalingGroupID] = outdatedNodesPerScalingGroup[scalingGroupID] - pendingJoiningNodesPerScalingGroup[scalingGroupID]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for scalingGroupID := range requiredNodesPerScalingGroup {
|
||||||
|
scalingGroup, ok := scalingGroupByID[scalingGroupID]
|
||||||
|
if !ok {
|
||||||
|
logr.Info("Scaling group does not have matching resource", "scalingGroup", scalingGroupID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if scalingGroup.Status.ImageReference != desiredNodeImage.Spec.ImageReference {
|
||||||
|
logr.Info("Scaling group does not use latest image", "scalingGroup", scalingGroupID, "usedImage", scalingGroup.Status.ImageReference, "wantedImage", desiredNodeImage.Spec.ImageReference)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if requiredNodesPerScalingGroup[scalingGroupID] == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
if newNodesBudget == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if requiredNodesPerScalingGroup[scalingGroupID] == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
logr.Info("Creating new node", "scalingGroup", scalingGroupID)
|
||||||
|
nodeName, providerID, err := r.CreateNode(ctx, scalingGroupID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
deadline := metav1.NewTime(time.Now().Add(nodeJoinTimeout))
|
||||||
|
pendingNode := &updatev1alpha1.PendingNode{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: nodeName},
|
||||||
|
Spec: updatev1alpha1.PendingNodeSpec{
|
||||||
|
ProviderID: providerID,
|
||||||
|
ScalingGroupID: scalingGroupID,
|
||||||
|
NodeName: nodeName,
|
||||||
|
Goal: updatev1alpha1.NodeGoalJoin,
|
||||||
|
Deadline: &deadline,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := ctrl.SetControllerReference(&desiredNodeImage, pendingNode, r.Scheme); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := r.Create(ctx, pendingNode); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logr.Info("Created new node", "createdNode", nodeName, "scalingGroup", scalingGroupID)
|
||||||
|
requiredNodesPerScalingGroup[scalingGroupID]--
|
||||||
|
newNodesBudget--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// patchNodeAnnotations attempts to patch node annotations in a retry loop.
|
||||||
|
func (r *NodeImageReconciler) patchNodeAnnotations(ctx context.Context, nodeName string, annotations map[string]string) error {
|
||||||
|
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||||
|
var node corev1.Node
|
||||||
|
if err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
patchedNode := node.DeepCopy()
|
||||||
|
patch := patch.SetAnnotations(&node, patchedNode, annotations)
|
||||||
|
return r.Client.Patch(ctx, patchedNode, patch)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// patchNodeAnnotations attempts to remove node annotations using a patch in a retry loop.
|
||||||
|
func (r *NodeImageReconciler) patchUnsetNodeAnnotations(ctx context.Context, nodeName string, annotationKeys []string) error {
|
||||||
|
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||||
|
var node corev1.Node
|
||||||
|
if err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
patchedNode := node.DeepCopy()
|
||||||
|
patch := patch.UnsetAnnotations(&node, patchedNode, annotationKeys)
|
||||||
|
return r.Client.Patch(ctx, patchedNode, patch)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// copyNodeLabels attempts to copy all node labels (except for reserved labels) from one node to another in a retry loop.
|
||||||
|
func (r *NodeImageReconciler) copyNodeLabels(ctx context.Context, oldNodeName, newNodeName string) error {
|
||||||
|
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||||
|
var oldNode corev1.Node
|
||||||
|
if err := r.Get(ctx, types.NamespacedName{Name: oldNodeName}, &oldNode); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var newNode corev1.Node
|
||||||
|
if err := r.Get(ctx, types.NamespacedName{Name: newNodeName}, &newNode); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
patchedNode := newNode.DeepCopy()
|
||||||
|
patch := patch.SetLabels(&newNode, patchedNode, nodeutil.FilterLabels(oldNode.GetLabels()))
|
||||||
|
return r.Client.Patch(ctx, patchedNode, patch)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// tryUpdateStatus attempts to update the NodeImage status field in a retry loop.
|
||||||
|
func (r *NodeImageReconciler) tryUpdateStatus(ctx context.Context, name types.NamespacedName, status updatev1alpha1.NodeImageStatus) error {
|
||||||
|
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||||
|
var nodeImage updatev1alpha1.NodeImage
|
||||||
|
if err := r.Get(ctx, name, &nodeImage); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nodeImage.Status = *status.DeepCopy()
|
||||||
|
if err := r.Status().Update(ctx, &nodeImage); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodeImageStatus generates the NodeImage.Status field given node groups and the budget for new nodes.
|
||||||
|
func nodeImageStatus(scheme *runtime.Scheme, groups nodeGroups, pendingNodes []updatev1alpha1.PendingNode, invalidNodes []corev1.Node, newNodesBudget int) updatev1alpha1.NodeImageStatus {
|
||||||
|
var status updatev1alpha1.NodeImageStatus
|
||||||
|
outdatedCondition := metav1.Condition{
|
||||||
|
Type: updatev1alpha1.ConditionOutdated,
|
||||||
|
}
|
||||||
|
if len(groups.Outdated)+len(groups.Heirs)+len(pendingNodes)+len(groups.Obsolete) == 0 {
|
||||||
|
outdatedCondition.Status = metav1.ConditionFalse
|
||||||
|
outdatedCondition.Reason = conditionNodeImageUpToDateReason
|
||||||
|
outdatedCondition.Message = conditionNodeImageUpToDateMessage
|
||||||
|
} else {
|
||||||
|
outdatedCondition.Status = metav1.ConditionTrue
|
||||||
|
outdatedCondition.Reason = conditionNodeImageOutOfDateReason
|
||||||
|
outdatedCondition.Message = conditionNodeImageOutOfDateMessage
|
||||||
|
}
|
||||||
|
meta.SetStatusCondition(&status.Conditions, outdatedCondition)
|
||||||
|
for _, node := range groups.Outdated {
|
||||||
|
nodeRef, err := ref.GetReference(scheme, &node)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
status.Outdated = append(status.Outdated, *nodeRef)
|
||||||
|
}
|
||||||
|
for _, node := range groups.UpToDate {
|
||||||
|
nodeRef, err := ref.GetReference(scheme, &node)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
status.UpToDate = append(status.UpToDate, *nodeRef)
|
||||||
|
}
|
||||||
|
for _, node := range groups.Donors {
|
||||||
|
nodeRef, err := ref.GetReference(scheme, &node)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
status.Donors = append(status.Donors, *nodeRef)
|
||||||
|
}
|
||||||
|
for _, node := range groups.Heirs {
|
||||||
|
nodeRef, err := ref.GetReference(scheme, &node)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
status.Heirs = append(status.Heirs, *nodeRef)
|
||||||
|
}
|
||||||
|
for _, node := range groups.Obsolete {
|
||||||
|
nodeRef, err := ref.GetReference(scheme, &node)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
status.Obsolete = append(status.Obsolete, *nodeRef)
|
||||||
|
}
|
||||||
|
for _, node := range invalidNodes {
|
||||||
|
nodeRef, err := ref.GetReference(scheme, &node)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
status.Invalid = append(status.Invalid, *nodeRef)
|
||||||
|
}
|
||||||
|
for _, mintN := range groups.Mint {
|
||||||
|
nodeRef, err := ref.GetReference(scheme, &mintN.node)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
status.Mints = append(status.Mints, *nodeRef)
|
||||||
|
}
|
||||||
|
for _, pending := range pendingNodes {
|
||||||
|
pendingRef, err := ref.GetReference(scheme, &pending)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
status.Pending = append(status.Pending, *pendingRef)
|
||||||
|
}
|
||||||
|
status.Budget = uint32(newNodesBudget)
|
||||||
|
return status
|
||||||
|
}
|
||||||
|
|
||||||
|
// mintNode is a pair of a freshly joined kubernetes nodes
|
||||||
|
// and the corresponding (left over) pending node resource.
|
||||||
|
type mintNode struct {
|
||||||
|
node corev1.Node
|
||||||
|
pendingNode updatev1alpha1.PendingNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// replacementPair is a pair of a donor (outdated node that should be replaced)
|
||||||
|
// and a heir (up to date node that inherits node labels)
|
||||||
|
type replacementPair struct {
|
||||||
|
donor corev1.Node
|
||||||
|
heir corev1.Node
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodeGroups is a collection of disjoint sets of nodes.
|
||||||
|
// every properly annotated kubernetes node can be placed in exactly one of the sets.
|
||||||
|
type nodeGroups struct {
|
||||||
|
// Outdated nodes are nodes that
|
||||||
|
// do not use the most recent image AND
|
||||||
|
// are not yet a donor to an up to date heir node
|
||||||
|
Outdated,
|
||||||
|
// UpToDate nodes are nodes that
|
||||||
|
// use the most recent image,
|
||||||
|
// are not an heir to an outdated donor node AND
|
||||||
|
// are not mint nodes
|
||||||
|
UpToDate,
|
||||||
|
// Donors are nodes that
|
||||||
|
// do not use the most recent image AND
|
||||||
|
// are paired up with an up to date heir node
|
||||||
|
Donors,
|
||||||
|
// Heirs are nodes that
|
||||||
|
// use the most recent image AND
|
||||||
|
// are paired up with an outdated donor node
|
||||||
|
Heirs,
|
||||||
|
// Obsolete nodes are nodes that
|
||||||
|
// were created by the operator as replacements (heirs)
|
||||||
|
// but could not get paired up with a donor node.
|
||||||
|
// They will be cleaned up by the operator.
|
||||||
|
Obsolete []corev1.Node
|
||||||
|
// Mint nodes are nodes that
|
||||||
|
// use the most recent image AND
|
||||||
|
// were created by the operator as replacements (heirs)
|
||||||
|
// and are awaiting pairing up with a donor node.
|
||||||
|
Mint []mintNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// groupNodes classifies nodes by placing each into exactly one group.
|
||||||
|
func groupNodes(nodes []corev1.Node, pendingNodes []updatev1alpha1.PendingNode, latestImageReference string) nodeGroups {
|
||||||
|
groups := nodeGroups{}
|
||||||
|
for _, node := range nodes {
|
||||||
|
if node.Annotations[obsoleteAnnotation] == "true" {
|
||||||
|
groups.Obsolete = append(groups.Obsolete, node)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if node.Annotations[nodeImageAnnotation] != latestImageReference {
|
||||||
|
if heir := node.Annotations[heirAnnotation]; heir != "" {
|
||||||
|
groups.Donors = append(groups.Donors, node)
|
||||||
|
} else {
|
||||||
|
groups.Outdated = append(groups.Outdated, node)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if donor := node.Annotations[donorAnnotation]; donor != "" {
|
||||||
|
groups.Heirs = append(groups.Heirs, node)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if pendingNode := nodeutil.FindPending(pendingNodes, &node); pendingNode != nil {
|
||||||
|
groups.Mint = append(groups.Mint, mintNode{
|
||||||
|
node: node,
|
||||||
|
pendingNode: *pendingNode,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
groups.UpToDate = append(groups.UpToDate, node)
|
||||||
|
}
|
||||||
|
return groups
|
||||||
|
}
|
||||||
|
|
||||||
|
type nodeReplacer interface {
|
||||||
|
// GetNodeImage retrieves the image currently used by a node.
|
||||||
|
GetNodeImage(ctx context.Context, providerID string) (string, error)
|
||||||
|
// GetScalingGroupID retrieves the scaling group that a node is part of.
|
||||||
|
GetScalingGroupID(ctx context.Context, providerID string) (string, error)
|
||||||
|
// CreateNode creates a new node inside a specified scaling group at the CSP and returns its future name and provider id.
|
||||||
|
CreateNode(ctx context.Context, scalingGroupID string) (nodeName, providerID string, err error)
|
||||||
|
// DeleteNode starts the termination of the node at the CSP.
|
||||||
|
DeleteNode(ctx context.Context, providerID string) error
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,112 @@
|
||||||
|
package controllers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
node "github.com/edgelesssys/constellation/operators/constellation-node-operator/internal/node"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||||
|
|
||||||
|
updatev1alpha1 "github.com/edgelesssys/constellation/operators/constellation-node-operator/api/v1alpha1"
|
||||||
|
nodemaintenancev1beta1 "github.com/medik8s/node-maintenance-operator/api/v1beta1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// scalingGroupImageChangedPredicate checks if a scaling group has adopted a new node image for future nodes.
|
||||||
|
func scalingGroupImageChangedPredicate() predicate.Predicate {
|
||||||
|
return predicate.Funcs{
|
||||||
|
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||||
|
oldScalingGroup, ok := e.ObjectOld.(*updatev1alpha1.ScalingGroup)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
newScalingGroup, ok := e.ObjectNew.(*updatev1alpha1.ScalingGroup)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return oldScalingGroup.Status.ImageReference != newScalingGroup.Status.ImageReference
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// autoscalerEnabledStatusChangedPredicate checks if the autoscaler was either enabled or disabled.
|
||||||
|
func autoscalerEnabledStatusChangedPredicate() predicate.Predicate {
|
||||||
|
return predicate.Funcs{
|
||||||
|
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||||
|
oldAutoscalingStrat, ok := e.ObjectOld.(*updatev1alpha1.AutoscalingStrategy)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
newAutoscalingStrat, ok := e.ObjectNew.(*updatev1alpha1.AutoscalingStrategy)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return oldAutoscalingStrat.Status.Enabled != newAutoscalingStrat.Status.Enabled
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodeReadyPredicate checks if a node became ready or acquired a providerID.
|
||||||
|
func nodeReadyPredicate() predicate.Predicate {
|
||||||
|
return predicate.Funcs{
|
||||||
|
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||||
|
oldNode, ok := e.ObjectOld.(*corev1.Node)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
newNode, ok := e.ObjectNew.(*corev1.Node)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
becameReady := !node.Ready(oldNode) && node.Ready(newNode)
|
||||||
|
receivedProviderID := len(oldNode.Spec.ProviderID) == 0 && len(newNode.Spec.ProviderID) != 0
|
||||||
|
return becameReady || receivedProviderID
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodeMaintenanceSucceededPredicate checks if a node maintenance resource switched its status to "maintenance succeeded".
|
||||||
|
func nodeMaintenanceSucceededPredicate() predicate.Predicate {
|
||||||
|
return predicate.Funcs{
|
||||||
|
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||||
|
oldNode, ok := e.ObjectOld.(*nodemaintenancev1beta1.NodeMaintenance)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
newNode, ok := e.ObjectNew.(*nodemaintenancev1beta1.NodeMaintenance)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
succeeded := oldNode.Status.Phase != nodemaintenancev1beta1.MaintenanceSucceeded &&
|
||||||
|
newNode.Status.Phase == nodemaintenancev1beta1.MaintenanceSucceeded
|
||||||
|
return succeeded
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// findObjectsForScalingGroup requests a reconcile call for the node image referenced by a scaling group.
|
||||||
|
func (r *NodeImageReconciler) findObjectsForScalingGroup(rawScalingGroup client.Object) []reconcile.Request {
|
||||||
|
scalingGroup := rawScalingGroup.(*updatev1alpha1.ScalingGroup)
|
||||||
|
return []reconcile.Request{
|
||||||
|
{NamespacedName: types.NamespacedName{Name: scalingGroup.Spec.NodeImage}},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// findAllNodeImages requests a reconcile call for all node images.
|
||||||
|
func (r *NodeImageReconciler) findAllNodeImages(_ client.Object) []reconcile.Request {
|
||||||
|
var nodeImageList updatev1alpha1.NodeImageList
|
||||||
|
err := r.List(context.TODO(), &nodeImageList)
|
||||||
|
if err != nil {
|
||||||
|
return []reconcile.Request{}
|
||||||
|
}
|
||||||
|
requests := make([]reconcile.Request, len(nodeImageList.Items))
|
||||||
|
for i, item := range nodeImageList.Items {
|
||||||
|
requests[i] = reconcile.Request{
|
||||||
|
NamespacedName: types.NamespacedName{Name: item.GetName()},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return requests
|
||||||
|
}
|
|
@ -0,0 +1,284 @@
|
||||||
|
package controllers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||||
|
|
||||||
|
updatev1alpha1 "github.com/edgelesssys/constellation/operators/constellation-node-operator/api/v1alpha1"
|
||||||
|
nodemaintenancev1beta1 "github.com/medik8s/node-maintenance-operator/api/v1beta1"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestScalingGroupImageChangedPredicate(t *testing.T) {
|
||||||
|
testCases := map[string]struct {
|
||||||
|
event event.UpdateEvent
|
||||||
|
wantProcessing bool
|
||||||
|
}{
|
||||||
|
"old object is not a scaling group": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectNew: &updatev1alpha1.ScalingGroup{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"new object is not a scaling group": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &updatev1alpha1.ScalingGroup{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"image reference is unchanged": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &updatev1alpha1.ScalingGroup{
|
||||||
|
Status: updatev1alpha1.ScalingGroupStatus{ImageReference: "image-reference"},
|
||||||
|
},
|
||||||
|
ObjectNew: &updatev1alpha1.ScalingGroup{
|
||||||
|
Status: updatev1alpha1.ScalingGroupStatus{ImageReference: "image-reference"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"image reference has changed": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &updatev1alpha1.ScalingGroup{
|
||||||
|
Status: updatev1alpha1.ScalingGroupStatus{ImageReference: "old-image-reference"},
|
||||||
|
},
|
||||||
|
ObjectNew: &updatev1alpha1.ScalingGroup{
|
||||||
|
Status: updatev1alpha1.ScalingGroupStatus{ImageReference: "new-image-reference"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantProcessing: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
predicate := scalingGroupImageChangedPredicate()
|
||||||
|
assert.Equal(tc.wantProcessing, predicate.Update(tc.event))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAutoscalerEnabledStatusChangedPredicate(t *testing.T) {
|
||||||
|
testCases := map[string]struct {
|
||||||
|
event event.UpdateEvent
|
||||||
|
wantProcessing bool
|
||||||
|
}{
|
||||||
|
"old object is not an autoscaling strategy": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectNew: &updatev1alpha1.AutoscalingStrategy{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"new object is not an autoscaling strategy": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &updatev1alpha1.AutoscalingStrategy{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"status is unchanged": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &updatev1alpha1.AutoscalingStrategy{
|
||||||
|
Status: updatev1alpha1.AutoscalingStrategyStatus{Enabled: true},
|
||||||
|
},
|
||||||
|
ObjectNew: &updatev1alpha1.AutoscalingStrategy{
|
||||||
|
Status: updatev1alpha1.AutoscalingStrategyStatus{Enabled: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"status has changed": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &updatev1alpha1.AutoscalingStrategy{
|
||||||
|
Status: updatev1alpha1.AutoscalingStrategyStatus{},
|
||||||
|
},
|
||||||
|
ObjectNew: &updatev1alpha1.AutoscalingStrategy{
|
||||||
|
Status: updatev1alpha1.AutoscalingStrategyStatus{Enabled: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantProcessing: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
predicate := autoscalerEnabledStatusChangedPredicate()
|
||||||
|
assert.Equal(tc.wantProcessing, predicate.Update(tc.event))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNodeReadyPredicate(t *testing.T) {
|
||||||
|
testCases := map[string]struct {
|
||||||
|
event event.UpdateEvent
|
||||||
|
wantProcessing bool
|
||||||
|
}{
|
||||||
|
"old object is not a node": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectNew: &corev1.Node{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"new object is not a node": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &corev1.Node{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"status is unchanged": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &corev1.Node{},
|
||||||
|
ObjectNew: &corev1.Node{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"node became ready": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &corev1.Node{
|
||||||
|
Status: corev1.NodeStatus{
|
||||||
|
Conditions: []corev1.NodeCondition{
|
||||||
|
{Type: corev1.NodeReady, Status: corev1.ConditionFalse},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ObjectNew: &corev1.Node{
|
||||||
|
Status: corev1.NodeStatus{
|
||||||
|
Conditions: []corev1.NodeCondition{
|
||||||
|
{Type: corev1.NodeReady, Status: corev1.ConditionTrue},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantProcessing: true,
|
||||||
|
},
|
||||||
|
"node acquired provider id": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &corev1.Node{},
|
||||||
|
ObjectNew: &corev1.Node{
|
||||||
|
Spec: corev1.NodeSpec{
|
||||||
|
ProviderID: "provider-id",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantProcessing: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
predicate := nodeReadyPredicate()
|
||||||
|
assert.Equal(tc.wantProcessing, predicate.Update(tc.event))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNodeMaintenanceSucceededPredicate(t *testing.T) {
|
||||||
|
testCases := map[string]struct {
|
||||||
|
event event.UpdateEvent
|
||||||
|
wantProcessing bool
|
||||||
|
}{
|
||||||
|
"old object is not a node maintenance resource": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectNew: &nodemaintenancev1beta1.NodeMaintenance{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"new object is not a node maintenance resource": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &nodemaintenancev1beta1.NodeMaintenance{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"status is unchanged": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &nodemaintenancev1beta1.NodeMaintenance{
|
||||||
|
Status: nodemaintenancev1beta1.NodeMaintenanceStatus{
|
||||||
|
Phase: nodemaintenancev1beta1.MaintenanceRunning,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ObjectNew: &nodemaintenancev1beta1.NodeMaintenance{
|
||||||
|
Status: nodemaintenancev1beta1.NodeMaintenanceStatus{
|
||||||
|
Phase: nodemaintenancev1beta1.MaintenanceRunning,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"status has changed": {
|
||||||
|
event: event.UpdateEvent{
|
||||||
|
ObjectOld: &nodemaintenancev1beta1.NodeMaintenance{
|
||||||
|
Status: nodemaintenancev1beta1.NodeMaintenanceStatus{
|
||||||
|
Phase: nodemaintenancev1beta1.MaintenanceRunning,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ObjectNew: &nodemaintenancev1beta1.NodeMaintenance{
|
||||||
|
Status: nodemaintenancev1beta1.NodeMaintenanceStatus{
|
||||||
|
Phase: nodemaintenancev1beta1.MaintenanceSucceeded,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantProcessing: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
predicate := nodeMaintenanceSucceededPredicate()
|
||||||
|
assert.Equal(tc.wantProcessing, predicate.Update(tc.event))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindObjectsForScalingGroup(t *testing.T) {
|
||||||
|
scalingGroup := updatev1alpha1.ScalingGroup{
|
||||||
|
Spec: updatev1alpha1.ScalingGroupSpec{
|
||||||
|
NodeImage: "nodeimage",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
wantRequests := []reconcile.Request{
|
||||||
|
{
|
||||||
|
NamespacedName: types.NamespacedName{
|
||||||
|
Name: "nodeimage",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert := assert.New(t)
|
||||||
|
reconciler := NodeImageReconciler{}
|
||||||
|
requests := reconciler.findObjectsForScalingGroup(&scalingGroup)
|
||||||
|
assert.ElementsMatch(wantRequests, requests)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindAllNodeImages(t *testing.T) {
|
||||||
|
testCases := map[string]struct {
|
||||||
|
nodeImage client.Object
|
||||||
|
listNodeImagesErr error
|
||||||
|
wantRequests []reconcile.Request
|
||||||
|
}{
|
||||||
|
"getting the corresponding node images fails": {
|
||||||
|
listNodeImagesErr: errors.New("get-node-images-err"),
|
||||||
|
},
|
||||||
|
"node image reconcile request is returned": {
|
||||||
|
nodeImage: &updatev1alpha1.NodeImage{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "nodeimage"},
|
||||||
|
},
|
||||||
|
wantRequests: []reconcile.Request{
|
||||||
|
{
|
||||||
|
NamespacedName: types.NamespacedName{
|
||||||
|
Name: "nodeimage",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
reconciler := NodeImageReconciler{
|
||||||
|
Client: newStubReaderClient(t, []runtime.Object{tc.nodeImage}, nil, tc.listNodeImagesErr),
|
||||||
|
}
|
||||||
|
requests := reconciler.findAllNodeImages(nil)
|
||||||
|
assert.ElementsMatch(tc.wantRequests, requests)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue