Install kubernetes on init / join and restart kubelet after reboot

Signed-off-by: Malte Poll <mp@edgeless.systems>
This commit is contained in:
Malte Poll 2022-05-19 17:18:22 +02:00 committed by Malte Poll
parent f67cf2d31f
commit 1331ee4077
9 changed files with 141 additions and 18 deletions

View File

@ -87,7 +87,7 @@ func main() {
issuer = gcp.NewIssuer()
validator = gcp.NewValidator(pcrs)
kube = kubernetes.New(&k8sapi.KubernetesUtil{}, &k8sapi.CoreOSConfiguration{}, kubectl.New())
kube = kubernetes.New(k8sapi.NewKubernetesUtil(), &k8sapi.CoreOSConfiguration{}, kubectl.New())
gcpClient, err := gcpcloud.NewClient(context.Background())
if err != nil {
log.Fatalf("creating GCP client failed: %v\n", err)
@ -112,7 +112,7 @@ func main() {
issuer = azure.NewIssuer()
validator = azure.NewValidator(pcrs)
kube = kubernetes.New(&k8sapi.KubernetesUtil{}, &k8sapi.CoreOSConfiguration{}, kubectl.New())
kube = kubernetes.New(k8sapi.NewKubernetesUtil(), &k8sapi.CoreOSConfiguration{}, kubectl.New())
metadata, err = azurecloud.NewMetadata(context.Background())
if err != nil {
log.Fatal(err)
@ -136,7 +136,7 @@ func main() {
issuer = qemu.NewIssuer()
validator = qemu.NewValidator(pcrs)
kube = kubernetes.New(&k8sapi.KubernetesUtil{}, &k8sapi.CoreOSConfiguration{}, kubectl.New())
kube = kubernetes.New(k8sapi.NewKubernetesUtil(), &k8sapi.CoreOSConfiguration{}, kubectl.New())
// no support for cloud services in qemu
metadata = &qemucloud.Metadata{}

View File

@ -185,6 +185,8 @@ type Cluster interface {
GetKubeadmCertificateKey() (string, error)
// GetJoinToken returns a bootstrap (join) token.
GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscovery, error)
// StartKubelet starts the kubelet service.
StartKubelet() error
}
// ClusterFake behaves like a real cluster, but does not actually initialize or join Kubernetes.
@ -219,6 +221,11 @@ func (c *ClusterFake) GetJoinToken(_ time.Duration) (*kubeadm.BootstrapTokenDisc
}, nil
}
// StartKubelet starts the kubelet service.
func (c *ClusterFake) StartKubelet() error {
return nil
}
// k8sCompliantHostname transforms a hostname to an RFC 1123 compliant, lowercase subdomain as required by Kubernetes node names.
// The following regex is used by k8s for validation: /^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$/ .
// Only a simple heuristic is used for now (to lowercase, replace underscores).

View File

@ -349,6 +349,7 @@ type clusterStub struct {
getKubeconfigErr error
getJoinTokenResponse *kubeadm.BootstrapTokenDiscovery
getJoinTokenErr error
startKubeletErr error
initInputs []kubernetes.InitClusterInput
joinClusterArgs []joinClusterArgs
@ -383,6 +384,10 @@ func (c *clusterStub) GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDi
return c.getJoinTokenResponse, c.getJoinTokenErr
}
func (c *clusterStub) StartKubelet() error {
return c.startKubeletErr
}
type prepareInstanceRequest struct {
instance Instance
vpnIP string

View File

@ -206,6 +206,12 @@ func (c *Core) Initialize(ctx context.Context, dialer Dialer, api PubAPI) (nodeA
if err := c.vpn.Setup(nodeState.VPNPrivKey); err != nil {
return false, fmt.Errorf("failed to setup VPN: %w", err)
}
// restart kubernetes
if err := c.kube.StartKubelet(); err != nil {
return false, fmt.Errorf("failed to start kubelet service: %w", err)
}
var initialState state.State
switch nodeState.Role {
case role.Coordinator:

View File

@ -214,7 +214,7 @@ func TestInitialize(t *testing.T) {
VPNPrivKey: []byte{0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7},
}).ToFile(fileHandler))
}
core, err := NewCore(&stubVPN{}, nil, &ProviderMetadataFake{}, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, &fakeStoreFactory{}, fileHandler, user.NewLinuxUserManagerFake(fs))
core, err := NewCore(&stubVPN{}, &clusterStub{}, &ProviderMetadataFake{}, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, &fakeStoreFactory{}, fileHandler, user.NewLinuxUserManagerFake(fs))
require.NoError(err)
core.initialVPNPeersRetriever = fakeInitializeVPNPeersRetriever
// prepare store to emulate initialized KMS

View File

@ -7,8 +7,7 @@ import (
"github.com/coreos/go-systemd/v22/dbus"
)
func RestartSystemdUnit(unit string) error {
ctx := context.Background()
func restartSystemdUnit(ctx context.Context, unit string) error {
conn, err := dbus.NewSystemdConnectionContext(ctx)
if err != nil {
return fmt.Errorf("establishing systemd connection failed: %w", err)
@ -16,7 +15,7 @@ func RestartSystemdUnit(unit string) error {
restartChan := make(chan string)
if _, err := conn.RestartUnitContext(ctx, unit, "replace", restartChan); err != nil {
return fmt.Errorf("restarting systemd unit \"%v\" failed: %w", unit, err)
return fmt.Errorf("restarting systemd unit %q failed: %w", unit, err)
}
// Wait for the restart to finish and actually check if it was
@ -28,6 +27,42 @@ func RestartSystemdUnit(unit string) error {
return nil
default:
return fmt.Errorf("restarting systemd unit \"%v\" failed: expected %v but received %v", unit, "done", result)
return fmt.Errorf("restarting systemd unit %q failed: expected %v but received %v", unit, "done", result)
}
}
func startSystemdUnit(ctx context.Context, unit string) error {
conn, err := dbus.NewSystemdConnectionContext(ctx)
if err != nil {
return fmt.Errorf("establishing systemd connection failed: %w", err)
}
startChan := make(chan string)
if _, err := conn.StartUnitContext(ctx, unit, "replace", startChan); err != nil {
return fmt.Errorf("starting systemd unit %q failed: %w", unit, err)
}
// Wait for the enable to finish and actually check if it was
// successful or not.
result := <-startChan
switch result {
case "done":
return nil
default:
return fmt.Errorf("starting systemd unit %q failed: expected %v but received %v", unit, "done", result)
}
}
func enableSystemdUnit(ctx context.Context, unitPath string) error {
conn, err := dbus.NewSystemdConnectionContext(ctx)
if err != nil {
return fmt.Errorf("establishing systemd connection failed: %w", err)
}
if _, _, err := conn.EnableUnitFilesContext(ctx, []string{unitPath}, true, true); err != nil {
return fmt.Errorf("enabling systemd unit %q failed: %w", unitPath, err)
}
return nil
}

View File

@ -1,6 +1,7 @@
package k8sapi
import (
"context"
"errors"
"fmt"
"os"
@ -12,8 +13,12 @@ import (
kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
)
const (
// kubeConfig is the path to the Kubernetes admin config (used for authentication).
const kubeConfig = "/etc/kubernetes/admin.conf"
kubeConfig = "/etc/kubernetes/admin.conf"
// kubeletStartTimeout is the maximum time given to the kubelet service to (re)start.
kubeletStartTimeout = 10 * time.Minute
)
// Client provides the functionality of `kubectl apply`.
type Client interface {
@ -23,18 +28,44 @@ type Client interface {
}
type ClusterUtil interface {
InstallComponents(ctx context.Context, version string) error
InitCluster(initConfig []byte) error
JoinCluster(joinConfig []byte) error
SetupPodNetwork(kubectl Client, podNetworkConfiguration resources.Marshaler) error
SetupAutoscaling(kubectl Client, clusterAutoscalerConfiguration resources.Marshaler, secrets resources.Marshaler) error
SetupCloudControllerManager(kubectl Client, cloudControllerManagerConfiguration resources.Marshaler, configMaps resources.Marshaler, secrets resources.Marshaler) error
SetupCloudNodeManager(kubectl Client, cloudNodeManagerConfiguration resources.Marshaler) error
StartKubelet() error
RestartKubelet() error
GetControlPlaneJoinCertificateKey() (string, error)
CreateJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscovery, error)
}
type KubernetesUtil struct{}
// KubernetesUtil provides low level management of the kubernetes cluster.
type KubernetesUtil struct {
inst installer
}
// NewKubernetesUtils creates a new KubernetesUtil.
func NewKubernetesUtil() *KubernetesUtil {
return &KubernetesUtil{
inst: newOSInstaller(),
}
}
// InstallComponents installs kubernetes components in the version specified.
func (k *KubernetesUtil) InstallComponents(ctx context.Context, version string) error {
var versionConf kubernetesVersion
var ok bool
if versionConf, ok = versionConfigs[version]; !ok {
return fmt.Errorf("unsupported kubernetes version %q", version)
}
if err := versionConf.installK8sComponents(ctx, k.inst); err != nil {
return err
}
return enableSystemdUnit(ctx, kubeletServiceEtcPath)
}
func (k *KubernetesUtil) InitCluster(initConfig []byte) error {
// TODO: audit policy should be user input
@ -56,7 +87,7 @@ func (k *KubernetesUtil) InitCluster(initConfig []byte) error {
return fmt.Errorf("writing kubeadm init yaml config %v failed: %w", initConfigFile.Name(), err)
}
cmd := exec.Command("kubeadm", "init", "--config", initConfigFile.Name())
cmd := exec.Command(kubeadmPath, "init", "--config", initConfigFile.Name())
_, err = cmd.Output()
if err != nil {
var exitErr *exec.ExitError
@ -75,11 +106,11 @@ func (k *KubernetesUtil) SetupPodNetwork(kubectl Client, podNetworkConfiguration
}
// allow coredns to run on uninitialized nodes (required by cloud-controller-manager)
err := exec.Command("kubectl", "--kubeconfig", kubeConfig, "-n", "kube-system", "patch", "deployment", "coredns", "--type", "json", "-p", "[{\"op\":\"add\",\"path\":\"/spec/template/spec/tolerations/-\",\"value\":{\"key\":\"node.cloudprovider.kubernetes.io/uninitialized\",\"value\":\"true\",\"effect\":\"NoSchedule\"}}]").Run()
err := exec.Command(kubectlPath, "--kubeconfig", kubeConfig, "-n", "kube-system", "patch", "deployment", "coredns", "--type", "json", "-p", "[{\"op\":\"add\",\"path\":\"/spec/template/spec/tolerations/-\",\"value\":{\"key\":\"node.cloudprovider.kubernetes.io/uninitialized\",\"value\":\"true\",\"effect\":\"NoSchedule\"}}]").Run()
if err != nil {
return err
}
return exec.Command("kubectl", "--kubeconfig", kubeConfig, "-n", "kube-system", "patch", "deployment", "coredns", "--type", "json", "-p", "[{\"op\":\"add\",\"path\":\"/spec/template/spec/tolerations/-\",\"value\":{\"key\":\"node.kubernetes.io/network-unavailable\",\"value\":\"\",\"effect\":\"NoSchedule\"}}]").Run()
return exec.Command(kubectlPath, "--kubeconfig", kubeConfig, "-n", "kube-system", "patch", "deployment", "coredns", "--type", "json", "-p", "[{\"op\":\"add\",\"path\":\"/spec/template/spec/tolerations/-\",\"value\":{\"key\":\"node.kubernetes.io/network-unavailable\",\"value\":\"\",\"effect\":\"NoSchedule\"}}]").Run()
}
// SetupAutoscaling deploys the k8s cluster autoscaler.
@ -131,7 +162,7 @@ func (k *KubernetesUtil) JoinCluster(joinConfig []byte) error {
}
// run `kubeadm join` to join a worker node to an existing Kubernetes cluster
cmd := exec.Command("kubeadm", "join", "--config", joinConfigFile.Name())
cmd := exec.Command(kubeadmPath, "join", "--config", joinConfigFile.Name())
if _, err := cmd.Output(); err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
@ -142,9 +173,21 @@ func (k *KubernetesUtil) JoinCluster(joinConfig []byte) error {
return nil
}
// StartKubelet enables and starts the kubelet systemd unit.
func (k *KubernetesUtil) StartKubelet() error {
ctx, cancel := context.WithTimeout(context.TODO(), kubeletStartTimeout)
defer cancel()
if err := enableSystemdUnit(ctx, kubeletServiceEtcPath); err != nil {
return fmt.Errorf("enabling kubelet systemd unit failed: %w", err)
}
return startSystemdUnit(ctx, "kubelet.service")
}
// RestartKubelet restarts a kubelet.
func (k *KubernetesUtil) RestartKubelet() error {
return RestartSystemdUnit("kubelet.service")
ctx, cancel := context.WithTimeout(context.TODO(), kubeletStartTimeout)
defer cancel()
return restartSystemdUnit(ctx, "kubelet.service")
}
// GetControlPlaneJoinCertificateKey return the key which can be used in combination with the joinArgs
@ -152,7 +195,7 @@ func (k *KubernetesUtil) RestartKubelet() error {
func (k *KubernetesUtil) GetControlPlaneJoinCertificateKey() (string, error) {
// Key will be valid for 1h (no option to reduce the duration).
// https://kubernetes.io/docs/reference/setup-tools/kubeadm/kubeadm-init-phase/#cmd-phase-upload-certs
output, err := exec.Command("kubeadm", "init", "phase", "upload-certs", "--upload-certs").Output()
output, err := exec.Command(kubeadmPath, "init", "phase", "upload-certs", "--upload-certs").Output()
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
@ -175,7 +218,7 @@ func (k *KubernetesUtil) GetControlPlaneJoinCertificateKey() (string, error) {
// CreateJoinToken creates a new bootstrap (join) token.
func (k *KubernetesUtil) CreateJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscovery, error) {
output, err := exec.Command("kubeadm", "token", "create", "--ttl", ttl.String(), "--print-join-command").Output()
output, err := exec.Command(kubeadmPath, "token", "create", "--ttl", ttl.String(), "--print-join-command").Output()
if err != nil {
return nil, fmt.Errorf("kubeadm token create failed: %w", err)
}

View File

@ -1,6 +1,7 @@
package kubernetes
import (
"context"
"fmt"
"strings"
"time"
@ -49,6 +50,11 @@ func New(clusterUtil k8sapi.ClusterUtil, configProvider configurationProvider, c
// InitCluster initializes a new Kubernetes cluster and applies pod network provider.
func (k *KubeWrapper) InitCluster(in InitClusterInput) error {
// TODO: k8s version should be user input
if err := k.clusterUtil.InstallComponents(context.TODO(), "1.23.6"); err != nil {
return err
}
initConfig := k.configProvider.InitConfiguration(in.SupportsCloudControllerManager)
initConfig.SetApiServerAdvertiseAddress(in.APIServerAdvertiseIP)
initConfig.SetNodeIP(in.NodeIP)
@ -105,6 +111,11 @@ func (k *KubeWrapper) InitCluster(in InitClusterInput) error {
// JoinCluster joins existing Kubernetes cluster.
func (k *KubeWrapper) JoinCluster(args *kubeadm.BootstrapTokenDiscovery, nodeName, nodeInternalIP, nodeVPNIP, providerID, certKey string, ccmSupported bool, peerRole role.Role) error {
// TODO: k8s version should be user input
if err := k.clusterUtil.InstallComponents(context.TODO(), "1.23.6"); err != nil {
return err
}
joinConfig := k.configProvider.JoinConfiguration(ccmSupported)
joinConfig.SetApiServerEndpoint(args.APIServerEndpoint)
joinConfig.SetToken(args.Token)
@ -149,6 +160,11 @@ func (k *KubeWrapper) GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDi
return k.clusterUtil.CreateJoinToken(ttl)
}
// StartKubelet starts the kubelet service.
func (k *KubeWrapper) StartKubelet() error {
return k.clusterUtil.StartKubelet()
}
type fakeK8SClient struct {
kubeconfig []byte
}

View File

@ -1,6 +1,7 @@
package kubernetes
import (
"context"
"errors"
"testing"
"time"
@ -20,12 +21,14 @@ func TestMain(m *testing.M) {
}
type stubClusterUtil struct {
installComponentsErr error
initClusterErr error
setupPodNetworkErr error
setupAutoscalingError error
setupCloudControllerManagerError error
setupCloudNodeManagerError error
joinClusterErr error
startKubeletErr error
restartKubeletErr error
createJoinTokenResponse *kubeadm.BootstrapTokenDiscovery
createJoinTokenErr error
@ -34,6 +37,10 @@ type stubClusterUtil struct {
joinConfigs [][]byte
}
func (s *stubClusterUtil) InstallComponents(ctx context.Context, version string) error {
return s.installComponentsErr
}
func (s *stubClusterUtil) InitCluster(initConfig []byte) error {
s.initConfigs = append(s.initConfigs, initConfig)
return s.initClusterErr
@ -60,6 +67,10 @@ func (s *stubClusterUtil) JoinCluster(joinConfig []byte) error {
return s.joinClusterErr
}
func (s *stubClusterUtil) StartKubelet() error {
return s.startKubeletErr
}
func (s *stubClusterUtil) RestartKubelet() error {
return s.restartKubeletErr
}