constellation/bootstrapper/internal/joinclient/joinclient.go

434 lines
12 KiB
Go
Raw Normal View History

/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
/*
# JoinClient
The JoinClient is one of the two main components of the bootstrapper.
It is responsible for for the initial setup of a node, and joining an existing Kubernetes cluster.
The JoinClient is started on each node, it then continuously checks for an existing cluster to join,
or for the InitServer to bootstrap a new cluster.
If the JoinClient finds an existing cluster, it will attempt to join it as either a control-plane or a worker node.
*/
2022-06-21 11:59:12 -04:00
package joinclient
import (
"context"
"errors"
"fmt"
2024-02-08 09:20:01 -05:00
"log/slog"
2022-06-21 11:59:12 -04:00
"net"
"path/filepath"
2022-06-21 11:59:12 -04:00
"strconv"
"time"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/certificate"
2022-09-21 07:47:57 -04:00
"github.com/edgelesssys/constellation/v2/internal/attestation"
"github.com/edgelesssys/constellation/v2/internal/cloud/metadata"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/nodestate"
"github.com/edgelesssys/constellation/v2/internal/role"
"github.com/edgelesssys/constellation/v2/internal/versions/components"
2022-09-21 07:47:57 -04:00
"github.com/edgelesssys/constellation/v2/joinservice/joinproto"
2022-06-21 11:59:12 -04:00
"github.com/spf13/afero"
"google.golang.org/grpc"
2022-06-28 12:33:27 -04:00
kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
kubeconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
2022-06-21 11:59:12 -04:00
"k8s.io/utils/clock"
)
const (
2022-07-05 08:14:11 -04:00
interval = 30 * time.Second
timeout = 30 * time.Second
joinTimeout = 5 * time.Minute
2022-06-21 11:59:12 -04:00
)
2022-07-05 04:36:31 -04:00
// JoinClient is a client for requesting the needed information and
// joining an existing Kubernetes cluster.
2022-06-21 11:59:12 -04:00
type JoinClient struct {
nodeLock locker
2022-06-21 11:59:12 -04:00
diskUUID string
nodeName string
role role.Role
validIPs []net.IP
2022-06-28 12:33:27 -04:00
disk encryptedDisk
2022-06-21 11:59:12 -04:00
fileHandler file.Handler
2022-07-05 08:14:11 -04:00
timeout time.Duration
joinTimeout time.Duration
interval time.Duration
clock clock.WithTicker
2022-06-21 11:59:12 -04:00
dialer grpcDialer
joiner ClusterJoiner
2022-06-28 12:33:27 -04:00
metadataAPI MetadataAPI
2022-06-21 11:59:12 -04:00
2024-02-08 09:20:01 -05:00
log *slog.Logger
2022-06-21 11:59:12 -04:00
stopC chan struct{}
stopDone chan struct{}
}
2022-07-05 04:36:31 -04:00
// New creates a new JoinClient.
func New(lock locker, dial grpcDialer, joiner ClusterJoiner, meta MetadataAPI, disk encryptedDisk, log *slog.Logger) *JoinClient {
2022-06-21 11:59:12 -04:00
return &JoinClient{
2022-07-05 08:14:11 -04:00
nodeLock: lock,
disk: disk,
2022-06-21 11:59:12 -04:00
fileHandler: file.NewHandler(afero.NewOsFs()),
timeout: timeout,
2022-07-05 08:14:11 -04:00
joinTimeout: joinTimeout,
2022-06-21 11:59:12 -04:00
interval: interval,
clock: clock.RealClock{},
dialer: dial,
joiner: joiner,
metadataAPI: meta,
2024-02-08 09:20:01 -05:00
log: log.WithGroup("join-client"),
stopC: make(chan struct{}, 1),
stopDone: make(chan struct{}, 1),
2022-06-21 11:59:12 -04:00
}
}
2022-07-05 04:36:31 -04:00
// Start starts the client routine. The client will make the needed API calls to join
// the cluster with the role it receives from the metadata API.
// After receiving the needed information, the node will join the cluster.
func (c *JoinClient) Start(cleaner cleaner) error {
c.log.Info("Starting")
ticker := c.clock.NewTicker(c.interval)
defer ticker.Stop()
defer func() { c.stopDone <- struct{}{} }()
defer c.log.Info("Client stopped")
2022-06-21 11:59:12 -04:00
diskUUID, err := c.getDiskUUID()
if err != nil {
c.log.With(slog.Any("error", err)).Error("Failed to get disk UUID")
return err
2022-06-21 11:59:12 -04:00
}
c.diskUUID = diskUUID
2022-06-21 11:59:12 -04:00
for {
err := c.getNodeMetadata()
if err == nil {
c.log.With(slog.String("role", c.role.String()), slog.String("name", c.nodeName)).Info("Received own instance metadata")
break
}
c.log.With(slog.Any("error", err)).Error("Failed to retrieve instance metadata")
2022-06-21 11:59:12 -04:00
c.log.With(slog.Duration("interval", c.interval)).Info("Sleeping")
select {
case <-c.stopC:
return nil
case <-ticker.C():
2022-06-21 11:59:12 -04:00
}
}
var ticket *joinproto.IssueJoinTicketResponse
var kubeletKey []byte
for {
ticket, kubeletKey, err = c.tryJoinWithAvailableServices()
if err == nil {
c.log.Info("Successfully retrieved join ticket, starting Kubernetes node")
break
2022-06-21 11:59:12 -04:00
}
c.log.With(slog.Any("error", err)).Warn("Join failed for all available endpoints")
2022-06-21 11:59:12 -04:00
c.log.With(slog.Duration("interval", c.interval)).Info("Sleeping")
select {
case <-c.stopC:
return nil
case <-ticker.C():
2022-06-21 11:59:12 -04:00
}
}
if err := c.startNodeAndJoin(ticket, kubeletKey, cleaner); err != nil {
c.log.With(slog.Any("error", err)).Error("Failed to start node and join cluster")
return err
}
return nil
2022-06-21 11:59:12 -04:00
}
// Stop stops the client and blocks until the client's routine is stopped.
func (c *JoinClient) Stop() {
2024-02-08 09:20:01 -05:00
c.log.Info("Stopping")
2022-06-21 11:59:12 -04:00
c.stopC <- struct{}{}
<-c.stopDone
2024-02-08 09:20:01 -05:00
c.log.Info("Stopped")
2022-06-21 11:59:12 -04:00
}
func (c *JoinClient) tryJoinWithAvailableServices() (ticket *joinproto.IssueJoinTicketResponse, kubeletKey []byte, err error) {
ctx, cancel := c.timeoutCtx()
defer cancel()
var endpoints []string
2023-10-19 10:28:07 -04:00
endpoint, _, err := c.metadataAPI.GetLoadBalancerEndpoint(ctx)
2022-06-21 11:59:12 -04:00
if err != nil {
return nil, nil, fmt.Errorf("failed to get load balancer endpoint: %w", err)
2022-06-21 11:59:12 -04:00
}
2023-10-19 10:28:07 -04:00
endpoints = append(endpoints, endpoint)
2022-06-21 11:59:12 -04:00
ips, err := c.getControlPlaneIPs(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to get control plane IPs: %w", err)
}
endpoints = append(endpoints, ips...)
if len(endpoints) == 0 {
return nil, nil, errors.New("no control plane IPs found")
2022-06-21 11:59:12 -04:00
}
var joinErrs error
for _, endpoint := range endpoints {
ticket, kubeletKey, err := c.requestJoinTicket(net.JoinHostPort(endpoint, strconv.Itoa(constants.JoinServiceNodePort)))
2022-06-21 11:59:12 -04:00
if err == nil {
return ticket, kubeletKey, nil
2022-07-05 05:41:31 -04:00
}
joinErrs = errors.Join(joinErrs, err)
2022-06-21 11:59:12 -04:00
}
return nil, nil, fmt.Errorf("trying to join on all endpoints %v: %w", endpoints, joinErrs)
2022-06-21 11:59:12 -04:00
}
func (c *JoinClient) requestJoinTicket(serviceEndpoint string) (ticket *joinproto.IssueJoinTicketResponse, kubeletKey []byte, err error) {
2022-06-21 11:59:12 -04:00
ctx, cancel := c.timeoutCtx()
defer cancel()
certificateRequest, kubeletKey, err := certificate.GetKubeletCertificateRequest(c.nodeName, c.validIPs)
if err != nil {
return nil, nil, err
}
2022-06-21 11:59:12 -04:00
conn, err := c.dialer.Dial(ctx, serviceEndpoint)
if err != nil {
2024-02-08 09:20:01 -05:00
c.log.With(slog.String("endpoint", serviceEndpoint), slog.Any("error", err)).Error("Join service unreachable")
return nil, nil, fmt.Errorf("dialing join service endpoint: %w", err)
2022-06-21 11:59:12 -04:00
}
defer conn.Close()
2022-07-05 05:41:31 -04:00
protoClient := joinproto.NewAPIClient(conn)
req := &joinproto.IssueJoinTicketRequest{
DiskUuid: c.diskUUID,
CertificateRequest: certificateRequest,
IsControlPlane: c.role == role.ControlPlane,
2022-06-21 11:59:12 -04:00
}
ticket, err = protoClient.IssueJoinTicket(ctx, req)
2022-06-21 11:59:12 -04:00
if err != nil {
2024-02-08 09:20:01 -05:00
c.log.With(slog.String("endpoint", serviceEndpoint), slog.Any("error", err)).Error("Issuing join ticket failed")
return nil, nil, fmt.Errorf("issuing join ticket: %w", err)
2022-06-21 11:59:12 -04:00
}
return ticket, kubeletKey, err
2022-06-21 11:59:12 -04:00
}
func (c *JoinClient) startNodeAndJoin(ticket *joinproto.IssueJoinTicketResponse, kubeletKey []byte, cleaner cleaner) error {
2022-07-05 08:14:11 -04:00
ctx, cancel := context.WithTimeout(context.Background(), c.joinTimeout)
defer cancel()
clusterID, err := attestation.DeriveClusterID(ticket.MeasurementSecret, ticket.MeasurementSalt)
if err != nil {
return err
}
nodeLockAcquired, err := c.nodeLock.TryLockOnce(clusterID)
if err != nil {
2024-02-08 09:20:01 -05:00
c.log.With(slog.Any("error", err)).Error("Acquiring node lock failed")
return fmt.Errorf("acquiring node lock: %w", err)
}
if !nodeLockAcquired {
2022-06-28 12:33:27 -04:00
// There is already a cluster initialization in progress on
// this node, so there is no need to also join the cluster,
// as the initializing node is automatically part of the cluster.
c.log.Info("Node is already being initialized. Aborting join process.")
return nil
2022-06-21 11:59:12 -04:00
}
cleaner.Clean()
2022-07-05 05:41:31 -04:00
if err := c.updateDiskPassphrase(string(ticket.StateDiskKey)); err != nil {
2022-06-21 11:59:12 -04:00
return fmt.Errorf("updating disk passphrase: %w", err)
}
if c.role == role.ControlPlane {
if err := c.writeControlPlaneFiles(ticket.ControlPlaneFiles); err != nil {
return fmt.Errorf("writing control plane files: %w", err)
}
}
if err := c.fileHandler.Write(certificate.CertificateFilename, ticket.KubeletCert, file.OptMkdirAll); err != nil {
return fmt.Errorf("writing kubelet certificate: %w", err)
}
if err := c.fileHandler.Write(certificate.KeyFilename, kubeletKey, file.OptMkdirAll); err != nil {
return fmt.Errorf("writing kubelet key: %w", err)
}
2022-06-21 11:59:12 -04:00
state := nodestate.NodeState{
Role: c.role,
MeasurementSalt: ticket.MeasurementSalt,
2022-06-21 11:59:12 -04:00
}
if err := state.ToFile(c.fileHandler); err != nil {
return fmt.Errorf("persisting node state: %w", err)
}
btd := &kubeadm.BootstrapTokenDiscovery{
2022-07-05 05:41:31 -04:00
APIServerEndpoint: ticket.ApiServerEndpoint,
2022-07-05 08:14:11 -04:00
Token: ticket.Token,
2022-07-05 05:41:31 -04:00
CACertHashes: []string{ticket.DiscoveryTokenCaCertHash},
2022-06-21 11:59:12 -04:00
}
// We currently cannot recover from any failure in this function. Joining the k8s cluster
// sometimes fails transiently, and we don't want to brick the node because of that.
for i := range 3 {
err = c.joiner.JoinCluster(ctx, btd, c.role, ticket.KubernetesComponents, c.log)
if err == nil {
break
}
c.log.Error("failed to join k8s cluster", "role", c.role, "attempt", i, "error", err)
}
if err != nil {
2022-06-21 11:59:12 -04:00
return fmt.Errorf("joining Kubernetes cluster: %w", err)
}
return nil
}
func (c *JoinClient) getNodeMetadata() error {
ctx, cancel := c.timeoutCtx()
defer cancel()
2024-02-08 09:20:01 -05:00
c.log.Debug("Requesting node metadata from metadata API")
2022-06-21 11:59:12 -04:00
inst, err := c.metadataAPI.Self(ctx)
if err != nil {
return err
}
2024-02-08 09:20:01 -05:00
c.log.With(slog.Any("instance", inst)).Debug("Received node metadata")
2022-06-21 11:59:12 -04:00
if inst.Name == "" {
return errors.New("got instance metadata with empty name")
}
if inst.Role == role.Unknown {
return errors.New("got instance metadata with unknown role")
}
var ips []net.IP
if inst.VPCIP != "" {
ips = append(ips, net.ParseIP(inst.VPCIP))
}
2022-06-21 11:59:12 -04:00
c.nodeName = inst.Name
c.role = inst.Role
c.validIPs = ips
2022-06-21 11:59:12 -04:00
return nil
}
func (c *JoinClient) updateDiskPassphrase(passphrase string) error {
free, err := c.disk.Open()
if err != nil {
2022-06-21 11:59:12 -04:00
return fmt.Errorf("opening disk: %w", err)
}
defer free()
2022-06-21 11:59:12 -04:00
return c.disk.UpdatePassphrase(passphrase)
}
2022-06-28 12:33:27 -04:00
func (c *JoinClient) getDiskUUID() (string, error) {
free, err := c.disk.Open()
if err != nil {
2022-06-21 11:59:12 -04:00
return "", fmt.Errorf("opening disk: %w", err)
}
defer free()
2022-06-21 11:59:12 -04:00
return c.disk.UUID()
}
func (c *JoinClient) getControlPlaneIPs(ctx context.Context) ([]string, error) {
2022-06-21 11:59:12 -04:00
instances, err := c.metadataAPI.List(ctx)
if err != nil {
2024-02-08 09:20:01 -05:00
c.log.With(slog.Any("error", err)).Error("Failed to list instances from metadata API")
2022-06-21 11:59:12 -04:00
return nil, fmt.Errorf("listing instances from metadata API: %w", err)
}
ips := []string{}
for _, instance := range instances {
if instance.Role == role.ControlPlane && instance.VPCIP != "" {
ips = append(ips, instance.VPCIP)
2022-06-21 11:59:12 -04:00
}
}
2024-02-08 09:20:01 -05:00
c.log.With(slog.Any("IPs", ips)).Info("Received control plane endpoints")
2022-06-21 11:59:12 -04:00
return ips, nil
}
func (c *JoinClient) writeControlPlaneFiles(files []*joinproto.ControlPlaneCertOrKey) error {
for _, cert := range files {
if err := c.fileHandler.Write(
filepath.Join(kubeconstants.KubernetesDir, kubeconstants.DefaultCertificateDir, cert.Name),
cert.Data,
file.OptMkdirAll,
); err != nil {
return fmt.Errorf("writing control plane files: %w", err)
}
}
return nil
}
2022-06-21 11:59:12 -04:00
func (c *JoinClient) timeoutCtx() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), c.timeout)
}
type grpcDialer interface {
Dial(ctx context.Context, target string) (*grpc.ClientConn, error)
}
2022-06-28 12:33:27 -04:00
// ClusterJoiner has the ability to join a new node to an existing cluster.
2022-06-21 11:59:12 -04:00
type ClusterJoiner interface {
2022-06-28 12:33:27 -04:00
// JoinCluster joins a new node to an existing cluster.
2022-06-21 11:59:12 -04:00
JoinCluster(
ctx context.Context,
args *kubeadm.BootstrapTokenDiscovery,
peerRole role.Role,
k8sComponents components.Components,
2024-02-08 09:20:01 -05:00
log *slog.Logger,
2022-06-21 11:59:12 -04:00
) error
}
2022-06-28 12:33:27 -04:00
// MetadataAPI provides information about the instances.
type MetadataAPI interface {
2022-06-21 11:59:12 -04:00
// List retrieves all instances belonging to the current constellation.
List(ctx context.Context) ([]metadata.InstanceMetadata, error)
// Self retrieves the current instance.
Self(ctx context.Context) (metadata.InstanceMetadata, error)
// GetLoadBalancerEndpoint retrieves the load balancer endpoint.
GetLoadBalancerEndpoint(ctx context.Context) (host, port string, err error)
2022-06-21 11:59:12 -04:00
}
2022-06-28 12:33:27 -04:00
type encryptedDisk interface {
2022-06-21 11:59:12 -04:00
// Open prepares the underlying device for disk operations.
Open() (func(), error)
2022-06-21 11:59:12 -04:00
// UUID gets the device's UUID.
UUID() (string, error)
// UpdatePassphrase switches the initial random passphrase of the encrypted disk to a permanent passphrase.
UpdatePassphrase(passphrase string) error
}
type cleaner interface {
Clean()
}
type locker interface {
// TryLockOnce tries to lock the node. If the node is already locked, it
// returns false. If the node is unlocked, it locks it and returns true.
TryLockOnce(clusterID []byte) (bool, error)
}