operator: reconcile kubernetesClusterVersion

This commit is contained in:
Leonard Cohnen 2023-01-06 12:08:25 +01:00 committed by 3u13r
parent 8c5e41b865
commit 2700d5182b
19 changed files with 521 additions and 33 deletions

View file

@ -8,18 +8,23 @@ package controllers
import (
"context"
"encoding/json"
"errors"
"reflect"
"strings"
"time"
mainconstants "github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/versions/components"
nodeutil "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/node"
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/patch"
"golang.org/x/mod/semver"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/version"
ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
@ -55,17 +60,21 @@ const (
type NodeVersionReconciler struct {
nodeReplacer
etcdRemover
clusterUpgrader
kubernetesServerVersionGetter
client.Client
Scheme *runtime.Scheme
}
// NewNodeVersionReconciler creates a new NodeVersionReconciler.
func NewNodeVersionReconciler(nodeReplacer nodeReplacer, etcdRemover etcdRemover, client client.Client, scheme *runtime.Scheme) *NodeVersionReconciler {
func NewNodeVersionReconciler(nodeReplacer nodeReplacer, etcdRemover etcdRemover, clusterUpgrader clusterUpgrader, k8sVerGetter kubernetesServerVersionGetter, client client.Client, scheme *runtime.Scheme) *NodeVersionReconciler {
return &NodeVersionReconciler{
nodeReplacer: nodeReplacer,
etcdRemover: etcdRemover,
Client: client,
Scheme: scheme,
nodeReplacer: nodeReplacer,
etcdRemover: etcdRemover,
clusterUpgrader: clusterUpgrader,
kubernetesServerVersionGetter: k8sVerGetter,
Client: client,
Scheme: scheme,
}
}
@ -75,6 +84,7 @@ func NewNodeVersionReconciler(nodeReplacer nodeReplacer, etcdRemover etcdRemover
//+kubebuilder:rbac:groups=nodemaintenance.medik8s.io,resources=nodemaintenances,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=nodes/status,verbs=get
//+kubebuilder:rbac:groups="",resources=configmaps,verbs=list;get
// Reconcile replaces outdated nodes (using an old image) with new nodes (using a new image) as specified in the NodeVersion spec.
func (r *NodeVersionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
@ -85,6 +95,17 @@ func (r *NodeVersionReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err := r.Get(ctx, req.NamespacedName, &desiredNodeVersion); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Check if we need to upgrade the cluster version.
serverVer, err := r.ServerVersion()
if err != nil {
return ctrl.Result{}, err
}
// GitVersion is the semantic version of the Kubernetes server e.g. "v1.24.9"
if semver.Compare(serverVer.GitVersion, desiredNodeVersion.Spec.KubernetesClusterVersion) != 0 {
r.tryStartClusterVersionUpgrade(ctx, req.NamespacedName)
}
// get list of autoscaling strategies
// there should be exactly one autoscaling strategy but we do not specify its name.
// if there is no autoscaling strategy, it is assumed that autoscaling is disabled.
@ -140,7 +161,7 @@ func (r *NodeVersionReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// - being created (joining)
// - being destroyed (leaving)
// - heirs to outdated nodes
extraNodes := len(groups.Heirs) + len(pendingNodeList.Items)
extraNodes := len(groups.Heirs) + len(groups.AwaitingAnnotation) + len(pendingNodeList.Items)
// newNodesBudget is the maximum number of new nodes that can be created in this Reconcile call.
var newNodesBudget int
if extraNodes < nodeOverprovisionLimit {
@ -153,7 +174,7 @@ func (r *NodeVersionReconciler) Reconcile(ctx context.Context, req ctrl.Request)
logr.Error(err, "Updating status")
}
allNodesUpToDate := len(groups.Outdated)+len(groups.Heirs)+len(pendingNodeList.Items)+len(groups.Obsolete) == 0
allNodesUpToDate := len(groups.Outdated)+len(groups.Heirs)+len(groups.AwaitingAnnotation)+len(pendingNodeList.Items)+len(groups.Obsolete) == 0
if err := r.ensureAutoscaling(ctx, autoscalingEnabled, allNodesUpToDate); err != nil {
logr.Error(err, "Ensure autoscaling", "autoscalingEnabledIs", autoscalingEnabled, "autoscalingEnabledWant", allNodesUpToDate)
return ctrl.Result{}, err
@ -235,6 +256,11 @@ func (r *NodeVersionReconciler) SetupWithManager(mgr ctrl.Manager) error {
handler.EnqueueRequestsFromMapFunc(r.findAllNodeVersions),
builder.WithPredicates(nodeMaintenanceSucceededPredicate()),
).
Watches(
&source.Kind{Type: &updatev1alpha1.JoiningNode{}},
handler.EnqueueRequestsFromMapFunc(r.findAllNodeVersions),
builder.WithPredicates(joiningNodeDeletedPredicate()),
).
Owns(&updatev1alpha1.PendingNode{}).
Complete(r)
}
@ -284,6 +310,84 @@ func (r *NodeVersionReconciler) annotateNodes(ctx context.Context, nodes []corev
return annotatedNodes, invalidNodes
}
func (r *NodeVersionReconciler) tryStartClusterVersionUpgrade(ctx context.Context, nodeVersionName types.NamespacedName) {
// try to set the cluster version upgrade status to "in progress"
// lock the node version for cluster upgrades
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
nodeVersion := &updatev1alpha1.NodeVersion{}
if err := r.Get(ctx, nodeVersionName, nodeVersion); err != nil {
log.FromContext(ctx).Error(err, "Unable to get node version")
return err
}
if nodeVersion.Status.ActiveClusterVersionUpgrade {
return errors.New("cluster version upgrade already in progress")
}
nodeVersion.Status.ActiveClusterVersionUpgrade = true
if err := r.Status().Update(ctx, nodeVersion); err != nil {
log.FromContext(ctx).Error(err, "Unable to update node version status")
return err
}
return nil
}); err != nil {
return
}
// get clusterKubernetesVersion from nodeVersion
nodeVersion := &updatev1alpha1.NodeVersion{}
if err := r.Get(ctx, nodeVersionName, nodeVersion); err != nil {
log.FromContext(ctx).Error(err, "Unable to get node version")
return
}
// get components configmap
componentsConfigMap := &corev1.ConfigMap{}
if err := r.Get(ctx, types.NamespacedName{Name: nodeVersion.Spec.KubernetesComponentsReference, Namespace: "kube-system"}, componentsConfigMap); err != nil {
log.FromContext(ctx).Error(err, "Unable to get components configmap")
return
}
// unmarshal components from configmap
componentsRaw := componentsConfigMap.Data[mainconstants.ComponentsListKey]
var clusterComponents components.Components
if err := json.Unmarshal([]byte(componentsRaw), &clusterComponents); err != nil {
log.FromContext(ctx).Error(err, "Unable to unmarshal components")
return
}
log.FromContext(ctx).Info("Starting cluster upgrade", "clusterVersion", nodeVersion.Spec.KubernetesClusterVersion)
kubeadm, err := clusterComponents.GetKubeadmComponent()
if err != nil {
log.FromContext(ctx).Error(err, "Unable to get kubeadm component")
return
}
// talk to the upgrade-agent to start the upgrade
if err := r.Upgrade(ctx, kubeadm.URL, kubeadm.Hash, nodeVersion.Spec.KubernetesClusterVersion); err != nil {
log.FromContext(ctx).Error(err, "Unable to upgrade cluster")
return
}
// set the cluster version upgrade status to "completed"
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
nodeVersion := &updatev1alpha1.NodeVersion{}
if err := r.Get(ctx, nodeVersionName, nodeVersion); err != nil {
log.FromContext(ctx).Error(err, "Unable to get node version")
return err
}
nodeVersion.Status.ActiveClusterVersionUpgrade = false
if err := r.Status().Update(ctx, nodeVersion); err != nil {
log.FromContext(ctx).Error(err, "Unable to update node version status")
return err
}
return nil
}); err != nil {
log.FromContext(ctx).Error(err, "Unable to set cluster version upgrade status to completed")
return
}
}
// pairDonorsAndHeirs takes a list of outdated nodes (that do not yet have a heir node) and a list of mint nodes (nodes using the latest image) and pairs matching nodes to become donor and heir.
// outdatedNodes is also updated with heir annotations.
func (r *NodeVersionReconciler) pairDonorsAndHeirs(ctx context.Context, controller metav1.Object, outdatedNodes []corev1.Node, mintNodes []mintNode) []replacementPair {
@ -690,6 +794,13 @@ func nodeVersionStatus(scheme *runtime.Scheme, groups nodeGroups, pendingNodes [
}
status.Heirs = append(status.Heirs, *nodeRef)
}
for _, node := range groups.AwaitingAnnotation {
nodeRef, err := ref.GetReference(scheme, &node)
if err != nil {
continue
}
status.AwaitingAnnotation = append(status.Heirs, *nodeRef)
}
for _, node := range groups.Obsolete {
nodeRef, err := ref.GetReference(scheme, &node)
if err != nil {
@ -756,6 +867,9 @@ type nodeGroups struct {
// use the most recent version AND
// are paired up with an outdated donor node
Heirs,
// AwaitingAnnotation nodes are nodes that
// are missing annotations.
AwaitingAnnotation,
// Obsolete nodes are nodes that
// were created by the operator as replacements (heirs)
// but could not get paired up with a donor node.
@ -776,6 +890,10 @@ func groupNodes(nodes []corev1.Node, pendingNodes []updatev1alpha1.PendingNode,
groups.Obsolete = append(groups.Obsolete, node)
continue
}
if node.Annotations[nodeImageAnnotation] == "" || node.Annotations[mainconstants.NodeKubernetesComponentsAnnotationKey] == "" {
groups.AwaitingAnnotation = append(groups.AwaitingAnnotation, node)
continue
}
if !strings.EqualFold(node.Annotations[nodeImageAnnotation], latestImageReference) ||
!strings.EqualFold(node.Annotations[mainconstants.NodeKubernetesComponentsAnnotationKey], latestK8sComponentsReference) {
if heir := node.Annotations[heirAnnotation]; heir != "" {
@ -785,7 +903,7 @@ func groupNodes(nodes []corev1.Node, pendingNodes []updatev1alpha1.PendingNode,
}
continue
}
if donor := node.Annotations[donorAnnotation]; donor != "" {
if node.Annotations[donorAnnotation] != "" {
groups.Heirs = append(groups.Heirs, node)
continue
}
@ -817,6 +935,15 @@ type etcdRemover interface {
RemoveEtcdMemberFromCluster(ctx context.Context, vpcIP string) error
}
type clusterUpgrader interface {
// UpgradeCluster upgrades the cluster to the specified version.
Upgrade(ctx context.Context, KubeadmURL, KubeadmHash, WantedKubernetesVersion string) error
}
type kubernetesServerVersionGetter interface {
ServerVersion() (*version.Info, error)
}
type newNodeConfig struct {
desiredNodeVersion updatev1alpha1.NodeVersion
outdatedNodes []corev1.Node