constellation/coordinator/internal/joinclient/client.go

377 lines
9.3 KiB
Go
Raw Normal View History

2022-06-21 11:59:12 -04:00
package joinclient
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/edgelesssys/constellation/activation/activationproto"
"github.com/edgelesssys/constellation/coordinator/diskencryption"
"github.com/edgelesssys/constellation/coordinator/nodestate"
"github.com/edgelesssys/constellation/coordinator/role"
"github.com/edgelesssys/constellation/internal/cloud/metadata"
"github.com/edgelesssys/constellation/internal/constants"
"github.com/edgelesssys/constellation/internal/file"
"github.com/spf13/afero"
"go.uber.org/zap"
"google.golang.org/grpc"
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/utils/clock"
)
const (
interval = 30 * time.Second
timeout = 30 * time.Second
)
// JoinClient is a client for self-activation of node.
type JoinClient struct {
nodeLock *sync.Mutex
diskUUID string
nodeName string
role role.Role
disk EncryptedDisk
fileHandler file.Handler
timeout time.Duration
interval time.Duration
clock clock.WithTicker
dialer grpcDialer
joiner ClusterJoiner
metadataAPI metadataAPI
log *zap.Logger
mux sync.Mutex
stopC chan struct{}
stopDone chan struct{}
}
// New creates a new SelfActivationClient.
func New(nodeLock *sync.Mutex, dial grpcDialer, joiner ClusterJoiner, meta metadataAPI, log *zap.Logger) *JoinClient {
return &JoinClient{
disk: diskencryption.New(),
fileHandler: file.NewHandler(afero.NewOsFs()),
timeout: timeout,
interval: interval,
clock: clock.RealClock{},
dialer: dial,
joiner: joiner,
metadataAPI: meta,
log: log.Named("selfactivation-client"),
}
}
// Start starts the client routine. The client will make the needed API calls to activate
// the node as the role it receives from the metadata API.
// Multiple calls of start on the same client won't start a second routine if there is
// already a routine running.
func (c *JoinClient) Start() {
c.mux.Lock()
defer c.mux.Unlock()
if c.stopC != nil { // daemon already running
return
}
c.log.Info("Starting")
c.stopC = make(chan struct{}, 1)
c.stopDone = make(chan struct{}, 1)
ticker := c.clock.NewTicker(c.interval)
go func() {
defer ticker.Stop()
defer func() { c.stopDone <- struct{}{} }()
diskUUID, err := c.GetDiskUUID()
if err != nil {
c.log.Error("Failed to get disk UUID", zap.Error(err))
c.log.Error("Stopping self-activation client")
return
}
c.diskUUID = diskUUID
for {
err := c.getNodeMetadata()
if err == nil {
c.log.Info("Received own instance metadata", zap.String("role", c.role.String()), zap.String("name", c.nodeName))
break
}
c.log.Info("Failed to retrieve instance metadata", zap.Error(err))
c.log.Info("Sleeping", zap.Duration("interval", c.interval))
select {
case <-c.stopC:
return
case <-ticker.C():
}
}
for {
err := c.tryJoinAtAvailableServices()
if err == nil {
c.log.Info("Activated successfully. SelfActivationClient shut down.")
return
}
c.log.Info("Activation failed for all available endpoints", zap.Error(err))
c.log.Info("Sleeping", zap.Duration("interval", c.interval))
select {
case <-c.stopC:
return
case <-ticker.C():
}
}
}()
}
// Stop stops the client and blocks until the client's routine is stopped.
func (c *JoinClient) Stop() {
c.mux.Lock()
defer c.mux.Unlock()
if c.stopC == nil { // daemon not running
return
}
c.log.Info("Stopping")
c.stopC <- struct{}{}
<-c.stopDone
c.stopC = nil
c.stopDone = nil
c.log.Info("Stopped")
}
func (c *JoinClient) tryJoinAtAvailableServices() error {
ips, err := c.getCoordinatorIPs()
if err != nil {
return err
}
if len(ips) == 0 {
return errors.New("no coordinator IPs found")
}
for _, ip := range ips {
err = c.join(net.JoinHostPort(ip, strconv.Itoa(constants.ActivationServiceNodePort)))
if err == nil {
return nil
}
}
return err
}
func (c *JoinClient) join(serviceEndpoint string) error {
ctx, cancel := c.timeoutCtx()
defer cancel()
conn, err := c.dialer.Dial(ctx, serviceEndpoint)
if err != nil {
c.log.Info("join service unreachable", zap.String("endpoint", serviceEndpoint), zap.Error(err))
return fmt.Errorf("dialing join service endpoint: %v", err)
}
defer conn.Close()
protoClient := activationproto.NewAPIClient(conn)
switch c.role {
case role.Node:
return c.joinAsWorkerNode(ctx, protoClient)
case role.Coordinator:
return c.joinAsControlPlaneNode(ctx, protoClient)
default:
return fmt.Errorf("cannot activate as %s", role.Unknown)
}
}
func (c *JoinClient) joinAsWorkerNode(ctx context.Context, client activationproto.APIClient) error {
req := &activationproto.ActivateWorkerNodeRequest{
DiskUuid: c.diskUUID,
NodeName: c.nodeName,
}
resp, err := client.ActivateWorkerNode(ctx, req)
if err != nil {
c.log.Info("Failed to activate as worker node", zap.Error(err))
return fmt.Errorf("activating worker node: %w", err)
}
c.log.Info("Activation at AaaS succeeded")
return c.startNodeAndJoin(
ctx,
resp.StateDiskKey,
resp.OwnerId,
resp.ClusterId,
resp.KubeletKey,
resp.KubeletCert,
resp.ApiServerEndpoint,
resp.Token,
resp.DiscoveryTokenCaCertHash,
"",
)
}
func (c *JoinClient) joinAsControlPlaneNode(ctx context.Context, client activationproto.APIClient) error {
req := &activationproto.ActivateControlPlaneNodeRequest{
DiskUuid: c.diskUUID,
NodeName: c.nodeName,
}
resp, err := client.ActivateControlPlaneNode(ctx, req)
if err != nil {
c.log.Info("Failed to activate as control plane node", zap.Error(err))
return fmt.Errorf("activating control plane node: %w", err)
}
c.log.Info("Activation at AaaS succeeded")
return c.startNodeAndJoin(
ctx,
resp.StateDiskKey,
resp.OwnerId,
resp.ClusterId,
resp.KubeletKey,
resp.KubeletCert,
resp.ApiServerEndpoint,
resp.Token,
resp.DiscoveryTokenCaCertHash,
resp.CertificateKey,
)
}
func (c *JoinClient) startNodeAndJoin(ctx context.Context, diskKey, ownerID, clusterID, kubeletKey, kubeletCert []byte, endpoint, token,
discoveryCACertHash, certKey string,
) error {
if ok := c.nodeLock.TryLock(); !ok {
return errors.New("node is already being initialized")
}
if err := c.updateDiskPassphrase(string(diskKey)); err != nil {
return fmt.Errorf("updating disk passphrase: %w", err)
}
state := nodestate.NodeState{
Role: c.role,
OwnerID: ownerID,
ClusterID: clusterID,
}
if err := state.ToFile(c.fileHandler); err != nil {
return fmt.Errorf("persisting node state: %w", err)
}
btd := &kubeadm.BootstrapTokenDiscovery{
APIServerEndpoint: endpoint,
Token: token,
CACertHashes: []string{discoveryCACertHash},
}
if err := c.joiner.JoinCluster(ctx, btd, certKey, c.role); err != nil {
return fmt.Errorf("joining Kubernetes cluster: %w", err)
}
return nil
}
func (c *JoinClient) getNodeMetadata() error {
ctx, cancel := c.timeoutCtx()
defer cancel()
c.log.Info("Requesting node metadata from metadata API")
inst, err := c.metadataAPI.Self(ctx)
if err != nil {
return err
}
c.log.Info("Received node metadata", zap.Any("instance", inst))
if inst.Name == "" {
return errors.New("got instance metadata with empty name")
}
if inst.Role == role.Unknown {
return errors.New("got instance metadata with unknown role")
}
c.nodeName = inst.Name
c.role = inst.Role
return nil
}
func (c *JoinClient) updateDiskPassphrase(passphrase string) error {
if err := c.disk.Open(); err != nil {
return fmt.Errorf("opening disk: %w", err)
}
defer c.disk.Close()
return c.disk.UpdatePassphrase(passphrase)
}
func (c *JoinClient) GetDiskUUID() (string, error) {
if err := c.disk.Open(); err != nil {
return "", fmt.Errorf("opening disk: %w", err)
}
defer c.disk.Close()
return c.disk.UUID()
}
func (c *JoinClient) getCoordinatorIPs() ([]string, error) {
ctx, cancel := c.timeoutCtx()
defer cancel()
instances, err := c.metadataAPI.List(ctx)
if err != nil {
c.log.Error("Failed to list instances from metadata API", zap.Error(err))
return nil, fmt.Errorf("listing instances from metadata API: %w", err)
}
ips := []string{}
for _, instance := range instances {
if instance.Role == role.Coordinator {
ips = append(ips, instance.PrivateIPs...)
}
}
c.log.Info("Received Coordinator endpoints", zap.Strings("IPs", ips))
return ips, nil
}
func (c *JoinClient) timeoutCtx() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), c.timeout)
}
type grpcDialer interface {
Dial(ctx context.Context, target string) (*grpc.ClientConn, error)
}
type ClusterJoiner interface {
JoinCluster(
ctx context.Context,
args *kubeadm.BootstrapTokenDiscovery,
certKey string,
peerRole role.Role,
) error
}
type metadataAPI interface {
// List retrieves all instances belonging to the current constellation.
List(ctx context.Context) ([]metadata.InstanceMetadata, error)
// Self retrieves the current instance.
Self(ctx context.Context) (metadata.InstanceMetadata, error)
}
// EncryptedDisk manages the encrypted state disk.
type EncryptedDisk interface {
// Open prepares the underlying device for disk operations.
Open() error
// Close closes the underlying device.
Close() error
// UUID gets the device's UUID.
UUID() (string, error)
// UpdatePassphrase switches the initial random passphrase of the encrypted disk to a permanent passphrase.
UpdatePassphrase(passphrase string) error
}