bootstrapper: prioritize etcd I/O when bootstrapping

This commit is contained in:
Moritz Sanft 2024-05-21 15:18:38 +02:00
parent 0ecb1186f5
commit 52a68c42f7
No known key found for this signature in database
GPG Key ID: 335D28368B1DA615
10 changed files with 193 additions and 109 deletions

View File

@ -86,7 +86,7 @@ func main() {
clusterInitJoiner = kubernetes.New(
"aws", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(),
metadata, &kubewaiter.CloudKubeAPIWaiter{},
metadata, &kubewaiter.CloudKubeAPIWaiter{}, log,
)
openDevice = vtpm.OpenVTPM
fs = afero.NewOsFs()
@ -102,7 +102,7 @@ func main() {
metadataAPI = metadata
clusterInitJoiner = kubernetes.New(
"gcp", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(),
metadata, &kubewaiter.CloudKubeAPIWaiter{},
metadata, &kubewaiter.CloudKubeAPIWaiter{}, log,
)
openDevice = vtpm.OpenVTPM
fs = afero.NewOsFs()
@ -122,7 +122,7 @@ func main() {
metadataAPI = metadata
clusterInitJoiner = kubernetes.New(
"azure", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(),
metadata, &kubewaiter.CloudKubeAPIWaiter{},
metadata, &kubewaiter.CloudKubeAPIWaiter{}, log,
)
openDevice = vtpm.OpenVTPM
@ -132,7 +132,7 @@ func main() {
metadata := qemucloud.New()
clusterInitJoiner = kubernetes.New(
"qemu", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(),
metadata, &kubewaiter.CloudKubeAPIWaiter{},
metadata, &kubewaiter.CloudKubeAPIWaiter{}, log,
)
metadataAPI = metadata
@ -155,7 +155,7 @@ func main() {
}
clusterInitJoiner = kubernetes.New(
"openstack", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(),
metadata, &kubewaiter.CloudKubeAPIWaiter{},
metadata, &kubewaiter.CloudKubeAPIWaiter{}, log,
)
metadataAPI = metadata
openDevice = vtpm.OpenVTPM

View File

@ -8,7 +8,6 @@ package main
import (
"context"
"log/slog"
"github.com/edgelesssys/constellation/v2/internal/cloud/metadata"
"github.com/edgelesssys/constellation/v2/internal/role"
@ -22,13 +21,13 @@ type clusterFake struct{}
// InitCluster fakes bootstrapping a new cluster with the current node being the master, returning the arguments required to join the cluster.
func (c *clusterFake) InitCluster(
context.Context, string, string,
bool, components.Components, []string, string, *slog.Logger,
bool, components.Components, []string, string,
) ([]byte, error) {
return []byte{}, nil
}
// JoinCluster will fake joining the current node to an existing cluster.
func (c *clusterFake) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components, *slog.Logger) error {
func (c *clusterFake) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components) error {
return nil
}

View File

@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "etcdio",
srcs = ["etcdio.go"],
importpath = "github.com/edgelesssys/constellation/v2/bootstrapper/internal/etcdio",
visibility = ["//bootstrapper:__subpackages__"],
deps = ["@org_golang_x_sys//unix"],
)

View File

@ -234,7 +234,6 @@ func (s *Server) Init(req *initproto.InitRequest, stream initproto.API_InitServe
req.KubernetesComponents,
req.ApiserverCertSans,
req.ServiceCidr,
s.log,
)
if err != nil {
if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "initializing cluster: %s", err)); e != nil {
@ -357,7 +356,6 @@ type ClusterInitializer interface {
kubernetesComponents components.Components,
apiServerCertSANs []string,
serviceCIDR string,
log *slog.Logger,
) ([]byte, error)
}

View File

@ -11,7 +11,6 @@ import (
"context"
"errors"
"io"
"log/slog"
"net"
"strings"
"sync"
@ -420,7 +419,7 @@ type stubClusterInitializer struct {
func (i *stubClusterInitializer) InitCluster(
context.Context, string, string,
bool, components.Components, []string, string, *slog.Logger,
bool, components.Components, []string, string,
) ([]byte, error) {
return i.initClusterKubeconfig, i.initClusterErr
}

View File

@ -288,7 +288,7 @@ func (c *JoinClient) startNodeAndJoin(ticket *joinproto.IssueJoinTicketResponse,
// We currently cannot recover from any failure in this function. Joining the k8s cluster
// sometimes fails transiently, and we don't want to brick the node because of that.
for i := range 3 {
err = c.joiner.JoinCluster(ctx, btd, c.role, ticket.KubernetesComponents, c.log)
err = c.joiner.JoinCluster(ctx, btd, c.role, ticket.KubernetesComponents)
if err == nil {
break
}
@ -399,7 +399,6 @@ type ClusterJoiner interface {
args *kubeadm.BootstrapTokenDiscovery,
peerRole role.Role,
k8sComponents components.Components,
log *slog.Logger,
) error
}

View File

@ -8,7 +8,6 @@ package joinclient
import (
"context"
"log/slog"
"net"
"strconv"
"sync"
@ -350,7 +349,7 @@ type stubClusterJoiner struct {
joinClusterErr error
}
func (j *stubClusterJoiner) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components, *slog.Logger) error {
func (j *stubClusterJoiner) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components) error {
j.joinClusterCalled++
if j.numBadCalls == 0 {
return nil

View File

@ -11,6 +11,7 @@ go_library(
importpath = "github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes",
visibility = ["//bootstrapper:__subpackages__"],
deps = [
"//bootstrapper/internal/etcdio",
"//bootstrapper/internal/kubernetes/k8sapi",
"//bootstrapper/internal/kubernetes/kubewaiter",
"//internal/cloud/cloudprovider",

View File

@ -9,6 +9,7 @@ package kubernetes
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
@ -16,6 +17,7 @@ import (
"strings"
"time"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/etcdio"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/k8sapi"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/kubewaiter"
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
@ -40,37 +42,48 @@ type kubeAPIWaiter interface {
Wait(ctx context.Context, kubernetesClient kubewaiter.KubernetesClient) error
}
type etcdIOPrioritizer interface {
PrioritizeIO() error
}
// KubeWrapper implements Cluster interface.
type KubeWrapper struct {
cloudProvider string
clusterUtil clusterUtil
kubeAPIWaiter kubeAPIWaiter
configProvider configurationProvider
client k8sapi.Client
providerMetadata ProviderMetadata
getIPAddr func() (string, error)
cloudProvider string
clusterUtil clusterUtil
kubeAPIWaiter kubeAPIWaiter
configProvider configurationProvider
client k8sapi.Client
providerMetadata ProviderMetadata
etcdIOPrioritizer etcdIOPrioritizer
getIPAddr func() (string, error)
log *slog.Logger
}
// New creates a new KubeWrapper with real values.
func New(cloudProvider string, clusterUtil clusterUtil, configProvider configurationProvider, client k8sapi.Client,
providerMetadata ProviderMetadata, kubeAPIWaiter kubeAPIWaiter,
providerMetadata ProviderMetadata, kubeAPIWaiter kubeAPIWaiter, log *slog.Logger,
) *KubeWrapper {
etcdIOPrioritizer := etcdio.NewClient(log)
return &KubeWrapper{
cloudProvider: cloudProvider,
clusterUtil: clusterUtil,
kubeAPIWaiter: kubeAPIWaiter,
configProvider: configProvider,
client: client,
providerMetadata: providerMetadata,
getIPAddr: getIPAddr,
cloudProvider: cloudProvider,
clusterUtil: clusterUtil,
kubeAPIWaiter: kubeAPIWaiter,
configProvider: configProvider,
client: client,
providerMetadata: providerMetadata,
getIPAddr: getIPAddr,
log: log,
etcdIOPrioritizer: etcdIOPrioritizer,
}
}
// InitCluster initializes a new Kubernetes cluster and applies pod network provider.
func (k *KubeWrapper) InitCluster(
ctx context.Context, versionString, clusterName string, conformanceMode bool, kubernetesComponents components.Components, apiServerCertSANs []string, serviceCIDR string, log *slog.Logger,
ctx context.Context, versionString, clusterName string, conformanceMode bool, kubernetesComponents components.Components, apiServerCertSANs []string, serviceCIDR string,
) ([]byte, error) {
log.With(slog.String("version", versionString)).Info("Installing Kubernetes components")
k.log.With(slog.String("version", versionString)).Info("Installing Kubernetes components")
if err := k.clusterUtil.InstallComponents(ctx, kubernetesComponents); err != nil {
return nil, err
}
@ -78,7 +91,7 @@ func (k *KubeWrapper) InitCluster(
var validIPs []net.IP
// Step 1: retrieve cloud metadata for Kubernetes configuration
log.Info("Retrieving node metadata")
k.log.Info("Retrieving node metadata")
instance, err := k.providerMetadata.Self(ctx)
if err != nil {
return nil, fmt.Errorf("retrieving own instance metadata: %w", err)
@ -106,7 +119,7 @@ func (k *KubeWrapper) InitCluster(
certSANs := []string{nodeIP}
certSANs = append(certSANs, apiServerCertSANs...)
log.With(
k.log.With(
slog.String("nodeName", nodeName),
slog.String("providerID", instance.ProviderID),
slog.String("nodeIP", nodeIP),
@ -132,12 +145,22 @@ func (k *KubeWrapper) InitCluster(
if err != nil {
return nil, fmt.Errorf("encoding kubeadm init configuration as YAML: %w", err)
}
log.Info("Initializing Kubernetes cluster")
kubeConfig, err := k.clusterUtil.InitCluster(ctx, initConfigYAML, nodeName, clusterName, validIPs, conformanceMode, log)
k.log.Info("Initializing Kubernetes cluster")
kubeConfig, err := k.clusterUtil.InitCluster(ctx, initConfigYAML, nodeName, clusterName, validIPs, conformanceMode, k.log)
if err != nil {
return nil, fmt.Errorf("kubeadm init: %w", err)
}
k.log.Info("Prioritizing etcd I/O")
err = k.etcdIOPrioritizer.PrioritizeIO()
if errors.Is(err, etcdio.ErrNoEtcdProcess) {
k.log.Warn("Skipping etcd I/O prioritization as etcd process is not running. " +
"This is expected if this node is a non-control-plane node.")
} else if err != nil {
return nil, fmt.Errorf("prioritizing etcd I/O: %w", err)
}
err = k.client.Initialize(kubeConfig)
if err != nil {
return nil, fmt.Errorf("initializing kubectl client: %w", err)
@ -177,22 +200,23 @@ func (k *KubeWrapper) InitCluster(
return nil, fmt.Errorf("annotating node with Kubernetes components hash: %w", err)
}
log.Info("Setting up internal-config ConfigMap")
k.log.Info("Setting up internal-config ConfigMap")
if err := k.setupInternalConfigMap(ctx); err != nil {
return nil, fmt.Errorf("failed to setup internal ConfigMap: %w", err)
}
return kubeConfig, nil
}
// JoinCluster joins existing Kubernetes cluster.
func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTokenDiscovery, peerRole role.Role, k8sComponents components.Components, log *slog.Logger) error {
log.With("k8sComponents", k8sComponents).Info("Installing provided kubernetes components")
func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTokenDiscovery, peerRole role.Role, k8sComponents components.Components) error {
k.log.With("k8sComponents", k8sComponents).Info("Installing provided kubernetes components")
if err := k.clusterUtil.InstallComponents(ctx, k8sComponents); err != nil {
return fmt.Errorf("installing kubernetes components: %w", err)
}
// Step 1: retrieve cloud metadata for Kubernetes configuration
log.Info("Retrieving node metadata")
k.log.Info("Retrieving node metadata")
instance, err := k.providerMetadata.Self(ctx)
if err != nil {
return fmt.Errorf("retrieving own instance metadata: %w", err)
@ -212,7 +236,7 @@ func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTo
// override join endpoint to go over lb
args.APIServerEndpoint = net.JoinHostPort(loadBalancerHost, loadBalancerPort)
log.With(
k.log.With(
slog.String("nodeName", nodeName),
slog.String("providerID", providerID),
slog.String("nodeIP", nodeInternalIP),
@ -237,11 +261,21 @@ func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTo
if err != nil {
return fmt.Errorf("encoding kubeadm join configuration as YAML: %w", err)
}
log.With(slog.String("apiServerEndpoint", args.APIServerEndpoint)).Info("Joining Kubernetes cluster")
if err := k.clusterUtil.JoinCluster(ctx, joinConfigYAML, log); err != nil {
k.log.With(slog.String("apiServerEndpoint", args.APIServerEndpoint)).Info("Joining Kubernetes cluster")
if err := k.clusterUtil.JoinCluster(ctx, joinConfigYAML, k.log); err != nil {
return fmt.Errorf("joining cluster: %v; %w ", string(joinConfigYAML), err)
}
k.log.Info("Prioritizing etcd I/O")
err = k.etcdIOPrioritizer.PrioritizeIO()
if errors.Is(err, etcdio.ErrNoEtcdProcess) {
k.log.Warn("Skipping etcd I/O prioritization as etcd process is not running. " +
"This is expected if this node is a non-control-plane node.")
} else if err != nil {
return fmt.Errorf("prioritizing etcd I/O: %w", err)
}
return nil
}
@ -301,6 +335,15 @@ func (k *KubeWrapper) StartKubelet() error {
return fmt.Errorf("starting kubelet: %w", err)
}
k.log.Info("Prioritizing etcd I/O")
err := k.etcdIOPrioritizer.PrioritizeIO()
if errors.Is(err, etcdio.ErrNoEtcdProcess) {
k.log.Warn("Skipping etcd I/O prioritization as etcd process is not running. " +
"This is expected if this node is a non-control-plane node.")
} else if err != nil {
return fmt.Errorf("prioritizing etcd I/O: %w", err)
}
return nil
}

View File

@ -42,17 +42,19 @@ func TestInitCluster(t *testing.T) {
aliasIPRange := "192.0.2.0/24"
testCases := map[string]struct {
clusterUtil stubClusterUtil
kubectl stubKubectl
kubeAPIWaiter stubKubeAPIWaiter
providerMetadata ProviderMetadata
wantConfig k8sapi.KubeadmInitYAML
wantErr bool
k8sVersion versions.ValidK8sVersion
clusterUtil stubClusterUtil
kubectl stubKubectl
kubeAPIWaiter stubKubeAPIWaiter
providerMetadata ProviderMetadata
wantConfig k8sapi.KubeadmInitYAML
etcdIOPrioritizer stubEtcdIOPrioritizer
wantErr bool
k8sVersion versions.ValidK8sVersion
}{
"kubeadm init works with metadata and loadbalancer": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
Name: nodeName,
@ -85,8 +87,9 @@ func TestInitCluster(t *testing.T) {
k8sVersion: versions.Default,
},
"kubeadm init fails when annotating itself": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
Name: nodeName,
@ -102,8 +105,9 @@ func TestInitCluster(t *testing.T) {
k8sVersion: versions.Default,
},
"kubeadm init fails when retrieving metadata self": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfErr: assert.AnError,
},
@ -111,7 +115,8 @@ func TestInitCluster(t *testing.T) {
k8sVersion: versions.Default,
},
"kubeadm init fails when retrieving metadata loadbalancer ip": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
getLoadBalancerEndpointErr: assert.AnError,
},
@ -123,51 +128,58 @@ func TestInitCluster(t *testing.T) {
initClusterErr: assert.AnError,
kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when deploying cilium": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when setting up constellation-services chart": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when reading kubeconfig": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when setting up verification service": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when waiting for kubeAPI server": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{waitErr: assert.AnError},
providerMetadata: &stubProviderMetadata{},
k8sVersion: versions.Default,
wantErr: true,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{waitErr: assert.AnError},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
k8sVersion: versions.Default,
wantErr: true,
},
"unsupported k8sVersion fails cluster creation": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
k8sVersion: "1.19",
wantErr: true,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
k8sVersion: "1.19",
wantErr: true,
},
}
@ -184,11 +196,12 @@ func TestInitCluster(t *testing.T) {
configProvider: &stubConfigProvider{initConfig: k8sapi.KubeadmInitYAML{}},
client: &tc.kubectl,
getIPAddr: func() (string, error) { return privateIP, nil },
log: logger.NewTest(t),
}
_, err := kube.InitCluster(
context.Background(), string(tc.k8sVersion), "kubernetes",
false, nil, nil, "", logger.NewTest(t),
false, nil, nil, "",
)
if tc.wantErr {
@ -224,15 +237,17 @@ func TestJoinCluster(t *testing.T) {
}
testCases := map[string]struct {
clusterUtil stubClusterUtil
providerMetadata ProviderMetadata
wantConfig kubeadm.JoinConfiguration
role role.Role
k8sComponents components.Components
wantErr bool
clusterUtil stubClusterUtil
providerMetadata ProviderMetadata
wantConfig kubeadm.JoinConfiguration
role role.Role
k8sComponents components.Components
etcdIOPrioritizer stubEtcdIOPrioritizer
wantErr bool
}{
"kubeadm join worker works with metadata and remote Kubernetes Components": {
clusterUtil: stubClusterUtil{},
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
ProviderID: "provider-id",
@ -253,7 +268,8 @@ func TestJoinCluster(t *testing.T) {
},
},
"kubeadm join worker works with metadata and local Kubernetes components": {
clusterUtil: stubClusterUtil{},
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
ProviderID: "provider-id",
@ -273,7 +289,8 @@ func TestJoinCluster(t *testing.T) {
},
},
"kubeadm join worker works with metadata and cloud controller manager": {
clusterUtil: stubClusterUtil{},
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
ProviderID: "provider-id",
@ -293,7 +310,8 @@ func TestJoinCluster(t *testing.T) {
},
},
"kubeadm join control-plane node works with metadata": {
clusterUtil: stubClusterUtil{},
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
ProviderID: "provider-id",
@ -320,7 +338,8 @@ func TestJoinCluster(t *testing.T) {
},
},
"kubeadm join worker fails when installing remote Kubernetes components": {
clusterUtil: stubClusterUtil{installComponentsErr: errors.New("error")},
clusterUtil: stubClusterUtil{installComponentsErr: errors.New("error")},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
ProviderID: "provider-id",
@ -333,7 +352,8 @@ func TestJoinCluster(t *testing.T) {
wantErr: true,
},
"kubeadm join worker fails when retrieving self metadata": {
clusterUtil: stubClusterUtil{},
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfErr: assert.AnError,
},
@ -341,10 +361,18 @@ func TestJoinCluster(t *testing.T) {
wantErr: true,
},
"kubeadm join worker fails when applying the join config": {
clusterUtil: stubClusterUtil{joinClusterErr: assert.AnError},
providerMetadata: &stubProviderMetadata{},
role: role.Worker,
wantErr: true,
clusterUtil: stubClusterUtil{joinClusterErr: assert.AnError},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
role: role.Worker,
wantErr: true,
},
"etcd prioritizer error fails": {
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{assert.AnError},
providerMetadata: &stubProviderMetadata{},
role: role.Worker,
wantErr: true,
},
}
@ -358,9 +386,10 @@ func TestJoinCluster(t *testing.T) {
providerMetadata: tc.providerMetadata,
configProvider: &stubConfigProvider{},
getIPAddr: func() (string, error) { return privateIP, nil },
log: logger.NewTest(t),
}
err := kube.JoinCluster(context.Background(), joinCommand, tc.role, tc.k8sComponents, logger.NewTest(t))
err := kube.JoinCluster(context.Background(), joinCommand, tc.role, tc.k8sComponents)
if tc.wantErr {
assert.Error(err)
return
@ -545,3 +574,11 @@ type stubKubeAPIWaiter struct {
func (s *stubKubeAPIWaiter) Wait(_ context.Context, _ kubewaiter.KubernetesClient) error {
return s.waitErr
}
type stubEtcdIOPrioritizer struct {
prioritizeErr error
}
func (s *stubEtcdIOPrioritizer) PrioritizeIO() error {
return s.prioritizeErr
}