mirror of
https://github.com/edgelesssys/constellation.git
synced 2025-03-20 13:56:43 -04:00
operators: infrastructure autodiscovery (#1958)
* helm: configure GCP cloud controller manager to search in all zones of a region
See also: d716fdd452/providers/gce/gce.go (L376-L380)
* operators: add nodeGroupName to ScalingGroup CRD
NodeGroupName is the human friendly name of the node group that will be exposed to customers via the Constellation config in the future.
* operators: support simple executor / scheduler to reconcile on non-k8s resources
* operators: add new return type for ListScalingGroups to support arbitrary node groups
* operators: ListScalingGroups should return additionally created node groups on AWS
* operators: ListScalingGroups should return additionally created node groups on Azure
* operators: ListScalingGroups should return additionally created node groups on GCP
* operators: ListScalingGroups should return additionally created node groups on unsupported CSPs
* operators: implement external scaling group reconciler
This controller scans the cloud provider infrastructure and changes k8s resources accordingly.
It creates ScaleSet resources when new node groups are created and deletes them if the node groups are removed.
* operators: no longer create scale sets when the operator starts
In the future, scale sets are created dynamically.
* operators: watch for node join/leave events using a controller
* operators: deploy new controllers
* docs: update auto scaling documentation with support for node groups
This commit is contained in:
parent
9de8660bd7
commit
4283601433
@ -5,5 +5,5 @@ metadata:
|
||||
name: gceconf
|
||||
namespace: {{ .Release.Namespace }}
|
||||
data:
|
||||
gce.conf: "[global]\nproject-id = {{.Values.GCP.projectID }}\nuse-metadata-server = true\nnode-tags = constellation-{{ .Values.GCP.uid }}\n"
|
||||
gce.conf: "[global]\nproject-id = {{.Values.GCP.projectID }}\nuse-metadata-server = true\nnode-tags = constellation-{{ .Values.GCP.uid }}\nregional = true\n"
|
||||
{{- end -}}
|
||||
|
@ -54,6 +54,10 @@ spec:
|
||||
(used by cluster-autoscaler).
|
||||
format: int32
|
||||
type: integer
|
||||
nodeGroupName:
|
||||
description: NodeGroupName is the human friendly name of the node group
|
||||
as defined in the Constellation configuration.
|
||||
type: string
|
||||
nodeImage:
|
||||
description: NodeImage is the name of the NodeImage resource.
|
||||
type: string
|
||||
|
@ -4,4 +4,4 @@ metadata:
|
||||
name: gceconf
|
||||
namespace: testNamespace
|
||||
data:
|
||||
gce.conf: "[global]\nproject-id = 42424242424242\nuse-metadata-server = true\nnode-tags = constellation-242424242424\n"
|
||||
gce.conf: "[global]\nproject-id = 42424242424242\nuse-metadata-server = true\nnode-tags = constellation-242424242424\nregional = true\n"
|
||||
|
@ -10,15 +10,18 @@ Constellation comes with autoscaling disabled by default. To enable autoscaling,
|
||||
worker nodes:
|
||||
|
||||
```bash
|
||||
worker_group=$(kubectl get scalinggroups -o json | jq -r '.items[].metadata.name | select(contains("worker"))')
|
||||
echo "The name of your worker scaling group is '$worker_group'"
|
||||
kubectl get scalinggroups -o json | yq '.items | .[] | select(.spec.role == "Worker") | [{"name": .metadata.name, "nodeGoupName": .spec.nodeGroupName}]'
|
||||
```
|
||||
|
||||
Then, patch the `autoscaling` field of the scaling group resource to `true`:
|
||||
This will output a list of scaling groups with the corresponding cloud provider name (`name`) and the cloud provider agnostic name of the node group (`nodeGroupName`).
|
||||
|
||||
Then, patch the `autoscaling` field of the scaling group resource with the desired `name` to `true`:
|
||||
|
||||
```bash
|
||||
# Replace <name> with the name of the scaling group you want to enable autoscaling for
|
||||
worker_group=<name>
|
||||
kubectl patch scalinggroups $worker_group --patch '{"spec":{"autoscaling": true}}' --type='merge'
|
||||
kubectl get scalinggroup $worker_group -o jsonpath='{.spec}' | jq
|
||||
kubectl get scalinggroup $worker_group -o jsonpath='{.spec}' | yq -P
|
||||
```
|
||||
|
||||
The cluster autoscaler now automatically provisions additional worker nodes so that all pods have a place to run.
|
||||
@ -27,7 +30,7 @@ You can configure the minimum and maximum number of worker nodes in the scaling
|
||||
|
||||
```bash
|
||||
kubectl patch scalinggroups $worker_group --patch '{"spec":{"max": 5}}' --type='merge'
|
||||
kubectl get scalinggroup $worker_group -o jsonpath='{.spec}' | jq
|
||||
kubectl get scalinggroup $worker_group -o jsonpath='{.spec}' | yq -P
|
||||
```
|
||||
|
||||
The cluster autoscaler will now never provision more than 5 worker nodes.
|
||||
|
@ -13,13 +13,16 @@ go_library(
|
||||
"//3rdparty/node-maintenance-operator/api/v1beta1",
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/controllers",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"//operators/constellation-node-operator/internal/cloud/aws/client",
|
||||
"//operators/constellation-node-operator/internal/cloud/azure/client",
|
||||
"//operators/constellation-node-operator/internal/cloud/fake/client",
|
||||
"//operators/constellation-node-operator/internal/cloud/gcp/client",
|
||||
"//operators/constellation-node-operator/internal/deploy",
|
||||
"//operators/constellation-node-operator/internal/etcd",
|
||||
"//operators/constellation-node-operator/internal/executor",
|
||||
"//operators/constellation-node-operator/internal/upgrade",
|
||||
"//operators/constellation-node-operator/sgreconciler",
|
||||
"@io_k8s_apimachinery//pkg/runtime",
|
||||
"@io_k8s_apimachinery//pkg/util/runtime",
|
||||
"@io_k8s_client_go//discovery",
|
||||
|
@ -7,6 +7,8 @@ SPDX-License-Identifier: AGPL-3.0-only
|
||||
package v1alpha1
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
@ -14,6 +16,8 @@ const (
|
||||
// ConditionOutdated is used to signal outdated scaling groups.
|
||||
ConditionOutdated = "Outdated"
|
||||
|
||||
// UnknownRole is used to signal unknown scaling group roles.
|
||||
UnknownRole NodeRole = ""
|
||||
// WorkerRole is used to signal worker scaling groups.
|
||||
WorkerRole NodeRole = "Worker"
|
||||
// ControlPlaneRole is used to signal control plane scaling groups.
|
||||
@ -28,6 +32,8 @@ type ScalingGroupSpec struct {
|
||||
GroupID string `json:"groupId,omitempty"`
|
||||
// AutoscalerGroupName is name that is expected by the autoscaler.
|
||||
AutoscalerGroupName string `json:"autoscalerGroupName,omitempty"`
|
||||
// NodeGroupName is the human friendly name of the node group as defined in the Constellation configuration.
|
||||
NodeGroupName string `json:"nodeGroupName,omitempty"`
|
||||
// Autoscaling specifies wether the scaling group should automatically scale using the cluster-autoscaler.
|
||||
Autoscaling bool `json:"autoscaling,omitempty"`
|
||||
// Min is the minimum number of nodes in the scaling group (used by cluster-autoscaler).
|
||||
@ -42,6 +48,18 @@ type ScalingGroupSpec struct {
|
||||
// +kubebuilder:validation:Enum=Worker;ControlPlane
|
||||
type NodeRole string
|
||||
|
||||
// NodeRoleFromString returns the NodeRole for the given string.
|
||||
func NodeRoleFromString(s string) NodeRole {
|
||||
switch strings.ToLower(s) {
|
||||
case "controlplane", "control-plane":
|
||||
return ControlPlaneRole
|
||||
case "worker":
|
||||
return WorkerRole
|
||||
default:
|
||||
return UnknownRole
|
||||
}
|
||||
}
|
||||
|
||||
// ScalingGroupStatus defines the observed state of ScalingGroup.
|
||||
type ScalingGroupStatus struct {
|
||||
// ImageReference is the image currently used for newly created nodes in this scaling group.
|
||||
|
@ -56,6 +56,10 @@ spec:
|
||||
(used by cluster-autoscaler).
|
||||
format: int32
|
||||
type: integer
|
||||
nodeGroupName:
|
||||
description: NodeGroupName is the human friendly name of the node group
|
||||
as defined in the Constellation configuration.
|
||||
type: string
|
||||
nodeImage:
|
||||
description: NodeVersion is the name of the NodeVersion resource.
|
||||
type: string
|
||||
|
@ -27,6 +27,7 @@ require (
|
||||
go.etcd.io/etcd/api/v3 v3.5.7
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.7
|
||||
go.etcd.io/etcd/client/v3 v3.5.7
|
||||
go.uber.org/goleak v1.2.1
|
||||
golang.org/x/mod v0.10.0
|
||||
google.golang.org/api v0.122.0
|
||||
google.golang.org/protobuf v1.30.0
|
||||
|
@ -342,6 +342,7 @@ go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
|
||||
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
|
||||
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
|
||||
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
|
||||
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
|
||||
|
@ -0,0 +1,9 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "api",
|
||||
srcs = ["scalinggroup.go"],
|
||||
importpath = "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api",
|
||||
visibility = ["//operators/constellation-node-operator:__subpackages__"],
|
||||
deps = ["//operators/constellation-node-operator/api/v1alpha1"],
|
||||
)
|
@ -0,0 +1,24 @@
|
||||
/*
|
||||
Copyright (c) Edgeless Systems GmbH
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package api
|
||||
|
||||
import updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
|
||||
// ScalingGroup is a cloud provider scaling group.
|
||||
type ScalingGroup struct {
|
||||
// Name is the csp specific name of the scaling group.
|
||||
Name string
|
||||
// NodeGroupName is the human friendly name of the node group
|
||||
// as defined in the Constellation configuration.
|
||||
NodeGroupName string
|
||||
// GroupID is the CSP specific, canonical identifier of a scaling group.
|
||||
GroupID string
|
||||
// AutoscalingGroupName is name that is expected by the autoscaler.
|
||||
AutoscalingGroupName string
|
||||
// Role is the role of the nodes in the scaling group.
|
||||
Role updatev1alpha1.NodeRole
|
||||
}
|
@ -15,6 +15,7 @@ go_library(
|
||||
visibility = ["//operators/constellation-node-operator:__subpackages__"],
|
||||
deps = [
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"@com_github_aws_aws_sdk_go_v2_config//:config",
|
||||
"@com_github_aws_aws_sdk_go_v2_feature_ec2_imds//:imds",
|
||||
"@com_github_aws_aws_sdk_go_v2_service_autoscaling//:autoscaling",
|
||||
@ -36,6 +37,7 @@ go_test(
|
||||
embed = [":client"],
|
||||
deps = [
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"@com_github_aws_aws_sdk_go_v2_service_autoscaling//:autoscaling",
|
||||
"@com_github_aws_aws_sdk_go_v2_service_autoscaling//types",
|
||||
"@com_github_aws_aws_sdk_go_v2_service_ec2//:ec2",
|
||||
|
@ -15,6 +15,8 @@ import (
|
||||
scalingtypes "github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
|
||||
"github.com/aws/aws-sdk-go-v2/service/ec2"
|
||||
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
)
|
||||
|
||||
// GetScalingGroupImage returns the image URI of the scaling group.
|
||||
@ -140,7 +142,8 @@ func (c *Client) GetAutoscalingGroupName(scalingGroupID string) (string, error)
|
||||
}
|
||||
|
||||
// ListScalingGroups retrieves a list of scaling groups for the cluster.
|
||||
func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error) {
|
||||
func (c *Client) ListScalingGroups(ctx context.Context, uid string) ([]cspapi.ScalingGroup, error) {
|
||||
results := []cspapi.ScalingGroup{}
|
||||
output, err := c.scalingClient.DescribeAutoScalingGroups(
|
||||
ctx,
|
||||
&autoscaling.DescribeAutoScalingGroupsInput{
|
||||
@ -153,22 +156,62 @@ func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlan
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to describe scaling groups: %w", err)
|
||||
return nil, fmt.Errorf("failed to describe scaling groups: %w", err)
|
||||
}
|
||||
|
||||
for _, group := range output.AutoScalingGroups {
|
||||
if group.Tags == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var role updatev1alpha1.NodeRole
|
||||
var nodeGroupName string
|
||||
for _, tag := range group.Tags {
|
||||
if *tag.Key == "constellation-role" {
|
||||
if *tag.Value == "control-plane" {
|
||||
controlPlaneGroupIDs = append(controlPlaneGroupIDs, *group.AutoScalingGroupName)
|
||||
} else if *tag.Value == "worker" {
|
||||
workerGroupIDs = append(workerGroupIDs, *group.AutoScalingGroupName)
|
||||
}
|
||||
if tag.Key == nil || tag.Value == nil {
|
||||
continue
|
||||
}
|
||||
key := *tag.Key
|
||||
switch key {
|
||||
case "constellation-role":
|
||||
role = updatev1alpha1.NodeRoleFromString(*tag.Value)
|
||||
case "constellation-node-group":
|
||||
nodeGroupName = *tag.Value
|
||||
}
|
||||
}
|
||||
|
||||
// fallback for legacy clusters
|
||||
// TODO(malt3): remove this fallback once we can assume all clusters have the correct labels
|
||||
if nodeGroupName == "" {
|
||||
switch role {
|
||||
case updatev1alpha1.ControlPlaneRole:
|
||||
nodeGroupName = "control_plane_default"
|
||||
case updatev1alpha1.WorkerRole:
|
||||
nodeGroupName = "worker_default"
|
||||
}
|
||||
}
|
||||
|
||||
name, err := c.GetScalingGroupName(*group.AutoScalingGroupName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting scaling group name: %w", err)
|
||||
}
|
||||
|
||||
nodeGroupName, err = c.GetScalingGroupName(nodeGroupName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting node group name: %w", err)
|
||||
}
|
||||
|
||||
autoscalerGroupName, err := c.GetAutoscalingGroupName(*group.AutoScalingGroupName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting autoscaler group name: %w", err)
|
||||
}
|
||||
|
||||
results = append(results, cspapi.ScalingGroup{
|
||||
Name: name,
|
||||
NodeGroupName: nodeGroupName,
|
||||
GroupID: *group.AutoScalingGroupName,
|
||||
AutoscalingGroupName: autoscalerGroupName,
|
||||
Role: role,
|
||||
})
|
||||
}
|
||||
return controlPlaneGroupIDs, workerGroupIDs, nil
|
||||
return results, nil
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
scalingtypes "github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
|
||||
"github.com/aws/aws-sdk-go-v2/service/ec2"
|
||||
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -229,8 +230,7 @@ func TestListScalingGroups(t *testing.T) {
|
||||
providerID string
|
||||
describeAutoScalingGroupsOut []*autoscaling.DescribeAutoScalingGroupsOutput
|
||||
describeAutoScalingGroupsErr []error
|
||||
wantControlPlaneGroupIDs []string
|
||||
wantWorkerGroupIDs []string
|
||||
wantGroups []cspapi.ScalingGroup
|
||||
wantErr bool
|
||||
}{
|
||||
"listing scaling groups work": {
|
||||
@ -263,6 +263,10 @@ func TestListScalingGroups(t *testing.T) {
|
||||
Key: toPtr("constellation-role"),
|
||||
Value: toPtr("worker"),
|
||||
},
|
||||
{
|
||||
Key: toPtr("constellation-node-group"),
|
||||
Value: toPtr("foo-group"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -272,8 +276,29 @@ func TestListScalingGroups(t *testing.T) {
|
||||
},
|
||||
},
|
||||
describeAutoScalingGroupsErr: []error{nil},
|
||||
wantControlPlaneGroupIDs: []string{"control-plane-asg"},
|
||||
wantWorkerGroupIDs: []string{"worker-asg", "worker-asg-2"},
|
||||
wantGroups: []cspapi.ScalingGroup{
|
||||
{
|
||||
Name: "control-plane-asg",
|
||||
NodeGroupName: "control_plane_default",
|
||||
GroupID: "control-plane-asg",
|
||||
AutoscalingGroupName: "control-plane-asg",
|
||||
Role: "ControlPlane",
|
||||
},
|
||||
{
|
||||
Name: "worker-asg",
|
||||
NodeGroupName: "worker_default",
|
||||
GroupID: "worker-asg",
|
||||
AutoscalingGroupName: "worker-asg",
|
||||
Role: "Worker",
|
||||
},
|
||||
{
|
||||
Name: "worker-asg-2",
|
||||
NodeGroupName: "foo-group",
|
||||
GroupID: "worker-asg-2",
|
||||
AutoscalingGroupName: "worker-asg-2",
|
||||
Role: "Worker",
|
||||
},
|
||||
},
|
||||
},
|
||||
"fails when describing scaling groups fails": {
|
||||
providerID: "aws:///us-east-2a/i-00000000000000000",
|
||||
@ -293,14 +318,13 @@ func TestListScalingGroups(t *testing.T) {
|
||||
describeAutoScalingGroupsErr: tc.describeAutoScalingGroupsErr,
|
||||
},
|
||||
}
|
||||
controlPlaneGroupIDs, workerGroupIDs, err := client.ListScalingGroups(context.Background(), tc.providerID)
|
||||
gotGroups, err := client.ListScalingGroups(context.Background(), tc.providerID)
|
||||
if tc.wantErr {
|
||||
assert.Error(err)
|
||||
return
|
||||
}
|
||||
require.NoError(err)
|
||||
assert.Equal(tc.wantControlPlaneGroupIDs, controlPlaneGroupIDs)
|
||||
assert.Equal(tc.wantWorkerGroupIDs, workerGroupIDs)
|
||||
assert.Equal(tc.wantGroups, gotGroups)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ go_library(
|
||||
visibility = ["//operators/constellation-node-operator:__subpackages__"],
|
||||
deps = [
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"//operators/constellation-node-operator/internal/poller",
|
||||
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore",
|
||||
"@com_github_azure_azure_sdk_for_go_sdk_azcore//runtime",
|
||||
@ -44,6 +45,7 @@ go_test(
|
||||
embed = [":client"],
|
||||
deps = [
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"//operators/constellation-node-operator/internal/poller",
|
||||
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore",
|
||||
"@com_github_azure_azure_sdk_for_go_sdk_azcore//runtime",
|
||||
|
@ -13,6 +13,8 @@ import (
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4"
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
)
|
||||
|
||||
// GetScalingGroupImage returns the image URI of the scaling group.
|
||||
@ -79,34 +81,63 @@ func (c *Client) GetAutoscalingGroupName(scalingGroupID string) (string, error)
|
||||
}
|
||||
|
||||
// ListScalingGroups retrieves a list of scaling groups for the cluster.
|
||||
func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error) {
|
||||
func (c *Client) ListScalingGroups(ctx context.Context, uid string) ([]cspapi.ScalingGroup, error) {
|
||||
results := []cspapi.ScalingGroup{}
|
||||
pager := c.scaleSetsAPI.NewListPager(c.config.ResourceGroup, nil)
|
||||
|
||||
for pager.More() {
|
||||
page, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("paging scale sets: %w", err)
|
||||
return nil, fmt.Errorf("paging scale sets: %w", err)
|
||||
}
|
||||
for _, scaleSet := range page.Value {
|
||||
if scaleSet == nil || scaleSet.ID == nil {
|
||||
continue
|
||||
}
|
||||
if scaleSet.Tags == nil || scaleSet.Tags["constellation-uid"] == nil || *scaleSet.Tags["constellation-uid"] != uid {
|
||||
if scaleSet.Tags == nil ||
|
||||
scaleSet.Tags["constellation-uid"] == nil ||
|
||||
*scaleSet.Tags["constellation-uid"] != uid ||
|
||||
scaleSet.Tags["constellation-role"] == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
role := updatev1alpha1.NodeRoleFromString(*scaleSet.Tags["constellation-role"])
|
||||
|
||||
name, err := c.GetScalingGroupName(*scaleSet.ID)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("getting scaling group name: %w", err)
|
||||
return nil, fmt.Errorf("getting scaling group name: %w", err)
|
||||
}
|
||||
switch *scaleSet.Tags["constellation-role"] {
|
||||
case "control-plane", "controlplane":
|
||||
controlPlaneGroupIDs = append(controlPlaneGroupIDs, *scaleSet.ID)
|
||||
case "worker":
|
||||
workerGroupIDs = append(workerGroupIDs, *scaleSet.ID)
|
||||
|
||||
var nodeGroupName string
|
||||
if scaleSet.Tags["constellation-node-group"] != nil {
|
||||
nodeGroupName = *scaleSet.Tags["constellation-node-group"]
|
||||
}
|
||||
// fallback for legacy clusters
|
||||
// TODO(malt3): remove this fallback once we can assume all clusters have the correct labels
|
||||
if nodeGroupName == "" {
|
||||
switch role {
|
||||
case updatev1alpha1.ControlPlaneRole:
|
||||
nodeGroupName = "control_plane_default"
|
||||
case updatev1alpha1.WorkerRole:
|
||||
nodeGroupName = "worker_default"
|
||||
}
|
||||
}
|
||||
|
||||
autoscalerGroupName, err := c.GetAutoscalingGroupName(*scaleSet.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting autoscaling group name: %w", err)
|
||||
}
|
||||
|
||||
results = append(results, cspapi.ScalingGroup{
|
||||
Name: name,
|
||||
NodeGroupName: nodeGroupName,
|
||||
GroupID: *scaleSet.ID,
|
||||
AutoscalingGroupName: autoscalerGroupName,
|
||||
Role: role,
|
||||
})
|
||||
}
|
||||
}
|
||||
return controlPlaneGroupIDs, workerGroupIDs, nil
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func imageReferenceFromImage(img string) *armcompute.ImageReference {
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -184,41 +185,67 @@ func TestGetScalingGroupName(t *testing.T) {
|
||||
|
||||
func TestListScalingGroups(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
scaleSet armcompute.VirtualMachineScaleSet
|
||||
fetchPageErr error
|
||||
wantControlPlanes []string
|
||||
wantWorkers []string
|
||||
wantErr bool
|
||||
scaleSet armcompute.VirtualMachineScaleSet
|
||||
fetchPageErr error
|
||||
wantGroups []cspapi.ScalingGroup
|
||||
wantErr bool
|
||||
}{
|
||||
"listing control-plane works": {
|
||||
scaleSet: armcompute.VirtualMachineScaleSet{
|
||||
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-control-planes-uid"),
|
||||
Name: to.Ptr("constellation-scale-set-control-planes-uid"),
|
||||
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-control-planes-uid"),
|
||||
Tags: map[string]*string{
|
||||
"constellation-uid": to.Ptr("uid"),
|
||||
"constellation-role": to.Ptr("control-plane"),
|
||||
},
|
||||
},
|
||||
wantControlPlanes: []string{"/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-control-planes-uid"},
|
||||
wantGroups: []cspapi.ScalingGroup{
|
||||
{
|
||||
Name: "constellation-scale-set-control-planes-uid",
|
||||
NodeGroupName: "control_plane_default",
|
||||
GroupID: "/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-control-planes-uid",
|
||||
AutoscalingGroupName: "constellation-scale-set-control-planes-uid",
|
||||
Role: "ControlPlane",
|
||||
},
|
||||
},
|
||||
},
|
||||
"listing worker works": {
|
||||
scaleSet: armcompute.VirtualMachineScaleSet{
|
||||
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-workers-uid"),
|
||||
Name: to.Ptr("constellation-scale-set-workers-uid"),
|
||||
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-workers-uid"),
|
||||
Tags: map[string]*string{
|
||||
"constellation-uid": to.Ptr("uid"),
|
||||
"constellation-role": to.Ptr("worker"),
|
||||
},
|
||||
},
|
||||
wantWorkers: []string{"/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-workers-uid"},
|
||||
wantGroups: []cspapi.ScalingGroup{
|
||||
{
|
||||
Name: "constellation-scale-set-workers-uid",
|
||||
NodeGroupName: "worker_default",
|
||||
GroupID: "/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/constellation-scale-set-workers-uid",
|
||||
AutoscalingGroupName: "constellation-scale-set-workers-uid",
|
||||
Role: "Worker",
|
||||
},
|
||||
},
|
||||
},
|
||||
"listing is not dependent on resource name": {
|
||||
scaleSet: armcompute.VirtualMachineScaleSet{
|
||||
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/some-scale-set"),
|
||||
Name: to.Ptr("foo-bar"),
|
||||
ID: to.Ptr("/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/some-scale-set"),
|
||||
Tags: map[string]*string{
|
||||
"constellation-uid": to.Ptr("uid"),
|
||||
"constellation-role": to.Ptr("control-plane"),
|
||||
},
|
||||
},
|
||||
wantControlPlanes: []string{"/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/some-scale-set"},
|
||||
wantGroups: []cspapi.ScalingGroup{
|
||||
{
|
||||
Name: "some-scale-set",
|
||||
NodeGroupName: "control_plane_default",
|
||||
GroupID: "/subscriptions/subscription-id/resourceGroups/resource-group/providers/Microsoft.Compute/virtualMachineScaleSets/some-scale-set",
|
||||
AutoscalingGroupName: "some-scale-set",
|
||||
Role: "ControlPlane",
|
||||
},
|
||||
},
|
||||
},
|
||||
"listing other works": {
|
||||
scaleSet: armcompute.VirtualMachineScaleSet{
|
||||
@ -245,14 +272,13 @@ func TestListScalingGroups(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
gotControlPlanes, gotWorkers, err := client.ListScalingGroups(context.Background(), "uid")
|
||||
gotGroups, err := client.ListScalingGroups(context.Background(), "uid")
|
||||
if tc.wantErr {
|
||||
assert.Error(err)
|
||||
return
|
||||
}
|
||||
require.NoError(err)
|
||||
assert.ElementsMatch(tc.wantControlPlanes, gotControlPlanes)
|
||||
assert.ElementsMatch(tc.wantWorkers, gotWorkers)
|
||||
assert.ElementsMatch(tc.wantGroups, gotGroups)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ go_library(
|
||||
visibility = ["//operators/constellation-node-operator:__subpackages__"],
|
||||
deps = [
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"//operators/constellation-node-operator/internal/constants",
|
||||
],
|
||||
)
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/constants"
|
||||
)
|
||||
|
||||
@ -82,8 +83,23 @@ func (c *Client) GetAutoscalingGroupName(scalingGroupID string) (string, error)
|
||||
}
|
||||
|
||||
// ListScalingGroups retrieves a list of scaling groups for the cluster.
|
||||
func (c *Client) ListScalingGroups(_ context.Context, _ string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error) {
|
||||
return []string{controlPlanesID}, []string{workersID}, nil
|
||||
func (c *Client) ListScalingGroups(_ context.Context, _ string) ([]cspapi.ScalingGroup, error) {
|
||||
return []cspapi.ScalingGroup{
|
||||
{
|
||||
Name: controlPlanesID,
|
||||
NodeGroupName: controlPlanesID,
|
||||
GroupID: controlPlanesID,
|
||||
AutoscalingGroupName: controlPlanesID,
|
||||
Role: updatev1alpha1.ControlPlaneRole,
|
||||
},
|
||||
{
|
||||
Name: workersID,
|
||||
NodeGroupName: workersID,
|
||||
GroupID: workersID,
|
||||
AutoscalingGroupName: workersID,
|
||||
Role: updatev1alpha1.WorkerRole,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AutoscalingCloudProvider returns the cloud-provider name as used by k8s cluster-autoscaler.
|
||||
|
@ -23,6 +23,7 @@ go_library(
|
||||
visibility = ["//operators/constellation-node-operator:__subpackages__"],
|
||||
deps = [
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"@com_github_googleapis_gax_go_v2//:gax-go",
|
||||
"@com_github_spf13_afero//:afero",
|
||||
"@com_google_cloud_go_compute//apiv1",
|
||||
@ -51,6 +52,7 @@ go_test(
|
||||
embed = [":client"],
|
||||
deps = [
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"@com_github_googleapis_gax_go_v2//:gax-go",
|
||||
"@com_github_spf13_afero//:afero",
|
||||
"@com_github_stretchr_testify//assert",
|
||||
|
@ -13,6 +13,8 @@ import (
|
||||
"strings"
|
||||
|
||||
"cloud.google.com/go/compute/apiv1/computepb"
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
"google.golang.org/api/iterator"
|
||||
)
|
||||
|
||||
@ -106,7 +108,8 @@ func (c *Client) GetAutoscalingGroupName(scalingGroupID string) (string, error)
|
||||
}
|
||||
|
||||
// ListScalingGroups retrieves a list of scaling groups for the cluster.
|
||||
func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error) {
|
||||
func (c *Client) ListScalingGroups(ctx context.Context, uid string) ([]cspapi.ScalingGroup, error) {
|
||||
results := []cspapi.ScalingGroup{}
|
||||
iter := c.instanceGroupManagersAPI.AggregatedList(ctx, &computepb.AggregatedListInstanceGroupManagersRequest{
|
||||
Project: c.projectID,
|
||||
})
|
||||
@ -115,7 +118,7 @@ func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlan
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("listing instance group managers: %w", err)
|
||||
return nil, fmt.Errorf("listing instance group managers: %w", err)
|
||||
}
|
||||
if instanceGroupManagerScopedListPair.Value == nil {
|
||||
continue
|
||||
@ -134,7 +137,7 @@ func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlan
|
||||
InstanceTemplate: templateURI[len(templateURI)-1],
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("getting instance template: %w", err)
|
||||
return nil, fmt.Errorf("getting instance template: %w", err)
|
||||
}
|
||||
if template.Properties == nil || template.Properties.Labels == nil {
|
||||
continue
|
||||
@ -145,18 +148,43 @@ func (c *Client) ListScalingGroups(ctx context.Context, uid string) (controlPlan
|
||||
|
||||
groupID, err := c.canonicalInstanceGroupID(ctx, *grpManager.SelfLink)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("normalizing instance group ID: %w", err)
|
||||
return nil, fmt.Errorf("normalizing instance group ID: %w", err)
|
||||
}
|
||||
|
||||
switch strings.ToLower(template.Properties.Labels["constellation-role"]) {
|
||||
case "control-plane", "controlplane":
|
||||
controlPlaneGroupIDs = append(controlPlaneGroupIDs, groupID)
|
||||
case "worker":
|
||||
workerGroupIDs = append(workerGroupIDs, groupID)
|
||||
role := updatev1alpha1.NodeRoleFromString(template.Properties.Labels["constellation-role"])
|
||||
|
||||
name, err := c.GetScalingGroupName(groupID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting scaling group name: %w", err)
|
||||
}
|
||||
|
||||
nodeGroupName := template.Properties.Labels["constellation-node-group"]
|
||||
// fallback for legacy clusters
|
||||
// TODO(malt3): remove this fallback once we can assume all clusters have the correct labels
|
||||
if nodeGroupName == "" {
|
||||
switch role {
|
||||
case updatev1alpha1.ControlPlaneRole:
|
||||
nodeGroupName = "control_plane_default"
|
||||
case updatev1alpha1.WorkerRole:
|
||||
nodeGroupName = "worker_default"
|
||||
}
|
||||
}
|
||||
|
||||
autoscalerGroupName, err := c.GetAutoscalingGroupName(groupID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting autoscaling group name: %w", err)
|
||||
}
|
||||
|
||||
results = append(results, cspapi.ScalingGroup{
|
||||
Name: name,
|
||||
NodeGroupName: nodeGroupName,
|
||||
GroupID: groupID,
|
||||
AutoscalingGroupName: autoscalerGroupName,
|
||||
Role: role,
|
||||
})
|
||||
}
|
||||
}
|
||||
return controlPlaneGroupIDs, workerGroupIDs, nil
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (c *Client) getScalingGroupTemplate(ctx context.Context, scalingGroupID string) (*computepb.InstanceTemplate, error) {
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"cloud.google.com/go/compute/apiv1/computepb"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@ -330,8 +331,7 @@ func TestListScalingGroups(t *testing.T) {
|
||||
templateLabels map[string]string
|
||||
listInstanceGroupManagersErr error
|
||||
templateGetErr error
|
||||
wantControlPlanes []string
|
||||
wantWorkers []string
|
||||
wantGroups []cspapi.ScalingGroup
|
||||
wantErr bool
|
||||
}{
|
||||
"list instance group managers fails": {
|
||||
@ -353,8 +353,14 @@ func TestListScalingGroups(t *testing.T) {
|
||||
"constellation-uid": "uid",
|
||||
"constellation-role": "control-plane",
|
||||
},
|
||||
wantControlPlanes: []string{
|
||||
"projects/project/zones/zone/instanceGroupManagers/test-control-plane-uid",
|
||||
wantGroups: []cspapi.ScalingGroup{
|
||||
{
|
||||
Name: "test-control-plane-uid",
|
||||
NodeGroupName: "control_plane_default",
|
||||
GroupID: "projects/project/zones/zone/instanceGroupManagers/test-control-plane-uid",
|
||||
AutoscalingGroupName: "https://www.googleapis.com/compute/v1/projects/project/zones/zone/instanceGroups/test-control-plane-uid",
|
||||
Role: "ControlPlane",
|
||||
},
|
||||
},
|
||||
},
|
||||
"list instance group managers for worker": {
|
||||
@ -365,8 +371,33 @@ func TestListScalingGroups(t *testing.T) {
|
||||
"constellation-uid": "uid",
|
||||
"constellation-role": "worker",
|
||||
},
|
||||
wantWorkers: []string{
|
||||
"projects/project/zones/zone/instanceGroupManagers/test-worker-uid",
|
||||
wantGroups: []cspapi.ScalingGroup{
|
||||
{
|
||||
Name: "test-worker-uid",
|
||||
NodeGroupName: "worker_default",
|
||||
GroupID: "projects/project/zones/zone/instanceGroupManagers/test-worker-uid",
|
||||
AutoscalingGroupName: "https://www.googleapis.com/compute/v1/projects/project/zones/zone/instanceGroups/test-worker-uid",
|
||||
Role: "Worker",
|
||||
},
|
||||
},
|
||||
},
|
||||
"list instance group managers with custom group name": {
|
||||
name: proto.String("test-worker-uid"),
|
||||
groupID: proto.String("projects/project/zones/zone/instanceGroupManagers/test-worker-uid"),
|
||||
templateRef: proto.String("projects/project/global/instanceTemplates/test-control-plane-uid"),
|
||||
templateLabels: map[string]string{
|
||||
"constellation-uid": "uid",
|
||||
"constellation-role": "worker",
|
||||
"constellation-node-group": "custom-group-name",
|
||||
},
|
||||
wantGroups: []cspapi.ScalingGroup{
|
||||
{
|
||||
Name: "test-worker-uid",
|
||||
NodeGroupName: "custom-group-name",
|
||||
GroupID: "projects/project/zones/zone/instanceGroupManagers/test-worker-uid",
|
||||
AutoscalingGroupName: "https://www.googleapis.com/compute/v1/projects/project/zones/zone/instanceGroups/test-worker-uid",
|
||||
Role: "Worker",
|
||||
},
|
||||
},
|
||||
},
|
||||
"listing instance group managers is not dependant on resource name": {
|
||||
@ -377,8 +408,14 @@ func TestListScalingGroups(t *testing.T) {
|
||||
"constellation-uid": "uid",
|
||||
"constellation-role": "control-plane",
|
||||
},
|
||||
wantControlPlanes: []string{
|
||||
"projects/project/zones/zone/instanceGroupManagers/some-instance-group-manager",
|
||||
wantGroups: []cspapi.ScalingGroup{
|
||||
{
|
||||
Name: "some-instance-group-manager",
|
||||
NodeGroupName: "control_plane_default",
|
||||
GroupID: "projects/project/zones/zone/instanceGroupManagers/some-instance-group-manager",
|
||||
AutoscalingGroupName: "https://www.googleapis.com/compute/v1/projects/project/zones/zone/instanceGroups/some-instance-group-manager",
|
||||
Role: "ControlPlane",
|
||||
},
|
||||
},
|
||||
},
|
||||
"unrelated instance group manager": {
|
||||
@ -415,14 +452,13 @@ func TestListScalingGroups(t *testing.T) {
|
||||
getErr: tc.templateGetErr,
|
||||
},
|
||||
}
|
||||
gotControlPlanes, gotWorkers, err := client.ListScalingGroups(context.Background(), "uid")
|
||||
gotGroups, err := client.ListScalingGroups(context.Background(), "uid")
|
||||
if tc.wantErr {
|
||||
assert.Error(err)
|
||||
return
|
||||
}
|
||||
require.NoError(err)
|
||||
assert.ElementsMatch(tc.wantControlPlanes, gotControlPlanes)
|
||||
assert.ElementsMatch(tc.wantWorkers, gotWorkers)
|
||||
assert.ElementsMatch(tc.wantGroups, gotGroups)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ go_library(
|
||||
deps = [
|
||||
"//internal/constants",
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"//operators/constellation-node-operator/internal/constants",
|
||||
"@com_github_spf13_afero//:afero",
|
||||
"@io_k8s_api//core/v1:core",
|
||||
@ -33,6 +34,7 @@ go_test(
|
||||
deps = [
|
||||
"//internal/constants",
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"//operators/constellation-node-operator/internal/constants",
|
||||
"@com_github_spf13_afero//:afero",
|
||||
"@com_github_stretchr_testify//assert",
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
|
||||
mainconstants "github.com/edgelesssys/constellation/v2/internal/constants"
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/constants"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@ -33,21 +34,18 @@ func InitialResources(ctx context.Context, k8sClient client.Client, imageInfo im
|
||||
}
|
||||
logr.Info("cleaned up placeholders")
|
||||
|
||||
controlPlaneGroupIDs, workerGroupIDs, err := scalingGroupGetter.ListScalingGroups(ctx, uid)
|
||||
scalingGroups, err := scalingGroupGetter.ListScalingGroups(ctx, uid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing scaling groups: %w", err)
|
||||
}
|
||||
if len(controlPlaneGroupIDs) == 0 {
|
||||
return errors.New("determining initial node image: no control plane scaling group found")
|
||||
}
|
||||
if len(workerGroupIDs) == 0 {
|
||||
return errors.New("determining initial node image: no worker scaling group found")
|
||||
if len(scalingGroups) == 0 {
|
||||
return errors.New("determining initial node image: no scaling group found")
|
||||
}
|
||||
|
||||
if err := createAutoscalingStrategy(ctx, k8sClient, scalingGroupGetter.AutoscalingCloudProvider()); err != nil {
|
||||
return fmt.Errorf("creating initial autoscaling strategy: %w", err)
|
||||
}
|
||||
imageReference, err := scalingGroupGetter.GetScalingGroupImage(ctx, controlPlaneGroupIDs[0])
|
||||
imageReference, err := scalingGroupGetter.GetScalingGroupImage(ctx, scalingGroups[0].GroupID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("determining initial node image: %w", err)
|
||||
}
|
||||
@ -62,34 +60,6 @@ func InitialResources(ctx context.Context, k8sClient client.Client, imageInfo im
|
||||
if err := createNodeVersion(ctx, k8sClient, imageReference, imageVersion); err != nil {
|
||||
return fmt.Errorf("creating initial node version %q: %w", imageReference, err)
|
||||
}
|
||||
for _, groupID := range controlPlaneGroupIDs {
|
||||
groupName, err := scalingGroupGetter.GetScalingGroupName(groupID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("determining scaling group name of %q: %w", groupID, err)
|
||||
}
|
||||
autoscalingGroupName, err := scalingGroupGetter.GetAutoscalingGroupName(groupID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("determining autoscaling group name of %q: %w", groupID, err)
|
||||
}
|
||||
newScalingGroupConfig := newScalingGroupConfig{k8sClient, groupID, groupName, autoscalingGroupName, updatev1alpha1.ControlPlaneRole}
|
||||
if err := createScalingGroup(ctx, newScalingGroupConfig); err != nil {
|
||||
return fmt.Errorf("creating initial control plane scaling group: %w", err)
|
||||
}
|
||||
}
|
||||
for _, groupID := range workerGroupIDs {
|
||||
groupName, err := scalingGroupGetter.GetScalingGroupName(groupID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("determining scaling group name of %q: %w", groupID, err)
|
||||
}
|
||||
autoscalingGroupName, err := scalingGroupGetter.GetAutoscalingGroupName(groupID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("determining autoscaling group name of %q: %w", groupID, err)
|
||||
}
|
||||
newScalingGroupConfig := newScalingGroupConfig{k8sClient, groupID, groupName, autoscalingGroupName, updatev1alpha1.WorkerRole}
|
||||
if err := createScalingGroup(ctx, newScalingGroupConfig); err != nil {
|
||||
return fmt.Errorf("creating initial worker scaling group: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -252,28 +222,6 @@ func findLatestK8sComponentsConfigMap(ctx context.Context, k8sClient client.Clie
|
||||
return latestConfigMap, nil
|
||||
}
|
||||
|
||||
// createScalingGroup creates an initial scaling group resource if it does not exist yet.
|
||||
func createScalingGroup(ctx context.Context, config newScalingGroupConfig) error {
|
||||
err := config.k8sClient.Create(ctx, &updatev1alpha1.ScalingGroup{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: strings.ToLower(config.groupName),
|
||||
},
|
||||
Spec: updatev1alpha1.ScalingGroupSpec{
|
||||
NodeVersion: mainconstants.NodeVersionResourceName,
|
||||
GroupID: config.groupID,
|
||||
AutoscalerGroupName: config.autoscalingGroupName,
|
||||
Min: 1,
|
||||
Max: 10,
|
||||
Role: config.role,
|
||||
},
|
||||
})
|
||||
if k8sErrors.IsAlreadyExists(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
type imageInfoGetter interface {
|
||||
ImageVersion() (string, error)
|
||||
}
|
||||
@ -286,15 +234,7 @@ type scalingGroupGetter interface {
|
||||
// GetScalingGroupName retrieves the name of a scaling group as needed by the cluster-autoscaler.
|
||||
GetAutoscalingGroupName(scalingGroupID string) (string, error)
|
||||
// ListScalingGroups retrieves a list of scaling groups for the cluster.
|
||||
ListScalingGroups(ctx context.Context, uid string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error)
|
||||
ListScalingGroups(ctx context.Context, uid string) ([]cspapi.ScalingGroup, error)
|
||||
// AutoscalingCloudProvider returns the cloud-provider name as used by k8s cluster-autoscaler.
|
||||
AutoscalingCloudProvider() string
|
||||
}
|
||||
|
||||
type newScalingGroupConfig struct {
|
||||
k8sClient client.Writer
|
||||
groupID string
|
||||
groupName string
|
||||
autoscalingGroupName string
|
||||
role updatev1alpha1.NodeRole
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
|
||||
mainconstants "github.com/edgelesssys/constellation/v2/internal/constants"
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/constants"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -41,18 +42,10 @@ func TestInitialResources(t *testing.T) {
|
||||
{groupID: "control-plane", image: "image-1", name: "control-plane", isControlPlane: true},
|
||||
{groupID: "worker", image: "image-1", name: "worker"},
|
||||
},
|
||||
wantResources: 4,
|
||||
wantResources: 2,
|
||||
},
|
||||
"missing control planes": {
|
||||
items: []scalingGroupStoreItem{
|
||||
{groupID: "worker", image: "image-1", name: "worker"},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
"missing workers": {
|
||||
items: []scalingGroupStoreItem{
|
||||
{groupID: "control-plane", image: "image-1", name: "control-plane", isControlPlane: true},
|
||||
},
|
||||
"missing groups": {
|
||||
items: []scalingGroupStoreItem{},
|
||||
wantErr: true,
|
||||
},
|
||||
"listing groups fails": {
|
||||
@ -75,14 +68,6 @@ func TestInitialResources(t *testing.T) {
|
||||
imageErr: errors.New("getting image failed"),
|
||||
wantErr: true,
|
||||
},
|
||||
"getting name fails": {
|
||||
items: []scalingGroupStoreItem{
|
||||
{groupID: "control-plane", image: "image-1", name: "control-plane", isControlPlane: true},
|
||||
{groupID: "worker", image: "image-1", name: "worker"},
|
||||
},
|
||||
nameErr: errors.New("getting name failed"),
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
@ -273,70 +258,6 @@ func TestCreateNodeVersion(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateScalingGroup(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
createErr error
|
||||
wantScalingGroup *updatev1alpha1.ScalingGroup
|
||||
wantErr bool
|
||||
}{
|
||||
"create works": {
|
||||
wantScalingGroup: &updatev1alpha1.ScalingGroup{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "group-name",
|
||||
},
|
||||
Spec: updatev1alpha1.ScalingGroupSpec{
|
||||
NodeVersion: mainconstants.NodeVersionResourceName,
|
||||
GroupID: "group-id",
|
||||
AutoscalerGroupName: "group-Name",
|
||||
Min: 1,
|
||||
Max: 10,
|
||||
Role: updatev1alpha1.WorkerRole,
|
||||
},
|
||||
},
|
||||
},
|
||||
"create fails": {
|
||||
createErr: errors.New("create failed"),
|
||||
wantErr: true,
|
||||
},
|
||||
"image exists": {
|
||||
createErr: k8sErrors.NewAlreadyExists(schema.GroupResource{}, constants.AutoscalingStrategyResourceName),
|
||||
wantScalingGroup: &updatev1alpha1.ScalingGroup{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "group-name",
|
||||
},
|
||||
Spec: updatev1alpha1.ScalingGroupSpec{
|
||||
NodeVersion: mainconstants.NodeVersionResourceName,
|
||||
GroupID: "group-id",
|
||||
AutoscalerGroupName: "group-Name",
|
||||
Min: 1,
|
||||
Max: 10,
|
||||
Role: updatev1alpha1.WorkerRole,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
k8sClient := &fakeK8sClient{createErr: tc.createErr}
|
||||
newScalingGroupConfig := newScalingGroupConfig{k8sClient, "group-id", "group-Name", "group-Name", updatev1alpha1.WorkerRole}
|
||||
err := createScalingGroup(context.Background(), newScalingGroupConfig)
|
||||
if tc.wantErr {
|
||||
assert.Error(err)
|
||||
return
|
||||
}
|
||||
require.NoError(err)
|
||||
assert.Len(k8sClient.createdObjects, 1)
|
||||
assert.Equal(tc.wantScalingGroup, k8sClient.createdObjects[0])
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeK8sClient struct {
|
||||
createdObjects []client.Object
|
||||
createErr error
|
||||
@ -437,15 +358,24 @@ func (g *stubScalingGroupGetter) GetAutoscalingGroupName(scalingGroupID string)
|
||||
return g.store[scalingGroupID].name, g.nameErr
|
||||
}
|
||||
|
||||
func (g *stubScalingGroupGetter) ListScalingGroups(_ context.Context, _ string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error) {
|
||||
func (g *stubScalingGroupGetter) ListScalingGroups(_ context.Context, _ string) ([]cspapi.ScalingGroup, error) {
|
||||
var scalingGroups []cspapi.ScalingGroup
|
||||
|
||||
for _, item := range g.store {
|
||||
if item.isControlPlane {
|
||||
controlPlaneGroupIDs = append(controlPlaneGroupIDs, item.groupID)
|
||||
} else {
|
||||
workerGroupIDs = append(workerGroupIDs, item.groupID)
|
||||
}
|
||||
scalingGroups = append(scalingGroups, cspapi.ScalingGroup{
|
||||
Name: item.name,
|
||||
NodeGroupName: item.nodeGroupName,
|
||||
GroupID: item.groupID,
|
||||
AutoscalingGroupName: item.autoscalingGroupName,
|
||||
Role: func() updatev1alpha1.NodeRole {
|
||||
if item.isControlPlane {
|
||||
return updatev1alpha1.ControlPlaneRole
|
||||
}
|
||||
return updatev1alpha1.WorkerRole
|
||||
}(),
|
||||
})
|
||||
}
|
||||
return controlPlaneGroupIDs, workerGroupIDs, g.listErr
|
||||
return scalingGroups, g.listErr
|
||||
}
|
||||
|
||||
func (g *stubScalingGroupGetter) AutoscalingCloudProvider() string {
|
||||
@ -453,8 +383,10 @@ func (g *stubScalingGroupGetter) AutoscalingCloudProvider() string {
|
||||
}
|
||||
|
||||
type scalingGroupStoreItem struct {
|
||||
groupID string
|
||||
name string
|
||||
image string
|
||||
isControlPlane bool
|
||||
name string
|
||||
groupID string
|
||||
autoscalingGroupName string
|
||||
nodeGroupName string
|
||||
image string
|
||||
isControlPlane bool
|
||||
}
|
||||
|
@ -0,0 +1,23 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
load("//bazel/go:go_test.bzl", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "executor",
|
||||
srcs = ["executor.go"],
|
||||
importpath = "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/executor",
|
||||
visibility = ["//operators/constellation-node-operator:__subpackages__"],
|
||||
deps = [
|
||||
"@io_k8s_client_go//util/workqueue",
|
||||
"@io_k8s_sigs_controller_runtime//pkg/log",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "executor_test",
|
||||
srcs = ["executor_test.go"],
|
||||
embed = [":executor"],
|
||||
deps = [
|
||||
"@com_github_stretchr_testify//assert",
|
||||
"@org_uber_go_goleak//:goleak",
|
||||
],
|
||||
)
|
@ -0,0 +1,218 @@
|
||||
/*
|
||||
Copyright (c) Edgeless Systems GmbH
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
// Package executor contains a task executor / scheduler for the constellation node operator.
|
||||
// It is used to execute tasks (outside of the k8s specific operator controllers) with regular intervals and
|
||||
// based of external triggers.
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultPollingFrequency = 15 * time.Minute
|
||||
// rateLimiterItem is the key used to rate limit the reconciliation loop.
|
||||
// since we don't have a reconcile request, we use a constant key.
|
||||
rateLimiterItem = "reconcile"
|
||||
)
|
||||
|
||||
// Controller is a type with a reconcile method.
|
||||
// It is modeled after the controller-runtime reconcile method,
|
||||
// but reconciles on external resources instead of k8s resources.
|
||||
type Controller interface {
|
||||
Reconcile(ctx context.Context) (Result, error)
|
||||
}
|
||||
|
||||
// Executor is a task executor / scheduler.
|
||||
// It will call the reconcile method of the given controller with a regular interval
|
||||
// or when triggered externally.
|
||||
type Executor struct {
|
||||
running atomic.Bool
|
||||
|
||||
// controller is the controller to be reconciled.
|
||||
controller Controller
|
||||
|
||||
// pollingFrequency is the default frequency with which the controller is reconciled
|
||||
// if no external trigger is received and no requeue is requested by the controller.
|
||||
pollingFrequency time.Duration
|
||||
// rateLimiter is used to rate limit the reconciliation loop.
|
||||
rateLimiter RateLimiter
|
||||
// externalTrigger is used to trigger a reconciliation immediately from the outside.
|
||||
// multiple triggers in a short time will be coalesced into one externalTrigger.
|
||||
externalTrigger chan struct{}
|
||||
// stop is used to stop the reconciliation loop.
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
// New creates a new Executor.
|
||||
func New(controller Controller, cfg Config) *Executor {
|
||||
cfg.applyDefaults()
|
||||
return &Executor{
|
||||
controller: controller,
|
||||
pollingFrequency: cfg.PollingFrequency,
|
||||
rateLimiter: cfg.RateLimiter,
|
||||
}
|
||||
}
|
||||
|
||||
// StopWaitFn is a function that can be called to stop the executor and wait for it to stop.
|
||||
type StopWaitFn func()
|
||||
|
||||
// Start starts the executor in a separate go routine.
|
||||
// Call Stop to stop the executor.
|
||||
func (e *Executor) Start(ctx context.Context) StopWaitFn {
|
||||
wg := &sync.WaitGroup{}
|
||||
logr := log.FromContext(ctx)
|
||||
stopWait := func() {
|
||||
defer wg.Wait()
|
||||
e.Stop()
|
||||
}
|
||||
|
||||
// this will return early if the executor is already running
|
||||
// if the executor is not running, set the running flag to true
|
||||
// and continue
|
||||
if !e.running.CompareAndSwap(false, true) {
|
||||
return stopWait
|
||||
}
|
||||
|
||||
e.externalTrigger = make(chan struct{}, 1)
|
||||
e.stop = make(chan struct{}, 1)
|
||||
// execute is used by the go routines below to communicate
|
||||
// that a reconciliation should happen
|
||||
execute := make(chan struct{}, 1)
|
||||
// nextScheduledReconcile is used to communicate the next scheduled reconciliation time
|
||||
nextScheduledReconcile := make(chan time.Duration, 1)
|
||||
// trigger a reconciliation on startup
|
||||
nextScheduledReconcile <- 0
|
||||
|
||||
wg.Add(2)
|
||||
|
||||
// timer routine is responsible for triggering the reconciliation after the timer expires
|
||||
// or when triggered externally
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer close(execute)
|
||||
defer logr.Info("Timer stopped")
|
||||
for {
|
||||
nextScheduledReconcileAfter := <-nextScheduledReconcile
|
||||
timer := *time.NewTimer(nextScheduledReconcileAfter)
|
||||
select {
|
||||
case <-e.stop:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-e.externalTrigger:
|
||||
case <-timer.C:
|
||||
}
|
||||
execute <- struct{}{}
|
||||
}
|
||||
}()
|
||||
|
||||
// executor routine is responsible for executing the reconciliation
|
||||
go func() {
|
||||
defer func() {
|
||||
e.running.Store(false)
|
||||
}()
|
||||
defer wg.Done()
|
||||
defer close(nextScheduledReconcile)
|
||||
defer logr.Info("Executor stopped")
|
||||
|
||||
for {
|
||||
_, ok := <-execute
|
||||
// execute channel closed. executor should stop
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
res, err := e.controller.Reconcile(ctx)
|
||||
var requeueAfter time.Duration
|
||||
switch {
|
||||
case err != nil:
|
||||
logr.Error(err, "reconciliation failed")
|
||||
requeueAfter = e.rateLimiter.When(rateLimiterItem) // requeue with rate limiter
|
||||
case res.Requeue && res.RequeueAfter != 0:
|
||||
e.rateLimiter.Forget(rateLimiterItem) // reset the rate limiter
|
||||
requeueAfter = res.RequeueAfter // requeue after the given duration
|
||||
case res.Requeue:
|
||||
requeueAfter = e.rateLimiter.When(rateLimiterItem) // requeue with rate limiter
|
||||
default:
|
||||
e.rateLimiter.Forget(rateLimiterItem) // reset the rate limiter
|
||||
requeueAfter = e.pollingFrequency // default polling frequency
|
||||
}
|
||||
|
||||
nextScheduledReconcile <- requeueAfter
|
||||
}
|
||||
}()
|
||||
|
||||
return stopWait
|
||||
}
|
||||
|
||||
// Stop stops the executor.
|
||||
// It does not block until the executor is stopped.
|
||||
func (e *Executor) Stop() {
|
||||
select {
|
||||
case e.stop <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
close(e.stop)
|
||||
}
|
||||
|
||||
// Running returns true if the executor is running.
|
||||
// When the executor is stopped, it is not running anymore.
|
||||
func (e *Executor) Running() bool {
|
||||
return e.running.Load()
|
||||
}
|
||||
|
||||
// Trigger triggers a reconciliation.
|
||||
// If a reconciliation is already pending, this call is a no-op.
|
||||
func (e *Executor) Trigger() {
|
||||
select {
|
||||
case e.externalTrigger <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Config is the configuration for the executor.
|
||||
type Config struct {
|
||||
PollingFrequency time.Duration
|
||||
RateLimiter RateLimiter
|
||||
}
|
||||
|
||||
// NewDefaultConfig creates a new default configuration.
|
||||
func NewDefaultConfig() Config {
|
||||
cfg := Config{}
|
||||
cfg.applyDefaults()
|
||||
return cfg
|
||||
}
|
||||
|
||||
func (c *Config) applyDefaults() {
|
||||
if c.PollingFrequency == 0 {
|
||||
c.PollingFrequency = defaultPollingFrequency
|
||||
}
|
||||
if c.RateLimiter == nil {
|
||||
c.RateLimiter = workqueue.DefaultControllerRateLimiter()
|
||||
}
|
||||
}
|
||||
|
||||
// Result is the result of a reconciliation.
|
||||
type Result struct {
|
||||
Requeue bool
|
||||
RequeueAfter time.Duration
|
||||
}
|
||||
|
||||
// RateLimiter is a stripped down version of the controller-runtime ratelimiter.RateLimiter interface.
|
||||
type RateLimiter interface {
|
||||
// When gets an item and gets to decide how long that item should wait
|
||||
When(item any) time.Duration
|
||||
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
|
||||
// or for success, we'll stop tracking it
|
||||
Forget(item any)
|
||||
}
|
@ -0,0 +1,306 @@
|
||||
/*
|
||||
Copyright (c) Edgeless Systems GmbH
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
goleak.VerifyTestMain(m)
|
||||
}
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
config Config
|
||||
wantPollingFrequency time.Duration
|
||||
}{
|
||||
"applies default polling frequency": {
|
||||
config: Config{},
|
||||
wantPollingFrequency: defaultPollingFrequency,
|
||||
},
|
||||
"custom polling frequency": {
|
||||
config: Config{
|
||||
PollingFrequency: time.Hour,
|
||||
},
|
||||
wantPollingFrequency: time.Hour,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
exec := New(nil, tc.config)
|
||||
assert.Equal(tc.wantPollingFrequency, exec.pollingFrequency)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartTriggersImmediateReconciliation(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
ctrl := newStubController(Result{}, nil)
|
||||
exec := Executor{
|
||||
controller: ctrl,
|
||||
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
rateLimiter: &stubRateLimiter{}, // no rate limiting
|
||||
}
|
||||
// on start, the executor should trigger a reconciliation
|
||||
stopAndWait := exec.Start(context.Background())
|
||||
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
|
||||
ctrl.stop <- struct{}{}
|
||||
|
||||
stopAndWait()
|
||||
|
||||
// initial trigger
|
||||
assert.Equal(1, ctrl.reconciliationCounter)
|
||||
}
|
||||
|
||||
func TestStartMultipleTimesIsCoalesced(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
ctrl := newStubController(Result{}, nil)
|
||||
exec := Executor{
|
||||
controller: ctrl,
|
||||
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
rateLimiter: &stubRateLimiter{}, // no rate limiting
|
||||
}
|
||||
// start once
|
||||
stopAndWait := exec.Start(context.Background())
|
||||
// start again multiple times
|
||||
for i := 0; i < 10; i++ {
|
||||
_ = exec.Start(context.Background())
|
||||
}
|
||||
|
||||
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
|
||||
ctrl.stop <- struct{}{}
|
||||
|
||||
stopAndWait()
|
||||
|
||||
// initial trigger. extra start calls should be coalesced
|
||||
assert.Equal(1, ctrl.reconciliationCounter)
|
||||
}
|
||||
|
||||
func TestErrorTriggersImmediateReconciliation(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
// returning an error should trigger a reconciliation immediately
|
||||
ctrl := newStubController(Result{}, errors.New("reconciler error"))
|
||||
exec := Executor{
|
||||
controller: ctrl,
|
||||
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
rateLimiter: &stubRateLimiter{}, // no rate limiting
|
||||
}
|
||||
stopAndWait := exec.Start(context.Background())
|
||||
for i := 0; i < 10; i++ {
|
||||
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
|
||||
}
|
||||
ctrl.stop <- struct{}{}
|
||||
|
||||
stopAndWait()
|
||||
|
||||
// we cannot assert the exact number of reconciliations here, because the executor might
|
||||
// select the stop case or the timer case first.
|
||||
assertBetween(assert, 10, 11, ctrl.reconciliationCounter)
|
||||
}
|
||||
|
||||
func TestErrorTriggersRateLimiting(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
// returning an error should trigger a reconciliation immediately
|
||||
ctrl := newStubController(Result{}, errors.New("reconciler error"))
|
||||
exec := Executor{
|
||||
controller: ctrl,
|
||||
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
rateLimiter: &stubRateLimiter{
|
||||
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
},
|
||||
}
|
||||
stopAndWait := exec.Start(context.Background())
|
||||
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called once to trigger rate limiting
|
||||
ctrl.stop <- struct{}{}
|
||||
|
||||
stopAndWait()
|
||||
|
||||
// initial trigger. error triggers are rate limited to 1 per year
|
||||
assert.Equal(1, ctrl.reconciliationCounter)
|
||||
}
|
||||
|
||||
func TestRequeueAfterResultRequeueInterval(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
// setting a requeue result should trigger a reconciliation after the specified delay
|
||||
ctrl := newStubController(Result{
|
||||
Requeue: true,
|
||||
RequeueAfter: time.Microsecond,
|
||||
}, nil)
|
||||
exec := Executor{
|
||||
controller: ctrl,
|
||||
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
rateLimiter: &stubRateLimiter{
|
||||
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
},
|
||||
}
|
||||
stopAndWait := exec.Start(context.Background())
|
||||
for i := 0; i < 10; i++ {
|
||||
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
|
||||
}
|
||||
ctrl.stop <- struct{}{}
|
||||
|
||||
stopAndWait()
|
||||
|
||||
// we cannot assert the exact number of reconciliations here, because the executor might
|
||||
// select the stop case or the timer case first.
|
||||
assertBetween(assert, 10, 11, ctrl.reconciliationCounter)
|
||||
}
|
||||
|
||||
func TestExternalTrigger(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
ctrl := newStubController(Result{}, nil)
|
||||
exec := Executor{
|
||||
controller: ctrl,
|
||||
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
rateLimiter: &stubRateLimiter{
|
||||
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
},
|
||||
}
|
||||
stopAndWait := exec.Start(context.Background())
|
||||
<-ctrl.waitUntilReconciled // initial trigger
|
||||
for i := 0; i < 10; i++ {
|
||||
exec.Trigger()
|
||||
<-ctrl.waitUntilReconciled // external trigger
|
||||
}
|
||||
ctrl.stop <- struct{}{}
|
||||
|
||||
stopAndWait()
|
||||
|
||||
// initial trigger + 10 external triggers
|
||||
assert.Equal(11, ctrl.reconciliationCounter)
|
||||
}
|
||||
|
||||
func TestSimultaneousExternalTriggers(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
ctrl := newStubController(Result{}, nil)
|
||||
exec := Executor{
|
||||
controller: ctrl,
|
||||
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
rateLimiter: &stubRateLimiter{
|
||||
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
},
|
||||
}
|
||||
stopAndWait := exec.Start(context.Background())
|
||||
<-ctrl.waitUntilReconciled // initial trigger
|
||||
for i := 0; i < 100; i++ {
|
||||
exec.Trigger() // extra trigger calls are coalesced
|
||||
}
|
||||
<-ctrl.waitUntilReconciled // external trigger
|
||||
ctrl.stop <- struct{}{}
|
||||
|
||||
stopAndWait()
|
||||
|
||||
// we cannot assert the exact number of reconciliations here, because the executor might
|
||||
// select the stop case or the next manual trigger case first.
|
||||
assertBetween(assert, 2, 3, ctrl.reconciliationCounter)
|
||||
}
|
||||
|
||||
func TestContextCancel(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctrl := newStubController(Result{}, nil)
|
||||
exec := Executor{
|
||||
controller: ctrl,
|
||||
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
rateLimiter: &stubRateLimiter{}, // no rate limiting
|
||||
}
|
||||
_ = exec.Start(ctx) // no need to explicitly stop the executor, it will stop when the context is canceled
|
||||
<-ctrl.waitUntilReconciled // initial trigger
|
||||
|
||||
// canceling the context should stop the executor without blocking
|
||||
cancel()
|
||||
ctrl.stop <- struct{}{}
|
||||
|
||||
// poll for the executor stop running
|
||||
// this is necessary since the executor doesn't expose
|
||||
// a pure wait method
|
||||
assert.Eventually(func() bool {
|
||||
return !exec.Running()
|
||||
}, 5*time.Second, 10*time.Millisecond)
|
||||
|
||||
// initial trigger
|
||||
assert.Equal(1, ctrl.reconciliationCounter)
|
||||
}
|
||||
|
||||
func TestRequeueAfterPollingFrequency(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
ctrl := newStubController(Result{}, nil)
|
||||
exec := Executor{
|
||||
controller: ctrl,
|
||||
pollingFrequency: time.Microsecond, // basically no delay
|
||||
rateLimiter: &stubRateLimiter{
|
||||
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
|
||||
},
|
||||
}
|
||||
stopAndWait := exec.Start(context.Background())
|
||||
for i := 0; i < 10; i++ {
|
||||
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
|
||||
}
|
||||
ctrl.stop <- struct{}{}
|
||||
|
||||
stopAndWait()
|
||||
|
||||
// we cannot assert the exact number of reconciliations here, because the executor might
|
||||
// select the stop case or the timer case first.
|
||||
assertBetween(assert, 10, 11, ctrl.reconciliationCounter)
|
||||
}
|
||||
|
||||
type stubController struct {
|
||||
stopped bool
|
||||
stop chan struct{}
|
||||
waitUntilReconciled chan struct{}
|
||||
reconciliationCounter int
|
||||
result Result
|
||||
err error
|
||||
}
|
||||
|
||||
func newStubController(result Result, err error) *stubController {
|
||||
return &stubController{
|
||||
waitUntilReconciled: make(chan struct{}),
|
||||
stop: make(chan struct{}, 1),
|
||||
result: result,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *stubController) Reconcile(_ context.Context) (Result, error) {
|
||||
if s.stopped {
|
||||
return Result{}, errors.New("controller stopped")
|
||||
}
|
||||
s.reconciliationCounter++
|
||||
select {
|
||||
case <-s.stop:
|
||||
s.stopped = true
|
||||
case s.waitUntilReconciled <- struct{}{}:
|
||||
}
|
||||
|
||||
return s.result, s.err
|
||||
}
|
||||
|
||||
type stubRateLimiter struct {
|
||||
whenRes time.Duration
|
||||
}
|
||||
|
||||
func (s *stubRateLimiter) When(_ any) time.Duration {
|
||||
return s.whenRes
|
||||
}
|
||||
|
||||
func (s *stubRateLimiter) Forget(_ any) {}
|
||||
|
||||
func assertBetween(assert *assert.Assertions, min, max, actual int) {
|
||||
assert.GreaterOrEqual(actual, min)
|
||||
assert.LessOrEqual(actual, max)
|
||||
}
|
@ -25,12 +25,15 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/healthz"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log/zap"
|
||||
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
awsclient "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/aws/client"
|
||||
azureclient "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/azure/client"
|
||||
cloudfake "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/fake/client"
|
||||
gcpclient "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/gcp/client"
|
||||
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/deploy"
|
||||
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/executor"
|
||||
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/upgrade"
|
||||
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/sgreconciler"
|
||||
|
||||
nodemaintenancev1beta1 "github.com/edgelesssys/constellation/v2/3rdparty/node-maintenance-operator/api/v1beta1"
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
@ -143,8 +146,21 @@ func main() {
|
||||
}
|
||||
defer etcdClient.Close()
|
||||
|
||||
uid := os.Getenv(constellationUID)
|
||||
|
||||
extScalingGroupReconciler := sgreconciler.NewExternalScalingGroupReconciler(
|
||||
uid,
|
||||
cspClient,
|
||||
k8sClient,
|
||||
)
|
||||
|
||||
exec := executor.New(extScalingGroupReconciler, executor.NewDefaultConfig())
|
||||
|
||||
stopAndWaitForExecutor := exec.Start(context.Background())
|
||||
defer stopAndWaitForExecutor()
|
||||
|
||||
imageInfo := deploy.NewImageInfo()
|
||||
if err := deploy.InitialResources(context.Background(), k8sClient, imageInfo, cspClient, os.Getenv(constellationUID)); err != nil {
|
||||
if err := deploy.InitialResources(context.Background(), k8sClient, imageInfo, cspClient, uid); err != nil {
|
||||
setupLog.Error(err, "Unable to deploy initial resources")
|
||||
os.Exit(1)
|
||||
}
|
||||
@ -187,6 +203,15 @@ func main() {
|
||||
|
||||
//+kubebuilder:scaffold:builder
|
||||
|
||||
if err = sgreconciler.NewNodeJoinWatcher(
|
||||
exec.Trigger,
|
||||
mgr.GetClient(),
|
||||
mgr.GetScheme(),
|
||||
).SetupWithManager(mgr); err != nil {
|
||||
setupLog.Error(err, "Unable to create controller", "controller", "NodeJoinWatcher")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
|
||||
setupLog.Error(err, "Unable to set up health check")
|
||||
os.Exit(1)
|
||||
@ -223,7 +248,7 @@ type cspAPI interface {
|
||||
// GetAutoscalingGroupName retrieves the name of a scaling group as needed by the cluster-autoscaler.
|
||||
GetAutoscalingGroupName(scalingGroupID string) (string, error)
|
||||
// ListScalingGroups retrieves a list of scaling groups for the cluster.
|
||||
ListScalingGroups(ctx context.Context, uid string) (controlPlaneGroupIDs []string, workerGroupIDs []string, err error)
|
||||
ListScalingGroups(ctx context.Context, uid string) ([]cspapi.ScalingGroup, error)
|
||||
// AutoscalingCloudProvider returns the cloud-provider name as used by k8s cluster-autoscaler.
|
||||
AutoscalingCloudProvider() string
|
||||
}
|
||||
|
@ -0,0 +1,51 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
load("//bazel/go:go_test.bzl", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "sgreconciler",
|
||||
srcs = [
|
||||
"nodejoin_watcher.go",
|
||||
"scalinggroup_controller.go",
|
||||
"sgreconciler.go",
|
||||
],
|
||||
importpath = "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/sgreconciler",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//internal/constants",
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"//operators/constellation-node-operator/internal/executor",
|
||||
"@io_k8s_api//core/v1:core",
|
||||
"@io_k8s_apimachinery//pkg/api/errors",
|
||||
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
|
||||
"@io_k8s_apimachinery//pkg/runtime",
|
||||
"@io_k8s_apimachinery//pkg/types",
|
||||
"@io_k8s_client_go//util/retry",
|
||||
"@io_k8s_sigs_controller_runtime//:controller-runtime",
|
||||
"@io_k8s_sigs_controller_runtime//pkg/builder",
|
||||
"@io_k8s_sigs_controller_runtime//pkg/client",
|
||||
"@io_k8s_sigs_controller_runtime//pkg/event",
|
||||
"@io_k8s_sigs_controller_runtime//pkg/handler",
|
||||
"@io_k8s_sigs_controller_runtime//pkg/log",
|
||||
"@io_k8s_sigs_controller_runtime//pkg/predicate",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "sgreconciler_test",
|
||||
srcs = ["scalinggroup_controller_test.go"],
|
||||
embed = [":sgreconciler"],
|
||||
deps = [
|
||||
"//internal/constants",
|
||||
"//operators/constellation-node-operator/api/v1alpha1",
|
||||
"//operators/constellation-node-operator/internal/cloud/api",
|
||||
"//operators/constellation-node-operator/internal/constants",
|
||||
"@com_github_stretchr_testify//assert",
|
||||
"@com_github_stretchr_testify//require",
|
||||
"@io_k8s_apimachinery//pkg/api/errors",
|
||||
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
|
||||
"@io_k8s_apimachinery//pkg/runtime/schema",
|
||||
"@io_k8s_apimachinery//pkg/types",
|
||||
"@io_k8s_sigs_controller_runtime//pkg/client",
|
||||
],
|
||||
)
|
@ -0,0 +1,76 @@
|
||||
/*
|
||||
Copyright (c) Edgeless Systems GmbH
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package sgreconciler
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
)
|
||||
|
||||
// NodeJoinWatcher watches node join / leave events.
|
||||
type NodeJoinWatcher struct {
|
||||
trigger func()
|
||||
client.Client
|
||||
Scheme *runtime.Scheme
|
||||
}
|
||||
|
||||
// NewNodeJoinWatcher creates a new NodeJoinWatcher.
|
||||
func NewNodeJoinWatcher(trigger func(), client client.Client, scheme *runtime.Scheme) *NodeJoinWatcher {
|
||||
return &NodeJoinWatcher{
|
||||
trigger: trigger,
|
||||
Client: client,
|
||||
Scheme: scheme,
|
||||
}
|
||||
}
|
||||
|
||||
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;create;update;patch;delete
|
||||
//+kubebuilder:rbac:groups="",resources=nodes/status,verbs=get
|
||||
|
||||
// Reconcile reconciles node join / leave events.
|
||||
func (w *NodeJoinWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
logr := log.FromContext(ctx)
|
||||
logr.Info("node has joined or left the cluster", "node", req.Name)
|
||||
w.trigger()
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
func (w *NodeJoinWatcher) SetupWithManager(mgr ctrl.Manager) error {
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
Named("node-join-watcher").
|
||||
Watches(
|
||||
client.Object(&corev1.Node{}),
|
||||
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
|
||||
return []ctrl.Request{{
|
||||
NamespacedName: types.NamespacedName{Name: obj.GetName()},
|
||||
}}
|
||||
}),
|
||||
builder.WithPredicates(nodeJoinLeavePredicate()),
|
||||
).
|
||||
Complete(w)
|
||||
}
|
||||
|
||||
// nodeJoinLeavePredicate returns a predicate that returns true if a node joins or leaves the cluster.
|
||||
func nodeJoinLeavePredicate() predicate.Predicate {
|
||||
return predicate.Funcs{
|
||||
// CreateFunc is not specified => never filter out create events
|
||||
// DeleteFunc is not specified => never filter out delete events
|
||||
UpdateFunc: func(e event.UpdateEvent) bool { return false },
|
||||
GenericFunc: func(e event.GenericEvent) bool { return false },
|
||||
}
|
||||
}
|
@ -0,0 +1,185 @@
|
||||
/*
|
||||
Copyright (c) Edgeless Systems GmbH
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package sgreconciler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
mainconstants "github.com/edgelesssys/constellation/v2/internal/constants"
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/executor"
|
||||
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultScalingGroupMin is the default minimum number of nodes in a scaling group.
|
||||
// This value is used if the scaling group is created by the operator.
|
||||
// If a user modifies the scaling group, the operator will not overwrite the user's configuration.
|
||||
defaultScalingGroupMin = 1
|
||||
// defaultScalingGroupMax is the default maximum number of nodes in a scaling group.
|
||||
// This value is used if the scaling group is created by the operator.
|
||||
// If a user modifies the scaling group, the operator will not overwrite the user's configuration.
|
||||
defaultScalingGroupMax = 10
|
||||
)
|
||||
|
||||
// ExternalScalingGroupReconciler reconciles on scaling groups in CSP infrastructure.
|
||||
// It does NOT reconcile on k8s resources.
|
||||
// Instead, it scans the cloud provider infrastructure and changes k8s resources accordingly.
|
||||
type ExternalScalingGroupReconciler struct {
|
||||
// uid is the unique identifier of the Constellation cluster.
|
||||
uid string
|
||||
scalingGroupDiscoverer scalingGroupDiscoverer
|
||||
k8sClient k8sReadWriter
|
||||
}
|
||||
|
||||
// NewExternalScalingGroupReconciler creates a new InfrastructureReconciler.
|
||||
func NewExternalScalingGroupReconciler(uid string, discoverer scalingGroupDiscoverer, k8sClient k8sReadWriter) *ExternalScalingGroupReconciler {
|
||||
return &ExternalScalingGroupReconciler{
|
||||
uid: uid,
|
||||
scalingGroupDiscoverer: discoverer,
|
||||
k8sClient: k8sClient,
|
||||
}
|
||||
}
|
||||
|
||||
// Reconcile reconciles on scaling groups in CSP infrastructure.
|
||||
func (r *ExternalScalingGroupReconciler) Reconcile(ctx context.Context) (executor.Result, error) {
|
||||
logr := log.FromContext(ctx)
|
||||
logr.Info("reconciling external scaling groups")
|
||||
|
||||
nodeGroups, err := r.scalingGroupDiscoverer.ListScalingGroups(ctx, r.uid)
|
||||
if err != nil {
|
||||
return executor.Result{}, err
|
||||
}
|
||||
|
||||
existingNodeGroups := map[string]struct{}{}
|
||||
|
||||
// create all scaling groups that are newly discovered
|
||||
for _, group := range nodeGroups {
|
||||
exists, err := patchNodeGroupName(ctx, r.k8sClient, group.Name, group.NodeGroupName)
|
||||
if err != nil {
|
||||
return executor.Result{}, err
|
||||
}
|
||||
if exists {
|
||||
// scaling group already exists
|
||||
existingNodeGroups[group.Name] = struct{}{}
|
||||
continue
|
||||
}
|
||||
err = createScalingGroupIfNotExists(ctx, newScalingGroupConfig{
|
||||
k8sClient: r.k8sClient,
|
||||
resourceName: group.Name,
|
||||
groupID: group.GroupID,
|
||||
nodeGroupName: group.NodeGroupName,
|
||||
autoscalingGroupName: group.AutoscalingGroupName,
|
||||
role: group.Role,
|
||||
})
|
||||
if err != nil {
|
||||
return executor.Result{}, err
|
||||
}
|
||||
existingNodeGroups[group.Name] = struct{}{}
|
||||
}
|
||||
|
||||
logr.Info("ensured scaling groups are created", "count", len(nodeGroups))
|
||||
|
||||
// delete all scaling groups that no longer exist
|
||||
var scalingGroups updatev1alpha1.ScalingGroupList
|
||||
if err := r.k8sClient.List(ctx, &scalingGroups); err != nil {
|
||||
return executor.Result{}, err
|
||||
}
|
||||
for _, group := range scalingGroups.Items {
|
||||
if _, ok := existingNodeGroups[group.Name]; !ok {
|
||||
logr.Info("deleting scaling group", "name", group.Name)
|
||||
err := r.k8sClient.Delete(ctx, &group)
|
||||
if err != nil {
|
||||
return executor.Result{}, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logr.Info("external scaling groups reconciled")
|
||||
|
||||
return executor.Result{}, nil
|
||||
}
|
||||
|
||||
// patchNodeGroupName patches the node group name of a scaling group resource (if necessary and it exists).
|
||||
func patchNodeGroupName(ctx context.Context, k8sClient k8sReadWriter, resourceName, nodeGroupName string) (exists bool, err error) {
|
||||
logr := log.FromContext(ctx)
|
||||
var scalingGroup updatev1alpha1.ScalingGroup
|
||||
err = k8sClient.Get(ctx, client.ObjectKey{Name: resourceName}, &scalingGroup)
|
||||
if k8sErrors.IsNotFound(err) {
|
||||
// scaling group does not exist
|
||||
// no need to patch
|
||||
return false /* doesn't exist */, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if scalingGroup.Spec.NodeGroupName == nodeGroupName {
|
||||
// scaling group already has the correct node group name
|
||||
return true /* exists */, nil
|
||||
}
|
||||
logr.Info("patching node group name", "resourceName", resourceName, "nodeGroupName", nodeGroupName)
|
||||
return true, retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
if err := k8sClient.Get(ctx, client.ObjectKey{Name: resourceName}, &scalingGroup); err != nil {
|
||||
return err
|
||||
}
|
||||
scalingGroup.Spec.NodeGroupName = nodeGroupName
|
||||
return k8sClient.Update(ctx, &scalingGroup)
|
||||
})
|
||||
}
|
||||
|
||||
func createScalingGroupIfNotExists(ctx context.Context, config newScalingGroupConfig) error {
|
||||
logr := log.FromContext(ctx)
|
||||
err := config.k8sClient.Create(ctx, &updatev1alpha1.ScalingGroup{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: strings.ToLower(config.resourceName),
|
||||
},
|
||||
Spec: updatev1alpha1.ScalingGroupSpec{
|
||||
NodeVersion: mainconstants.NodeVersionResourceName,
|
||||
GroupID: config.groupID,
|
||||
AutoscalerGroupName: config.autoscalingGroupName,
|
||||
NodeGroupName: config.nodeGroupName,
|
||||
Min: defaultScalingGroupMin,
|
||||
Max: defaultScalingGroupMax,
|
||||
Role: config.role,
|
||||
},
|
||||
})
|
||||
if k8sErrors.IsAlreadyExists(err) {
|
||||
return nil
|
||||
} else if err == nil {
|
||||
logr.Info("created scaling group", "name", config.resourceName, "nodeGroupName", config.nodeGroupName)
|
||||
} else {
|
||||
logr.Error(err, "failed to create scaling group", "name", config.resourceName, "nodeGroupName", config.nodeGroupName)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// scalingGroupDiscoverer is used to discover scaling groups in the cloud provider infrastructure.
|
||||
type scalingGroupDiscoverer interface {
|
||||
ListScalingGroups(ctx context.Context, uid string,
|
||||
) ([]cspapi.ScalingGroup, error)
|
||||
}
|
||||
|
||||
type k8sReadWriter interface {
|
||||
client.Reader
|
||||
client.Writer
|
||||
}
|
||||
|
||||
type newScalingGroupConfig struct {
|
||||
k8sClient client.Writer
|
||||
resourceName string
|
||||
groupID string
|
||||
nodeGroupName string
|
||||
autoscalingGroupName string
|
||||
role updatev1alpha1.NodeRole
|
||||
}
|
@ -0,0 +1,98 @@
|
||||
//go:build integration
|
||||
|
||||
/*
|
||||
Copyright (c) Edgeless Systems GmbH
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package sgreconciler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
)
|
||||
|
||||
var _ = Describe("ExternalScalingGroup controller", func() {
|
||||
AfterEach(func() {
|
||||
Eventually(func() error {
|
||||
return resetEnv()
|
||||
}, 30*time.Second, 1*time.Second).Should(Succeed())
|
||||
})
|
||||
|
||||
// Define utility constants for object names and testing timeouts/durations and intervals.
|
||||
const (
|
||||
nodeVersionName = "node-version"
|
||||
scalingGroupName = "test-group"
|
||||
nodeGroupName = "test-node-group"
|
||||
groupID = "test-group-id"
|
||||
autoscalingGroupName = "test-autoscaling-group-name"
|
||||
|
||||
timeout = time.Second * 20
|
||||
interval = time.Millisecond * 250
|
||||
)
|
||||
|
||||
Context("When creating a scaling group externally", func() {
|
||||
It("Should create a corresponding scaling group resource", func() {
|
||||
By("creating an external scaling group")
|
||||
ctx := context.Background()
|
||||
fakes.scalingGroupDiscoverer.set([]cspapi.ScalingGroup{
|
||||
{
|
||||
Name: scalingGroupName,
|
||||
NodeGroupName: nodeGroupName,
|
||||
GroupID: groupID,
|
||||
AutoscalingGroupName: autoscalingGroupName,
|
||||
Role: updatev1alpha1.ControlPlaneRole,
|
||||
},
|
||||
})
|
||||
|
||||
By("checking that the scaling group resource was created")
|
||||
triggerReconcile()
|
||||
createdScalingGroup := &updatev1alpha1.ScalingGroup{}
|
||||
Eventually(func() error {
|
||||
return k8sClient.Get(ctx, types.NamespacedName{Name: scalingGroupName}, createdScalingGroup)
|
||||
}, timeout, interval).Should(Succeed())
|
||||
Expect(createdScalingGroup.Spec.GroupID).Should(Equal(groupID))
|
||||
Expect(createdScalingGroup.Spec.AutoscalerGroupName).Should(Equal(autoscalingGroupName))
|
||||
Expect(createdScalingGroup.Spec.NodeGroupName).Should(Equal(nodeGroupName))
|
||||
Expect(createdScalingGroup.Spec.Role).Should(Equal(updatev1alpha1.ControlPlaneRole))
|
||||
})
|
||||
|
||||
It("Should update if external scaling groups are added/removed", func() {
|
||||
By("changing the external scaling groups")
|
||||
fakes.scalingGroupDiscoverer.set([]cspapi.ScalingGroup{
|
||||
{
|
||||
Name: "other-scaling-group",
|
||||
NodeGroupName: "other-node-group",
|
||||
GroupID: "other-group-id",
|
||||
AutoscalingGroupName: "other-autoscaling-group-name",
|
||||
Role: updatev1alpha1.WorkerRole,
|
||||
},
|
||||
})
|
||||
|
||||
By("checking that the scaling group resource was created")
|
||||
triggerReconcile()
|
||||
createdScalingGroup := &updatev1alpha1.ScalingGroup{}
|
||||
Eventually(func() error {
|
||||
return k8sClient.Get(ctx, types.NamespacedName{Name: "other-scaling-group"}, createdScalingGroup)
|
||||
}, timeout, interval).Should(Succeed())
|
||||
Expect(createdScalingGroup.Spec.GroupID).Should(Equal("other-group-id"))
|
||||
Expect(createdScalingGroup.Spec.AutoscalerGroupName).Should(Equal("other-autoscaling-group-name"))
|
||||
Expect(createdScalingGroup.Spec.NodeGroupName).Should(Equal("other-node-group"))
|
||||
Expect(createdScalingGroup.Spec.Role).Should(Equal(updatev1alpha1.WorkerRole))
|
||||
|
||||
By("checking that the old scaling group resource was deleted")
|
||||
deletedScalingGroup := &updatev1alpha1.ScalingGroup{}
|
||||
Eventually(func() error {
|
||||
return k8sClient.Get(ctx, types.NamespacedName{Name: scalingGroupName}, deletedScalingGroup)
|
||||
}, timeout, interval).ShouldNot(Succeed())
|
||||
})
|
||||
})
|
||||
})
|
@ -0,0 +1,265 @@
|
||||
/*
|
||||
Copyright (c) Edgeless Systems GmbH
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package sgreconciler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
mainconstants "github.com/edgelesssys/constellation/v2/internal/constants"
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
cspapi "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/cloud/api"
|
||||
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/constants"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
func TestCreateScalingGroupIfNotExists(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
createErr error
|
||||
wantScalingGroup *updatev1alpha1.ScalingGroup
|
||||
wantErr bool
|
||||
}{
|
||||
"create works": {
|
||||
wantScalingGroup: &updatev1alpha1.ScalingGroup{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "resource-name",
|
||||
},
|
||||
Spec: updatev1alpha1.ScalingGroupSpec{
|
||||
NodeVersion: mainconstants.NodeVersionResourceName,
|
||||
GroupID: "group-id",
|
||||
AutoscalerGroupName: "autoscaling-group-name",
|
||||
NodeGroupName: "node-group-name",
|
||||
Min: 1,
|
||||
Max: 10,
|
||||
Role: updatev1alpha1.WorkerRole,
|
||||
},
|
||||
},
|
||||
},
|
||||
"create fails": {
|
||||
createErr: errors.New("create failed"),
|
||||
wantErr: true,
|
||||
},
|
||||
"scaling group exists": {
|
||||
createErr: k8sErrors.NewAlreadyExists(schema.GroupResource{}, constants.AutoscalingStrategyResourceName),
|
||||
wantScalingGroup: &updatev1alpha1.ScalingGroup{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "resource-name",
|
||||
},
|
||||
Spec: updatev1alpha1.ScalingGroupSpec{
|
||||
NodeVersion: mainconstants.NodeVersionResourceName,
|
||||
GroupID: "group-id",
|
||||
AutoscalerGroupName: "autoscaling-group-name",
|
||||
NodeGroupName: "node-group-name",
|
||||
Min: 1,
|
||||
Max: 10,
|
||||
Role: updatev1alpha1.WorkerRole,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
k8sClient := &fakeK8sClient{createErr: tc.createErr}
|
||||
newScalingGroupConfig := newScalingGroupConfig{
|
||||
k8sClient: k8sClient,
|
||||
resourceName: "resource-name",
|
||||
groupID: "group-id",
|
||||
nodeGroupName: "node-group-name",
|
||||
autoscalingGroupName: "autoscaling-group-name",
|
||||
role: updatev1alpha1.WorkerRole,
|
||||
}
|
||||
err := createScalingGroupIfNotExists(context.Background(), newScalingGroupConfig)
|
||||
if tc.wantErr {
|
||||
assert.Error(err)
|
||||
return
|
||||
}
|
||||
require.NoError(err)
|
||||
assert.Len(k8sClient.createdObjects, 1)
|
||||
assert.Equal(tc.wantScalingGroup, k8sClient.createdObjects[0])
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatchNodeGroupName(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
getRes client.Object
|
||||
getErr error
|
||||
updateErr error
|
||||
wantExists bool
|
||||
wantErr bool
|
||||
}{
|
||||
"patching works": {
|
||||
getRes: &updatev1alpha1.ScalingGroup{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "resource-name",
|
||||
},
|
||||
Spec: updatev1alpha1.ScalingGroupSpec{
|
||||
NodeVersion: mainconstants.NodeVersionResourceName,
|
||||
GroupID: "group-id",
|
||||
AutoscalerGroupName: "autoscaling-group-name",
|
||||
Autoscaling: true,
|
||||
Min: 1,
|
||||
Max: 10,
|
||||
Role: updatev1alpha1.ControlPlaneRole,
|
||||
},
|
||||
},
|
||||
wantExists: true,
|
||||
},
|
||||
"name already set": {
|
||||
getRes: &updatev1alpha1.ScalingGroup{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "resource-name",
|
||||
},
|
||||
Spec: updatev1alpha1.ScalingGroupSpec{
|
||||
NodeVersion: mainconstants.NodeVersionResourceName,
|
||||
GroupID: "group-id",
|
||||
NodeGroupName: "node-group-name",
|
||||
AutoscalerGroupName: "autoscaling-group-name",
|
||||
Autoscaling: true,
|
||||
Min: 1,
|
||||
Max: 10,
|
||||
Role: updatev1alpha1.ControlPlaneRole,
|
||||
},
|
||||
},
|
||||
wantExists: true,
|
||||
},
|
||||
"does not exist": {
|
||||
getErr: k8sErrors.NewNotFound(schema.GroupResource{}, "resource-name"),
|
||||
wantExists: false,
|
||||
},
|
||||
"getting fails": {
|
||||
getErr: errors.New("get failed"),
|
||||
wantErr: true,
|
||||
},
|
||||
"patching fails": {
|
||||
getRes: &updatev1alpha1.ScalingGroup{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "update.edgeless.systems/v1alpha1", Kind: "ScalingGroup"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "resource-name",
|
||||
},
|
||||
Spec: updatev1alpha1.ScalingGroupSpec{
|
||||
NodeVersion: mainconstants.NodeVersionResourceName,
|
||||
GroupID: "group-id",
|
||||
AutoscalerGroupName: "autoscaling-group-name",
|
||||
Autoscaling: true,
|
||||
Min: 1,
|
||||
Max: 10,
|
||||
Role: updatev1alpha1.ControlPlaneRole,
|
||||
},
|
||||
},
|
||||
updateErr: errors.New("patch failed"),
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
k8sClient := &fakeK8sClient{
|
||||
getRes: tc.getRes,
|
||||
getErr: tc.getErr,
|
||||
updateErr: tc.updateErr,
|
||||
}
|
||||
gotExists, gotErr := patchNodeGroupName(context.Background(), k8sClient, "resource-name", "node-group-name")
|
||||
if tc.wantErr {
|
||||
assert.Error(gotErr)
|
||||
return
|
||||
}
|
||||
require.NoError(gotErr)
|
||||
assert.Equal(tc.wantExists, gotExists)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeK8sClient struct {
|
||||
getRes client.Object
|
||||
createdObjects []client.Object
|
||||
createErr error
|
||||
listErr error
|
||||
getErr error
|
||||
updateErr error
|
||||
client.Client
|
||||
}
|
||||
|
||||
func (s *fakeK8sClient) Create(_ context.Context, obj client.Object, _ ...client.CreateOption) error {
|
||||
for _, o := range s.createdObjects {
|
||||
if obj.GetName() == o.GetName() {
|
||||
return k8sErrors.NewAlreadyExists(schema.GroupResource{}, obj.GetName())
|
||||
}
|
||||
}
|
||||
|
||||
s.createdObjects = append(s.createdObjects, obj.DeepCopyObject().(client.Object))
|
||||
return s.createErr
|
||||
}
|
||||
|
||||
func (s *fakeK8sClient) Get(_ context.Context, _ types.NamespacedName, out client.Object, _ ...client.GetOption) error {
|
||||
if s.getErr != nil {
|
||||
return s.getErr
|
||||
}
|
||||
obj := s.getRes.DeepCopyObject()
|
||||
outVal := reflect.ValueOf(out)
|
||||
objVal := reflect.ValueOf(obj)
|
||||
if !objVal.Type().AssignableTo(outVal.Type()) {
|
||||
return fmt.Errorf("fake had type %s, but %s was asked for", objVal.Type(), outVal.Type())
|
||||
}
|
||||
reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *fakeK8sClient) Update(_ context.Context, _ client.Object, _ ...client.UpdateOption) error {
|
||||
return s.updateErr
|
||||
}
|
||||
|
||||
func (s *fakeK8sClient) List(_ context.Context, _ client.ObjectList, _ ...client.ListOption) error {
|
||||
return s.listErr
|
||||
}
|
||||
|
||||
type stubScalingGroupDiscoverer struct {
|
||||
sync.RWMutex
|
||||
groups []cspapi.ScalingGroup
|
||||
}
|
||||
|
||||
func (d *stubScalingGroupDiscoverer) ListScalingGroups(_ context.Context, _ string,
|
||||
) ([]cspapi.ScalingGroup, error) {
|
||||
d.RLock()
|
||||
defer d.RUnlock()
|
||||
ret := make([]cspapi.ScalingGroup, len(d.groups))
|
||||
copy(ret, d.groups)
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (d *stubScalingGroupDiscoverer) set(groups []cspapi.ScalingGroup) {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
d.groups = groups
|
||||
}
|
||||
|
||||
func (d *stubScalingGroupDiscoverer) reset() {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
d.groups = nil
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
/*
|
||||
Copyright (c) Edgeless Systems GmbH
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
// Package sgreconciler contains a reconciler that reconciles on cloud provider infrastructure.
|
||||
// It is used to create, delete and update the spec of infrastructure-related k8s resources based on the
|
||||
// actual state of the infrastructure.
|
||||
// It uses polling (with additional triggers) to check the state of the infrastructure.
|
||||
package sgreconciler
|
194
operators/constellation-node-operator/sgreconciler/suite_test.go
Normal file
194
operators/constellation-node-operator/sgreconciler/suite_test.go
Normal file
@ -0,0 +1,194 @@
|
||||
//go:build integration
|
||||
|
||||
/*
|
||||
Copyright (c) Edgeless Systems GmbH
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package sgreconciler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/rest"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/envtest"
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/log"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log/zap"
|
||||
|
||||
updatev1alpha1 "github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/api/v1alpha1"
|
||||
"github.com/edgelesssys/constellation/v2/operators/constellation-node-operator/v2/internal/executor"
|
||||
//+kubebuilder:scaffold:imports
|
||||
)
|
||||
|
||||
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
|
||||
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
|
||||
|
||||
var (
|
||||
cfg *rest.Config
|
||||
k8sClient client.Client
|
||||
testEnv *envtest.Environment
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
fakes = newFakes()
|
||||
triggerReconcile func()
|
||||
stopAndWaitForExecutor func()
|
||||
)
|
||||
|
||||
func TestAPIs(t *testing.T) {
|
||||
RegisterFailHandler(Fail)
|
||||
suiteConfig, reporterConfig := GinkgoConfiguration()
|
||||
// If you want to debug a specific seed, set it here.
|
||||
// suiteConfig.RandomSeed = 1679587116
|
||||
reporterConfig.VeryVerbose = true
|
||||
RunSpecs(t, "Controller Suite", suiteConfig, reporterConfig)
|
||||
}
|
||||
|
||||
var _ = BeforeSuite(func() {
|
||||
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
|
||||
By("bootstrapping test environment")
|
||||
testEnv = &envtest.Environment{
|
||||
CRDDirectoryPaths: []string{
|
||||
filepath.Join("..", "config", "crd", "bases"),
|
||||
},
|
||||
ErrorIfCRDPathMissing: true,
|
||||
}
|
||||
|
||||
var err error
|
||||
// cfg is defined in this file globally.
|
||||
cfg, err = testEnv.Start()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(cfg).NotTo(BeNil())
|
||||
|
||||
err = updatev1alpha1.AddToScheme(scheme.Scheme)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
//+kubebuilder:scaffold:scheme
|
||||
|
||||
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(k8sClient).NotTo(BeNil())
|
||||
|
||||
extScalingGroupReconciler := NewExternalScalingGroupReconciler(
|
||||
"uid",
|
||||
fakes.scalingGroupDiscoverer,
|
||||
k8sClient,
|
||||
)
|
||||
|
||||
exec := executor.New(extScalingGroupReconciler, executor.Config{})
|
||||
triggerReconcile = exec.Trigger
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
stopAndWaitForExecutor = exec.Start(ctx)
|
||||
}()
|
||||
})
|
||||
|
||||
var _ = AfterSuite(func() {
|
||||
By("tearing down the test environment")
|
||||
cancel()
|
||||
defer stopAndWaitForExecutor()
|
||||
err := testEnv.Stop()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
type fakeCollection struct {
|
||||
scalingGroupDiscoverer *stubScalingGroupDiscoverer
|
||||
}
|
||||
|
||||
func (c *fakeCollection) reset() {
|
||||
c.scalingGroupDiscoverer.reset()
|
||||
}
|
||||
|
||||
func newFakes() *fakeCollection {
|
||||
return &fakeCollection{
|
||||
scalingGroupDiscoverer: &stubScalingGroupDiscoverer{},
|
||||
}
|
||||
}
|
||||
|
||||
func resetEnv() error {
|
||||
// cleanup all nodes
|
||||
nodeList := &corev1.NodeList{}
|
||||
if err := k8sClient.List(context.Background(), nodeList); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, node := range nodeList.Items {
|
||||
if err := k8sClient.Delete(context.Background(), &node); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// cleanup all node versions
|
||||
nodeVersionList := &updatev1alpha1.NodeVersionList{}
|
||||
if err := k8sClient.List(context.Background(), nodeVersionList); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, nodeVersion := range nodeVersionList.Items {
|
||||
if err := k8sClient.Delete(context.Background(), &nodeVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// cleanup all scaling groups
|
||||
scalingGroupList := &updatev1alpha1.ScalingGroupList{}
|
||||
if err := k8sClient.List(context.Background(), scalingGroupList); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, scalingGroup := range scalingGroupList.Items {
|
||||
if err := k8sClient.Delete(context.Background(), &scalingGroup); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// cleanup all pending nodes
|
||||
pendingNodeList := &updatev1alpha1.PendingNodeList{}
|
||||
if err := k8sClient.List(context.Background(), pendingNodeList); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, pendingNode := range pendingNodeList.Items {
|
||||
if err := k8sClient.Delete(context.Background(), &pendingNode); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// cleanup all joining nodes
|
||||
joiningNodeList := &updatev1alpha1.JoiningNodeList{}
|
||||
if err := k8sClient.List(context.Background(), joiningNodeList); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, joiningNode := range joiningNodeList.Items {
|
||||
if err := k8sClient.Delete(context.Background(), &joiningNode); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// cleanup all autoscaling strategies
|
||||
autoscalingStrategyList := &updatev1alpha1.AutoscalingStrategyList{}
|
||||
if err := k8sClient.List(context.Background(), autoscalingStrategyList); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, autoscalingStrategy := range autoscalingStrategyList.Items {
|
||||
if err := k8sClient.Delete(context.Background(), &autoscalingStrategy); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// cleanup all deployments
|
||||
deploymentList := &appsv1.DeploymentList{}
|
||||
if err := k8sClient.List(context.Background(), deploymentList); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, deployment := range deploymentList.Items {
|
||||
if err := k8sClient.Delete(context.Background(), &deployment); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
fakes.reset()
|
||||
return nil
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user