Refactor coordinator run function

This commit is contained in:
katexochen 2022-06-08 13:36:29 +02:00 committed by Paul Meyer
parent 691ab84326
commit 4d50e4c657
2 changed files with 23 additions and 18 deletions

View File

@ -30,42 +30,38 @@ import (
var version = "0.0.0" var version = "0.0.0"
func run(issuer core.QuoteIssuer, vpn core.VPN, openTPM vtpm.TPMOpenFunc, getPublicIPAddr func() (string, error), dialer *grpcutil.Dialer, fileHandler file.Handler, func run(issuer core.QuoteIssuer, vpn core.VPN, tpm vtpm.TPMOpenFunc, getPublicIPAddr func() (string, error), dialer *grpcutil.Dialer, fileHandler file.Handler,
kube core.Cluster, metadata core.ProviderMetadata, encryptedDisk core.EncryptedDisk, etcdEndpoint string, etcdTLS bool, bindIP, bindPort string, zapLoggerCore *zap.Logger, kube core.Cluster, metadata core.ProviderMetadata, disk core.EncryptedDisk, etcdEndpoint string, etcdTLS bool, bindIP, bindPort string, logger *zap.Logger,
cloudLogger logging.CloudLogger, fs afero.Fs, cloudLogger logging.CloudLogger, fs afero.Fs,
) { ) {
defer zapLoggerCore.Sync() defer logger.Sync()
zapLoggerCore.Info("starting coordinator", zap.String("version", version)) logger.Info("starting coordinator", zap.String("version", version))
defer cloudLogger.Close() defer cloudLogger.Close()
cloudLogger.Disclose("Coordinator started running...") cloudLogger.Disclose("Coordinator started running...")
tlsConfig, err := atls.CreateAttestationServerTLSConfig(issuer, nil) tlsConfig, err := atls.CreateAttestationServerTLSConfig(issuer, nil)
if err != nil { if err != nil {
zapLoggerCore.Fatal("failed to create server TLS config", zap.Error(err)) logger.Fatal("failed to create server TLS config", zap.Error(err))
} }
etcdStoreFactory := &store.EtcdStoreFactory{ etcdStoreFactory := store.NewEtcdStoreFactory(etcdEndpoint, etcdTLS, logger)
Endpoint: etcdEndpoint,
ForceTLS: etcdTLS,
Logger: zapLoggerCore.WithOptions(zap.IncreaseLevel(zap.WarnLevel)).Named("etcd"),
}
linuxUserManager := user.NewLinuxUserManager(fs) linuxUserManager := user.NewLinuxUserManager(fs)
core, err := core.NewCore(vpn, kube, metadata, encryptedDisk, zapLoggerCore, openTPM, etcdStoreFactory, fileHandler, linuxUserManager) core, err := core.NewCore(vpn, kube, metadata, disk, logger, tpm, etcdStoreFactory, fileHandler, linuxUserManager)
if err != nil { if err != nil {
zapLoggerCore.Fatal("failed to create core", zap.Error(err)) logger.Fatal("failed to create core", zap.Error(err))
} }
vapiServer := &vpnAPIServer{logger: zapLoggerCore.Named("vpnapi"), core: core} vapiServer := &vpnAPIServer{logger: logger.Named("vpnapi"), core: core}
zapLoggerPubapi := zapLoggerCore.Named("pubapi") loggerPubAPI := logger.Named("pubapi")
papi := pubapi.New(zapLoggerPubapi, cloudLogger, core, dialer, vapiServer, getPublicIPAddr, pubapi.GetRecoveryPeerFromContext) papi := pubapi.New(loggerPubAPI, cloudLogger, core, dialer, vapiServer, getPublicIPAddr, pubapi.GetRecoveryPeerFromContext)
// initialize state machine and wait for re-joining of the VPN (if applicable) // initialize state machine and wait for re-joining of the VPN (if applicable)
nodeActivated, err := core.Initialize(context.TODO(), dialer, papi) nodeActivated, err := core.Initialize(context.TODO(), dialer, papi)
if err != nil { if err != nil {
zapLoggerCore.Fatal("failed to initialize core", zap.Error(err)) logger.Fatal("failed to initialize core", zap.Error(err))
} }
zapLoggergRPC := zapLoggerPubapi.Named("gRPC") zapLoggergRPC := loggerPubAPI.Named("gRPC")
grpcServer := grpc.NewServer( grpcServer := grpc.NewServer(
grpc.Creds(credentials.NewTLS(tlsConfig)), grpc.Creds(credentials.NewTLS(tlsConfig)),
@ -97,7 +93,7 @@ func run(issuer core.QuoteIssuer, vpn core.VPN, openTPM vtpm.TPMOpenFunc, getPub
}() }()
if !nodeActivated { if !nodeActivated {
zapLoggerStartupJoin := zapLoggerCore.Named("startup-join") zapLoggerStartupJoin := logger.Named("startup-join")
if err := tryJoinClusterOnStartup(getPublicIPAddr, metadata, zapLoggerStartupJoin); err != nil { if err := tryJoinClusterOnStartup(getPublicIPAddr, metadata, zapLoggerStartupJoin); err != nil {
zapLoggerStartupJoin.Info("joining existing cluster on startup failed. Waiting for connection.", zap.Error(err)) zapLoggerStartupJoin.Info("joining existing cluster on startup failed. Waiting for connection.", zap.Error(err))
} }

View File

@ -265,6 +265,15 @@ type EtcdStoreFactory struct {
Logger *zap.Logger Logger *zap.Logger
} }
// NewEtcdStoreFactory creates a new EtcdStoreFactory.
func NewEtcdStoreFactory(endpoint string, forceTLS bool, logger *zap.Logger) *EtcdStoreFactory {
return &EtcdStoreFactory{
Endpoint: endpoint,
ForceTLS: forceTLS,
Logger: logger.WithOptions(zap.IncreaseLevel(zap.WarnLevel)).Named("etcd"),
}
}
// New creates a new EtcdStore. // New creates a new EtcdStore.
func (f *EtcdStoreFactory) New() (Store, error) { func (f *EtcdStoreFactory) New() (Store, error) {
return NewEtcdStore(f.Endpoint, f.ForceTLS, f.Logger) return NewEtcdStore(f.Endpoint, f.ForceTLS, f.Logger)