From 1331ee4077d0d16fc20b502ac851e762345de155 Mon Sep 17 00:00:00 2001 From: Malte Poll Date: Thu, 19 May 2022 17:18:22 +0200 Subject: [PATCH] Install kubernetes on init / join and restart kubelet after reboot Signed-off-by: Malte Poll --- coordinator/cmd/coordinator/main.go | 6 +-- coordinator/core/cluster.go | 7 +++ coordinator/core/cluster_test.go | 5 ++ coordinator/core/core.go | 6 +++ coordinator/core/core_test.go | 2 +- coordinator/kubernetes/k8sapi/systemd.go | 43 ++++++++++++++-- coordinator/kubernetes/k8sapi/util.go | 63 +++++++++++++++++++---- coordinator/kubernetes/kubernetes.go | 16 ++++++ coordinator/kubernetes/kubernetes_test.go | 11 ++++ 9 files changed, 141 insertions(+), 18 deletions(-) diff --git a/coordinator/cmd/coordinator/main.go b/coordinator/cmd/coordinator/main.go index b784c7426..1c91c9085 100644 --- a/coordinator/cmd/coordinator/main.go +++ b/coordinator/cmd/coordinator/main.go @@ -87,7 +87,7 @@ func main() { issuer = gcp.NewIssuer() validator = gcp.NewValidator(pcrs) - kube = kubernetes.New(&k8sapi.KubernetesUtil{}, &k8sapi.CoreOSConfiguration{}, kubectl.New()) + kube = kubernetes.New(k8sapi.NewKubernetesUtil(), &k8sapi.CoreOSConfiguration{}, kubectl.New()) gcpClient, err := gcpcloud.NewClient(context.Background()) if err != nil { log.Fatalf("creating GCP client failed: %v\n", err) @@ -112,7 +112,7 @@ func main() { issuer = azure.NewIssuer() validator = azure.NewValidator(pcrs) - kube = kubernetes.New(&k8sapi.KubernetesUtil{}, &k8sapi.CoreOSConfiguration{}, kubectl.New()) + kube = kubernetes.New(k8sapi.NewKubernetesUtil(), &k8sapi.CoreOSConfiguration{}, kubectl.New()) metadata, err = azurecloud.NewMetadata(context.Background()) if err != nil { log.Fatal(err) @@ -136,7 +136,7 @@ func main() { issuer = qemu.NewIssuer() validator = qemu.NewValidator(pcrs) - kube = kubernetes.New(&k8sapi.KubernetesUtil{}, &k8sapi.CoreOSConfiguration{}, kubectl.New()) + kube = kubernetes.New(k8sapi.NewKubernetesUtil(), &k8sapi.CoreOSConfiguration{}, kubectl.New()) // no support for cloud services in qemu metadata = &qemucloud.Metadata{} diff --git a/coordinator/core/cluster.go b/coordinator/core/cluster.go index 9ea67c9be..9917e709b 100644 --- a/coordinator/core/cluster.go +++ b/coordinator/core/cluster.go @@ -185,6 +185,8 @@ type Cluster interface { GetKubeadmCertificateKey() (string, error) // GetJoinToken returns a bootstrap (join) token. GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscovery, error) + // StartKubelet starts the kubelet service. + StartKubelet() error } // ClusterFake behaves like a real cluster, but does not actually initialize or join Kubernetes. @@ -219,6 +221,11 @@ func (c *ClusterFake) GetJoinToken(_ time.Duration) (*kubeadm.BootstrapTokenDisc }, nil } +// StartKubelet starts the kubelet service. +func (c *ClusterFake) StartKubelet() error { + return nil +} + // k8sCompliantHostname transforms a hostname to an RFC 1123 compliant, lowercase subdomain as required by Kubernetes node names. // The following regex is used by k8s for validation: /^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$/ . // Only a simple heuristic is used for now (to lowercase, replace underscores). diff --git a/coordinator/core/cluster_test.go b/coordinator/core/cluster_test.go index 8ec81c7e0..63b94fe23 100644 --- a/coordinator/core/cluster_test.go +++ b/coordinator/core/cluster_test.go @@ -349,6 +349,7 @@ type clusterStub struct { getKubeconfigErr error getJoinTokenResponse *kubeadm.BootstrapTokenDiscovery getJoinTokenErr error + startKubeletErr error initInputs []kubernetes.InitClusterInput joinClusterArgs []joinClusterArgs @@ -383,6 +384,10 @@ func (c *clusterStub) GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDi return c.getJoinTokenResponse, c.getJoinTokenErr } +func (c *clusterStub) StartKubelet() error { + return c.startKubeletErr +} + type prepareInstanceRequest struct { instance Instance vpnIP string diff --git a/coordinator/core/core.go b/coordinator/core/core.go index 97ff440be..db35ee8ba 100644 --- a/coordinator/core/core.go +++ b/coordinator/core/core.go @@ -206,6 +206,12 @@ func (c *Core) Initialize(ctx context.Context, dialer Dialer, api PubAPI) (nodeA if err := c.vpn.Setup(nodeState.VPNPrivKey); err != nil { return false, fmt.Errorf("failed to setup VPN: %w", err) } + + // restart kubernetes + if err := c.kube.StartKubelet(); err != nil { + return false, fmt.Errorf("failed to start kubelet service: %w", err) + } + var initialState state.State switch nodeState.Role { case role.Coordinator: diff --git a/coordinator/core/core_test.go b/coordinator/core/core_test.go index aaccf12df..7a7f4a6d7 100644 --- a/coordinator/core/core_test.go +++ b/coordinator/core/core_test.go @@ -214,7 +214,7 @@ func TestInitialize(t *testing.T) { VPNPrivKey: []byte{0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7}, }).ToFile(fileHandler)) } - core, err := NewCore(&stubVPN{}, nil, &ProviderMetadataFake{}, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, &fakeStoreFactory{}, fileHandler, user.NewLinuxUserManagerFake(fs)) + core, err := NewCore(&stubVPN{}, &clusterStub{}, &ProviderMetadataFake{}, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, &fakeStoreFactory{}, fileHandler, user.NewLinuxUserManagerFake(fs)) require.NoError(err) core.initialVPNPeersRetriever = fakeInitializeVPNPeersRetriever // prepare store to emulate initialized KMS diff --git a/coordinator/kubernetes/k8sapi/systemd.go b/coordinator/kubernetes/k8sapi/systemd.go index a169d62bd..867af36d2 100644 --- a/coordinator/kubernetes/k8sapi/systemd.go +++ b/coordinator/kubernetes/k8sapi/systemd.go @@ -7,8 +7,7 @@ import ( "github.com/coreos/go-systemd/v22/dbus" ) -func RestartSystemdUnit(unit string) error { - ctx := context.Background() +func restartSystemdUnit(ctx context.Context, unit string) error { conn, err := dbus.NewSystemdConnectionContext(ctx) if err != nil { return fmt.Errorf("establishing systemd connection failed: %w", err) @@ -16,7 +15,7 @@ func RestartSystemdUnit(unit string) error { restartChan := make(chan string) if _, err := conn.RestartUnitContext(ctx, unit, "replace", restartChan); err != nil { - return fmt.Errorf("restarting systemd unit \"%v\" failed: %w", unit, err) + return fmt.Errorf("restarting systemd unit %q failed: %w", unit, err) } // Wait for the restart to finish and actually check if it was @@ -28,6 +27,42 @@ func RestartSystemdUnit(unit string) error { return nil default: - return fmt.Errorf("restarting systemd unit \"%v\" failed: expected %v but received %v", unit, "done", result) + return fmt.Errorf("restarting systemd unit %q failed: expected %v but received %v", unit, "done", result) } } + +func startSystemdUnit(ctx context.Context, unit string) error { + conn, err := dbus.NewSystemdConnectionContext(ctx) + if err != nil { + return fmt.Errorf("establishing systemd connection failed: %w", err) + } + + startChan := make(chan string) + if _, err := conn.StartUnitContext(ctx, unit, "replace", startChan); err != nil { + return fmt.Errorf("starting systemd unit %q failed: %w", unit, err) + } + + // Wait for the enable to finish and actually check if it was + // successful or not. + result := <-startChan + + switch result { + case "done": + return nil + + default: + return fmt.Errorf("starting systemd unit %q failed: expected %v but received %v", unit, "done", result) + } +} + +func enableSystemdUnit(ctx context.Context, unitPath string) error { + conn, err := dbus.NewSystemdConnectionContext(ctx) + if err != nil { + return fmt.Errorf("establishing systemd connection failed: %w", err) + } + + if _, _, err := conn.EnableUnitFilesContext(ctx, []string{unitPath}, true, true); err != nil { + return fmt.Errorf("enabling systemd unit %q failed: %w", unitPath, err) + } + return nil +} diff --git a/coordinator/kubernetes/k8sapi/util.go b/coordinator/kubernetes/k8sapi/util.go index 2b9b24a33..5b33d0904 100644 --- a/coordinator/kubernetes/k8sapi/util.go +++ b/coordinator/kubernetes/k8sapi/util.go @@ -1,6 +1,7 @@ package k8sapi import ( + "context" "errors" "fmt" "os" @@ -12,8 +13,12 @@ import ( kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3" ) -// kubeConfig is the path to the Kubernetes admin config (used for authentication). -const kubeConfig = "/etc/kubernetes/admin.conf" +const ( + // kubeConfig is the path to the Kubernetes admin config (used for authentication). + kubeConfig = "/etc/kubernetes/admin.conf" + // kubeletStartTimeout is the maximum time given to the kubelet service to (re)start. + kubeletStartTimeout = 10 * time.Minute +) // Client provides the functionality of `kubectl apply`. type Client interface { @@ -23,18 +28,44 @@ type Client interface { } type ClusterUtil interface { + InstallComponents(ctx context.Context, version string) error InitCluster(initConfig []byte) error JoinCluster(joinConfig []byte) error SetupPodNetwork(kubectl Client, podNetworkConfiguration resources.Marshaler) error SetupAutoscaling(kubectl Client, clusterAutoscalerConfiguration resources.Marshaler, secrets resources.Marshaler) error SetupCloudControllerManager(kubectl Client, cloudControllerManagerConfiguration resources.Marshaler, configMaps resources.Marshaler, secrets resources.Marshaler) error SetupCloudNodeManager(kubectl Client, cloudNodeManagerConfiguration resources.Marshaler) error + StartKubelet() error RestartKubelet() error GetControlPlaneJoinCertificateKey() (string, error) CreateJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscovery, error) } -type KubernetesUtil struct{} +// KubernetesUtil provides low level management of the kubernetes cluster. +type KubernetesUtil struct { + inst installer +} + +// NewKubernetesUtils creates a new KubernetesUtil. +func NewKubernetesUtil() *KubernetesUtil { + return &KubernetesUtil{ + inst: newOSInstaller(), + } +} + +// InstallComponents installs kubernetes components in the version specified. +func (k *KubernetesUtil) InstallComponents(ctx context.Context, version string) error { + var versionConf kubernetesVersion + var ok bool + if versionConf, ok = versionConfigs[version]; !ok { + return fmt.Errorf("unsupported kubernetes version %q", version) + } + if err := versionConf.installK8sComponents(ctx, k.inst); err != nil { + return err + } + + return enableSystemdUnit(ctx, kubeletServiceEtcPath) +} func (k *KubernetesUtil) InitCluster(initConfig []byte) error { // TODO: audit policy should be user input @@ -56,7 +87,7 @@ func (k *KubernetesUtil) InitCluster(initConfig []byte) error { return fmt.Errorf("writing kubeadm init yaml config %v failed: %w", initConfigFile.Name(), err) } - cmd := exec.Command("kubeadm", "init", "--config", initConfigFile.Name()) + cmd := exec.Command(kubeadmPath, "init", "--config", initConfigFile.Name()) _, err = cmd.Output() if err != nil { var exitErr *exec.ExitError @@ -75,11 +106,11 @@ func (k *KubernetesUtil) SetupPodNetwork(kubectl Client, podNetworkConfiguration } // allow coredns to run on uninitialized nodes (required by cloud-controller-manager) - err := exec.Command("kubectl", "--kubeconfig", kubeConfig, "-n", "kube-system", "patch", "deployment", "coredns", "--type", "json", "-p", "[{\"op\":\"add\",\"path\":\"/spec/template/spec/tolerations/-\",\"value\":{\"key\":\"node.cloudprovider.kubernetes.io/uninitialized\",\"value\":\"true\",\"effect\":\"NoSchedule\"}}]").Run() + err := exec.Command(kubectlPath, "--kubeconfig", kubeConfig, "-n", "kube-system", "patch", "deployment", "coredns", "--type", "json", "-p", "[{\"op\":\"add\",\"path\":\"/spec/template/spec/tolerations/-\",\"value\":{\"key\":\"node.cloudprovider.kubernetes.io/uninitialized\",\"value\":\"true\",\"effect\":\"NoSchedule\"}}]").Run() if err != nil { return err } - return exec.Command("kubectl", "--kubeconfig", kubeConfig, "-n", "kube-system", "patch", "deployment", "coredns", "--type", "json", "-p", "[{\"op\":\"add\",\"path\":\"/spec/template/spec/tolerations/-\",\"value\":{\"key\":\"node.kubernetes.io/network-unavailable\",\"value\":\"\",\"effect\":\"NoSchedule\"}}]").Run() + return exec.Command(kubectlPath, "--kubeconfig", kubeConfig, "-n", "kube-system", "patch", "deployment", "coredns", "--type", "json", "-p", "[{\"op\":\"add\",\"path\":\"/spec/template/spec/tolerations/-\",\"value\":{\"key\":\"node.kubernetes.io/network-unavailable\",\"value\":\"\",\"effect\":\"NoSchedule\"}}]").Run() } // SetupAutoscaling deploys the k8s cluster autoscaler. @@ -131,7 +162,7 @@ func (k *KubernetesUtil) JoinCluster(joinConfig []byte) error { } // run `kubeadm join` to join a worker node to an existing Kubernetes cluster - cmd := exec.Command("kubeadm", "join", "--config", joinConfigFile.Name()) + cmd := exec.Command(kubeadmPath, "join", "--config", joinConfigFile.Name()) if _, err := cmd.Output(); err != nil { var exitErr *exec.ExitError if errors.As(err, &exitErr) { @@ -142,9 +173,21 @@ func (k *KubernetesUtil) JoinCluster(joinConfig []byte) error { return nil } +// StartKubelet enables and starts the kubelet systemd unit. +func (k *KubernetesUtil) StartKubelet() error { + ctx, cancel := context.WithTimeout(context.TODO(), kubeletStartTimeout) + defer cancel() + if err := enableSystemdUnit(ctx, kubeletServiceEtcPath); err != nil { + return fmt.Errorf("enabling kubelet systemd unit failed: %w", err) + } + return startSystemdUnit(ctx, "kubelet.service") +} + // RestartKubelet restarts a kubelet. func (k *KubernetesUtil) RestartKubelet() error { - return RestartSystemdUnit("kubelet.service") + ctx, cancel := context.WithTimeout(context.TODO(), kubeletStartTimeout) + defer cancel() + return restartSystemdUnit(ctx, "kubelet.service") } // GetControlPlaneJoinCertificateKey return the key which can be used in combination with the joinArgs @@ -152,7 +195,7 @@ func (k *KubernetesUtil) RestartKubelet() error { func (k *KubernetesUtil) GetControlPlaneJoinCertificateKey() (string, error) { // Key will be valid for 1h (no option to reduce the duration). // https://kubernetes.io/docs/reference/setup-tools/kubeadm/kubeadm-init-phase/#cmd-phase-upload-certs - output, err := exec.Command("kubeadm", "init", "phase", "upload-certs", "--upload-certs").Output() + output, err := exec.Command(kubeadmPath, "init", "phase", "upload-certs", "--upload-certs").Output() if err != nil { var exitErr *exec.ExitError if errors.As(err, &exitErr) { @@ -175,7 +218,7 @@ func (k *KubernetesUtil) GetControlPlaneJoinCertificateKey() (string, error) { // CreateJoinToken creates a new bootstrap (join) token. func (k *KubernetesUtil) CreateJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscovery, error) { - output, err := exec.Command("kubeadm", "token", "create", "--ttl", ttl.String(), "--print-join-command").Output() + output, err := exec.Command(kubeadmPath, "token", "create", "--ttl", ttl.String(), "--print-join-command").Output() if err != nil { return nil, fmt.Errorf("kubeadm token create failed: %w", err) } diff --git a/coordinator/kubernetes/kubernetes.go b/coordinator/kubernetes/kubernetes.go index 392d64775..450c1ed0c 100644 --- a/coordinator/kubernetes/kubernetes.go +++ b/coordinator/kubernetes/kubernetes.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "fmt" "strings" "time" @@ -49,6 +50,11 @@ func New(clusterUtil k8sapi.ClusterUtil, configProvider configurationProvider, c // InitCluster initializes a new Kubernetes cluster and applies pod network provider. func (k *KubeWrapper) InitCluster(in InitClusterInput) error { + // TODO: k8s version should be user input + if err := k.clusterUtil.InstallComponents(context.TODO(), "1.23.6"); err != nil { + return err + } + initConfig := k.configProvider.InitConfiguration(in.SupportsCloudControllerManager) initConfig.SetApiServerAdvertiseAddress(in.APIServerAdvertiseIP) initConfig.SetNodeIP(in.NodeIP) @@ -105,6 +111,11 @@ func (k *KubeWrapper) InitCluster(in InitClusterInput) error { // JoinCluster joins existing Kubernetes cluster. func (k *KubeWrapper) JoinCluster(args *kubeadm.BootstrapTokenDiscovery, nodeName, nodeInternalIP, nodeVPNIP, providerID, certKey string, ccmSupported bool, peerRole role.Role) error { + // TODO: k8s version should be user input + if err := k.clusterUtil.InstallComponents(context.TODO(), "1.23.6"); err != nil { + return err + } + joinConfig := k.configProvider.JoinConfiguration(ccmSupported) joinConfig.SetApiServerEndpoint(args.APIServerEndpoint) joinConfig.SetToken(args.Token) @@ -149,6 +160,11 @@ func (k *KubeWrapper) GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDi return k.clusterUtil.CreateJoinToken(ttl) } +// StartKubelet starts the kubelet service. +func (k *KubeWrapper) StartKubelet() error { + return k.clusterUtil.StartKubelet() +} + type fakeK8SClient struct { kubeconfig []byte } diff --git a/coordinator/kubernetes/kubernetes_test.go b/coordinator/kubernetes/kubernetes_test.go index 50d3b95cd..b30579751 100644 --- a/coordinator/kubernetes/kubernetes_test.go +++ b/coordinator/kubernetes/kubernetes_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "errors" "testing" "time" @@ -20,12 +21,14 @@ func TestMain(m *testing.M) { } type stubClusterUtil struct { + installComponentsErr error initClusterErr error setupPodNetworkErr error setupAutoscalingError error setupCloudControllerManagerError error setupCloudNodeManagerError error joinClusterErr error + startKubeletErr error restartKubeletErr error createJoinTokenResponse *kubeadm.BootstrapTokenDiscovery createJoinTokenErr error @@ -34,6 +37,10 @@ type stubClusterUtil struct { joinConfigs [][]byte } +func (s *stubClusterUtil) InstallComponents(ctx context.Context, version string) error { + return s.installComponentsErr +} + func (s *stubClusterUtil) InitCluster(initConfig []byte) error { s.initConfigs = append(s.initConfigs, initConfig) return s.initClusterErr @@ -60,6 +67,10 @@ func (s *stubClusterUtil) JoinCluster(joinConfig []byte) error { return s.joinClusterErr } +func (s *stubClusterUtil) StartKubelet() error { + return s.startKubeletErr +} + func (s *stubClusterUtil) RestartKubelet() error { return s.restartKubeletErr }