join: synchronize control plane joining (#776)

* join: synchronize control plane joining
This commit is contained in:
3u13r 2022-12-09 18:30:20 +01:00 committed by GitHub
parent 012f739c67
commit c993cd6800
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 1166 additions and 61 deletions

View file

@ -18,6 +18,7 @@ testbin/*
# We hold the charts in the cli/internal/helm directory
chart/
bundle/
!vendor/**/zz_generated.*

View file

@ -16,6 +16,10 @@ type JoiningNodeSpec struct {
Name string `json:"name,omitempty"`
// ComponentsHash is the hash of the components that were sent to the node by the join service.
ComponentsHash string `json:"componentshash,omitempty"`
// IsControlPlane is true if the node is a control plane node.
IsControlPlane bool `json:"iscontrolplane,omitempty"`
// Deadline is the time after which the joining node is considered to have failed.
Deadline *metav1.Time `json:"deadline,omitempty"`
}
// JoiningNodeStatus defines the observed state of JoiningNode.

View file

@ -112,7 +112,7 @@ func (in *JoiningNode) DeepCopyInto(out *JoiningNode) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Spec = in.Spec
in.Spec.DeepCopyInto(&out.Spec)
out.Status = in.Status
}
@ -169,6 +169,10 @@ func (in *JoiningNodeList) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *JoiningNodeSpec) DeepCopyInto(out *JoiningNodeSpec) {
*out = *in
if in.Deadline != nil {
in, out := &in.Deadline, &out.Deadline
*out = (*in).DeepCopy()
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JoiningNodeSpec.

View file

@ -6,7 +6,7 @@ LABEL operators.operatorframework.io.bundle.manifests.v1=manifests/
LABEL operators.operatorframework.io.bundle.metadata.v1=metadata/
LABEL operators.operatorframework.io.bundle.package.v1=node-operator
LABEL operators.operatorframework.io.bundle.channels.v1=alpha
LABEL operators.operatorframework.io.metrics.builder=operator-sdk-v1.22.1
LABEL operators.operatorframework.io.metrics.builder=operator-sdk-v1.25.3
LABEL operators.operatorframework.io.metrics.mediatype.v1=metrics+v1
LABEL operators.operatorframework.io.metrics.project_layout=go.kubebuilder.io/v3

View file

@ -40,6 +40,15 @@ spec:
description: ComponentsHash is the hash of the components that were
sent to the node by the join service.
type: string
deadline:
description: Deadline is the time after which the joining node is
considered to have failed.
format: date-time
type: string
iscontrolplane:
description: IsControlPlane is true if the node is a control plane
node.
type: boolean
name:
description: Name of the node expected to join.
type: string

View file

@ -11,22 +11,22 @@ spec:
customresourcedefinitions:
owned:
- description: AutoscalingStrategy is the Schema for the autoscalingstrategies
API
API.
displayName: Autoscaling Strategy
kind: AutoscalingStrategy
name: autoscalingstrategies.update.edgeless.systems
version: v1alpha1
- description: NodeImage is the Schema for the nodeimages API
- description: NodeImage is the Schema for the nodeimages API.
displayName: Node Image
kind: NodeImage
name: nodeimages.update.edgeless.systems
version: v1alpha1
- description: PendingNode is the Schema for the pendingnodes API
- description: PendingNode is the Schema for the pendingnodes API.
displayName: Pending Node
kind: PendingNode
name: pendingnodes.update.edgeless.systems
version: v1alpha1
- description: ScalingGroup is the Schema for the scalinggroups API
- description: ScalingGroup is the Schema for the scalinggroups API.
displayName: Scaling Group
kind: ScalingGroup
name: scalinggroups.update.edgeless.systems

View file

@ -8,12 +8,15 @@ package controllers
import (
"context"
"time"
updatev1alpha1 "github.com/edgelesssys/constellation/operators/constellation-node-operator/v2/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
@ -33,6 +36,7 @@ const (
type JoiningNodesReconciler struct {
client.Client
Scheme *runtime.Scheme
clock.Clock
}
// NewJoiningNodesReconciler creates a new JoiningNodesReconciler.
@ -40,6 +44,7 @@ func NewJoiningNodesReconciler(client client.Client, scheme *runtime.Scheme) *Jo
return &JoiningNodesReconciler{
Client: client,
Scheme: scheme,
Clock: clock.RealClock{},
}
}
@ -54,14 +59,16 @@ func (r *JoiningNodesReconciler) Reconcile(ctx context.Context, req ctrl.Request
var joiningNode updatev1alpha1.JoiningNode
if err := r.Get(ctx, req.NamespacedName, &joiningNode); err != nil {
logr.Error(err, "unable to fetch JoiningNodes")
if !errors.IsNotFound(err) {
logr.Error(err, "Unable to fetch JoiningNode")
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
var node corev1.Node
if err := r.Get(ctx, types.NamespacedName{Name: joiningNode.Spec.Name}, &node); err != nil {
logr.Error(err, "unable to fetch Node")
logr.Info("unable to fetch Node", "err", err)
return err
}
@ -73,23 +80,35 @@ func (r *JoiningNodesReconciler) Reconcile(ctx context.Context, req ctrl.Request
return r.Update(ctx, &node)
})
if err != nil {
logr.Error(err, "unable to update Node")
return ctrl.Result{}, client.IgnoreNotFound(err)
// check if the deadline has been reached
// requeue if not
if joiningNode.Spec.Deadline == nil || r.Now().Before(joiningNode.Spec.Deadline.Time) {
var requeueAfter time.Duration
if joiningNode.Spec.Deadline == nil {
requeueAfter = defaultCheckInterval
} else {
requeueAfter = joiningNode.Spec.Deadline.Time.Sub(r.Now())
}
return ctrl.Result{
RequeueAfter: requeueAfter,
}, nil
}
}
// if the joining node is too old or the annotation succeeded, delete it.
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := r.Delete(ctx, &joiningNode); err != nil {
logr.Error(err, "unable to delete JoiningNode")
return err
if err := r.Get(ctx, req.NamespacedName, &joiningNode); err != nil {
return client.IgnoreNotFound(err)
}
return nil
return client.IgnoreNotFound(r.Delete(ctx, &joiningNode))
})
if err != nil {
logr.Error(err, "unable to delete JoiningNode")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
return ctrl.Result{}, err
}
// SetupWithManager sets up the controller with the Manager.

View file

@ -25,8 +25,10 @@ var _ = Describe("JoiningNode controller", func() {
const (
nodeName1 = "node-name-1"
nodeName2 = "node-name-2"
nodeName3 = "node-name-3"
componentsHash1 = "test-hash-1"
componentsHash2 = "test-hash-2"
componentsHash3 = "test-hash-3"
timeout = time.Second * 20
duration = time.Second * 2
@ -135,6 +137,40 @@ var _ = Describe("JoiningNode controller", func() {
return createdNode.Annotations[NodeKubernetesComponentsHashAnnotationKey]
}, timeout, interval).Should(Equal(componentsHash2))
By("deleting the joining node resource")
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{Name: joiningNode.Name}, createdJoiningNode)
}, timeout, interval).ShouldNot(Succeed())
})
It("Should clean up the joining node resource after the deadline is reached", func() {
ctx := context.Background()
By("creating a joining node resource")
joiningNode := &updatev1alpha1.JoiningNode{
TypeMeta: metav1.TypeMeta{
APIVersion: "update.edgeless.systems/v1alpha1",
Kind: "JoiningNode",
},
ObjectMeta: metav1.ObjectMeta{
Name: nodeName3,
},
Spec: updatev1alpha1.JoiningNodeSpec{
Name: nodeName3,
ComponentsHash: componentsHash3,
// create without deadline first
},
}
Expect(k8sClient.Create(ctx, joiningNode)).Should(Succeed())
createdJoiningNode := &updatev1alpha1.JoiningNode{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{Name: joiningNode.Name}, createdJoiningNode)
}, timeout, interval).Should(Succeed())
Expect(createdJoiningNode.Spec.Name).Should(Equal(nodeName3))
Expect(createdJoiningNode.Spec.ComponentsHash).Should(Equal(componentsHash3))
By("setting the deadline to the past")
createdJoiningNode.Spec.Deadline = &metav1.Time{Time: fakes.clock.Now().Add(-time.Second)}
Expect(k8sClient.Update(ctx, createdJoiningNode)).Should(Succeed())
By("deleting the joining node resource")
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{Name: joiningNode.Name}, createdJoiningNode)

View file

@ -96,6 +96,7 @@ var _ = BeforeSuite(func() {
err = (&JoiningNodesReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Clock: fakes.clock,
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

View file

@ -75,6 +75,7 @@ func main() {
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
// Create CSP client
var cspClient cspAPI
var clientErr error
csp := strings.ToLower(os.Getenv(constellationCSP))
@ -98,8 +99,7 @@ func main() {
os.Exit(1)
}
default:
setupLog.Info("Unknown CSP", "csp", csp)
os.Exit(1)
setupLog.Info("CSP does not support upgrades", "csp", csp)
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
@ -126,44 +126,49 @@ func main() {
os.Exit(1)
}
defer etcdClient.Close()
imageInfo := deploy.NewImageInfo()
if err := deploy.InitialResources(context.Background(), k8sClient, imageInfo, cspClient, os.Getenv(constellationUID)); err != nil {
setupLog.Error(err, "Unable to deploy initial resources")
os.Exit(1)
// Create Controllers
if csp == "azure" || csp == "gcp" {
imageInfo := deploy.NewImageInfo()
if err := deploy.InitialResources(context.Background(), k8sClient, imageInfo, cspClient, os.Getenv(constellationUID)); err != nil {
setupLog.Error(err, "Unable to deploy initial resources")
os.Exit(1)
}
if err = controllers.NewNodeImageReconciler(
cspClient, etcdClient, mgr.GetClient(), mgr.GetScheme(),
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "NodeImage")
os.Exit(1)
}
if err = (&controllers.AutoscalingStrategyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "AutoscalingStrategy")
os.Exit(1)
}
if err = controllers.NewScalingGroupReconciler(
cspClient, mgr.GetClient(), mgr.GetScheme(),
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "ScalingGroup")
os.Exit(1)
}
if err = controllers.NewPendingNodeReconciler(
cspClient, mgr.GetClient(), mgr.GetScheme(),
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "PendingNode")
os.Exit(1)
}
}
if err = controllers.NewNodeImageReconciler(
cspClient, etcdClient, mgr.GetClient(), mgr.GetScheme(),
if err = controllers.NewJoiningNodesReconciler(
mgr.GetClient(),
mgr.GetScheme(),
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "NodeImage")
os.Exit(1)
}
if err = (&controllers.AutoscalingStrategyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "AutoscalingStrategy")
os.Exit(1)
}
if err = (&controllers.JoiningNodesReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "JoiningNode")
os.Exit(1)
}
if err = controllers.NewScalingGroupReconciler(
cspClient, mgr.GetClient(), mgr.GetScheme(),
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "ScalingGroup")
os.Exit(1)
}
if err = controllers.NewPendingNodeReconciler(
cspClient, mgr.GetClient(), mgr.GetScheme(),
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "PendingNode")
os.Exit(1)
}
//+kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {