AB#2261 Add loadbalancer for control-plane recovery (#151)

Signed-off-by: Daniel Weiße <dw@edgeless.systems>
This commit is contained in:
Daniel Weiße 2022-09-14 13:25:42 +02:00 committed by GitHub
parent 273d89e002
commit e367e1a68b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 418 additions and 243 deletions

View file

@ -33,6 +33,7 @@ import (
"github.com/edgelesssys/constellation/internal/constants"
"github.com/edgelesssys/constellation/internal/grpc/dialer"
"github.com/edgelesssys/constellation/internal/logger"
"github.com/edgelesssys/constellation/internal/role"
tpmClient "github.com/google/go-tpm-tools/client"
"github.com/google/go-tpm/tpm2"
"github.com/spf13/afero"
@ -128,8 +129,13 @@ func main() {
log.Named("rejoinClient"),
)
// set up recovery server
recoveryServer := recoveryserver.New(issuer, log.Named("recoveryServer"))
// set up recovery server if control-plane node
var recoveryServer setup.RecoveryServer
if self.Role == role.ControlPlane {
recoveryServer = recoveryserver.New(issuer, log.Named("recoveryServer"))
} else {
recoveryServer = recoveryserver.NewStub(log.Named("recoveryServer"))
}
err = setupManger.PrepareExistingDisk(setup.NewNodeRecoverer(recoveryServer, rejoinClient))
} else {

View file

@ -14,6 +14,7 @@ import (
"github.com/edgelesssys/constellation/disk-mapper/recoverproto"
"github.com/edgelesssys/constellation/internal/atls"
"github.com/edgelesssys/constellation/internal/grpc/atlscredentials"
"github.com/edgelesssys/constellation/internal/grpc/grpclog"
"github.com/edgelesssys/constellation/internal/logger"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -89,8 +90,9 @@ func (s *RecoveryServer) Serve(ctx context.Context, listener net.Listener, diskU
func (s *RecoveryServer) Recover(stream recoverproto.API_RecoverServer) error {
s.mux.Lock()
defer s.mux.Unlock()
log := s.log.With(zap.String("peer", grpclog.PeerAddrFromContext(stream.Context())))
s.log.Infof("Received recover call")
log.Infof("Received recover call")
msg, err := stream.Recv()
if err != nil {
@ -99,35 +101,53 @@ func (s *RecoveryServer) Recover(stream recoverproto.API_RecoverServer) error {
measurementSecret, ok := msg.GetRequest().(*recoverproto.RecoverMessage_MeasurementSecret)
if !ok {
s.log.Errorf("Received invalid first message: not a measurement secret")
log.Errorf("Received invalid first message: not a measurement secret")
return status.Error(codes.InvalidArgument, "first message is not a measurement secret")
}
if err := stream.Send(&recoverproto.RecoverResponse{DiskUuid: s.diskUUID}); err != nil {
s.log.With(zap.Error(err)).Errorf("Failed to send disk UUID")
log.With(zap.Error(err)).Errorf("Failed to send disk UUID")
return status.Error(codes.Internal, "failed to send response")
}
msg, err = stream.Recv()
if err != nil {
s.log.With(zap.Error(err)).Errorf("Failed to receive disk key")
log.With(zap.Error(err)).Errorf("Failed to receive disk key")
return status.Error(codes.Internal, "failed to receive message")
}
stateDiskKey, ok := msg.GetRequest().(*recoverproto.RecoverMessage_StateDiskKey)
if !ok {
s.log.Errorf("Received invalid second message: not a state disk key")
log.Errorf("Received invalid second message: not a state disk key")
return status.Error(codes.InvalidArgument, "second message is not a state disk key")
}
s.stateDiskKey = stateDiskKey.StateDiskKey
s.measurementSecret = measurementSecret.MeasurementSecret
s.log.Infof("Received state disk key and measurement secret, shutting down server")
log.Infof("Received state disk key and measurement secret, shutting down server")
go s.grpcServer.GracefulStop()
return nil
}
// stubServer implements the RecoveryServer interface but does not actually start a server.
type stubServer struct {
log *logger.Logger
}
// NewStub returns a new stubbed RecoveryServer.
// We use this to avoid having to start a server for worker nodes, since they don't require manual recovery.
func NewStub(log *logger.Logger) *stubServer {
return &stubServer{log: log}
}
// Serve waits until the context is canceled and returns nil.
func (s *stubServer) Serve(ctx context.Context, _ net.Listener, _ string) ([]byte, []byte, error) {
s.log.Infof("Running as worker node, skipping recovery server")
<-ctx.Done()
return nil, nil, ctx.Err()
}
type server interface {
Serve(net.Listener) error
GracefulStop()

View file

@ -160,21 +160,21 @@ func (s *SetupManager) saveConfiguration(passphrase []byte) error {
return s.config.Generate(stateDiskMappedName, s.diskPath, filepath.Join(keyPath, keyFile), cryptsetupOptions)
}
type recoveryServer interface {
type RecoveryServer interface {
Serve(context.Context, net.Listener, string) (key, secret []byte, err error)
}
type rejoinClient interface {
type RejoinClient interface {
Start(context.Context, string) (key, secret []byte)
}
type nodeRecoverer struct {
recoveryServer recoveryServer
rejoinClient rejoinClient
recoveryServer RecoveryServer
rejoinClient RejoinClient
}
// NewNodeRecoverer initializes a new nodeRecoverer.
func NewNodeRecoverer(recoveryServer recoveryServer, rejoinClient rejoinClient) *nodeRecoverer {
func NewNodeRecoverer(recoveryServer RecoveryServer, rejoinClient RejoinClient) *nodeRecoverer {
return &nodeRecoverer{
recoveryServer: recoveryServer,
rejoinClient: rejoinClient,

View file

@ -26,6 +26,7 @@ type RecoverMessage struct {
unknownFields protoimpl.UnknownFields
// Types that are assignable to Request:
//
// *RecoverMessage_StateDiskKey
// *RecoverMessage_MeasurementSecret
Request isRecoverMessage_Request `protobuf_oneof:"request"`