operators: infrastructure autodiscovery (#1958)

* helm: configure GCP cloud controller manager to search in all zones of a region

See also: d716fdd452/providers/gce/gce.go (L376-L380)

* operators: add nodeGroupName to ScalingGroup CRD

NodeGroupName is the human friendly name of the node group that will be exposed to customers via the Constellation config in the future.

* operators: support simple executor / scheduler to reconcile on non-k8s resources

* operators: add new return type for ListScalingGroups to support arbitrary node groups

* operators: ListScalingGroups should return additionally created node groups on AWS

* operators: ListScalingGroups should return additionally created node groups on Azure

* operators: ListScalingGroups should return additionally created node groups on GCP

* operators: ListScalingGroups should return additionally created node groups on unsupported CSPs

* operators: implement external scaling group reconciler

This controller scans the cloud provider infrastructure and changes k8s resources accordingly.
It creates ScaleSet resources when new node groups are created and deletes them if the node groups are removed.

* operators: no longer create scale sets when the operator starts

In the future, scale sets are created dynamically.

* operators: watch for node join/leave events using a controller

* operators: deploy new controllers

* docs: update auto scaling documentation with support for node groups
This commit is contained in:
Malte Poll 2023-07-05 07:27:34 +02:00 committed by Adrian Stobbe
parent 10a540c290
commit 388ff011a3
36 changed files with 1836 additions and 232 deletions

View file

@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "api",
srcs = ["scalinggroup.go"],
importpath = "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api",
visibility = ["//operators/constellation-node-operator:__subpackages__"],
deps = ["//operators/constellation-node-operator/api/v1alpha1"],
)

View file

@ -0,0 +1,24 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package api
import updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
// ScalingGroup is a cloud provider scaling group.
type ScalingGroup struct {
// Name is the csp specific name of the scaling group.
Name string
// NodeGroupName is the human friendly name of the node group
// as defined in the Constellation configuration.
NodeGroupName string
// GroupID is the CSP specific, canonical identifier of a scaling group.
GroupID string
// AutoscalingGroupName is name that is expected by the autoscaler.
AutoscalingGroupName string
// Role is the role of the nodes in the scaling group.
Role updatev1alpha1.NodeRole
}

View file

@ -15,6 +15,7 @@ go_library(
visibility = ["//operators/constellation-node-operator:__subpackages__"],
deps = [
"//operators/constellation-node-operator/api/v1alpha1",
"//operators/constellation-node-operator/internal/cloud/api",
"@com_github_aws_aws_sdk_go_v2_config//:config",
"@com_github_aws_aws_sdk_go_v2_feature_ec2_imds//:imds",
"@com_github_aws_aws_sdk_go_v2_service_autoscaling//:autoscaling",
@ -36,6 +37,7 @@ go_test(
embed = [":client"],
deps = [
"//operators/constellation-node-operator/api/v1alpha1",
"//operators/constellation-node-operator/internal/cloud/api",
"@com_github_aws_aws_sdk_go_v2_service_autoscaling//:autoscaling",
"@com_github_aws_aws_sdk_go_v2_service_autoscaling//types",
"@com_github_aws_aws_sdk_go_v2_service_ec2//:ec2",

View file

@ -15,6 +15,8 @@ import (
scalingtypes "github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
)
// GetScalingGroupImage returns the image URI of the scaling group.
@ -140,7 +142,8 @@ func (c *Client) GetAutoscalingGroupName(scalingGroupID string) (string, error)
}
// ListScalingGroups retrieves a list of scaling groups for the cluster.
func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error) {
func (c *Client) ListScalingGroups(ctx context.Context, uid string) ([]cspapi.ScalingGroup, error) {
results := []cspapi.ScalingGroup{}
output, err := c.scalingClient.DescribeAutoScalingGroups(
ctx,
&autoscaling.DescribeAutoScalingGroupsInput{
@ -153,22 +156,62 @@ func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlan
},
)
if err != nil {
return nil, nil, fmt.Errorf("failed to describe scaling groups: %w", err)
return nil, fmt.Errorf("failed to describe scaling groups: %w", err)
}
for _, group := range output.AutoScalingGroups {
if group.Tags == nil {
continue
}
var role updatev1alpha1.NodeRole
var nodeGroupName string
for _, tag := range group.Tags {
if *tag.Key == "constellation-role" {
if *tag.Value == "control-plane" {
controlPlaneGroupIDs = append(controlPlaneGroupIDs, *group.AutoScalingGroupName)
} else if *tag.Value == "worker" {
workerGroupIDs = append(workerGroupIDs, *group.AutoScalingGroupName)
}
if tag.Key == nil || tag.Value == nil {
continue
}
key := *tag.Key
switch key {
case "constellation-role":
role = updatev1alpha1.NodeRoleFromString(*tag.Value)
case "constellation-node-group":
nodeGroupName = *tag.Value
}
}
// fallback for legacy clusters
// TODO(malt3): remove this fallback once we can assume all clusters have the correct labels
if nodeGroupName == "" {
switch role {
case updatev1alpha1.ControlPlaneRole:
nodeGroupName = "control_plane_default"
case updatev1alpha1.WorkerRole:
nodeGroupName = "worker_default"
}
}
name, err := c.GetScalingGroupName(*group.AutoScalingGroupName)
if err != nil {
return nil, fmt.Errorf("getting scaling group name: %w", err)
}
nodeGroupName, err = c.GetScalingGroupName(nodeGroupName)
if err != nil {
return nil, fmt.Errorf("getting node group name: %w", err)
}
autoscalerGroupName, err := c.GetAutoscalingGroupName(*group.AutoScalingGroupName)
if err != nil {
return nil, fmt.Errorf("getting autoscaler group name: %w", err)
}
results = append(results, cspapi.ScalingGroup{
Name: name,
NodeGroupName: nodeGroupName,
GroupID: *group.AutoScalingGroupName,
AutoscalingGroupName: autoscalerGroupName,
Role: role,
})
}
return controlPlaneGroupIDs, workerGroupIDs, nil
return results, nil
}

View file

@ -14,6 +14,7 @@ import (
scalingtypes "github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -229,8 +230,7 @@ func TestListScalingGroups(t *testing.T) {
providerID string
describeAutoScalingGroupsOut []*autoscaling.DescribeAutoScalingGroupsOutput
describeAutoScalingGroupsErr []error
wantControlPlaneGroupIDs []string
wantWorkerGroupIDs []string
wantGroups []cspapi.ScalingGroup
wantErr bool
}{
"listing scaling groups work": {
@ -263,6 +263,10 @@ func TestListScalingGroups(t *testing.T) {
Key: toPtr("constellation-role"),
Value: toPtr("worker"),
},
{
Key: toPtr("constellation-node-group"),
Value: toPtr("foo-group"),
},
},
},
{
@ -272,8 +276,29 @@ func TestListScalingGroups(t *testing.T) {
},
},
describeAutoScalingGroupsErr: []error{nil},
wantControlPlaneGroupIDs: []string{"control-plane-asg"},
wantWorkerGroupIDs: []string{"worker-asg", "worker-asg-2"},
wantGroups: []cspapi.ScalingGroup{
{
Name: "control-plane-asg",
NodeGroupName: "control_plane_default",
GroupID: "control-plane-asg",
AutoscalingGroupName: "control-plane-asg",
Role: "ControlPlane",
},
{
Name: "worker-asg",
NodeGroupName: "worker_default",
GroupID: "worker-asg",
AutoscalingGroupName: "worker-asg",
Role: "Worker",
},
{
Name: "worker-asg-2",
NodeGroupName: "foo-group",
GroupID: "worker-asg-2",
AutoscalingGroupName: "worker-asg-2",
Role: "Worker",
},
},
},
"fails when describing scaling groups fails": {
providerID: "aws:///us-east-2a/i-00000000000000000",
@ -293,14 +318,13 @@ func TestListScalingGroups(t *testing.T) {
describeAutoScalingGroupsErr: tc.describeAutoScalingGroupsErr,
},
}
controlPlaneGroupIDs, workerGroupIDs, err := client.ListScalingGroups(context.Background(), tc.providerID)
gotGroups, err := client.ListScalingGroups(context.Background(), tc.providerID)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
assert.Equal(tc.wantControlPlaneGroupIDs, controlPlaneGroupIDs)
assert.Equal(tc.wantWorkerGroupIDs, workerGroupIDs)
assert.Equal(tc.wantGroups, gotGroups)
})
}
}

View file

@ -19,6 +19,7 @@ go_library(
visibility = ["//operators/constellation-node-operator:__subpackages__"],
deps = [
"//operators/constellation-node-operator/api/v1alpha1",
"//operators/constellation-node-operator/internal/cloud/api",
"//operators/constellation-node-operator/internal/poller",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//runtime",
@ -44,6 +45,7 @@ go_test(
embed = [":client"],
deps = [
"//operators/constellation-node-operator/api/v1alpha1",
"//operators/constellation-node-operator/internal/cloud/api",
"//operators/constellation-node-operator/internal/poller",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//runtime",

View file

@ -13,6 +13,8 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4"
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
)
// GetScalingGroupImage returns the image URI of the scaling group.
@ -79,34 +81,63 @@ func (c *Client) GetAutoscalingGroupName(scalingGroupID string) (string, error)
}
// ListScalingGroups retrieves a list of scaling groups for the cluster.
func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error) {
func (c *Client) ListScalingGroups(ctx context.Context, uid string) ([]cspapi.ScalingGroup, error) {
results := []cspapi.ScalingGroup{}
pager := c.scaleSetsAPI.NewListPager(c.config.ResourceGroup, nil)
for pager.More() {
page, err := pager.NextPage(ctx)
if err != nil {
return nil, nil, fmt.Errorf("paging scale sets: %w", err)
return nil, fmt.Errorf("paging scale sets: %w", err)
}
for _, scaleSet := range page.Value {
if scaleSet == nil || scaleSet.ID == nil {
continue
}
if scaleSet.Tags == nil || scaleSet.Tags["constellation-uid"] == nil || *scaleSet.Tags["constellation-uid"] != uid {
if scaleSet.Tags == nil ||
scaleSet.Tags["constellation-uid"] == nil ||
*scaleSet.Tags["constellation-uid"] != uid ||
scaleSet.Tags["constellation-role"] == nil {
continue
}
role := updatev1alpha1.NodeRoleFromString(*scaleSet.Tags["constellation-role"])
name, err := c.GetScalingGroupName(*scaleSet.ID)
if err != nil {
return nil, nil, fmt.Errorf("getting scaling group name: %w", err)
return nil, fmt.Errorf("getting scaling group name: %w", err)
}
switch *scaleSet.Tags["constellation-role"] {
case "control-plane", "controlplane":
controlPlaneGroupIDs = append(controlPlaneGroupIDs, *scaleSet.ID)
case "worker":
workerGroupIDs = append(workerGroupIDs, *scaleSet.ID)
var nodeGroupName string
if scaleSet.Tags["constellation-node-group"] != nil {
nodeGroupName = *scaleSet.Tags["constellation-node-group"]
}
// fallback for legacy clusters
// TODO(malt3): remove this fallback once we can assume all clusters have the correct labels
if nodeGroupName == "" {
switch role {
case updatev1alpha1.ControlPlaneRole:
nodeGroupName = "control_plane_default"
case updatev1alpha1.WorkerRole:
nodeGroupName = "worker_default"
}
}
autoscalerGroupName, err := c.GetAutoscalingGroupName(*scaleSet.ID)
if err != nil {
return nil, fmt.Errorf("getting autoscaling group name: %w", err)
}
results = append(results, cspapi.ScalingGroup{
Name: name,
NodeGroupName: nodeGroupName,
GroupID: *scaleSet.ID,
AutoscalingGroupName: autoscalerGroupName,
Role: role,
})
}
}
return controlPlaneGroupIDs, workerGroupIDs, nil
return results, nil
}
func imageReferenceFromImage(img string) *armcompute.ImageReference {

View file

@ -13,6 +13,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4"
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -184,41 +185,67 @@ func TestGetScalingGroupName(t *testing.T) {
func TestListScalingGroups(t *testing.T) {
testCases := map[string]struct {
scaleSet armcompute.VirtualMachineScaleSet
fetchPageErr error
wantControlPlanes []string
wantWorkers []string
wantErr bool
scaleSet armcompute.VirtualMachineScaleSet
fetchPageErr error
wantGroups []cspapi.ScalingGroup
wantErr bool
}{
"listing control-plane works": {
scaleSet: armcompute.VirtualMachineScaleSet{
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-control-planes-uid"),
Name: to.Ptr("constellation-scale-set-control-planes-uid"),
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-control-planes-uid"),
Tags: map[string]*string{
"constellation-uid": to.Ptr("uid"),
"constellation-role": to.Ptr("control-plane"),
},
},
wantControlPlanes: []string{"/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-control-planes-uid"},
wantGroups: []cspapi.ScalingGroup{
{
Name: "constellation-scale-set-control-planes-uid",
NodeGroupName: "control_plane_default",
GroupID: "/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-control-planes-uid",
AutoscalingGroupName: "constellation-scale-set-control-planes-uid",
Role: "ControlPlane",
},
},
},
"listing worker works": {
scaleSet: armcompute.VirtualMachineScaleSet{
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-workers-uid"),
Name: to.Ptr("constellation-scale-set-workers-uid"),
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-workers-uid"),
Tags: map[string]*string{
"constellation-uid": to.Ptr("uid"),
"constellation-role": to.Ptr("worker"),
},
},
wantWorkers: []string{"/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-workers-uid"},
wantGroups: []cspapi.ScalingGroup{
{
Name: "constellation-scale-set-workers-uid",
NodeGroupName: "worker_default",
GroupID: "/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-workers-uid",
AutoscalingGroupName: "constellation-scale-set-workers-uid",
Role: "Worker",
},
},
},
"listing is not dependent on resource name": {
scaleSet: armcompute.VirtualMachineScaleSet{
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/some-scale-set"),
Name: to.Ptr("foo-bar"),
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/some-scale-set"),
Tags: map[string]*string{
"constellation-uid": to.Ptr("uid"),
"constellation-role": to.Ptr("control-plane"),
},
},
wantControlPlanes: []string{"/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/some-scale-set"},
wantGroups: []cspapi.ScalingGroup{
{
Name: "some-scale-set",
NodeGroupName: "control_plane_default",
GroupID: "/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/some-scale-set",
AutoscalingGroupName: "some-scale-set",
Role: "ControlPlane",
},
},
},
"listing other works": {
scaleSet: armcompute.VirtualMachineScaleSet{
@ -245,14 +272,13 @@ func TestListScalingGroups(t *testing.T) {
},
},
}
gotControlPlanes, gotWorkers, err := client.ListScalingGroups(context.Background(), "uid")
gotGroups, err := client.ListScalingGroups(context.Background(), "uid")
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
assert.ElementsMatch(tc.wantControlPlanes, gotControlPlanes)
assert.ElementsMatch(tc.wantWorkers, gotWorkers)
assert.ElementsMatch(tc.wantGroups, gotGroups)
})
}
}

View file

@ -7,6 +7,7 @@ go_library(
visibility = ["//operators/constellation-node-operator:__subpackages__"],
deps = [
"//operators/constellation-node-operator/api/v1alpha1",
"//operators/constellation-node-operator/internal/cloud/api",
"//operators/constellation-node-operator/internal/constants",
],
)

View file

@ -11,6 +11,7 @@ import (
"fmt"
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/constants"
)
@ -82,8 +83,23 @@ func (c *Client) GetAutoscalingGroupName(scalingGroupID string) (string, error)
}
// ListScalingGroups retrieves a list of scaling groups for the cluster.
func (c *Client) ListScalingGroups(_ context.Context, _ string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error) {
return []string{controlPlanesID}, []string{workersID}, nil
func (c *Client) ListScalingGroups(_ context.Context, _ string) ([]cspapi.ScalingGroup, error) {
return []cspapi.ScalingGroup{
{
Name: controlPlanesID,
NodeGroupName: controlPlanesID,
GroupID: controlPlanesID,
AutoscalingGroupName: controlPlanesID,
Role: updatev1alpha1.ControlPlaneRole,
},
{
Name: workersID,
NodeGroupName: workersID,
GroupID: workersID,
AutoscalingGroupName: workersID,
Role: updatev1alpha1.WorkerRole,
},
}, nil
}
// AutoscalingCloudProvider returns the cloud-provider name as used by k8s cluster-autoscaler.

View file

@ -23,6 +23,7 @@ go_library(
visibility = ["//operators/constellation-node-operator:__subpackages__"],
deps = [
"//operators/constellation-node-operator/api/v1alpha1",
"//operators/constellation-node-operator/internal/cloud/api",
"@com_github_googleapis_gax_go_v2//:gax-go",
"@com_github_spf13_afero//:afero",
"@com_google_cloud_go_compute//apiv1",
@ -51,6 +52,7 @@ go_test(
embed = [":client"],
deps = [
"//operators/constellation-node-operator/api/v1alpha1",
"//operators/constellation-node-operator/internal/cloud/api",
"@com_github_googleapis_gax_go_v2//:gax-go",
"@com_github_spf13_afero//:afero",
"@com_github_stretchr_testify//assert",

View file

@ -13,6 +13,8 @@ import (
"strings"
"cloud.google.com/go/compute/apiv1/computepb"
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
"google.golang.org/api/iterator"
)
@ -106,7 +108,8 @@ func (c *Client) GetAutoscalingGroupName(scalingGroupID string) (string, error)
}
// ListScalingGroups retrieves a list of scaling groups for the cluster.
func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error) {
func (c *Client) ListScalingGroups(ctx context.Context, uid string) ([]cspapi.ScalingGroup, error) {
results := []cspapi.ScalingGroup{}
iter := c.instanceGroupManagersAPI.AggregatedList(ctx, &computepb.AggregatedListInstanceGroupManagersRequest{
Project: c.projectID,
})
@ -115,7 +118,7 @@ func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlan
break
}
if err != nil {
return nil, nil, fmt.Errorf("listing instance group managers: %w", err)
return nil, fmt.Errorf("listing instance group managers: %w", err)
}
if instanceGroupManagerScopedListPair.Value == nil {
continue
@ -134,7 +137,7 @@ func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlan
InstanceTemplate: templateURI[len(templateURI)-1],
})
if err != nil {
return nil, nil, fmt.Errorf("getting instance template: %w", err)
return nil, fmt.Errorf("getting instance template: %w", err)
}
if template.Properties == nil || template.Properties.Labels == nil {
continue
@ -145,18 +148,43 @@ func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlan
groupID, err := c.canonicalInstanceGroupID(ctx, *grpManager.SelfLink)
if err != nil {
return nil, nil, fmt.Errorf("normalizing instance group ID: %w", err)
return nil, fmt.Errorf("normalizing instance group ID: %w", err)
}
switch strings.ToLower(template.Properties.Labels["constellation-role"]) {
case "control-plane", "controlplane":
controlPlaneGroupIDs = append(controlPlaneGroupIDs, groupID)
case "worker":
workerGroupIDs = append(workerGroupIDs, groupID)
role := updatev1alpha1.NodeRoleFromString(template.Properties.Labels["constellation-role"])
name, err := c.GetScalingGroupName(groupID)
if err != nil {
return nil, fmt.Errorf("getting scaling group name: %w", err)
}
nodeGroupName := template.Properties.Labels["constellation-node-group"]
// fallback for legacy clusters
// TODO(malt3): remove this fallback once we can assume all clusters have the correct labels
if nodeGroupName == "" {
switch role {
case updatev1alpha1.ControlPlaneRole:
nodeGroupName = "control_plane_default"
case updatev1alpha1.WorkerRole:
nodeGroupName = "worker_default"
}
}
autoscalerGroupName, err := c.GetAutoscalingGroupName(groupID)
if err != nil {
return nil, fmt.Errorf("getting autoscaling group name: %w", err)
}
results = append(results, cspapi.ScalingGroup{
Name: name,
NodeGroupName: nodeGroupName,
GroupID: groupID,
AutoscalingGroupName: autoscalerGroupName,
Role: role,
})
}
}
return controlPlaneGroupIDs, workerGroupIDs, nil
return results, nil
}
func (c *Client) getScalingGroupTemplate(ctx context.Context, scalingGroupID string) (*computepb.InstanceTemplate, error) {

View file

@ -12,6 +12,7 @@ import (
"testing"
"cloud.google.com/go/compute/apiv1/computepb"
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
@ -330,8 +331,7 @@ func TestListScalingGroups(t *testing.T) {
templateLabels map[string]string
listInstanceGroupManagersErr error
templateGetErr error
wantControlPlanes []string
wantWorkers []string
wantGroups []cspapi.ScalingGroup
wantErr bool
}{
"list instance group managers fails": {
@ -353,8 +353,14 @@ func TestListScalingGroups(t *testing.T) {
"constellation-uid": "uid",
"constellation-role": "control-plane",
},
wantControlPlanes: []string{
"projects/project/zones/zone/instanceGroupManagers/test-control-plane-uid",
wantGroups: []cspapi.ScalingGroup{
{
Name: "test-control-plane-uid",
NodeGroupName: "control_plane_default",
GroupID: "projects/project/zones/zone/instanceGroupManagers/test-control-plane-uid",
AutoscalingGroupName: "https://www.googleapis.com/compute/v1/projects/project/zones/zone/instanceGroups/test-control-plane-uid",
Role: "ControlPlane",
},
},
},
"list instance group managers for worker": {
@ -365,8 +371,33 @@ func TestListScalingGroups(t *testing.T) {
"constellation-uid": "uid",
"constellation-role": "worker",
},
wantWorkers: []string{
"projects/project/zones/zone/instanceGroupManagers/test-worker-uid",
wantGroups: []cspapi.ScalingGroup{
{
Name: "test-worker-uid",
NodeGroupName: "worker_default",
GroupID: "projects/project/zones/zone/instanceGroupManagers/test-worker-uid",
AutoscalingGroupName: "https://www.googleapis.com/compute/v1/projects/project/zones/zone/instanceGroups/test-worker-uid",
Role: "Worker",
},
},
},
"list instance group managers with custom group name": {
name: proto.String("test-worker-uid"),
groupID: proto.String("projects/project/zones/zone/instanceGroupManagers/test-worker-uid"),
templateRef: proto.String("projects/project/global/instanceTemplates/test-control-plane-uid"),
templateLabels: map[string]string{
"constellation-uid": "uid",
"constellation-role": "worker",
"constellation-node-group": "custom-group-name",
},
wantGroups: []cspapi.ScalingGroup{
{
Name: "test-worker-uid",
NodeGroupName: "custom-group-name",
GroupID: "projects/project/zones/zone/instanceGroupManagers/test-worker-uid",
AutoscalingGroupName: "https://www.googleapis.com/compute/v1/projects/project/zones/zone/instanceGroups/test-worker-uid",
Role: "Worker",
},
},
},
"listing instance group managers is not dependant on resource name": {
@ -377,8 +408,14 @@ func TestListScalingGroups(t *testing.T) {
"constellation-uid": "uid",
"constellation-role": "control-plane",
},
wantControlPlanes: []string{
"projects/project/zones/zone/instanceGroupManagers/some-instance-group-manager",
wantGroups: []cspapi.ScalingGroup{
{
Name: "some-instance-group-manager",
NodeGroupName: "control_plane_default",
GroupID: "projects/project/zones/zone/instanceGroupManagers/some-instance-group-manager",
AutoscalingGroupName: "https://www.googleapis.com/compute/v1/projects/project/zones/zone/instanceGroups/some-instance-group-manager",
Role: "ControlPlane",
},
},
},
"unrelated instance group manager": {
@ -415,14 +452,13 @@ func TestListScalingGroups(t *testing.T) {
getErr: tc.templateGetErr,
},
}
gotControlPlanes, gotWorkers, err := client.ListScalingGroups(context.Background(), "uid")
gotGroups, err := client.ListScalingGroups(context.Background(), "uid")
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
assert.ElementsMatch(tc.wantControlPlanes, gotControlPlanes)
assert.ElementsMatch(tc.wantWorkers, gotWorkers)
assert.ElementsMatch(tc.wantGroups, gotGroups)
})
}
}

View file

@ -12,6 +12,7 @@ go_library(
deps = [
"//internal/constants",
"//operators/constellation-node-operator/api/v1alpha1",
"//operators/constellation-node-operator/internal/cloud/api",
"//operators/constellation-node-operator/internal/constants",
"@com_github_spf13_afero//:afero",
"@io_k8s_api//core/v1:core",
@ -33,6 +34,7 @@ go_test(
deps = [
"//internal/constants",
"//operators/constellation-node-operator/api/v1alpha1",
"//operators/constellation-node-operator/internal/cloud/api",
"//operators/constellation-node-operator/internal/constants",
"@com_github_spf13_afero//:afero",
"@com_github_stretchr_testify//assert",

View file

@ -15,6 +15,7 @@ import (
mainconstants "github.com/edgelesssys/constellation/v2/internal/constants"
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/constants"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
@ -33,21 +34,18 @@ func InitialResources(ctx context.Context, k8sClient client.Client, imageInfo im
}
logr.Info("cleaned up placeholders")
controlPlaneGroupIDs, workerGroupIDs, err := scalingGroupGetter.ListScalingGroups(ctx, uid)
scalingGroups, err := scalingGroupGetter.ListScalingGroups(ctx, uid)
if err != nil {
return fmt.Errorf("listing scaling groups: %w", err)
}
if len(controlPlaneGroupIDs) == 0 {
return errors.New("determining initial node image: no control plane scaling group found")
}
if len(workerGroupIDs) == 0 {
return errors.New("determining initial node image: no worker scaling group found")
if len(scalingGroups) == 0 {
return errors.New("determining initial node image: no scaling group found")
}
if err := createAutoscalingStrategy(ctx, k8sClient, scalingGroupGetter.AutoscalingCloudProvider()); err != nil {
return fmt.Errorf("creating initial autoscaling strategy: %w", err)
}
imageReference, err := scalingGroupGetter.GetScalingGroupImage(ctx, controlPlaneGroupIDs[0])
imageReference, err := scalingGroupGetter.GetScalingGroupImage(ctx, scalingGroups[0].GroupID)
if err != nil {
return fmt.Errorf("determining initial node image: %w", err)
}
@ -62,34 +60,6 @@ func InitialResources(ctx context.Context, k8sClient client.Client, imageInfo im
if err := createNodeVersion(ctx, k8sClient, imageReference, imageVersion); err != nil {
return fmt.Errorf("creating initial node version %q: %w", imageReference, err)
}
for _, groupID := range controlPlaneGroupIDs {
groupName, err := scalingGroupGetter.GetScalingGroupName(groupID)
if err != nil {
return fmt.Errorf("determining scaling group name of %q: %w", groupID, err)
}
autoscalingGroupName, err := scalingGroupGetter.GetAutoscalingGroupName(groupID)
if err != nil {
return fmt.Errorf("determining autoscaling group name of %q: %w", groupID, err)
}
newScalingGroupConfig := newScalingGroupConfig{k8sClient, groupID, groupName, autoscalingGroupName, updatev1alpha1.ControlPlaneRole}
if err := createScalingGroup(ctx, newScalingGroupConfig); err != nil {
return fmt.Errorf("creating initial control plane scaling group: %w", err)
}
}
for _, groupID := range workerGroupIDs {
groupName, err := scalingGroupGetter.GetScalingGroupName(groupID)
if err != nil {
return fmt.Errorf("determining scaling group name of %q: %w", groupID, err)
}
autoscalingGroupName, err := scalingGroupGetter.GetAutoscalingGroupName(groupID)
if err != nil {
return fmt.Errorf("determining autoscaling group name of %q: %w", groupID, err)
}
newScalingGroupConfig := newScalingGroupConfig{k8sClient, groupID, groupName, autoscalingGroupName, updatev1alpha1.WorkerRole}
if err := createScalingGroup(ctx, newScalingGroupConfig); err != nil {
return fmt.Errorf("creating initial worker scaling group: %w", err)
}
}
return nil
}
@ -252,28 +222,6 @@ func findLatestK8sComponentsConfigMap(ctx context.Context, k8sClient client.Clie
return latestConfigMap, nil
}
// createScalingGroup creates an initial scaling group resource if it does not exist yet.
func createScalingGroup(ctx context.Context, config newScalingGroupConfig) error {
err := config.k8sClient.Create(ctx, &updatev1alpha1.ScalingGroup{
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
ObjectMeta: metav1.ObjectMeta{
Name: strings.ToLower(config.groupName),
},
Spec: updatev1alpha1.ScalingGroupSpec{
NodeVersion: mainconstants.NodeVersionResourceName,
GroupID: config.groupID,
AutoscalerGroupName: config.autoscalingGroupName,
Min: 1,
Max: 10,
Role: config.role,
},
})
if k8sErrors.IsAlreadyExists(err) {
return nil
}
return err
}
type imageInfoGetter interface {
ImageVersion() (string, error)
}
@ -286,15 +234,7 @@ type scalingGroupGetter interface {
// GetScalingGroupName retrieves the name of a scaling group as needed by the cluster-autoscaler.
GetAutoscalingGroupName(scalingGroupID string) (string, error)
// ListScalingGroups retrieves a list of scaling groups for the cluster.
ListScalingGroups(ctx context.Context, uid string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error)
ListScalingGroups(ctx context.Context, uid string) ([]cspapi.ScalingGroup, error)
// AutoscalingCloudProvider returns the cloud-provider name as used by k8s cluster-autoscaler.
AutoscalingCloudProvider() string
}
type newScalingGroupConfig struct {
k8sClient client.Writer
groupID string
groupName string
autoscalingGroupName string
role updatev1alpha1.NodeRole
}

View file

@ -14,6 +14,7 @@ import (
mainconstants "github.com/edgelesssys/constellation/v2/internal/constants"
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/constants"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -41,18 +42,10 @@ func TestInitialResources(t *testing.T) {
{groupID: "control-plane", image: "image-1", name: "control-plane", isControlPlane: true},
{groupID: "worker", image: "image-1", name: "worker"},
},
wantResources: 4,
wantResources: 2,
},
"missing control planes": {
items: []scalingGroupStoreItem{
{groupID: "worker", image: "image-1", name: "worker"},
},
wantErr: true,
},
"missing workers": {
items: []scalingGroupStoreItem{
{groupID: "control-plane", image: "image-1", name: "control-plane", isControlPlane: true},
},
"missing groups": {
items: []scalingGroupStoreItem{},
wantErr: true,
},
"listing groups fails": {
@ -75,14 +68,6 @@ func TestInitialResources(t *testing.T) {
imageErr: errors.New("getting image failed"),
wantErr: true,
},
"getting name fails": {
items: []scalingGroupStoreItem{
{groupID: "control-plane", image: "image-1", name: "control-plane", isControlPlane: true},
{groupID: "worker", image: "image-1", name: "worker"},
},
nameErr: errors.New("getting name failed"),
wantErr: true,
},
}
for name, tc := range testCases {
@ -273,70 +258,6 @@ func TestCreateNodeVersion(t *testing.T) {
}
}
func TestCreateScalingGroup(t *testing.T) {
testCases := map[string]struct {
createErr error
wantScalingGroup *updatev1alpha1.ScalingGroup
wantErr bool
}{
"create works": {
wantScalingGroup: &updatev1alpha1.ScalingGroup{
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
ObjectMeta: metav1.ObjectMeta{
Name: "group-name",
},
Spec: updatev1alpha1.ScalingGroupSpec{
NodeVersion: mainconstants.NodeVersionResourceName,
GroupID: "group-id",
AutoscalerGroupName: "group-Name",
Min: 1,
Max: 10,
Role: updatev1alpha1.WorkerRole,
},
},
},
"create fails": {
createErr: errors.New("create failed"),
wantErr: true,
},
"image exists": {
createErr: k8sErrors.NewAlreadyExists(schema.GroupResource{}, constants.AutoscalingStrategyResourceName),
wantScalingGroup: &updatev1alpha1.ScalingGroup{
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
ObjectMeta: metav1.ObjectMeta{
Name: "group-name",
},
Spec: updatev1alpha1.ScalingGroupSpec{
NodeVersion: mainconstants.NodeVersionResourceName,
GroupID: "group-id",
AutoscalerGroupName: "group-Name",
Min: 1,
Max: 10,
Role: updatev1alpha1.WorkerRole,
},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
k8sClient := &fakeK8sClient{createErr: tc.createErr}
newScalingGroupConfig := newScalingGroupConfig{k8sClient, "group-id", "group-Name", "group-Name", updatev1alpha1.WorkerRole}
err := createScalingGroup(context.Background(), newScalingGroupConfig)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
assert.Len(k8sClient.createdObjects, 1)
assert.Equal(tc.wantScalingGroup, k8sClient.createdObjects[0])
})
}
}
type fakeK8sClient struct {
createdObjects []client.Object
createErr error
@ -437,15 +358,24 @@ func (g *stubScalingGroupGetter) GetAutoscalingGroupName(scalingGroupID string)
return g.store[scalingGroupID].name, g.nameErr
}
func (g *stubScalingGroupGetter) ListScalingGroups(_ context.Context, _ string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error) {
func (g *stubScalingGroupGetter) ListScalingGroups(_ context.Context, _ string) ([]cspapi.ScalingGroup, error) {
var scalingGroups []cspapi.ScalingGroup
for _, item := range g.store {
if item.isControlPlane {
controlPlaneGroupIDs = append(controlPlaneGroupIDs, item.groupID)
} else {
workerGroupIDs = append(workerGroupIDs, item.groupID)
}
scalingGroups = append(scalingGroups, cspapi.ScalingGroup{
Name: item.name,
NodeGroupName: item.nodeGroupName,
GroupID: item.groupID,
AutoscalingGroupName: item.autoscalingGroupName,
Role: func() updatev1alpha1.NodeRole {
if item.isControlPlane {
return updatev1alpha1.ControlPlaneRole
}
return updatev1alpha1.WorkerRole
}(),
})
}
return controlPlaneGroupIDs, workerGroupIDs, g.listErr
return scalingGroups, g.listErr
}
func (g *stubScalingGroupGetter) AutoscalingCloudProvider() string {
@ -453,8 +383,10 @@ func (g *stubScalingGroupGetter) AutoscalingCloudProvider() string {
}
type scalingGroupStoreItem struct {
groupID string
name string
image string
isControlPlane bool
name string
groupID string
autoscalingGroupName string
nodeGroupName string
image string
isControlPlane bool
}

View file

@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("//bazel/go:go_test.bzl", "go_test")
go_library(
name = "executor",
srcs = ["executor.go"],
importpath = "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/executor",
visibility = ["//operators/constellation-node-operator:__subpackages__"],
deps = [
"@io_k8s_client_go//util/workqueue",
"@io_k8s_sigs_controller_runtime//pkg/log",
],
)
go_test(
name = "executor_test",
srcs = ["executor_test.go"],
embed = [":executor"],
deps = [
"@com_github_stretchr_testify//assert",
"@org_uber_go_goleak//:goleak",
],
)

View file

@ -0,0 +1,218 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
// Package executor contains a task executor / scheduler for the constellation node operator.
// It is used to execute tasks (outside of the k8s specific operator controllers) with regular intervals and
// based of external triggers.
package executor
import (
"context"
"sync"
"sync/atomic"
"time"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const (
defaultPollingFrequency = 15 * time.Minute
// rateLimiterItem is the key used to rate limit the reconciliation loop.
// since we don't have a reconcile request, we use a constant key.
rateLimiterItem = "reconcile"
)
// Controller is a type with a reconcile method.
// It is modeled after the controller-runtime reconcile method,
// but reconciles on external resources instead of k8s resources.
type Controller interface {
Reconcile(ctx context.Context) (Result, error)
}
// Executor is a task executor / scheduler.
// It will call the reconcile method of the given controller with a regular interval
// or when triggered externally.
type Executor struct {
running atomic.Bool
// controller is the controller to be reconciled.
controller Controller
// pollingFrequency is the default frequency with which the controller is reconciled
// if no external trigger is received and no requeue is requested by the controller.
pollingFrequency time.Duration
// rateLimiter is used to rate limit the reconciliation loop.
rateLimiter RateLimiter
// externalTrigger is used to trigger a reconciliation immediately from the outside.
// multiple triggers in a short time will be coalesced into one externalTrigger.
externalTrigger chan struct{}
// stop is used to stop the reconciliation loop.
stop chan struct{}
}
// New creates a new Executor.
func New(controller Controller, cfg Config) *Executor {
cfg.applyDefaults()
return &Executor{
controller: controller,
pollingFrequency: cfg.PollingFrequency,
rateLimiter: cfg.RateLimiter,
}
}
// StopWaitFn is a function that can be called to stop the executor and wait for it to stop.
type StopWaitFn func()
// Start starts the executor in a separate go routine.
// Call Stop to stop the executor.
func (e *Executor) Start(ctx context.Context) StopWaitFn {
wg := &sync.WaitGroup{}
logr := log.FromContext(ctx)
stopWait := func() {
defer wg.Wait()
e.Stop()
}
// this will return early if the executor is already running
// if the executor is not running, set the running flag to true
// and continue
if !e.running.CompareAndSwap(false, true) {
return stopWait
}
e.externalTrigger = make(chan struct{}, 1)
e.stop = make(chan struct{}, 1)
// execute is used by the go routines below to communicate
// that a reconciliation should happen
execute := make(chan struct{}, 1)
// nextScheduledReconcile is used to communicate the next scheduled reconciliation time
nextScheduledReconcile := make(chan time.Duration, 1)
// trigger a reconciliation on startup
nextScheduledReconcile <- 0
wg.Add(2)
// timer routine is responsible for triggering the reconciliation after the timer expires
// or when triggered externally
go func() {
defer wg.Done()
defer close(execute)
defer logr.Info("Timer stopped")
for {
nextScheduledReconcileAfter := <-nextScheduledReconcile
timer := *time.NewTimer(nextScheduledReconcileAfter)
select {
case <-e.stop:
return
case <-ctx.Done():
return
case <-e.externalTrigger:
case <-timer.C:
}
execute <- struct{}{}
}
}()
// executor routine is responsible for executing the reconciliation
go func() {
defer func() {
e.running.Store(false)
}()
defer wg.Done()
defer close(nextScheduledReconcile)
defer logr.Info("Executor stopped")
for {
_, ok := <-execute
// execute channel closed. executor should stop
if !ok {
return
}
res, err := e.controller.Reconcile(ctx)
var requeueAfter time.Duration
switch {
case err != nil:
logr.Error(err, "reconciliation failed")
requeueAfter = e.rateLimiter.When(rateLimiterItem) // requeue with rate limiter
case res.Requeue && res.RequeueAfter != 0:
e.rateLimiter.Forget(rateLimiterItem) // reset the rate limiter
requeueAfter = res.RequeueAfter // requeue after the given duration
case res.Requeue:
requeueAfter = e.rateLimiter.When(rateLimiterItem) // requeue with rate limiter
default:
e.rateLimiter.Forget(rateLimiterItem) // reset the rate limiter
requeueAfter = e.pollingFrequency // default polling frequency
}
nextScheduledReconcile <- requeueAfter
}
}()
return stopWait
}
// Stop stops the executor.
// It does not block until the executor is stopped.
func (e *Executor) Stop() {
select {
case e.stop <- struct{}{}:
default:
}
close(e.stop)
}
// Running returns true if the executor is running.
// When the executor is stopped, it is not running anymore.
func (e *Executor) Running() bool {
return e.running.Load()
}
// Trigger triggers a reconciliation.
// If a reconciliation is already pending, this call is a no-op.
func (e *Executor) Trigger() {
select {
case e.externalTrigger <- struct{}{}:
default:
}
}
// Config is the configuration for the executor.
type Config struct {
PollingFrequency time.Duration
RateLimiter RateLimiter
}
// NewDefaultConfig creates a new default configuration.
func NewDefaultConfig() Config {
cfg := Config{}
cfg.applyDefaults()
return cfg
}
func (c *Config) applyDefaults() {
if c.PollingFrequency == 0 {
c.PollingFrequency = defaultPollingFrequency
}
if c.RateLimiter == nil {
c.RateLimiter = workqueue.DefaultControllerRateLimiter()
}
}
// Result is the result of a reconciliation.
type Result struct {
Requeue bool
RequeueAfter time.Duration
}
// RateLimiter is a stripped down version of the controller-runtime ratelimiter.RateLimiter interface.
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item any) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop tracking it
Forget(item any)
}

View file

@ -0,0 +1,306 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package executor
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestNew(t *testing.T) {
testCases := map[string]struct {
config Config
wantPollingFrequency time.Duration
}{
"applies default polling frequency": {
config: Config{},
wantPollingFrequency: defaultPollingFrequency,
},
"custom polling frequency": {
config: Config{
PollingFrequency: time.Hour,
},
wantPollingFrequency: time.Hour,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
exec := New(nil, tc.config)
assert.Equal(tc.wantPollingFrequency, exec.pollingFrequency)
})
}
}
func TestStartTriggersImmediateReconciliation(t *testing.T) {
assert := assert.New(t)
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{}, // no rate limiting
}
// on start, the executor should trigger a reconciliation
stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
ctrl.stop <- struct{}{}
stopAndWait()
// initial trigger
assert.Equal(1, ctrl.reconciliationCounter)
}
func TestStartMultipleTimesIsCoalesced(t *testing.T) {
assert := assert.New(t)
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{}, // no rate limiting
}
// start once
stopAndWait := exec.Start(context.Background())
// start again multiple times
for i := 0; i < 10; i++ {
_ = exec.Start(context.Background())
}
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
ctrl.stop <- struct{}{}
stopAndWait()
// initial trigger. extra start calls should be coalesced
assert.Equal(1, ctrl.reconciliationCounter)
}
func TestErrorTriggersImmediateReconciliation(t *testing.T) {
assert := assert.New(t)
// returning an error should trigger a reconciliation immediately
ctrl := newStubController(Result{}, errors.New("reconciler error"))
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{}, // no rate limiting
}
stopAndWait := exec.Start(context.Background())
for i := 0; i < 10; i++ {
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
}
ctrl.stop <- struct{}{}
stopAndWait()
// we cannot assert the exact number of reconciliations here, because the executor might
// select the stop case or the timer case first.
assertBetween(assert, 10, 11, ctrl.reconciliationCounter)
}
func TestErrorTriggersRateLimiting(t *testing.T) {
assert := assert.New(t)
// returning an error should trigger a reconciliation immediately
ctrl := newStubController(Result{}, errors.New("reconciler error"))
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
},
}
stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called once to trigger rate limiting
ctrl.stop <- struct{}{}
stopAndWait()
// initial trigger. error triggers are rate limited to 1 per year
assert.Equal(1, ctrl.reconciliationCounter)
}
func TestRequeueAfterResultRequeueInterval(t *testing.T) {
assert := assert.New(t)
// setting a requeue result should trigger a reconciliation after the specified delay
ctrl := newStubController(Result{
Requeue: true,
RequeueAfter: time.Microsecond,
}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
},
}
stopAndWait := exec.Start(context.Background())
for i := 0; i < 10; i++ {
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
}
ctrl.stop <- struct{}{}
stopAndWait()
// we cannot assert the exact number of reconciliations here, because the executor might
// select the stop case or the timer case first.
assertBetween(assert, 10, 11, ctrl.reconciliationCounter)
}
func TestExternalTrigger(t *testing.T) {
assert := assert.New(t)
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
},
}
stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // initial trigger
for i := 0; i < 10; i++ {
exec.Trigger()
<-ctrl.waitUntilReconciled // external trigger
}
ctrl.stop <- struct{}{}
stopAndWait()
// initial trigger + 10 external triggers
assert.Equal(11, ctrl.reconciliationCounter)
}
func TestSimultaneousExternalTriggers(t *testing.T) {
assert := assert.New(t)
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
},
}
stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // initial trigger
for i := 0; i < 100; i++ {
exec.Trigger() // extra trigger calls are coalesced
}
<-ctrl.waitUntilReconciled // external trigger
ctrl.stop <- struct{}{}
stopAndWait()
// we cannot assert the exact number of reconciliations here, because the executor might
// select the stop case or the next manual trigger case first.
assertBetween(assert, 2, 3, ctrl.reconciliationCounter)
}
func TestContextCancel(t *testing.T) {
assert := assert.New(t)
ctx, cancel := context.WithCancel(context.Background())
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{}, // no rate limiting
}
_ = exec.Start(ctx) // no need to explicitly stop the executor, it will stop when the context is canceled
<-ctrl.waitUntilReconciled // initial trigger
// canceling the context should stop the executor without blocking
cancel()
ctrl.stop <- struct{}{}
// poll for the executor stop running
// this is necessary since the executor doesn't expose
// a pure wait method
assert.Eventually(func() bool {
return !exec.Running()
}, 5*time.Second, 10*time.Millisecond)
// initial trigger
assert.Equal(1, ctrl.reconciliationCounter)
}
func TestRequeueAfterPollingFrequency(t *testing.T) {
assert := assert.New(t)
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Microsecond, // basically no delay
rateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
},
}
stopAndWait := exec.Start(context.Background())
for i := 0; i < 10; i++ {
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
}
ctrl.stop <- struct{}{}
stopAndWait()
// we cannot assert the exact number of reconciliations here, because the executor might
// select the stop case or the timer case first.
assertBetween(assert, 10, 11, ctrl.reconciliationCounter)
}
type stubController struct {
stopped bool
stop chan struct{}
waitUntilReconciled chan struct{}
reconciliationCounter int
result Result
err error
}
func newStubController(result Result, err error) *stubController {
return &stubController{
waitUntilReconciled: make(chan struct{}),
stop: make(chan struct{}, 1),
result: result,
err: err,
}
}
func (s *stubController) Reconcile(_ context.Context) (Result, error) {
if s.stopped {
return Result{}, errors.New("controller stopped")
}
s.reconciliationCounter++
select {
case <-s.stop:
s.stopped = true
case s.waitUntilReconciled <- struct{}{}:
}
return s.result, s.err
}
type stubRateLimiter struct {
whenRes time.Duration
}
func (s *stubRateLimiter) When(_ any) time.Duration {
return s.whenRes
}
func (s *stubRateLimiter) Forget(_ any) {}
func assertBetween(assert *assert.Assertions, min, max, actual int) {
assert.GreaterOrEqual(actual, min)
assert.LessOrEqual(actual, max)
}