constellation/bootstrapper/internal/initserver/initserver.go

217 lines
6.4 KiB
Go
Raw Normal View History

2022-06-21 11:59:12 -04:00
package initserver
import (
"context"
"fmt"
"net"
"strings"
"github.com/edgelesssys/constellation/bootstrapper/initproto"
"github.com/edgelesssys/constellation/bootstrapper/internal/diskencryption"
"github.com/edgelesssys/constellation/bootstrapper/internal/kubernetes"
"github.com/edgelesssys/constellation/bootstrapper/internal/nodelock"
"github.com/edgelesssys/constellation/bootstrapper/nodestate"
"github.com/edgelesssys/constellation/bootstrapper/role"
"github.com/edgelesssys/constellation/bootstrapper/util"
2022-07-05 08:14:11 -04:00
"github.com/edgelesssys/constellation/internal/atls"
2022-06-21 11:59:12 -04:00
attestationtypes "github.com/edgelesssys/constellation/internal/attestation/types"
2022-06-28 12:33:27 -04:00
"github.com/edgelesssys/constellation/internal/constants"
2022-06-21 11:59:12 -04:00
"github.com/edgelesssys/constellation/internal/file"
2022-07-05 08:14:11 -04:00
"github.com/edgelesssys/constellation/internal/grpc/atlscredentials"
2022-06-21 11:59:12 -04:00
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
2022-06-28 12:33:27 -04:00
// Server is the initialization server, which is started on each node.
// The server handles initialization calls from the CLI and initializes the
// Kubernetes cluster.
2022-06-21 11:59:12 -04:00
type Server struct {
2022-06-28 12:33:27 -04:00
nodeLock *nodelock.Lock
initializer ClusterInitializer
disk encryptedDisk
2022-06-21 11:59:12 -04:00
fileHandler file.Handler
2022-06-28 12:33:27 -04:00
grpcServer serveStopper
2022-06-21 11:59:12 -04:00
logger *zap.Logger
initproto.UnimplementedAPIServer
}
2022-06-28 12:33:27 -04:00
// New creates a new initialization server.
2022-07-05 08:14:11 -04:00
func New(lock *nodelock.Lock, kube ClusterInitializer, issuer atls.Issuer, fh file.Handler, logger *zap.Logger) *Server {
2022-06-21 11:59:12 -04:00
logger = logger.Named("initServer")
server := &Server{
2022-06-28 12:33:27 -04:00
nodeLock: lock,
disk: diskencryption.New(),
initializer: kube,
2022-07-05 08:14:11 -04:00
fileHandler: fh,
2022-06-28 12:33:27 -04:00
logger: logger,
2022-06-21 11:59:12 -04:00
}
2022-07-05 08:14:11 -04:00
creds := atlscredentials.New(issuer, nil)
2022-06-21 11:59:12 -04:00
grpcLogger := logger.Named("gRPC")
grpcServer := grpc.NewServer(
2022-07-05 08:14:11 -04:00
grpc.Creds(creds),
2022-06-21 11:59:12 -04:00
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_zap.UnaryServerInterceptor(grpcLogger),
)),
)
initproto.RegisterAPIServer(grpcServer, server)
server.grpcServer = grpcServer
return server
}
func (s *Server) Serve(ip, port string) error {
lis, err := net.Listen("tcp", net.JoinHostPort(ip, port))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
return s.grpcServer.Serve(lis)
}
2022-06-28 12:33:27 -04:00
// Init initializes the cluster.
2022-06-21 11:59:12 -04:00
func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initproto.InitResponse, error) {
2022-07-05 08:14:11 -04:00
s.logger.Info("Init called")
2022-06-28 12:33:27 -04:00
if ok := s.nodeLock.TryLockOnce(); !ok {
// The join client seems to already have a connection to an
// existing join service. At this point, any further call to
// init does not make sense, so we just stop.
//
// The server stops itself after the current call is done.
2022-06-21 11:59:12 -04:00
go s.grpcServer.GracefulStop()
2022-07-05 08:14:11 -04:00
s.logger.Info("node is already in a join process")
2022-06-21 11:59:12 -04:00
return nil, status.Error(codes.FailedPrecondition, "node is already being activated")
}
id, err := s.deriveAttestationID(req.MasterSecret)
if err != nil {
return nil, status.Errorf(codes.Internal, "%s", err)
}
if err := s.setupDisk(req.MasterSecret); err != nil {
return nil, status.Errorf(codes.Internal, "setting up disk: %s", err)
}
state := nodestate.NodeState{
Role: role.ControlPlane,
2022-06-21 11:59:12 -04:00
OwnerID: id.Owner,
ClusterID: id.Cluster,
}
if err := state.ToFile(s.fileHandler); err != nil {
return nil, status.Errorf(codes.Internal, "persisting node state: %s", err)
}
2022-06-28 12:33:27 -04:00
kubeconfig, err := s.initializer.InitCluster(ctx,
2022-06-21 11:59:12 -04:00
req.AutoscalingNodeGroups,
req.CloudServiceAccountUri,
req.KubernetesVersion,
id,
kubernetes.KMSConfig{
MasterSecret: req.MasterSecret,
KMSURI: req.KmsUri,
StorageURI: req.StorageUri,
KeyEncryptionKeyID: req.KeyEncryptionKeyId,
UseExistingKEK: req.UseExistingKek,
},
sshProtoKeysToMap(req.SshUserKeys),
s.logger,
2022-06-21 11:59:12 -04:00
)
if err != nil {
return nil, status.Errorf(codes.Internal, "initializing cluster: %s", err)
}
2022-07-05 08:14:11 -04:00
s.logger.Info("Init succeeded")
2022-06-21 11:59:12 -04:00
return &initproto.InitResponse{
Kubeconfig: kubeconfig,
OwnerId: id.Owner,
ClusterId: id.Cluster,
}, nil
}
func (s *Server) setupDisk(masterSecret []byte) error {
if err := s.disk.Open(); err != nil {
return fmt.Errorf("opening encrypted disk: %w", err)
}
defer s.disk.Close()
uuid, err := s.disk.UUID()
if err != nil {
return fmt.Errorf("retrieving uuid of disk: %w", err)
}
uuid = strings.ToLower(uuid)
// TODO: Choose a way to salt the key derivation
diskKey, err := util.DeriveKey(masterSecret, []byte("Constellation"), []byte("key"+uuid), 32)
if err != nil {
return err
}
return s.disk.UpdatePassphrase(string(diskKey))
}
func (s *Server) deriveAttestationID(masterSecret []byte) (attestationtypes.ID, error) {
2022-06-28 12:33:27 -04:00
clusterID, err := util.GenerateRandomBytes(constants.RNGLengthDefault)
2022-06-21 11:59:12 -04:00
if err != nil {
return attestationtypes.ID{}, err
}
// TODO: Choose a way to salt the key derivation
2022-06-28 12:33:27 -04:00
ownerID, err := util.DeriveKey(masterSecret, []byte("Constellation"), []byte("id"), constants.RNGLengthDefault)
2022-06-21 11:59:12 -04:00
if err != nil {
return attestationtypes.ID{}, err
}
return attestationtypes.ID{Owner: ownerID, Cluster: clusterID}, nil
}
func sshProtoKeysToMap(keys []*initproto.SSHUserKey) map[string]string {
keyMap := make(map[string]string)
for _, key := range keys {
keyMap[key.Username] = key.PublicKey
}
return keyMap
}
2022-06-28 12:33:27 -04:00
// ClusterInitializer has the ability to initialize a cluster.
2022-06-21 11:59:12 -04:00
type ClusterInitializer interface {
2022-06-28 12:33:27 -04:00
// InitCluster initializes a new Kubernetes cluster.
2022-06-21 11:59:12 -04:00
InitCluster(
ctx context.Context,
autoscalingNodeGroups []string,
cloudServiceAccountURI string,
2022-06-28 12:33:27 -04:00
k8sVersion string,
2022-06-21 11:59:12 -04:00
id attestationtypes.ID,
2022-06-28 12:33:27 -04:00
kmsConfig kubernetes.KMSConfig,
2022-06-21 11:59:12 -04:00
sshUserKeys map[string]string,
logger *zap.Logger,
2022-06-21 11:59:12 -04:00
) ([]byte, error)
}
2022-06-28 12:33:27 -04:00
type encryptedDisk interface {
2022-06-21 11:59:12 -04:00
// Open prepares the underlying device for disk operations.
Open() error
// Close closes the underlying device.
Close() error
// UUID gets the device's UUID.
UUID() (string, error)
// UpdatePassphrase switches the initial random passphrase of the encrypted disk to a permanent passphrase.
UpdatePassphrase(passphrase string) error
}
2022-06-28 12:33:27 -04:00
type serveStopper interface {
// Serve starts the server.
Serve(lis net.Listener) error
// GracefulStop stops the server and blocks until all requests are done.
GracefulStop()
}