Cloud providers: Add CloudNodeManager

This commit is contained in:
Malte Poll 2022-03-25 10:49:18 +01:00 committed by Malte Poll
parent 2158377f9f
commit f5eddf8af0
17 changed files with 309 additions and 18 deletions

View File

@ -39,6 +39,7 @@ func main() {
var kube core.Cluster
var metadata core.ProviderMetadata
var cloudControllerManager core.CloudControllerManager
var cloudNodeManager core.CloudNodeManager
var autoscaler core.ClusterAutoscaler
cfg := zap.NewDevelopmentConfig()
@ -148,5 +149,5 @@ func main() {
dialer := &net.Dialer{}
run(validator, issuer, wg, openTPM, util.GetIPAddr, dialer, kube,
metadata, cloudControllerManager, autoscaler, etcdEndpoint, enforceEtcdTls, bindIP, bindPort, zapLoggerCore)
metadata, cloudControllerManager, cloudNodeManager, autoscaler, etcdEndpoint, enforceEtcdTls, bindIP, bindPort, zapLoggerCore)
}

View File

@ -26,7 +26,7 @@ import (
var version = "0.0.0"
func run(validator core.QuoteValidator, issuer core.QuoteIssuer, vpn core.VPN, openTPM vtpm.TPMOpenFunc, getPublicIPAddr func() (string, error), dialer pubapi.Dialer,
kube core.Cluster, metadata core.ProviderMetadata, cloudControllerManager core.CloudControllerManager, clusterAutoscaler core.ClusterAutoscaler, etcdEndpoint string, etcdTLS bool, bindIP, bindPort string, zapLoggerCore *zap.Logger,
kube core.Cluster, metadata core.ProviderMetadata, cloudControllerManager core.CloudControllerManager, cloudNodeManager core.CloudNodeManager, clusterAutoscaler core.ClusterAutoscaler, etcdEndpoint string, etcdTLS bool, bindIP, bindPort string, zapLoggerCore *zap.Logger,
) {
defer zapLoggerCore.Sync()
zapLoggerCore.Info("starting coordinator", zap.String("version", version))
@ -41,7 +41,7 @@ func run(validator core.QuoteValidator, issuer core.QuoteIssuer, vpn core.VPN, o
ForceTLS: etcdTLS,
Logger: zapLoggerCore.WithOptions(zap.IncreaseLevel(zap.WarnLevel)).Named("etcd"),
}
core, err := core.NewCore(vpn, kube, metadata, cloudControllerManager, clusterAutoscaler, zapLoggerCore, openTPM, etcdStoreFactory)
core, err := core.NewCore(vpn, kube, metadata, cloudControllerManager, cloudNodeManager, clusterAutoscaler, zapLoggerCore, openTPM, etcdStoreFactory)
if err != nil {
zapLoggerCore.Fatal("failed to create core", zap.Error(err))
}

View File

@ -189,7 +189,7 @@ func TestConcurrent(t *testing.T) {
func spawnPeer(require *require.Assertions, logger *zap.Logger, dialer *testdialer.BufconnDialer, netw *network, endpoint string) (*grpc.Server, *pubapi.API, *fakeVPN) {
vpn := newVPN(netw, endpoint)
cor, err := core.NewCore(vpn, &core.ClusterFake{}, &core.ProviderMetadataFake{}, &core.CloudControllerManagerFake{}, &core.ClusterAutoscalerFake{}, logger, vtpm.OpenSimulatedTPM, fakeStoreFactory{})
cor, err := core.NewCore(vpn, &core.ClusterFake{}, &core.ProviderMetadataFake{}, &core.CloudControllerManagerFake{}, &core.CloudNodeManagerFake{}, &core.ClusterAutoscalerFake{}, logger, vtpm.OpenSimulatedTPM, fakeStoreFactory{})
require.NoError(err)
getPublicAddr := func() (string, error) {

View File

@ -76,6 +76,18 @@ type CloudControllerManager interface {
Supported() bool
}
// CloudNodeManager implementers provide configuration for the k8s cloud-node-manager.
type CloudNodeManager interface {
// Image returns the container image used to provide cloud-node-manager for the cloud-provider.
Image() string
// Path returns the path used by cloud-node-manager executable within the container image.
Path() string
// ExtraArgs returns a list of arguments to append to the cloud-node-manager command.
ExtraArgs() []string
// Supported is used to determine if cloud node manager is implemented for this cloud provider.
Supported() bool
}
// ClusterAutoscaler implementers provide configuration for the k8s cluster-autoscaler.
type ClusterAutoscaler interface {
// Name returns the cloud-provider name as used by k8s cluster-autoscaler.
@ -202,6 +214,24 @@ func (f *CloudControllerManagerFake) Supported() bool {
return false
}
type CloudNodeManagerFake struct{}
func (f *CloudNodeManagerFake) Image() string {
return "fake-image:latest"
}
func (f *CloudNodeManagerFake) Path() string {
return "/fake-cloud-node-manager"
}
func (f *CloudNodeManagerFake) ExtraArgs() []string {
return []string{}
}
func (f *CloudNodeManagerFake) Supported() bool {
return false
}
type ClusterAutoscalerFake struct{}
func (f *ClusterAutoscalerFake) Name() string {

View File

@ -25,7 +25,6 @@ func (c *Core) InitCluster(autoscalingNodeGroups []string, cloudServiceAccountUR
var ccmSecrets resources.Secrets
var err error
nodeIP := coordinatorVPNIP.String()
var err error
if c.metadata.Supported() {
instance, err = c.metadata.Self(context.TODO())
if err != nil {
@ -74,6 +73,10 @@ func (c *Core) InitCluster(autoscalingNodeGroups []string, cloudServiceAccountUR
CloudControllerManagerVolumes: c.cloudControllerManager.Volumes(),
CloudControllerManagerVolumeMounts: c.cloudControllerManager.VolumeMounts(),
CloudControllerManagerEnv: c.cloudControllerManager.Env(),
SupportsCloudNodeManager: c.cloudNodeManager.Supported(),
CloudNodeManagerImage: c.cloudNodeManager.Image(),
CloudNodeManagerPath: c.cloudNodeManager.Path(),
CloudNodeManagerExtraArgs: c.cloudNodeManager.ExtraArgs(),
})
if err != nil {
c.zaplogger.Error("Initializing cluster failed", zap.Error(err))

View File

@ -22,6 +22,7 @@ func TestInitCluster(t *testing.T) {
cluster clusterStub
metadata stubMetadata
cloudControllerManager stubCloudControllerManager
cloudNodeManager stubCloudNodeManager
clusterAutoscaler stubClusterAutoscaler
autoscalingNodeGroups []string
expectErr bool
@ -166,7 +167,7 @@ func TestInitCluster(t *testing.T) {
zapLogger, err := zap.NewDevelopment()
require.NoError(err)
core, err := NewCore(&stubVPN{}, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil)
core, err := NewCore(&stubVPN{}, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.cloudNodeManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil)
require.NoError(err)
kubeconfig, err := core.InitCluster(tc.autoscalingNodeGroups, "cloud-service-account-uri")
@ -190,6 +191,7 @@ func TestJoinCluster(t *testing.T) {
cluster clusterStub
metadata stubMetadata
cloudControllerManager stubCloudControllerManager
cloudNodeManager stubCloudNodeManager
clusterAutoscaler stubClusterAutoscaler
vpn stubVPN
expectErr bool
@ -280,7 +282,7 @@ func TestJoinCluster(t *testing.T) {
zapLogger, err := zap.NewDevelopment()
require.NoError(err)
core, err := NewCore(&tc.vpn, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil)
core, err := NewCore(&tc.vpn, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.cloudNodeManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil)
require.NoError(err)
joinReq := kubeadm.BootstrapTokenDiscovery{
@ -435,6 +437,39 @@ func (s *stubCloudControllerManager) Supported() bool {
return s.supportedRes
}
type stubCloudNodeManager struct {
imageRes string
pathRes string
nameRes string
prepareInstanceRes error
extraArgsRes []string
configMapsRes resources.ConfigMaps
configMapsErr error
secretsRes resources.Secrets
secretsErr error
volumesRes []k8s.Volume
volumeMountRes []k8s.VolumeMount
supportedRes bool
prepareInstanceRequests []prepareInstanceRequest
}
func (s *stubCloudNodeManager) Image() string {
return s.imageRes
}
func (s *stubCloudNodeManager) Path() string {
return s.pathRes
}
func (s *stubCloudNodeManager) ExtraArgs() []string {
return s.extraArgsRes
}
func (s *stubCloudNodeManager) Supported() bool {
return s.supportedRes
}
type stubClusterAutoscaler struct {
nameRes string
supportedRes bool

View File

@ -29,6 +29,7 @@ type Core struct {
kube Cluster
metadata ProviderMetadata
cloudControllerManager CloudControllerManager
cloudNodeManager CloudNodeManager
clusterAutoscaler ClusterAutoscaler
kms kms.CloudKMS
zaplogger *zap.Logger
@ -38,7 +39,7 @@ type Core struct {
// NewCore creates and initializes a new Core object.
func NewCore(vpn VPN, kube Cluster,
metadata ProviderMetadata, cloudControllerManager CloudControllerManager, clusterAutoscaler ClusterAutoscaler,
metadata ProviderMetadata, cloudControllerManager CloudControllerManager, cloudNodeManager CloudNodeManager, clusterAutoscaler ClusterAutoscaler,
zapLogger *zap.Logger, openTPM vtpm.TPMOpenFunc, persistentStoreFactory PersistentStoreFactory,
) (*Core, error) {
stor := store.NewStdStore()
@ -48,6 +49,7 @@ func NewCore(vpn VPN, kube Cluster,
vpn: vpn,
kube: kube,
metadata: metadata,
cloudNodeManager: cloudNodeManager,
cloudControllerManager: cloudControllerManager,
clusterAutoscaler: clusterAutoscaler,
zaplogger: zapLogger,

View File

@ -28,7 +28,7 @@ func TestAddAdmin(t *testing.T) {
require := require.New(t)
vpn := &stubVPN{}
core, err := NewCore(vpn, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(vpn, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
require.NoError(err)
pubKey := []byte{2, 3, 4}
@ -43,7 +43,7 @@ func TestGenerateNextIP(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
require.NoError(err)
ip, err := core.GenerateNextIP()
@ -81,7 +81,7 @@ func TestSwitchToPersistentStore(t *testing.T) {
require := require.New(t)
storeFactory := &fakeStoreFactory{}
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, storeFactory)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, storeFactory)
require.NoError(err)
require.NoError(core.SwitchToPersistentStore())
@ -95,7 +95,7 @@ func TestGetIDs(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
require.NoError(err)
_, _, err = core.GetIDs(nil)
@ -119,7 +119,7 @@ func TestNotifyNodeHeartbeat(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
require.NoError(err)
const ip = "192.0.2.1"
@ -132,7 +132,7 @@ func TestDeriveKey(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
require.NoError(err)
// error when no kms is set up

View File

@ -140,12 +140,13 @@ func newMockCoreWithDialer(dialer *bufconnDialer) (*Core, *pubapi.API, error) {
kubeFake := &ClusterFake{}
metadataFake := &ProviderMetadataFake{}
ccmFake := &CloudControllerManagerFake{}
cnmFake := &CloudNodeManagerFake{}
autoscalerFake := &ClusterAutoscalerFake{}
getPublicAddr := func() (string, error) {
return "192.0.2.1", nil
}
core, err := NewCore(vpn, kubeFake, metadataFake, ccmFake, autoscalerFake, zapLogger, vtpm.OpenSimulatedTPM, &fakeStoreFactory{})
core, err := NewCore(vpn, kubeFake, metadataFake, ccmFake, cnmFake, autoscalerFake, zapLogger, vtpm.OpenSimulatedTPM, &fakeStoreFactory{})
if err != nil {
return nil, nil, err
}

View File

@ -51,7 +51,7 @@ func TestGetPeers(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
require.NoError(err)
// prepare store
@ -119,7 +119,7 @@ func TestAddPeer(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&tc.vpn, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&tc.vpn, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
require.NoError(err)
err = core.AddPeer(tc.peer)

View File

@ -63,7 +63,7 @@ func TestAdvanceState(t *testing.T) {
return vtpm.OpenSimulatedTPM()
}
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, nil)
require.NoError(err)
assert.Equal(state.AcceptingInit, core.GetState())
core.state = tc.initialState

View File

@ -24,4 +24,8 @@ type InitClusterInput struct {
CloudControllerManagerVolumes []k8s.Volume
CloudControllerManagerVolumeMounts []k8s.VolumeMount
CloudControllerManagerEnv []k8s.EnvVar
SupportsCloudNodeManager bool
CloudNodeManagerImage string
CloudNodeManagerPath string
CloudNodeManagerExtraArgs []string
}

View File

@ -0,0 +1,175 @@
package resources
import (
apps "k8s.io/api/apps/v1"
k8s "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/resource"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type cloudNodeManagerDeployment struct {
ServiceAccount k8s.ServiceAccount
ClusterRole rbac.ClusterRole
ClusterRoleBinding rbac.ClusterRoleBinding
DaemonSet apps.DaemonSet
}
// NewDefaultCloudNodeManagerDeployment creates a new *cloudNodeManagerDeployment, customized for the CSP.
func NewDefaultCloudNodeManagerDeployment(image, path string, extraArgs []string) *cloudNodeManagerDeployment {
command := []string{
path,
"--node-name=$(NODE_NAME)",
}
command = append(command, extraArgs...)
return &cloudNodeManagerDeployment{
ServiceAccount: k8s.ServiceAccount{
TypeMeta: meta.TypeMeta{
APIVersion: "v1",
Kind: "ServiceAccount",
},
ObjectMeta: meta.ObjectMeta{
Name: "cloud-node-manager",
Namespace: "kube-system",
Labels: map[string]string{
"k8s-app": "cloud-node-manager",
"kubernetes.io/cluster-service": "true",
"addonmanager.kubernetes.io/mode": "Reconcile",
},
},
},
ClusterRole: rbac.ClusterRole{
TypeMeta: meta.TypeMeta{
APIVersion: "rbac.authorization.k8s.io/v1",
Kind: "ClusterRole",
},
ObjectMeta: meta.ObjectMeta{
Name: "cloud-node-manager",
Labels: map[string]string{
"k8s-app": "cloud-node-manager",
"kubernetes.io/cluster-service": "true",
"addonmanager.kubernetes.io/mode": "Reconcile",
},
},
Rules: []rbac.PolicyRule{
{
APIGroups: []string{""},
Resources: []string{"nodes"},
Verbs: []string{"watch", "list", "get", "update", "patch"},
},
{
APIGroups: []string{""},
Resources: []string{"nodes/status"},
Verbs: []string{"patch"},
},
},
},
ClusterRoleBinding: rbac.ClusterRoleBinding{
TypeMeta: meta.TypeMeta{
APIVersion: "rbac.authorization.k8s.io/v1",
Kind: "ClusterRoleBinding",
},
ObjectMeta: meta.ObjectMeta{
Name: "cloud-node-manager",
Labels: map[string]string{
"k8s-app": "cloud-node-manager",
"kubernetes.io/cluster-service": "true",
"addonmanager.kubernetes.io/mode": "Reconcile",
},
},
RoleRef: rbac.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "ClusterRole",
Name: "cloud-node-manager",
},
Subjects: []rbac.Subject{
{
Kind: "ServiceAccount",
Name: "cloud-node-manager",
Namespace: "kube-system",
},
},
},
DaemonSet: apps.DaemonSet{
TypeMeta: meta.TypeMeta{
APIVersion: "apps/v1",
Kind: "DaemonSet",
},
ObjectMeta: meta.ObjectMeta{
Name: "cloud-node-manager",
Namespace: "kube-system",
Labels: map[string]string{
"component": "cloud-node-manager",
"kubernetes.io/cluster-service": "true",
"addonmanager.kubernetes.io/mode": "Reconcile",
},
},
Spec: apps.DaemonSetSpec{
Selector: &meta.LabelSelector{
MatchLabels: map[string]string{"k8s-app": "cloud-node-manager"},
},
Template: k8s.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: map[string]string{"k8s-app": "cloud-node-manager"},
Annotations: map[string]string{"cluster-autoscaler.kubernetes.io/daemonset-pod": "true"},
},
Spec: k8s.PodSpec{
PriorityClassName: "system-node-critical",
ServiceAccountName: "cloud-node-manager",
HostNetwork: true,
NodeSelector: map[string]string{"kubernetes.io/os": "linux"},
Tolerations: []k8s.Toleration{
{
Key: "CriticalAddonsOnly",
Operator: k8s.TolerationOpExists,
},
{
Key: "node-role.kubernetes.io/master",
Operator: k8s.TolerationOpEqual,
Value: "true",
Effect: k8s.TaintEffectNoSchedule,
},
{
Operator: k8s.TolerationOpExists,
Effect: k8s.TaintEffectNoExecute,
},
{
Operator: k8s.TolerationOpExists,
Effect: k8s.TaintEffectNoSchedule,
},
},
Containers: []k8s.Container{
{
Name: "cloud-node-manager",
Image: image,
ImagePullPolicy: k8s.PullIfNotPresent,
Command: command,
Env: []k8s.EnvVar{
{
Name: "NODE_NAME",
ValueFrom: &k8s.EnvVarSource{
FieldRef: &k8s.ObjectFieldSelector{
FieldPath: "spec.nodeName",
},
},
},
},
Resources: k8s.ResourceRequirements{
Requests: k8s.ResourceList{
k8s.ResourceCPU: resource.MustParse("50m"),
k8s.ResourceMemory: resource.MustParse("50Mi"),
},
},
},
},
},
},
},
},
}
}
// Marshal marshals the cloud-node-manager deployment as YAML documents.
func (c *cloudNodeManagerDeployment) Marshal() ([]byte, error) {
return MarshalK8SResources(c)
}

View File

@ -0,0 +1,21 @@
package resources
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCloudNodeManagerMarshalUnmarshal(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
cloudNodeManagerDepl := NewDefaultCloudNodeManagerDeployment("image", "path", []string{})
data, err := cloudNodeManagerDepl.Marshal()
require.NoError(err)
var recreated cloudNodeManagerDeployment
require.NoError(UnmarshalK8SResources(data, &recreated))
assert.Equal(cloudNodeManagerDepl, &recreated)
}

View File

@ -27,6 +27,7 @@ type ClusterUtil interface {
SetupPodNetwork(kubectl Client, podNetworkConfiguration resources.Marshaler) error
SetupAutoscaling(kubectl Client, clusterAutoscalerConfiguration resources.Marshaler) error
SetupCloudControllerManager(kubectl Client, cloudControllerManagerConfiguration resources.Marshaler, configMaps resources.Marshaler, secrets resources.Marshaler) error
SetupCloudNodeManager(kubectl Client, cloudNodeManagerConfiguration resources.Marshaler) error
RestartKubelet() error
}
@ -114,6 +115,10 @@ func (k *KubernetesUtil) SetupCloudControllerManager(kubectl Client, cloudContro
}
return nil
}
// SetupCloudNodeManager deploys the k8s cloud-node-manager.
func (k *KubernetesUtil) SetupCloudNodeManager(kubectl Client, cloudNodeManagerConfiguration resources.Marshaler) error {
return kubectl.Apply(cloudNodeManagerConfiguration, true)
}
// JoinCluster joins existing kubernetes cluster using kubeadm join.

View File

@ -81,6 +81,16 @@ func (k *KubeWrapper) InitCluster(in InitClusterInput) (*kubeadm.BootstrapTokenD
return nil, fmt.Errorf("failed to setup cloud-controller-manager: %w", err)
}
}
if in.SupportsCloudNodeManager {
cloudNodeManagerConfiguration := resources.NewDefaultCloudNodeManagerDeployment(
in.CloudNodeManagerImage, in.CloudNodeManagerPath, in.CloudNodeManagerExtraArgs,
)
if err := k.clusterUtil.SetupCloudNodeManager(k.client, cloudNodeManagerConfiguration); err != nil {
return nil, fmt.Errorf("failed to setup cloud-node-manager: %w", err)
}
}
if in.SupportClusterAutoscaler {
clusterAutoscalerConfiguration := resources.NewDefaultAutoscalerDeployment()
clusterAutoscalerConfiguration.SetAutoscalerCommand(in.AutoscalingCloudprovider, in.AutoscalingNodeGroups)

View File

@ -26,6 +26,7 @@ type stubClusterUtil struct {
setupPodNetworkErr error
setupAutoscalingError error
setupCloudControllerManagerError error
setupCloudNodeManagerError error
joinClusterErr error
restartKubeletErr error
@ -49,6 +50,9 @@ func (s *stubClusterUtil) SetupAutoscaling(kubectl k8sapi.Client, clusterAutosca
func (s *stubClusterUtil) SetupCloudControllerManager(kubectl k8sapi.Client, cloudControllerManagerConfiguration resources.Marshaler, configMaps resources.Marshaler, secrets resources.Marshaler) error {
return s.setupCloudControllerManagerError
}
func (s *stubClusterUtil) SetupCloudNodeManager(kubectl k8sapi.Client, cloudNodeManagerConfiguration resources.Marshaler) error {
return s.setupCloudNodeManagerError
}
func (s *stubClusterUtil) JoinCluster(joinConfig []byte) error {