bootstrapper: prioritize etcd disk I/O (#3114)

This commit is contained in:
Moritz Sanft 2024-05-22 16:12:53 +02:00 committed by GitHub
parent 902b7f49a8
commit 9c100a542c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 334 additions and 120 deletions

View File

@ -86,7 +86,7 @@ func main() {
clusterInitJoiner = kubernetes.New(
"aws", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(),
metadata, &kubewaiter.CloudKubeAPIWaiter{},
metadata, &kubewaiter.CloudKubeAPIWaiter{}, log,
)
openDevice = vtpm.OpenVTPM
fs = afero.NewOsFs()
@ -102,7 +102,7 @@ func main() {
metadataAPI = metadata
clusterInitJoiner = kubernetes.New(
"gcp", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(),
metadata, &kubewaiter.CloudKubeAPIWaiter{},
metadata, &kubewaiter.CloudKubeAPIWaiter{}, log,
)
openDevice = vtpm.OpenVTPM
fs = afero.NewOsFs()
@ -122,7 +122,7 @@ func main() {
metadataAPI = metadata
clusterInitJoiner = kubernetes.New(
"azure", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(),
metadata, &kubewaiter.CloudKubeAPIWaiter{},
metadata, &kubewaiter.CloudKubeAPIWaiter{}, log,
)
openDevice = vtpm.OpenVTPM
@ -132,7 +132,7 @@ func main() {
metadata := qemucloud.New()
clusterInitJoiner = kubernetes.New(
"qemu", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(),
metadata, &kubewaiter.CloudKubeAPIWaiter{},
metadata, &kubewaiter.CloudKubeAPIWaiter{}, log,
)
metadataAPI = metadata
@ -155,7 +155,7 @@ func main() {
}
clusterInitJoiner = kubernetes.New(
"openstack", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.NewUninitialized(),
metadata, &kubewaiter.CloudKubeAPIWaiter{},
metadata, &kubewaiter.CloudKubeAPIWaiter{}, log,
)
metadataAPI = metadata
openDevice = vtpm.OpenVTPM

View File

@ -8,7 +8,6 @@ package main
import (
"context"
"log/slog"
"github.com/edgelesssys/constellation/v2/internal/cloud/metadata"
"github.com/edgelesssys/constellation/v2/internal/role"
@ -22,13 +21,13 @@ type clusterFake struct{}
// InitCluster fakes bootstrapping a new cluster with the current node being the master, returning the arguments required to join the cluster.
func (c *clusterFake) InitCluster(
context.Context, string, string,
bool, components.Components, []string, string, *slog.Logger,
bool, components.Components, []string, string,
) ([]byte, error) {
return []byte{}, nil
}
// JoinCluster will fake joining the current node to an existing cluster.
func (c *clusterFake) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components, *slog.Logger) error {
func (c *clusterFake) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components) error {
return nil
}

View File

@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "etcdio",
srcs = ["etcdio.go"],
importpath = "github.com/edgelesssys/constellation/v2/bootstrapper/internal/etcdio",
visibility = ["//bootstrapper:__subpackages__"],
deps = ["@org_golang_x_sys//unix"],
)

View File

@ -0,0 +1,156 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
// The etcdio package provides utilities to manage etcd I/O.
package etcdio
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"path"
"strconv"
"time"
"golang.org/x/sys/unix"
)
var (
// ErrNoEtcdProcess is returned when no etcd process is found on the node.
ErrNoEtcdProcess = errors.New("no etcd process found on node")
// ErrMultipleEtcdProcesses is returned when multiple etcd processes are found on the node.
ErrMultipleEtcdProcesses = errors.New("multiple etcd processes found on node")
)
const (
// Tells the syscall that a process' priority is going to be set.
// See https://elixir.bootlin.com/linux/v6.9.1/source/include/uapi/linux/ioprio.h#L54.
ioPrioWhoProcess = 1
// See https://elixir.bootlin.com/linux/v6.9.1/source/include/uapi/linux/ioprio.h#L11.
ioPrioClassShift = 13
ioPrioNrClasses = 8
ioPrioClassMask = ioPrioNrClasses - 1
ioPrioPrioMask = (1 << ioPrioClassShift) - 1
targetClass = 1 // Realtime IO class for best scheduling prio
targetPrio = 0 // Highest priority within the class
)
// Client is a client for managing etcd I/O.
type Client struct {
log *slog.Logger
}
// NewClient creates a new etcd I/O management client.
func NewClient(log *slog.Logger) *Client {
return &Client{log: log}
}
// PrioritizeIO tries to prioritize the I/O of the etcd process.
// Since it might be possible that the process just started (if this method is called
// right after the kubelet started), it retries to do its work each second
// until it succeeds or the timeout of 10 seconds is reached.
func (c *Client) PrioritizeIO() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for {
c.log.Info("Prioritizing etcd I/O")
err := c.setIOPriority()
if err == nil {
// Success, return directly
return
} else if errors.Is(err, ErrNoEtcdProcess) {
c.log.Info("No etcd process found, retrying")
} else {
c.log.Warn("Prioritizing etcd I/O failed", "error", err)
return
}
select {
case <-ticker.C:
case <-timeout.Done():
c.log.Warn("Timed out waiting for etcd to start")
return
}
}
}
// setIOPriority tries to find the etcd process on the node and prioritizes its I/O.
func (c *Client) setIOPriority() error {
// find etcd process(es)
pid, err := c.findEtcdProcess()
if err != nil {
return fmt.Errorf("finding etcd process: %w", err)
}
// Highest realtime priority value for the etcd process, see https://elixir.bootlin.com/linux/v6.9.1/source/include/uapi/linux/ioprio.h
// for the calculation details.
prioVal := ((targetClass & ioPrioClassMask) << ioPrioClassShift) | (targetPrio & ioPrioPrioMask)
// see https://man7.org/linux/man-pages/man2/ioprio_set.2.html
ret, _, errno := unix.Syscall(unix.SYS_IOPRIO_SET, ioPrioWhoProcess, uintptr(pid), uintptr(prioVal))
if ret != 0 {
return fmt.Errorf("setting I/O priority for etcd: %w", errno)
}
return nil
}
// findEtcdProcess tries to find the etcd process on the node.
func (c *Client) findEtcdProcess() (int, error) {
procDir, err := os.Open("/proc")
if err != nil {
return 0, fmt.Errorf("opening /proc: %w", err)
}
defer procDir.Close()
procEntries, err := procDir.Readdirnames(0)
if err != nil {
return 0, fmt.Errorf("reading /proc: %w", err)
}
// find etcd process(es)
etcdPIDs := []int{}
for _, f := range procEntries {
// exclude non-pid dirs
if f[0] < '0' || f[0] > '9' {
continue
}
exe, err := os.Readlink(fmt.Sprintf("/proc/%s/exe", f))
if err != nil {
continue
}
if path.Base(exe) != "etcd" {
continue
}
pid, err := strconv.Atoi(f)
if err != nil {
continue
}
// add the PID to the list of etcd PIDs
etcdPIDs = append(etcdPIDs, pid)
}
if len(etcdPIDs) == 0 {
return 0, ErrNoEtcdProcess
}
if len(etcdPIDs) > 1 {
return 0, ErrMultipleEtcdProcesses
}
return etcdPIDs[0], nil
}

View File

@ -234,7 +234,6 @@ func (s *Server) Init(req *initproto.InitRequest, stream initproto.API_InitServe
req.KubernetesComponents,
req.ApiserverCertSans,
req.ServiceCidr,
s.log,
)
if err != nil {
if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "initializing cluster: %s", err)); e != nil {
@ -357,7 +356,6 @@ type ClusterInitializer interface {
kubernetesComponents components.Components,
apiServerCertSANs []string,
serviceCIDR string,
log *slog.Logger,
) ([]byte, error)
}

View File

@ -11,7 +11,6 @@ import (
"context"
"errors"
"io"
"log/slog"
"net"
"strings"
"sync"
@ -420,7 +419,7 @@ type stubClusterInitializer struct {
func (i *stubClusterInitializer) InitCluster(
context.Context, string, string,
bool, components.Components, []string, string, *slog.Logger,
bool, components.Components, []string, string,
) ([]byte, error) {
return i.initClusterKubeconfig, i.initClusterErr
}

View File

@ -288,7 +288,7 @@ func (c *JoinClient) startNodeAndJoin(ticket *joinproto.IssueJoinTicketResponse,
// We currently cannot recover from any failure in this function. Joining the k8s cluster
// sometimes fails transiently, and we don't want to brick the node because of that.
for i := range 3 {
err = c.joiner.JoinCluster(ctx, btd, c.role, ticket.KubernetesComponents, c.log)
err = c.joiner.JoinCluster(ctx, btd, c.role, ticket.KubernetesComponents)
if err == nil {
break
}
@ -399,7 +399,6 @@ type ClusterJoiner interface {
args *kubeadm.BootstrapTokenDiscovery,
peerRole role.Role,
k8sComponents components.Components,
log *slog.Logger,
) error
}

View File

@ -8,7 +8,6 @@ package joinclient
import (
"context"
"log/slog"
"net"
"strconv"
"sync"
@ -350,7 +349,7 @@ type stubClusterJoiner struct {
joinClusterErr error
}
func (j *stubClusterJoiner) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components, *slog.Logger) error {
func (j *stubClusterJoiner) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, components.Components) error {
j.joinClusterCalled++
if j.numBadCalls == 0 {
return nil

View File

@ -11,6 +11,7 @@ go_library(
importpath = "github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes",
visibility = ["//bootstrapper:__subpackages__"],
deps = [
"//bootstrapper/internal/etcdio",
"//bootstrapper/internal/kubernetes/k8sapi",
"//bootstrapper/internal/kubernetes/kubewaiter",
"//internal/cloud/cloudprovider",

View File

@ -16,6 +16,7 @@ import (
"strings"
"time"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/etcdio"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/k8sapi"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/kubewaiter"
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
@ -40,37 +41,48 @@ type kubeAPIWaiter interface {
Wait(ctx context.Context, kubernetesClient kubewaiter.KubernetesClient) error
}
type etcdIOPrioritizer interface {
PrioritizeIO()
}
// KubeWrapper implements Cluster interface.
type KubeWrapper struct {
cloudProvider string
clusterUtil clusterUtil
kubeAPIWaiter kubeAPIWaiter
configProvider configurationProvider
client k8sapi.Client
providerMetadata ProviderMetadata
getIPAddr func() (string, error)
cloudProvider string
clusterUtil clusterUtil
kubeAPIWaiter kubeAPIWaiter
configProvider configurationProvider
client k8sapi.Client
providerMetadata ProviderMetadata
etcdIOPrioritizer etcdIOPrioritizer
getIPAddr func() (string, error)
log *slog.Logger
}
// New creates a new KubeWrapper with real values.
func New(cloudProvider string, clusterUtil clusterUtil, configProvider configurationProvider, client k8sapi.Client,
providerMetadata ProviderMetadata, kubeAPIWaiter kubeAPIWaiter,
providerMetadata ProviderMetadata, kubeAPIWaiter kubeAPIWaiter, log *slog.Logger,
) *KubeWrapper {
etcdIOPrioritizer := etcdio.NewClient(log)
return &KubeWrapper{
cloudProvider: cloudProvider,
clusterUtil: clusterUtil,
kubeAPIWaiter: kubeAPIWaiter,
configProvider: configProvider,
client: client,
providerMetadata: providerMetadata,
getIPAddr: getIPAddr,
cloudProvider: cloudProvider,
clusterUtil: clusterUtil,
kubeAPIWaiter: kubeAPIWaiter,
configProvider: configProvider,
client: client,
providerMetadata: providerMetadata,
getIPAddr: getIPAddr,
log: log,
etcdIOPrioritizer: etcdIOPrioritizer,
}
}
// InitCluster initializes a new Kubernetes cluster and applies pod network provider.
func (k *KubeWrapper) InitCluster(
ctx context.Context, versionString, clusterName string, conformanceMode bool, kubernetesComponents components.Components, apiServerCertSANs []string, serviceCIDR string, log *slog.Logger,
ctx context.Context, versionString, clusterName string, conformanceMode bool, kubernetesComponents components.Components, apiServerCertSANs []string, serviceCIDR string,
) ([]byte, error) {
log.With(slog.String("version", versionString)).Info("Installing Kubernetes components")
k.log.With(slog.String("version", versionString)).Info("Installing Kubernetes components")
if err := k.clusterUtil.InstallComponents(ctx, kubernetesComponents); err != nil {
return nil, err
}
@ -78,7 +90,7 @@ func (k *KubeWrapper) InitCluster(
var validIPs []net.IP
// Step 1: retrieve cloud metadata for Kubernetes configuration
log.Info("Retrieving node metadata")
k.log.Info("Retrieving node metadata")
instance, err := k.providerMetadata.Self(ctx)
if err != nil {
return nil, fmt.Errorf("retrieving own instance metadata: %w", err)
@ -106,7 +118,7 @@ func (k *KubeWrapper) InitCluster(
certSANs := []string{nodeIP}
certSANs = append(certSANs, apiServerCertSANs...)
log.With(
k.log.With(
slog.String("nodeName", nodeName),
slog.String("providerID", instance.ProviderID),
slog.String("nodeIP", nodeIP),
@ -132,12 +144,16 @@ func (k *KubeWrapper) InitCluster(
if err != nil {
return nil, fmt.Errorf("encoding kubeadm init configuration as YAML: %w", err)
}
log.Info("Initializing Kubernetes cluster")
kubeConfig, err := k.clusterUtil.InitCluster(ctx, initConfigYAML, nodeName, clusterName, validIPs, conformanceMode, log)
k.log.Info("Initializing Kubernetes cluster")
kubeConfig, err := k.clusterUtil.InitCluster(ctx, initConfigYAML, nodeName, clusterName, validIPs, conformanceMode, k.log)
if err != nil {
return nil, fmt.Errorf("kubeadm init: %w", err)
}
k.log.Info("Prioritizing etcd I/O")
k.etcdIOPrioritizer.PrioritizeIO()
err = k.client.Initialize(kubeConfig)
if err != nil {
return nil, fmt.Errorf("initializing kubectl client: %w", err)
@ -177,22 +193,23 @@ func (k *KubeWrapper) InitCluster(
return nil, fmt.Errorf("annotating node with Kubernetes components hash: %w", err)
}
log.Info("Setting up internal-config ConfigMap")
k.log.Info("Setting up internal-config ConfigMap")
if err := k.setupInternalConfigMap(ctx); err != nil {
return nil, fmt.Errorf("failed to setup internal ConfigMap: %w", err)
}
return kubeConfig, nil
}
// JoinCluster joins existing Kubernetes cluster.
func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTokenDiscovery, peerRole role.Role, k8sComponents components.Components, log *slog.Logger) error {
log.With("k8sComponents", k8sComponents).Info("Installing provided kubernetes components")
func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTokenDiscovery, peerRole role.Role, k8sComponents components.Components) error {
k.log.With("k8sComponents", k8sComponents).Info("Installing provided kubernetes components")
if err := k.clusterUtil.InstallComponents(ctx, k8sComponents); err != nil {
return fmt.Errorf("installing kubernetes components: %w", err)
}
// Step 1: retrieve cloud metadata for Kubernetes configuration
log.Info("Retrieving node metadata")
k.log.Info("Retrieving node metadata")
instance, err := k.providerMetadata.Self(ctx)
if err != nil {
return fmt.Errorf("retrieving own instance metadata: %w", err)
@ -212,7 +229,7 @@ func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTo
// override join endpoint to go over lb
args.APIServerEndpoint = net.JoinHostPort(loadBalancerHost, loadBalancerPort)
log.With(
k.log.With(
slog.String("nodeName", nodeName),
slog.String("providerID", providerID),
slog.String("nodeIP", nodeInternalIP),
@ -237,11 +254,18 @@ func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTo
if err != nil {
return fmt.Errorf("encoding kubeadm join configuration as YAML: %w", err)
}
log.With(slog.String("apiServerEndpoint", args.APIServerEndpoint)).Info("Joining Kubernetes cluster")
if err := k.clusterUtil.JoinCluster(ctx, joinConfigYAML, log); err != nil {
k.log.With(slog.String("apiServerEndpoint", args.APIServerEndpoint)).Info("Joining Kubernetes cluster")
if err := k.clusterUtil.JoinCluster(ctx, joinConfigYAML, k.log); err != nil {
return fmt.Errorf("joining cluster: %v; %w ", string(joinConfigYAML), err)
}
// If on control plane (and thus with etcd), try to prioritize etcd I/O.
if peerRole == role.ControlPlane {
k.log.Info("Prioritizing etcd I/O")
k.etcdIOPrioritizer.PrioritizeIO()
}
return nil
}
@ -301,6 +325,8 @@ func (k *KubeWrapper) StartKubelet() error {
return fmt.Errorf("starting kubelet: %w", err)
}
k.etcdIOPrioritizer.PrioritizeIO()
return nil
}

View File

@ -42,17 +42,19 @@ func TestInitCluster(t *testing.T) {
aliasIPRange := "192.0.2.0/24"
testCases := map[string]struct {
clusterUtil stubClusterUtil
kubectl stubKubectl
kubeAPIWaiter stubKubeAPIWaiter
providerMetadata ProviderMetadata
wantConfig k8sapi.KubeadmInitYAML
wantErr bool
k8sVersion versions.ValidK8sVersion
clusterUtil stubClusterUtil
kubectl stubKubectl
kubeAPIWaiter stubKubeAPIWaiter
providerMetadata ProviderMetadata
wantConfig k8sapi.KubeadmInitYAML
etcdIOPrioritizer stubEtcdIOPrioritizer
wantErr bool
k8sVersion versions.ValidK8sVersion
}{
"kubeadm init works with metadata and loadbalancer": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
Name: nodeName,
@ -85,8 +87,9 @@ func TestInitCluster(t *testing.T) {
k8sVersion: versions.Default,
},
"kubeadm init fails when annotating itself": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
Name: nodeName,
@ -102,8 +105,9 @@ func TestInitCluster(t *testing.T) {
k8sVersion: versions.Default,
},
"kubeadm init fails when retrieving metadata self": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfErr: assert.AnError,
},
@ -111,7 +115,8 @@ func TestInitCluster(t *testing.T) {
k8sVersion: versions.Default,
},
"kubeadm init fails when retrieving metadata loadbalancer ip": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
getLoadBalancerEndpointErr: assert.AnError,
},
@ -123,51 +128,58 @@ func TestInitCluster(t *testing.T) {
initClusterErr: assert.AnError,
kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when deploying cilium": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when setting up constellation-services chart": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when reading kubeconfig": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when setting up verification service": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when waiting for kubeAPI server": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{waitErr: assert.AnError},
providerMetadata: &stubProviderMetadata{},
k8sVersion: versions.Default,
wantErr: true,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{waitErr: assert.AnError},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
k8sVersion: versions.Default,
wantErr: true,
},
"unsupported k8sVersion fails cluster creation": {
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
k8sVersion: "1.19",
wantErr: true,
clusterUtil: stubClusterUtil{kubeconfig: []byte("someKubeconfig")},
kubeAPIWaiter: stubKubeAPIWaiter{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
k8sVersion: "1.19",
wantErr: true,
},
}
@ -177,18 +189,20 @@ func TestInitCluster(t *testing.T) {
require := require.New(t)
kube := KubeWrapper{
cloudProvider: "aws", // provide a valid cloud provider for cilium installation
clusterUtil: &tc.clusterUtil,
providerMetadata: tc.providerMetadata,
kubeAPIWaiter: &tc.kubeAPIWaiter,
configProvider: &stubConfigProvider{initConfig: k8sapi.KubeadmInitYAML{}},
client: &tc.kubectl,
getIPAddr: func() (string, error) { return privateIP, nil },
cloudProvider: "aws", // provide a valid cloud provider for cilium installation
clusterUtil: &tc.clusterUtil,
providerMetadata: tc.providerMetadata,
kubeAPIWaiter: &tc.kubeAPIWaiter,
configProvider: &stubConfigProvider{initConfig: k8sapi.KubeadmInitYAML{}},
client: &tc.kubectl,
getIPAddr: func() (string, error) { return privateIP, nil },
etcdIOPrioritizer: &tc.etcdIOPrioritizer,
log: logger.NewTest(t),
}
_, err := kube.InitCluster(
context.Background(), string(tc.k8sVersion), "kubernetes",
false, nil, nil, "", logger.NewTest(t),
false, nil, nil, "",
)
if tc.wantErr {
@ -224,15 +238,17 @@ func TestJoinCluster(t *testing.T) {
}
testCases := map[string]struct {
clusterUtil stubClusterUtil
providerMetadata ProviderMetadata
wantConfig kubeadm.JoinConfiguration
role role.Role
k8sComponents components.Components
wantErr bool
clusterUtil stubClusterUtil
providerMetadata ProviderMetadata
wantConfig kubeadm.JoinConfiguration
role role.Role
k8sComponents components.Components
etcdIOPrioritizer stubEtcdIOPrioritizer
wantErr bool
}{
"kubeadm join worker works with metadata and remote Kubernetes Components": {
clusterUtil: stubClusterUtil{},
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
ProviderID: "provider-id",
@ -253,7 +269,8 @@ func TestJoinCluster(t *testing.T) {
},
},
"kubeadm join worker works with metadata and local Kubernetes components": {
clusterUtil: stubClusterUtil{},
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
ProviderID: "provider-id",
@ -273,7 +290,8 @@ func TestJoinCluster(t *testing.T) {
},
},
"kubeadm join worker works with metadata and cloud controller manager": {
clusterUtil: stubClusterUtil{},
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
ProviderID: "provider-id",
@ -293,7 +311,8 @@ func TestJoinCluster(t *testing.T) {
},
},
"kubeadm join control-plane node works with metadata": {
clusterUtil: stubClusterUtil{},
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
ProviderID: "provider-id",
@ -320,7 +339,8 @@ func TestJoinCluster(t *testing.T) {
},
},
"kubeadm join worker fails when installing remote Kubernetes components": {
clusterUtil: stubClusterUtil{installComponentsErr: errors.New("error")},
clusterUtil: stubClusterUtil{installComponentsErr: errors.New("error")},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfResp: metadata.InstanceMetadata{
ProviderID: "provider-id",
@ -333,7 +353,8 @@ func TestJoinCluster(t *testing.T) {
wantErr: true,
},
"kubeadm join worker fails when retrieving self metadata": {
clusterUtil: stubClusterUtil{},
clusterUtil: stubClusterUtil{},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{
selfErr: assert.AnError,
},
@ -341,10 +362,11 @@ func TestJoinCluster(t *testing.T) {
wantErr: true,
},
"kubeadm join worker fails when applying the join config": {
clusterUtil: stubClusterUtil{joinClusterErr: assert.AnError},
providerMetadata: &stubProviderMetadata{},
role: role.Worker,
wantErr: true,
clusterUtil: stubClusterUtil{joinClusterErr: assert.AnError},
etcdIOPrioritizer: stubEtcdIOPrioritizer{},
providerMetadata: &stubProviderMetadata{},
role: role.Worker,
wantErr: true,
},
}
@ -354,13 +376,15 @@ func TestJoinCluster(t *testing.T) {
require := require.New(t)
kube := KubeWrapper{
clusterUtil: &tc.clusterUtil,
providerMetadata: tc.providerMetadata,
configProvider: &stubConfigProvider{},
getIPAddr: func() (string, error) { return privateIP, nil },
clusterUtil: &tc.clusterUtil,
providerMetadata: tc.providerMetadata,
configProvider: &stubConfigProvider{},
getIPAddr: func() (string, error) { return privateIP, nil },
etcdIOPrioritizer: &tc.etcdIOPrioritizer,
log: logger.NewTest(t),
}
err := kube.JoinCluster(context.Background(), joinCommand, tc.role, tc.k8sComponents, logger.NewTest(t))
err := kube.JoinCluster(context.Background(), joinCommand, tc.role, tc.k8sComponents)
if tc.wantErr {
assert.Error(err)
return
@ -545,3 +569,7 @@ type stubKubeAPIWaiter struct {
func (s *stubKubeAPIWaiter) Wait(_ context.Context, _ kubewaiter.KubernetesClient) error {
return s.waitErr
}
type stubEtcdIOPrioritizer struct{}
func (s *stubEtcdIOPrioritizer) PrioritizeIO() {}