From 52a68c42f728ec58d4a47365a565ef0d23ecf69e Mon Sep 17 00:00:00 2001 From: Moritz Sanft <58110325+msanft@users.noreply.github.com> Date: Tue, 21 May 2024 15:18:38 +0200 Subject: [PATCH] bootstrapper: prioritize etcd I/O when bootstrapping --- bootstrapper/cmd/bootstrapper/main.go | 10 +- bootstrapper/cmd/bootstrapper/test.go | 5 +- bootstrapper/internal/etcdio/BUILD.bazel | 9 + .../internal/initserver/initserver.go | 2 - .../internal/initserver/initserver_test.go | 3 +- .../internal/joinclient/joinclient.go | 3 +- .../internal/joinclient/joinclient_test.go | 3 +- bootstrapper/internal/kubernetes/BUILD.bazel | 1 + .../internal/kubernetes/kubernetes.go | 99 ++++++++--- .../internal/kubernetes/kubernetes_test.go | 167 +++++++++++------- 10 files changed, 193 insertions(+), 109 deletions(-) create mode 100644 bootstrapper/internal/etcdio/BUILD.bazel diff --git a/bootstrapper/cmd/bootstrapper/main.go b/bootstrapper/cmd/bootstrapper/main.go index ff99b231f..fd7080c74 100644 --- a/bootstrapper/cmd/bootstrapper/main.go +++ b/bootstrapper/cmd/bootstrapper/main.go @@ -86,7 +86,7 @@ func main() { clusterInitJoiner = kubernetes.New( "aws", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(), - metadata, &kubewaiter.CloudKubeAPIWaiter{}, + metadata, &kubewaiter.CloudKubeAPIWaiter{}, log, ) openDevice = vtpm.OpenVTPM fs = afero.NewOsFs() @@ -102,7 +102,7 @@ func main() { metadataAPI = metadata clusterInitJoiner = kubernetes.New( "gcp", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(), - metadata, &kubewaiter.CloudKubeAPIWaiter{}, + metadata, &kubewaiter.CloudKubeAPIWaiter{}, log, ) openDevice = vtpm.OpenVTPM fs = afero.NewOsFs() @@ -122,7 +122,7 @@ func main() { metadataAPI = metadata clusterInitJoiner = kubernetes.New( "azure", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(), - metadata, &kubewaiter.CloudKubeAPIWaiter{}, + metadata, &kubewaiter.CloudKubeAPIWaiter{}, log, ) openDevice = vtpm.OpenVTPM @@ -132,7 +132,7 @@ func main() { metadata := qemucloud.New() clusterInitJoiner = kubernetes.New( "qemu", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(), - metadata, &kubewaiter.CloudKubeAPIWaiter{}, + metadata, &kubewaiter.CloudKubeAPIWaiter{}, log, ) metadataAPI = metadata @@ -155,7 +155,7 @@ func main() { } clusterInitJoiner = kubernetes.New( "openstack", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(), - metadata, &kubewaiter.CloudKubeAPIWaiter{}, + metadata, &kubewaiter.CloudKubeAPIWaiter{}, log, ) metadataAPI = metadata openDevice = vtpm.OpenVTPM diff --git a/bootstrapper/cmd/bootstrapper/test.go b/bootstrapper/cmd/bootstrapper/test.go index 05840de33..d0132ead8 100644 --- a/bootstrapper/cmd/bootstrapper/test.go +++ b/bootstrapper/cmd/bootstrapper/test.go @@ -8,7 +8,6 @@ package main import ( "context" - "log/slog" "github.com/edgelesssys/constellation/v2/internal/cloud/metadata" "github.com/edgelesssys/constellation/v2/internal/role" @@ -22,13 +21,13 @@ type clusterFake struct{} // InitCluster fakes bootstrapping a new cluster with the current node being the master, returning the arguments required to join the cluster. func (c *clusterFake) InitCluster( context.Context, string, string, - bool, components.Components, []string, string, *slog.Logger, + bool, components.Components, []string, string, ) ([]byte, error) { return []byte{}, nil } // JoinCluster will fake joining the current node to an existing cluster. -func (c *clusterFake) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components, *slog.Logger) error { +func (c *clusterFake) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components) error { return nil } diff --git a/bootstrapper/internal/etcdio/BUILD.bazel b/bootstrapper/internal/etcdio/BUILD.bazel new file mode 100644 index 000000000..b7725d106 --- /dev/null +++ b/bootstrapper/internal/etcdio/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "etcdio", + srcs = ["etcdio.go"], + importpath = "github.com/edgelesssys/constellation/v2/bootstrapper/internal/etcdio", + visibility = ["//bootstrapper:__subpackages__"], + deps = ["@org_golang_x_sys//unix"], +) diff --git a/bootstrapper/internal/initserver/initserver.go b/bootstrapper/internal/initserver/initserver.go index a38bdbc8d..9c2ef36a9 100644 --- a/bootstrapper/internal/initserver/initserver.go +++ b/bootstrapper/internal/initserver/initserver.go @@ -234,7 +234,6 @@ func (s *Server) Init(req *initproto.InitRequest, stream initproto.API_InitServe req.KubernetesComponents, req.ApiserverCertSans, req.ServiceCidr, - s.log, ) if err != nil { if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "initializing cluster: %s", err)); e != nil { @@ -357,7 +356,6 @@ type ClusterInitializer interface { kubernetesComponents components.Components, apiServerCertSANs []string, serviceCIDR string, - log *slog.Logger, ) ([]byte, error) } diff --git a/bootstrapper/internal/initserver/initserver_test.go b/bootstrapper/internal/initserver/initserver_test.go index 84d0316d7..0d9f25db4 100644 --- a/bootstrapper/internal/initserver/initserver_test.go +++ b/bootstrapper/internal/initserver/initserver_test.go @@ -11,7 +11,6 @@ import ( "context" "errors" "io" - "log/slog" "net" "strings" "sync" @@ -420,7 +419,7 @@ type stubClusterInitializer struct { func (i *stubClusterInitializer) InitCluster( context.Context, string, string, - bool, components.Components, []string, string, *slog.Logger, + bool, components.Components, []string, string, ) ([]byte, error) { return i.initClusterKubeconfig, i.initClusterErr } diff --git a/bootstrapper/internal/joinclient/joinclient.go b/bootstrapper/internal/joinclient/joinclient.go index 3e2944325..8a2f6986f 100644 --- a/bootstrapper/internal/joinclient/joinclient.go +++ b/bootstrapper/internal/joinclient/joinclient.go @@ -288,7 +288,7 @@ func (c *JoinClient) startNodeAndJoin(ticket *joinproto.IssueJoinTicketResponse, // We currently cannot recover from any failure in this function. Joining the k8s cluster // sometimes fails transiently, and we don't want to brick the node because of that. for i := range 3 { - err = c.joiner.JoinCluster(ctx, btd, c.role, ticket.KubernetesComponents, c.log) + err = c.joiner.JoinCluster(ctx, btd, c.role, ticket.KubernetesComponents) if err == nil { break } @@ -399,7 +399,6 @@ type ClusterJoiner interface { args *kubeadm.BootstrapTokenDiscovery, peerRole role.Role, k8sComponents components.Components, - log *slog.Logger, ) error } diff --git a/bootstrapper/internal/joinclient/joinclient_test.go b/bootstrapper/internal/joinclient/joinclient_test.go index a93ed4b3f..6a0b89f4b 100644 --- a/bootstrapper/internal/joinclient/joinclient_test.go +++ b/bootstrapper/internal/joinclient/joinclient_test.go @@ -8,7 +8,6 @@ package joinclient import ( "context" - "log/slog" "net" "strconv" "sync" @@ -350,7 +349,7 @@ type stubClusterJoiner struct { joinClusterErr error } -func (j *stubClusterJoiner) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components, *slog.Logger) error { +func (j *stubClusterJoiner) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components) error { j.joinClusterCalled++ if j.numBadCalls == 0 { return nil diff --git a/bootstrapper/internal/kubernetes/BUILD.bazel b/bootstrapper/internal/kubernetes/BUILD.bazel index d6ba14a49..935c3fefd 100644 --- a/bootstrapper/internal/kubernetes/BUILD.bazel +++ b/bootstrapper/internal/kubernetes/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes", visibility = ["//bootstrapper:__subpackages__"], deps = [ + "//bootstrapper/internal/etcdio", "//bootstrapper/internal/kubernetes/k8sapi", "//bootstrapper/internal/kubernetes/kubewaiter", "//internal/cloud/cloudprovider", diff --git a/bootstrapper/internal/kubernetes/kubernetes.go b/bootstrapper/internal/kubernetes/kubernetes.go index 5ef1f4637..cbe58b709 100644 --- a/bootstrapper/internal/kubernetes/kubernetes.go +++ b/bootstrapper/internal/kubernetes/kubernetes.go @@ -9,6 +9,7 @@ package kubernetes import ( "context" + "errors" "fmt" "log/slog" "net" @@ -16,6 +17,7 @@ import ( "strings" "time" + "github.com/edgelesssys/constellation/v2/bootstrapper/internal/etcdio" "github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/k8sapi" "github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/kubewaiter" "github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider" @@ -40,37 +42,48 @@ type kubeAPIWaiter interface { Wait(ctx context.Context, kubernetesClient kubewaiter.KubernetesClient) error } +type etcdIOPrioritizer interface { + PrioritizeIO() error +} + // KubeWrapper implements Cluster interface. type KubeWrapper struct { - cloudProvider string - clusterUtil clusterUtil - kubeAPIWaiter kubeAPIWaiter - configProvider configurationProvider - client k8sapi.Client - providerMetadata ProviderMetadata - getIPAddr func() (string, error) + cloudProvider string + clusterUtil clusterUtil + kubeAPIWaiter kubeAPIWaiter + configProvider configurationProvider + client k8sapi.Client + providerMetadata ProviderMetadata + etcdIOPrioritizer etcdIOPrioritizer + getIPAddr func() (string, error) + + log *slog.Logger } // New creates a new KubeWrapper with real values. func New(cloudProvider string, clusterUtil clusterUtil, configProvider configurationProvider, client k8sapi.Client, - providerMetadata ProviderMetadata, kubeAPIWaiter kubeAPIWaiter, + providerMetadata ProviderMetadata, kubeAPIWaiter kubeAPIWaiter, log *slog.Logger, ) *KubeWrapper { + etcdIOPrioritizer := etcdio.NewClient(log) + return &KubeWrapper{ - cloudProvider: cloudProvider, - clusterUtil: clusterUtil, - kubeAPIWaiter: kubeAPIWaiter, - configProvider: configProvider, - client: client, - providerMetadata: providerMetadata, - getIPAddr: getIPAddr, + cloudProvider: cloudProvider, + clusterUtil: clusterUtil, + kubeAPIWaiter: kubeAPIWaiter, + configProvider: configProvider, + client: client, + providerMetadata: providerMetadata, + getIPAddr: getIPAddr, + log: log, + etcdIOPrioritizer: etcdIOPrioritizer, } } // InitCluster initializes a new Kubernetes cluster and applies pod network provider. func (k *KubeWrapper) InitCluster( - ctx context.Context, versionString, clusterName string, conformanceMode bool, kubernetesComponents components.Components, apiServerCertSANs []string, serviceCIDR string, log *slog.Logger, + ctx context.Context, versionString, clusterName string, conformanceMode bool, kubernetesComponents components.Components, apiServerCertSANs []string, serviceCIDR string, ) ([]byte, error) { - log.With(slog.String("version", versionString)).Info("Installing Kubernetes components") + k.log.With(slog.String("version", versionString)).Info("Installing Kubernetes components") if err := k.clusterUtil.InstallComponents(ctx, kubernetesComponents); err != nil { return nil, err } @@ -78,7 +91,7 @@ func (k *KubeWrapper) InitCluster( var validIPs []net.IP // Step 1: retrieve cloud metadata for Kubernetes configuration - log.Info("Retrieving node metadata") + k.log.Info("Retrieving node metadata") instance, err := k.providerMetadata.Self(ctx) if err != nil { return nil, fmt.Errorf("retrieving own instance metadata: %w", err) @@ -106,7 +119,7 @@ func (k *KubeWrapper) InitCluster( certSANs := []string{nodeIP} certSANs = append(certSANs, apiServerCertSANs...) - log.With( + k.log.With( slog.String("nodeName", nodeName), slog.String("providerID", instance.ProviderID), slog.String("nodeIP", nodeIP), @@ -132,12 +145,22 @@ func (k *KubeWrapper) InitCluster( if err != nil { return nil, fmt.Errorf("encoding kubeadm init configuration as YAML: %w", err) } - log.Info("Initializing Kubernetes cluster") - kubeConfig, err := k.clusterUtil.InitCluster(ctx, initConfigYAML, nodeName, clusterName, validIPs, conformanceMode, log) + + k.log.Info("Initializing Kubernetes cluster") + kubeConfig, err := k.clusterUtil.InitCluster(ctx, initConfigYAML, nodeName, clusterName, validIPs, conformanceMode, k.log) if err != nil { return nil, fmt.Errorf("kubeadm init: %w", err) } + k.log.Info("Prioritizing etcd I/O") + err = k.etcdIOPrioritizer.PrioritizeIO() + if errors.Is(err, etcdio.ErrNoEtcdProcess) { + k.log.Warn("Skipping etcd I/O prioritization as etcd process is not running. " + + "This is expected if this node is a non-control-plane node.") + } else if err != nil { + return nil, fmt.Errorf("prioritizing etcd I/O: %w", err) + } + err = k.client.Initialize(kubeConfig) if err != nil { return nil, fmt.Errorf("initializing kubectl client: %w", err) @@ -177,22 +200,23 @@ func (k *KubeWrapper) InitCluster( return nil, fmt.Errorf("annotating node with Kubernetes components hash: %w", err) } - log.Info("Setting up internal-config ConfigMap") + k.log.Info("Setting up internal-config ConfigMap") if err := k.setupInternalConfigMap(ctx); err != nil { return nil, fmt.Errorf("failed to setup internal ConfigMap: %w", err) } + return kubeConfig, nil } // JoinCluster joins existing Kubernetes cluster. -func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTokenDiscovery, peerRole role.Role, k8sComponents components.Components, log *slog.Logger) error { - log.With("k8sComponents", k8sComponents).Info("Installing provided kubernetes components") +func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTokenDiscovery, peerRole role.Role, k8sComponents components.Components) error { + k.log.With("k8sComponents", k8sComponents).Info("Installing provided kubernetes components") if err := k.clusterUtil.InstallComponents(ctx, k8sComponents); err != nil { return fmt.Errorf("installing kubernetes components: %w", err) } // Step 1: retrieve cloud metadata for Kubernetes configuration - log.Info("Retrieving node metadata") + k.log.Info("Retrieving node metadata") instance, err := k.providerMetadata.Self(ctx) if err != nil { return fmt.Errorf("retrieving own instance metadata: %w", err) @@ -212,7 +236,7 @@ func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTo // override join endpoint to go over lb args.APIServerEndpoint = net.JoinHostPort(loadBalancerHost, loadBalancerPort) - log.With( + k.log.With( slog.String("nodeName", nodeName), slog.String("providerID", providerID), slog.String("nodeIP", nodeInternalIP), @@ -237,11 +261,21 @@ func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTo if err != nil { return fmt.Errorf("encoding kubeadm join configuration as YAML: %w", err) } - log.With(slog.String("apiServerEndpoint", args.APIServerEndpoint)).Info("Joining Kubernetes cluster") - if err := k.clusterUtil.JoinCluster(ctx, joinConfigYAML, log); err != nil { + + k.log.With(slog.String("apiServerEndpoint", args.APIServerEndpoint)).Info("Joining Kubernetes cluster") + if err := k.clusterUtil.JoinCluster(ctx, joinConfigYAML, k.log); err != nil { return fmt.Errorf("joining cluster: %v; %w ", string(joinConfigYAML), err) } + k.log.Info("Prioritizing etcd I/O") + err = k.etcdIOPrioritizer.PrioritizeIO() + if errors.Is(err, etcdio.ErrNoEtcdProcess) { + k.log.Warn("Skipping etcd I/O prioritization as etcd process is not running. " + + "This is expected if this node is a non-control-plane node.") + } else if err != nil { + return fmt.Errorf("prioritizing etcd I/O: %w", err) + } + return nil } @@ -301,6 +335,15 @@ func (k *KubeWrapper) StartKubelet() error { return fmt.Errorf("starting kubelet: %w", err) } + k.log.Info("Prioritizing etcd I/O") + err := k.etcdIOPrioritizer.PrioritizeIO() + if errors.Is(err, etcdio.ErrNoEtcdProcess) { + k.log.Warn("Skipping etcd I/O prioritization as etcd process is not running. " + + "This is expected if this node is a non-control-plane node.") + } else if err != nil { + return fmt.Errorf("prioritizing etcd I/O: %w", err) + } + return nil } diff --git a/bootstrapper/internal/kubernetes/kubernetes_test.go b/bootstrapper/internal/kubernetes/kubernetes_test.go index ccc3a107c..746754676 100644 --- a/bootstrapper/internal/kubernetes/kubernetes_test.go +++ b/bootstrapper/internal/kubernetes/kubernetes_test.go @@ -42,17 +42,19 @@ func TestInitCluster(t *testing.T) { aliasIPRange := "192.0.2.0/24" testCases := map[string]struct { - clusterUtil stubClusterUtil - kubectl stubKubectl - kubeAPIWaiter stubKubeAPIWaiter - providerMetadata ProviderMetadata - wantConfig k8sapi.KubeadmInitYAML - wantErr bool - k8sVersion versions.ValidK8sVersion + clusterUtil stubClusterUtil + kubectl stubKubectl + kubeAPIWaiter stubKubeAPIWaiter + providerMetadata ProviderMetadata + wantConfig k8sapi.KubeadmInitYAML + etcdIOPrioritizer stubEtcdIOPrioritizer + wantErr bool + k8sVersion versions.ValidK8sVersion }{ "kubeadm init works with metadata and loadbalancer": { - clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, - kubeAPIWaiter: stubKubeAPIWaiter{}, + clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + kubeAPIWaiter: stubKubeAPIWaiter{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, providerMetadata: &stubProviderMetadata{ selfResp: metadata.InstanceMetadata{ Name: nodeName, @@ -85,8 +87,9 @@ func TestInitCluster(t *testing.T) { k8sVersion: versions.Default, }, "kubeadm init fails when annotating itself": { - clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, - kubeAPIWaiter: stubKubeAPIWaiter{}, + clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + kubeAPIWaiter: stubKubeAPIWaiter{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, providerMetadata: &stubProviderMetadata{ selfResp: metadata.InstanceMetadata{ Name: nodeName, @@ -102,8 +105,9 @@ func TestInitCluster(t *testing.T) { k8sVersion: versions.Default, }, "kubeadm init fails when retrieving metadata self": { - clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, - kubeAPIWaiter: stubKubeAPIWaiter{}, + clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + kubeAPIWaiter: stubKubeAPIWaiter{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, providerMetadata: &stubProviderMetadata{ selfErr: assert.AnError, }, @@ -111,7 +115,8 @@ func TestInitCluster(t *testing.T) { k8sVersion: versions.Default, }, "kubeadm init fails when retrieving metadata loadbalancer ip": { - clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, providerMetadata: &stubProviderMetadata{ getLoadBalancerEndpointErr: assert.AnError, }, @@ -123,51 +128,58 @@ func TestInitCluster(t *testing.T) { initClusterErr: assert.AnError, kubeconfig: []byte("someKubeconfig"), }, - kubeAPIWaiter: stubKubeAPIWaiter{}, - providerMetadata: &stubProviderMetadata{}, - wantErr: true, - k8sVersion: versions.Default, + kubeAPIWaiter: stubKubeAPIWaiter{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, + providerMetadata: &stubProviderMetadata{}, + wantErr: true, + k8sVersion: versions.Default, }, "kubeadm init fails when deploying cilium": { - clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, - providerMetadata: &stubProviderMetadata{}, - wantErr: true, - k8sVersion: versions.Default, + clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, + providerMetadata: &stubProviderMetadata{}, + wantErr: true, + k8sVersion: versions.Default, }, "kubeadm init fails when setting up constellation-services chart": { - clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, - kubeAPIWaiter: stubKubeAPIWaiter{}, - providerMetadata: &stubProviderMetadata{}, - wantErr: true, - k8sVersion: versions.Default, + clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + kubeAPIWaiter: stubKubeAPIWaiter{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, + providerMetadata: &stubProviderMetadata{}, + wantErr: true, + k8sVersion: versions.Default, }, "kubeadm init fails when reading kubeconfig": { - clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, - kubeAPIWaiter: stubKubeAPIWaiter{}, - providerMetadata: &stubProviderMetadata{}, - wantErr: true, - k8sVersion: versions.Default, + clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + kubeAPIWaiter: stubKubeAPIWaiter{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, + providerMetadata: &stubProviderMetadata{}, + wantErr: true, + k8sVersion: versions.Default, }, "kubeadm init fails when setting up verification service": { - clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, - kubeAPIWaiter: stubKubeAPIWaiter{}, - providerMetadata: &stubProviderMetadata{}, - wantErr: true, - k8sVersion: versions.Default, + clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + kubeAPIWaiter: stubKubeAPIWaiter{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, + providerMetadata: &stubProviderMetadata{}, + wantErr: true, + k8sVersion: versions.Default, }, "kubeadm init fails when waiting for kubeAPI server": { - clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, - kubeAPIWaiter: stubKubeAPIWaiter{waitErr: assert.AnError}, - providerMetadata: &stubProviderMetadata{}, - k8sVersion: versions.Default, - wantErr: true, + clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + kubeAPIWaiter: stubKubeAPIWaiter{waitErr: assert.AnError}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, + providerMetadata: &stubProviderMetadata{}, + k8sVersion: versions.Default, + wantErr: true, }, "unsupported k8sVersion fails cluster creation": { - clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, - kubeAPIWaiter: stubKubeAPIWaiter{}, - providerMetadata: &stubProviderMetadata{}, - k8sVersion: "1.19", - wantErr: true, + clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")}, + kubeAPIWaiter: stubKubeAPIWaiter{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, + providerMetadata: &stubProviderMetadata{}, + k8sVersion: "1.19", + wantErr: true, }, } @@ -184,11 +196,12 @@ func TestInitCluster(t *testing.T) { configProvider: &stubConfigProvider{initConfig: k8sapi.KubeadmInitYAML{}}, client: &tc.kubectl, getIPAddr: func() (string, error) { return privateIP, nil }, + log: logger.NewTest(t), } _, err := kube.InitCluster( context.Background(), string(tc.k8sVersion), "kubernetes", - false, nil, nil, "", logger.NewTest(t), + false, nil, nil, "", ) if tc.wantErr { @@ -224,15 +237,17 @@ func TestJoinCluster(t *testing.T) { } testCases := map[string]struct { - clusterUtil stubClusterUtil - providerMetadata ProviderMetadata - wantConfig kubeadm.JoinConfiguration - role role.Role - k8sComponents components.Components - wantErr bool + clusterUtil stubClusterUtil + providerMetadata ProviderMetadata + wantConfig kubeadm.JoinConfiguration + role role.Role + k8sComponents components.Components + etcdIOPrioritizer stubEtcdIOPrioritizer + wantErr bool }{ "kubeadm join worker works with metadata and remote Kubernetes Components": { - clusterUtil: stubClusterUtil{}, + clusterUtil: stubClusterUtil{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, providerMetadata: &stubProviderMetadata{ selfResp: metadata.InstanceMetadata{ ProviderID: "provider-id", @@ -253,7 +268,8 @@ func TestJoinCluster(t *testing.T) { }, }, "kubeadm join worker works with metadata and local Kubernetes components": { - clusterUtil: stubClusterUtil{}, + clusterUtil: stubClusterUtil{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, providerMetadata: &stubProviderMetadata{ selfResp: metadata.InstanceMetadata{ ProviderID: "provider-id", @@ -273,7 +289,8 @@ func TestJoinCluster(t *testing.T) { }, }, "kubeadm join worker works with metadata and cloud controller manager": { - clusterUtil: stubClusterUtil{}, + clusterUtil: stubClusterUtil{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, providerMetadata: &stubProviderMetadata{ selfResp: metadata.InstanceMetadata{ ProviderID: "provider-id", @@ -293,7 +310,8 @@ func TestJoinCluster(t *testing.T) { }, }, "kubeadm join control-plane node works with metadata": { - clusterUtil: stubClusterUtil{}, + clusterUtil: stubClusterUtil{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, providerMetadata: &stubProviderMetadata{ selfResp: metadata.InstanceMetadata{ ProviderID: "provider-id", @@ -320,7 +338,8 @@ func TestJoinCluster(t *testing.T) { }, }, "kubeadm join worker fails when installing remote Kubernetes components": { - clusterUtil: stubClusterUtil{installComponentsErr: errors.New("error")}, + clusterUtil: stubClusterUtil{installComponentsErr: errors.New("error")}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, providerMetadata: &stubProviderMetadata{ selfResp: metadata.InstanceMetadata{ ProviderID: "provider-id", @@ -333,7 +352,8 @@ func TestJoinCluster(t *testing.T) { wantErr: true, }, "kubeadm join worker fails when retrieving self metadata": { - clusterUtil: stubClusterUtil{}, + clusterUtil: stubClusterUtil{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, providerMetadata: &stubProviderMetadata{ selfErr: assert.AnError, }, @@ -341,10 +361,18 @@ func TestJoinCluster(t *testing.T) { wantErr: true, }, "kubeadm join worker fails when applying the join config": { - clusterUtil: stubClusterUtil{joinClusterErr: assert.AnError}, - providerMetadata: &stubProviderMetadata{}, - role: role.Worker, - wantErr: true, + clusterUtil: stubClusterUtil{joinClusterErr: assert.AnError}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{}, + providerMetadata: &stubProviderMetadata{}, + role: role.Worker, + wantErr: true, + }, + "etcd prioritizer error fails": { + clusterUtil: stubClusterUtil{}, + etcdIOPrioritizer: stubEtcdIOPrioritizer{assert.AnError}, + providerMetadata: &stubProviderMetadata{}, + role: role.Worker, + wantErr: true, }, } @@ -358,9 +386,10 @@ func TestJoinCluster(t *testing.T) { providerMetadata: tc.providerMetadata, configProvider: &stubConfigProvider{}, getIPAddr: func() (string, error) { return privateIP, nil }, + log: logger.NewTest(t), } - err := kube.JoinCluster(context.Background(), joinCommand, tc.role, tc.k8sComponents, logger.NewTest(t)) + err := kube.JoinCluster(context.Background(), joinCommand, tc.role, tc.k8sComponents) if tc.wantErr { assert.Error(err) return @@ -545,3 +574,11 @@ type stubKubeAPIWaiter struct { func (s *stubKubeAPIWaiter) Wait(_ context.Context, _ kubewaiter.KubernetesClient) error { return s.waitErr } + +type stubEtcdIOPrioritizer struct { + prioritizeErr error +} + +func (s *stubEtcdIOPrioritizer) PrioritizeIO() error { + return s.prioritizeErr +}