270 lines
7.6 KiB
Go
Raw Normal View History

/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
2022-06-21 17:59:12 +02:00
package initserver
import (
"context"
"fmt"
"net"
"strings"
2022-08-01 16:51:34 +02:00
"time"
2022-06-21 17:59:12 +02:00
2022-09-21 13:47:57 +02:00
"github.com/edgelesssys/constellation/v2/bootstrapper/initproto"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/diskencryption"
"github.com/edgelesssys/constellation/v2/internal/atls"
"github.com/edgelesssys/constellation/v2/internal/attestation"
"github.com/edgelesssys/constellation/v2/internal/cloud/vmtype"
"github.com/edgelesssys/constellation/v2/internal/crypto"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/grpc/atlscredentials"
"github.com/edgelesssys/constellation/v2/internal/grpc/grpclog"
"github.com/edgelesssys/constellation/v2/internal/logger"
"github.com/edgelesssys/constellation/v2/internal/nodestate"
"github.com/edgelesssys/constellation/v2/internal/role"
"github.com/edgelesssys/constellation/v2/internal/versions"
2022-06-21 17:59:12 +02:00
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
2022-08-01 16:51:34 +02:00
"google.golang.org/grpc/keepalive"
2022-06-21 17:59:12 +02:00
"google.golang.org/grpc/status"
)
2022-06-28 18:33:27 +02:00
// Server is the initialization server, which is started on each node.
// The server handles initialization calls from the CLI and initializes the
// Kubernetes cluster.
2022-06-21 17:59:12 +02:00
type Server struct {
nodeLock locker
initializer ClusterInitializer
disk encryptedDisk
fileHandler file.Handler
grpcServer serveStopper
cleaner cleaner
issuerWrapper IssuerWrapper
2022-06-21 17:59:12 +02:00
log *logger.Logger
2022-06-21 17:59:12 +02:00
initproto.UnimplementedAPIServer
}
2022-06-28 18:33:27 +02:00
// New creates a new initialization server.
func New(lock locker, kube ClusterInitializer, issuerWrapper IssuerWrapper, fh file.Handler, log *logger.Logger) *Server {
log = log.Named("initServer")
2022-06-21 17:59:12 +02:00
server := &Server{
nodeLock: lock,
disk: diskencryption.New(),
initializer: kube,
fileHandler: fh,
issuerWrapper: issuerWrapper,
log: log,
2022-06-21 17:59:12 +02:00
}
grpcServer := grpc.NewServer(
grpc.Creds(atlscredentials.New(issuerWrapper, nil)),
2022-08-01 16:51:34 +02:00
grpc.KeepaliveParams(keepalive.ServerParameters{Time: 15 * time.Second}),
log.Named("gRPC").GetServerUnaryInterceptor(),
2022-06-21 17:59:12 +02:00
)
initproto.RegisterAPIServer(grpcServer, server)
server.grpcServer = grpcServer
return server
}
// Serve starts the initialization server.
func (s *Server) Serve(ip, port string, cleaner cleaner) error {
s.cleaner = cleaner
2022-06-21 17:59:12 +02:00
lis, err := net.Listen("tcp", net.JoinHostPort(ip, port))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
2022-08-01 16:51:34 +02:00
s.log.Infof("Starting")
return s.grpcServer.Serve(lis)
2022-06-21 17:59:12 +02:00
}
2022-06-28 18:33:27 +02:00
// Init initializes the cluster.
2022-06-21 17:59:12 +02:00
func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initproto.InitResponse, error) {
defer s.cleaner.Clean()
log := s.log.With(zap.String("peer", grpclog.PeerAddrFromContext(ctx)))
log.Infof("Init called")
2022-07-05 14:14:11 +02:00
// generate values for cluster attestation
measurementSalt, clusterID, err := deriveMeasurementValues(req.MasterSecret, req.Salt)
if err != nil {
return nil, status.Errorf(codes.Internal, "deriving measurement values: %s", err)
}
nodeLockAcquired, err := s.nodeLock.TryLockOnce(clusterID)
if err != nil {
return nil, status.Errorf(codes.Internal, "locking node: %s", err)
}
if !nodeLockAcquired {
2022-06-28 18:33:27 +02:00
// The join client seems to already have a connection to an
// existing join service. At this point, any further call to
// init does not make sense, so we just stop.
//
// The server stops itself after the current call is done.
log.Warnf("Node is already in a join process")
2022-06-21 17:59:12 +02:00
return nil, status.Error(codes.FailedPrecondition, "node is already being activated")
}
if err := s.setupDisk(req.MasterSecret, req.Salt); err != nil {
2022-06-21 17:59:12 +02:00
return nil, status.Errorf(codes.Internal, "setting up disk: %s", err)
}
state := nodestate.NodeState{
Role: role.ControlPlane,
MeasurementSalt: measurementSalt,
2022-06-21 17:59:12 +02:00
}
if err := state.ToFile(s.fileHandler); err != nil {
return nil, status.Errorf(codes.Internal, "persisting node state: %s", err)
}
2022-06-28 18:33:27 +02:00
kubeconfig, err := s.initializer.InitCluster(ctx,
2022-06-21 17:59:12 +02:00
req.CloudServiceAccountUri,
req.KubernetesVersion,
measurementSalt,
req.EnforcedPcrs,
req.EnforceIdkeydigest,
s.issuerWrapper.IDKeyDigest(),
s.issuerWrapper.VMType() == vmtype.AzureCVM,
2022-08-12 10:20:19 +02:00
req.HelmDeployments,
req.ConformanceMode,
versions.NewComponentVersionsFromProto(req.KubernetesComponents),
s.log,
2022-06-21 17:59:12 +02:00
)
if err != nil {
return nil, status.Errorf(codes.Internal, "initializing cluster: %s", err)
}
log.Infof("Init succeeded")
2022-06-21 17:59:12 +02:00
return &initproto.InitResponse{
Kubeconfig: kubeconfig,
ClusterId: clusterID,
2022-06-21 17:59:12 +02:00
}, nil
}
// Stop stops the initialization server gracefully.
func (s *Server) Stop() {
s.log.Infof("Stopping")
s.grpcServer.GracefulStop()
s.log.Infof("Stopped")
}
func (s *Server) setupDisk(masterSecret, salt []byte) error {
2022-06-21 17:59:12 +02:00
if err := s.disk.Open(); err != nil {
return fmt.Errorf("opening encrypted disk: %w", err)
}
defer s.disk.Close()
uuid, err := s.disk.UUID()
if err != nil {
return fmt.Errorf("retrieving uuid of disk: %w", err)
}
uuid = strings.ToLower(uuid)
diskKey, err := crypto.DeriveKey(masterSecret, salt, []byte(crypto.HKDFInfoPrefix+uuid), crypto.DerivedKeyLengthDefault)
2022-06-21 17:59:12 +02:00
if err != nil {
return err
}
return s.disk.UpdatePassphrase(string(diskKey))
}
// IssuerWrapper adds VM type context to an issuer to distinguish between
// confidential and trusted launch VMs.
type IssuerWrapper struct {
atls.Issuer
vmType vmtype.VMType
idkeydigest []byte
}
// NewIssuerWrapper creates a new issuer with VM type context.
func NewIssuerWrapper(issuer atls.Issuer, vmType vmtype.VMType, idkeydigest []byte) IssuerWrapper {
return IssuerWrapper{
Issuer: issuer,
vmType: vmType,
idkeydigest: idkeydigest,
}
}
// VMType returns the VM type.
func (i *IssuerWrapper) VMType() vmtype.VMType {
return i.vmType
}
// IDKeyDigest returns the ID key digest.
func (i *IssuerWrapper) IDKeyDigest() []byte {
return i.idkeydigest
}
func deriveMeasurementValues(masterSecret, hkdfSalt []byte) (salt, clusterID []byte, err error) {
salt, err = crypto.GenerateRandomBytes(crypto.RNGLengthDefault)
if err != nil {
return nil, nil, err
}
secret, err := attestation.DeriveMeasurementSecret(masterSecret, hkdfSalt)
if err != nil {
return nil, nil, err
}
clusterID, err = attestation.DeriveClusterID(secret, salt)
if err != nil {
return nil, nil, err
}
return salt, clusterID, nil
}
2022-06-28 18:33:27 +02:00
// ClusterInitializer has the ability to initialize a cluster.
2022-06-21 17:59:12 +02:00
type ClusterInitializer interface {
2022-06-28 18:33:27 +02:00
// InitCluster initializes a new Kubernetes cluster.
2022-06-21 17:59:12 +02:00
InitCluster(
ctx context.Context,
cloudServiceAccountURI string,
2022-06-28 18:33:27 +02:00
k8sVersion string,
measurementSalt []byte,
enforcedPcrs []uint32,
enforceIDKeyDigest bool,
idKeyDigest []byte,
azureCVM bool,
2022-08-12 10:20:19 +02:00
helmDeployments []byte,
conformanceMode bool,
kubernetesComponents versions.ComponentVersions,
log *logger.Logger,
2022-06-21 17:59:12 +02:00
) ([]byte, error)
}
2022-06-28 18:33:27 +02:00
type encryptedDisk interface {
2022-06-21 17:59:12 +02:00
// Open prepares the underlying device for disk operations.
Open() error
// Close closes the underlying device.
Close() error
// UUID gets the device's UUID.
UUID() (string, error)
// UpdatePassphrase switches the initial random passphrase of the encrypted disk to a permanent passphrase.
UpdatePassphrase(passphrase string) error
}
2022-06-28 18:33:27 +02:00
type serveStopper interface {
// Serve starts the server.
Serve(lis net.Listener) error
// GracefulStop stops the server and blocks until all requests are done.
GracefulStop()
}
type locker interface {
// TryLockOnce tries to lock the node. If the node is already locked, it
// returns false. If the node is unlocked, it locks it and returns true.
TryLockOnce(clusterID []byte) (bool, error)
}
type cleaner interface {
Clean()
}