diff --git a/coordinator/cmd/coordinator/main.go b/coordinator/cmd/coordinator/main.go index fb781a564..1d7f5ceef 100644 --- a/coordinator/cmd/coordinator/main.go +++ b/coordinator/cmd/coordinator/main.go @@ -39,6 +39,7 @@ func main() { var kube core.Cluster var metadata core.ProviderMetadata var cloudControllerManager core.CloudControllerManager + var cloudNodeManager core.CloudNodeManager var autoscaler core.ClusterAutoscaler cfg := zap.NewDevelopmentConfig() @@ -148,5 +149,5 @@ func main() { dialer := &net.Dialer{} run(validator, issuer, wg, openTPM, util.GetIPAddr, dialer, kube, - metadata, cloudControllerManager, autoscaler, etcdEndpoint, enforceEtcdTls, bindIP, bindPort, zapLoggerCore) + metadata, cloudControllerManager, cloudNodeManager, autoscaler, etcdEndpoint, enforceEtcdTls, bindIP, bindPort, zapLoggerCore) } diff --git a/coordinator/cmd/coordinator/run.go b/coordinator/cmd/coordinator/run.go index 8c160d85e..d99b92b23 100644 --- a/coordinator/cmd/coordinator/run.go +++ b/coordinator/cmd/coordinator/run.go @@ -26,7 +26,7 @@ import ( var version = "0.0.0" func run(validator core.QuoteValidator, issuer core.QuoteIssuer, vpn core.VPN, openTPM vtpm.TPMOpenFunc, getPublicIPAddr func() (string, error), dialer pubapi.Dialer, - kube core.Cluster, metadata core.ProviderMetadata, cloudControllerManager core.CloudControllerManager, clusterAutoscaler core.ClusterAutoscaler, etcdEndpoint string, etcdTLS bool, bindIP, bindPort string, zapLoggerCore *zap.Logger, + kube core.Cluster, metadata core.ProviderMetadata, cloudControllerManager core.CloudControllerManager, cloudNodeManager core.CloudNodeManager, clusterAutoscaler core.ClusterAutoscaler, etcdEndpoint string, etcdTLS bool, bindIP, bindPort string, zapLoggerCore *zap.Logger, ) { defer zapLoggerCore.Sync() zapLoggerCore.Info("starting coordinator", zap.String("version", version)) @@ -41,7 +41,7 @@ func run(validator core.QuoteValidator, issuer core.QuoteIssuer, vpn core.VPN, o ForceTLS: etcdTLS, Logger: zapLoggerCore.WithOptions(zap.IncreaseLevel(zap.WarnLevel)).Named("etcd"), } - core, err := core.NewCore(vpn, kube, metadata, cloudControllerManager, clusterAutoscaler, zapLoggerCore, openTPM, etcdStoreFactory) + core, err := core.NewCore(vpn, kube, metadata, cloudControllerManager, cloudNodeManager, clusterAutoscaler, zapLoggerCore, openTPM, etcdStoreFactory) if err != nil { zapLoggerCore.Fatal("failed to create core", zap.Error(err)) } diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 48a4176a1..c2b9d953c 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -189,7 +189,7 @@ func TestConcurrent(t *testing.T) { func spawnPeer(require *require.Assertions, logger *zap.Logger, dialer *testdialer.BufconnDialer, netw *network, endpoint string) (*grpc.Server, *pubapi.API, *fakeVPN) { vpn := newVPN(netw, endpoint) - cor, err := core.NewCore(vpn, &core.ClusterFake{}, &core.ProviderMetadataFake{}, &core.CloudControllerManagerFake{}, &core.ClusterAutoscalerFake{}, logger, vtpm.OpenSimulatedTPM, fakeStoreFactory{}) + cor, err := core.NewCore(vpn, &core.ClusterFake{}, &core.ProviderMetadataFake{}, &core.CloudControllerManagerFake{}, &core.CloudNodeManagerFake{}, &core.ClusterAutoscalerFake{}, logger, vtpm.OpenSimulatedTPM, fakeStoreFactory{}) require.NoError(err) getPublicAddr := func() (string, error) { diff --git a/coordinator/core/cloud_provider.go b/coordinator/core/cloud_provider.go index dc085d86d..c7396d56b 100644 --- a/coordinator/core/cloud_provider.go +++ b/coordinator/core/cloud_provider.go @@ -76,6 +76,18 @@ type CloudControllerManager interface { Supported() bool } +// CloudNodeManager implementers provide configuration for the k8s cloud-node-manager. +type CloudNodeManager interface { + // Image returns the container image used to provide cloud-node-manager for the cloud-provider. + Image() string + // Path returns the path used by cloud-node-manager executable within the container image. + Path() string + // ExtraArgs returns a list of arguments to append to the cloud-node-manager command. + ExtraArgs() []string + // Supported is used to determine if cloud node manager is implemented for this cloud provider. + Supported() bool +} + // ClusterAutoscaler implementers provide configuration for the k8s cluster-autoscaler. type ClusterAutoscaler interface { // Name returns the cloud-provider name as used by k8s cluster-autoscaler. @@ -202,6 +214,24 @@ func (f *CloudControllerManagerFake) Supported() bool { return false } +type CloudNodeManagerFake struct{} + +func (f *CloudNodeManagerFake) Image() string { + return "fake-image:latest" +} + +func (f *CloudNodeManagerFake) Path() string { + return "/fake-cloud-node-manager" +} + +func (f *CloudNodeManagerFake) ExtraArgs() []string { + return []string{} +} + +func (f *CloudNodeManagerFake) Supported() bool { + return false +} + type ClusterAutoscalerFake struct{} func (f *ClusterAutoscalerFake) Name() string { diff --git a/coordinator/core/cluster.go b/coordinator/core/cluster.go index 4192aa53e..47c55e558 100644 --- a/coordinator/core/cluster.go +++ b/coordinator/core/cluster.go @@ -25,7 +25,6 @@ func (c *Core) InitCluster(autoscalingNodeGroups []string, cloudServiceAccountUR var ccmSecrets resources.Secrets var err error nodeIP := coordinatorVPNIP.String() - var err error if c.metadata.Supported() { instance, err = c.metadata.Self(context.TODO()) if err != nil { @@ -74,6 +73,10 @@ func (c *Core) InitCluster(autoscalingNodeGroups []string, cloudServiceAccountUR CloudControllerManagerVolumes: c.cloudControllerManager.Volumes(), CloudControllerManagerVolumeMounts: c.cloudControllerManager.VolumeMounts(), CloudControllerManagerEnv: c.cloudControllerManager.Env(), + SupportsCloudNodeManager: c.cloudNodeManager.Supported(), + CloudNodeManagerImage: c.cloudNodeManager.Image(), + CloudNodeManagerPath: c.cloudNodeManager.Path(), + CloudNodeManagerExtraArgs: c.cloudNodeManager.ExtraArgs(), }) if err != nil { c.zaplogger.Error("Initializing cluster failed", zap.Error(err)) diff --git a/coordinator/core/cluster_test.go b/coordinator/core/cluster_test.go index 006889e64..e28bf4e07 100644 --- a/coordinator/core/cluster_test.go +++ b/coordinator/core/cluster_test.go @@ -22,6 +22,7 @@ func TestInitCluster(t *testing.T) { cluster clusterStub metadata stubMetadata cloudControllerManager stubCloudControllerManager + cloudNodeManager stubCloudNodeManager clusterAutoscaler stubClusterAutoscaler autoscalingNodeGroups []string expectErr bool @@ -166,7 +167,7 @@ func TestInitCluster(t *testing.T) { zapLogger, err := zap.NewDevelopment() require.NoError(err) - core, err := NewCore(&stubVPN{}, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil) + core, err := NewCore(&stubVPN{}, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.cloudNodeManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil) require.NoError(err) kubeconfig, err := core.InitCluster(tc.autoscalingNodeGroups, "cloud-service-account-uri") @@ -190,6 +191,7 @@ func TestJoinCluster(t *testing.T) { cluster clusterStub metadata stubMetadata cloudControllerManager stubCloudControllerManager + cloudNodeManager stubCloudNodeManager clusterAutoscaler stubClusterAutoscaler vpn stubVPN expectErr bool @@ -280,7 +282,7 @@ func TestJoinCluster(t *testing.T) { zapLogger, err := zap.NewDevelopment() require.NoError(err) - core, err := NewCore(&tc.vpn, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil) + core, err := NewCore(&tc.vpn, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.cloudNodeManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil) require.NoError(err) joinReq := kubeadm.BootstrapTokenDiscovery{ @@ -435,6 +437,39 @@ func (s *stubCloudControllerManager) Supported() bool { return s.supportedRes } +type stubCloudNodeManager struct { + imageRes string + pathRes string + nameRes string + prepareInstanceRes error + extraArgsRes []string + configMapsRes resources.ConfigMaps + configMapsErr error + secretsRes resources.Secrets + secretsErr error + volumesRes []k8s.Volume + volumeMountRes []k8s.VolumeMount + supportedRes bool + + prepareInstanceRequests []prepareInstanceRequest +} + +func (s *stubCloudNodeManager) Image() string { + return s.imageRes +} + +func (s *stubCloudNodeManager) Path() string { + return s.pathRes +} + +func (s *stubCloudNodeManager) ExtraArgs() []string { + return s.extraArgsRes +} + +func (s *stubCloudNodeManager) Supported() bool { + return s.supportedRes +} + type stubClusterAutoscaler struct { nameRes string supportedRes bool diff --git a/coordinator/core/core.go b/coordinator/core/core.go index 3f3c47cd3..ab373d12d 100644 --- a/coordinator/core/core.go +++ b/coordinator/core/core.go @@ -29,6 +29,7 @@ type Core struct { kube Cluster metadata ProviderMetadata cloudControllerManager CloudControllerManager + cloudNodeManager CloudNodeManager clusterAutoscaler ClusterAutoscaler kms kms.CloudKMS zaplogger *zap.Logger @@ -38,7 +39,7 @@ type Core struct { // NewCore creates and initializes a new Core object. func NewCore(vpn VPN, kube Cluster, - metadata ProviderMetadata, cloudControllerManager CloudControllerManager, clusterAutoscaler ClusterAutoscaler, + metadata ProviderMetadata, cloudControllerManager CloudControllerManager, cloudNodeManager CloudNodeManager, clusterAutoscaler ClusterAutoscaler, zapLogger *zap.Logger, openTPM vtpm.TPMOpenFunc, persistentStoreFactory PersistentStoreFactory, ) (*Core, error) { stor := store.NewStdStore() @@ -48,6 +49,7 @@ func NewCore(vpn VPN, kube Cluster, vpn: vpn, kube: kube, metadata: metadata, + cloudNodeManager: cloudNodeManager, cloudControllerManager: cloudControllerManager, clusterAutoscaler: clusterAutoscaler, zaplogger: zapLogger, diff --git a/coordinator/core/core_test.go b/coordinator/core/core_test.go index 58b53ef8d..ed845c20c 100644 --- a/coordinator/core/core_test.go +++ b/coordinator/core/core_test.go @@ -28,7 +28,7 @@ func TestAddAdmin(t *testing.T) { require := require.New(t) vpn := &stubVPN{} - core, err := NewCore(vpn, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) + core, err := NewCore(vpn, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) require.NoError(err) pubKey := []byte{2, 3, 4} @@ -43,7 +43,7 @@ func TestGenerateNextIP(t *testing.T) { assert := assert.New(t) require := require.New(t) - core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) + core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) require.NoError(err) ip, err := core.GenerateNextIP() @@ -81,7 +81,7 @@ func TestSwitchToPersistentStore(t *testing.T) { require := require.New(t) storeFactory := &fakeStoreFactory{} - core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, storeFactory) + core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, storeFactory) require.NoError(err) require.NoError(core.SwitchToPersistentStore()) @@ -95,7 +95,7 @@ func TestGetIDs(t *testing.T) { assert := assert.New(t) require := require.New(t) - core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) + core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) require.NoError(err) _, _, err = core.GetIDs(nil) @@ -119,7 +119,7 @@ func TestNotifyNodeHeartbeat(t *testing.T) { assert := assert.New(t) require := require.New(t) - core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) + core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) require.NoError(err) const ip = "192.0.2.1" @@ -132,7 +132,7 @@ func TestDeriveKey(t *testing.T) { assert := assert.New(t) require := require.New(t) - core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) + core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) require.NoError(err) // error when no kms is set up diff --git a/coordinator/core/legacy_test.go b/coordinator/core/legacy_test.go index a524c3f1a..ffd92424e 100644 --- a/coordinator/core/legacy_test.go +++ b/coordinator/core/legacy_test.go @@ -140,12 +140,13 @@ func newMockCoreWithDialer(dialer *bufconnDialer) (*Core, *pubapi.API, error) { kubeFake := &ClusterFake{} metadataFake := &ProviderMetadataFake{} ccmFake := &CloudControllerManagerFake{} + cnmFake := &CloudNodeManagerFake{} autoscalerFake := &ClusterAutoscalerFake{} getPublicAddr := func() (string, error) { return "192.0.2.1", nil } - core, err := NewCore(vpn, kubeFake, metadataFake, ccmFake, autoscalerFake, zapLogger, vtpm.OpenSimulatedTPM, &fakeStoreFactory{}) + core, err := NewCore(vpn, kubeFake, metadataFake, ccmFake, cnmFake, autoscalerFake, zapLogger, vtpm.OpenSimulatedTPM, &fakeStoreFactory{}) if err != nil { return nil, nil, err } diff --git a/coordinator/core/peer_test.go b/coordinator/core/peer_test.go index a836f71c6..ef903af71 100644 --- a/coordinator/core/peer_test.go +++ b/coordinator/core/peer_test.go @@ -51,7 +51,7 @@ func TestGetPeers(t *testing.T) { assert := assert.New(t) require := require.New(t) - core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) + core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) require.NoError(err) // prepare store @@ -119,7 +119,7 @@ func TestAddPeer(t *testing.T) { assert := assert.New(t) require := require.New(t) - core, err := NewCore(&tc.vpn, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) + core, err := NewCore(&tc.vpn, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil) require.NoError(err) err = core.AddPeer(tc.peer) diff --git a/coordinator/core/state_test.go b/coordinator/core/state_test.go index 223548ec0..26c827b52 100644 --- a/coordinator/core/state_test.go +++ b/coordinator/core/state_test.go @@ -63,7 +63,7 @@ func TestAdvanceState(t *testing.T) { return vtpm.OpenSimulatedTPM() } - core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, nil) + core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, nil) require.NoError(err) assert.Equal(state.AcceptingInit, core.GetState()) core.state = tc.initialState diff --git a/coordinator/kubernetes/cluster_input.go b/coordinator/kubernetes/cluster_input.go index 40163d92f..335438d6a 100644 --- a/coordinator/kubernetes/cluster_input.go +++ b/coordinator/kubernetes/cluster_input.go @@ -24,4 +24,8 @@ type InitClusterInput struct { CloudControllerManagerVolumes []k8s.Volume CloudControllerManagerVolumeMounts []k8s.VolumeMount CloudControllerManagerEnv []k8s.EnvVar + SupportsCloudNodeManager bool + CloudNodeManagerImage string + CloudNodeManagerPath string + CloudNodeManagerExtraArgs []string } diff --git a/coordinator/kubernetes/k8sapi/resources/cloudnodemanager.go b/coordinator/kubernetes/k8sapi/resources/cloudnodemanager.go new file mode 100644 index 000000000..e47f5a49a --- /dev/null +++ b/coordinator/kubernetes/k8sapi/resources/cloudnodemanager.go @@ -0,0 +1,175 @@ +package resources + +import ( + apps "k8s.io/api/apps/v1" + k8s "k8s.io/api/core/v1" + rbac "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/resource" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type cloudNodeManagerDeployment struct { + ServiceAccount k8s.ServiceAccount + ClusterRole rbac.ClusterRole + ClusterRoleBinding rbac.ClusterRoleBinding + DaemonSet apps.DaemonSet +} + +// NewDefaultCloudNodeManagerDeployment creates a new *cloudNodeManagerDeployment, customized for the CSP. +func NewDefaultCloudNodeManagerDeployment(image, path string, extraArgs []string) *cloudNodeManagerDeployment { + command := []string{ + path, + "--node-name=$(NODE_NAME)", + } + command = append(command, extraArgs...) + return &cloudNodeManagerDeployment{ + ServiceAccount: k8s.ServiceAccount{ + TypeMeta: meta.TypeMeta{ + APIVersion: "v1", + Kind: "ServiceAccount", + }, + ObjectMeta: meta.ObjectMeta{ + Name: "cloud-node-manager", + Namespace: "kube-system", + Labels: map[string]string{ + "k8s-app": "cloud-node-manager", + "kubernetes.io/cluster-service": "true", + "addonmanager.kubernetes.io/mode": "Reconcile", + }, + }, + }, + ClusterRole: rbac.ClusterRole{ + TypeMeta: meta.TypeMeta{ + APIVersion: "rbac.authorization.k8s.io/v1", + Kind: "ClusterRole", + }, + ObjectMeta: meta.ObjectMeta{ + Name: "cloud-node-manager", + Labels: map[string]string{ + "k8s-app": "cloud-node-manager", + "kubernetes.io/cluster-service": "true", + "addonmanager.kubernetes.io/mode": "Reconcile", + }, + }, + Rules: []rbac.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"nodes"}, + Verbs: []string{"watch", "list", "get", "update", "patch"}, + }, + { + APIGroups: []string{""}, + Resources: []string{"nodes/status"}, + Verbs: []string{"patch"}, + }, + }, + }, + ClusterRoleBinding: rbac.ClusterRoleBinding{ + TypeMeta: meta.TypeMeta{ + APIVersion: "rbac.authorization.k8s.io/v1", + Kind: "ClusterRoleBinding", + }, + ObjectMeta: meta.ObjectMeta{ + Name: "cloud-node-manager", + Labels: map[string]string{ + "k8s-app": "cloud-node-manager", + "kubernetes.io/cluster-service": "true", + "addonmanager.kubernetes.io/mode": "Reconcile", + }, + }, + RoleRef: rbac.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: "cloud-node-manager", + }, + Subjects: []rbac.Subject{ + { + Kind: "ServiceAccount", + Name: "cloud-node-manager", + Namespace: "kube-system", + }, + }, + }, + DaemonSet: apps.DaemonSet{ + TypeMeta: meta.TypeMeta{ + APIVersion: "apps/v1", + Kind: "DaemonSet", + }, + ObjectMeta: meta.ObjectMeta{ + Name: "cloud-node-manager", + Namespace: "kube-system", + Labels: map[string]string{ + "component": "cloud-node-manager", + "kubernetes.io/cluster-service": "true", + "addonmanager.kubernetes.io/mode": "Reconcile", + }, + }, + Spec: apps.DaemonSetSpec{ + Selector: &meta.LabelSelector{ + MatchLabels: map[string]string{"k8s-app": "cloud-node-manager"}, + }, + Template: k8s.PodTemplateSpec{ + ObjectMeta: meta.ObjectMeta{ + Labels: map[string]string{"k8s-app": "cloud-node-manager"}, + Annotations: map[string]string{"cluster-autoscaler.kubernetes.io/daemonset-pod": "true"}, + }, + Spec: k8s.PodSpec{ + PriorityClassName: "system-node-critical", + ServiceAccountName: "cloud-node-manager", + HostNetwork: true, + NodeSelector: map[string]string{"kubernetes.io/os": "linux"}, + Tolerations: []k8s.Toleration{ + { + Key: "CriticalAddonsOnly", + Operator: k8s.TolerationOpExists, + }, + { + Key: "node-role.kubernetes.io/master", + Operator: k8s.TolerationOpEqual, + Value: "true", + Effect: k8s.TaintEffectNoSchedule, + }, + { + Operator: k8s.TolerationOpExists, + Effect: k8s.TaintEffectNoExecute, + }, + { + Operator: k8s.TolerationOpExists, + Effect: k8s.TaintEffectNoSchedule, + }, + }, + Containers: []k8s.Container{ + { + Name: "cloud-node-manager", + Image: image, + ImagePullPolicy: k8s.PullIfNotPresent, + Command: command, + Env: []k8s.EnvVar{ + { + Name: "NODE_NAME", + ValueFrom: &k8s.EnvVarSource{ + FieldRef: &k8s.ObjectFieldSelector{ + FieldPath: "spec.nodeName", + }, + }, + }, + }, + Resources: k8s.ResourceRequirements{ + Requests: k8s.ResourceList{ + k8s.ResourceCPU: resource.MustParse("50m"), + k8s.ResourceMemory: resource.MustParse("50Mi"), + }, + }, + }, + }, + }, + }, + }, + }, + } +} + +// Marshal marshals the cloud-node-manager deployment as YAML documents. +func (c *cloudNodeManagerDeployment) Marshal() ([]byte, error) { + return MarshalK8SResources(c) +} diff --git a/coordinator/kubernetes/k8sapi/resources/cloudnodemanager_test.go b/coordinator/kubernetes/k8sapi/resources/cloudnodemanager_test.go new file mode 100644 index 000000000..4b7f7fd46 --- /dev/null +++ b/coordinator/kubernetes/k8sapi/resources/cloudnodemanager_test.go @@ -0,0 +1,21 @@ +package resources + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCloudNodeManagerMarshalUnmarshal(t *testing.T) { + require := require.New(t) + assert := assert.New(t) + + cloudNodeManagerDepl := NewDefaultCloudNodeManagerDeployment("image", "path", []string{}) + data, err := cloudNodeManagerDepl.Marshal() + require.NoError(err) + + var recreated cloudNodeManagerDeployment + require.NoError(UnmarshalK8SResources(data, &recreated)) + assert.Equal(cloudNodeManagerDepl, &recreated) +} diff --git a/coordinator/kubernetes/k8sapi/util.go b/coordinator/kubernetes/k8sapi/util.go index 1f63c8348..dbba86800 100644 --- a/coordinator/kubernetes/k8sapi/util.go +++ b/coordinator/kubernetes/k8sapi/util.go @@ -27,6 +27,7 @@ type ClusterUtil interface { SetupPodNetwork(kubectl Client, podNetworkConfiguration resources.Marshaler) error SetupAutoscaling(kubectl Client, clusterAutoscalerConfiguration resources.Marshaler) error SetupCloudControllerManager(kubectl Client, cloudControllerManagerConfiguration resources.Marshaler, configMaps resources.Marshaler, secrets resources.Marshaler) error + SetupCloudNodeManager(kubectl Client, cloudNodeManagerConfiguration resources.Marshaler) error RestartKubelet() error } @@ -114,6 +115,10 @@ func (k *KubernetesUtil) SetupCloudControllerManager(kubectl Client, cloudContro } return nil } + +// SetupCloudNodeManager deploys the k8s cloud-node-manager. +func (k *KubernetesUtil) SetupCloudNodeManager(kubectl Client, cloudNodeManagerConfiguration resources.Marshaler) error { + return kubectl.Apply(cloudNodeManagerConfiguration, true) } // JoinCluster joins existing kubernetes cluster using kubeadm join. diff --git a/coordinator/kubernetes/kubernetes.go b/coordinator/kubernetes/kubernetes.go index 37b8a9441..5f03f4b9e 100644 --- a/coordinator/kubernetes/kubernetes.go +++ b/coordinator/kubernetes/kubernetes.go @@ -81,6 +81,16 @@ func (k *KubeWrapper) InitCluster(in InitClusterInput) (*kubeadm.BootstrapTokenD return nil, fmt.Errorf("failed to setup cloud-controller-manager: %w", err) } } + + if in.SupportsCloudNodeManager { + cloudNodeManagerConfiguration := resources.NewDefaultCloudNodeManagerDeployment( + in.CloudNodeManagerImage, in.CloudNodeManagerPath, in.CloudNodeManagerExtraArgs, + ) + if err := k.clusterUtil.SetupCloudNodeManager(k.client, cloudNodeManagerConfiguration); err != nil { + return nil, fmt.Errorf("failed to setup cloud-node-manager: %w", err) + } + } + if in.SupportClusterAutoscaler { clusterAutoscalerConfiguration := resources.NewDefaultAutoscalerDeployment() clusterAutoscalerConfiguration.SetAutoscalerCommand(in.AutoscalingCloudprovider, in.AutoscalingNodeGroups) diff --git a/coordinator/kubernetes/kubernetes_test.go b/coordinator/kubernetes/kubernetes_test.go index 651a883a1..4311b301e 100644 --- a/coordinator/kubernetes/kubernetes_test.go +++ b/coordinator/kubernetes/kubernetes_test.go @@ -26,6 +26,7 @@ type stubClusterUtil struct { setupPodNetworkErr error setupAutoscalingError error setupCloudControllerManagerError error + setupCloudNodeManagerError error joinClusterErr error restartKubeletErr error @@ -49,6 +50,9 @@ func (s *stubClusterUtil) SetupAutoscaling(kubectl k8sapi.Client, clusterAutosca func (s *stubClusterUtil) SetupCloudControllerManager(kubectl k8sapi.Client, cloudControllerManagerConfiguration resources.Marshaler, configMaps resources.Marshaler, secrets resources.Marshaler) error { return s.setupCloudControllerManagerError } + +func (s *stubClusterUtil) SetupCloudNodeManager(kubectl k8sapi.Client, cloudNodeManagerConfiguration resources.Marshaler) error { + return s.setupCloudNodeManagerError } func (s *stubClusterUtil) JoinCluster(joinConfig []byte) error {