mirror of
https://github.com/edgelesssys/constellation.git
synced 2025-03-25 08:16:54 -04:00
Refactor bootstrapper logging
Signed-off-by: Daniel Weiße <dw@edgeless.systems>
This commit is contained in:
parent
50188d1d93
commit
edf424d415
@ -5,7 +5,6 @@ import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
@ -24,8 +23,8 @@ import (
|
||||
"github.com/edgelesssys/constellation/internal/attestation/simulator"
|
||||
"github.com/edgelesssys/constellation/internal/attestation/vtpm"
|
||||
"github.com/edgelesssys/constellation/internal/file"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"github.com/edgelesssys/constellation/internal/oid"
|
||||
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
|
||||
"github.com/spf13/afero"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -38,6 +37,18 @@ const (
|
||||
)
|
||||
|
||||
func main() {
|
||||
gRPCDebug := flag.Bool("debug", false, "Enable gRPC debug logging")
|
||||
verbosity := flag.Int("v", 0, logger.CmdLineVerbosityDescription)
|
||||
flag.Parse()
|
||||
log := logger.New(logger.JSONLog, logger.VerbosityFromInt(*verbosity)).Named("bootstrapper")
|
||||
defer log.Sync()
|
||||
|
||||
if *gRPCDebug {
|
||||
log.Named("gRPC").ReplaceGRPCLogger()
|
||||
} else {
|
||||
log.Named("gRPC").WithIncreasedLevel(zap.WarnLevel).ReplaceGRPCLogger()
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
@ -45,23 +56,6 @@ func main() {
|
||||
var clusterInitJoiner clusterInitJoiner
|
||||
var metadataAPI joinclient.MetadataAPI
|
||||
var cloudLogger logging.CloudLogger
|
||||
cfg := zap.NewDevelopmentConfig()
|
||||
|
||||
logLevelUser := flag.Bool("debug", false, "enables gRPC debug output")
|
||||
flag.Parse()
|
||||
cfg.Level.SetLevel(zap.DebugLevel)
|
||||
|
||||
logger, err := cfg.Build()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if *logLevelUser {
|
||||
grpc_zap.ReplaceGrpcLoggerV2(logger.Named("gRPC"))
|
||||
} else {
|
||||
grpc_zap.ReplaceGrpcLoggerV2(logger.WithOptions(zap.IncreaseLevel(zap.WarnLevel)).Named("gRPC"))
|
||||
}
|
||||
logger = logger.Named("bootstrapper")
|
||||
|
||||
var issuer atls.Issuer
|
||||
var openTPM vtpm.TPMOpenFunc
|
||||
var fs afero.Fs
|
||||
@ -72,29 +66,28 @@ func main() {
|
||||
case "gcp":
|
||||
pcrs, err := vtpm.GetSelectedPCRs(vtpm.OpenVTPM, vtpm.GCPPCRSelection)
|
||||
if err != nil {
|
||||
// TODO: Is there a reason we use log. instead of zapLogger?
|
||||
log.Fatal(err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to get selected PCRs")
|
||||
}
|
||||
|
||||
issuer = gcp.NewIssuer()
|
||||
|
||||
gcpClient, err := gcpcloud.NewClient(ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create GCP client: %v\n", err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to create GCP metadata client")
|
||||
}
|
||||
metadata := gcpcloud.New(gcpClient)
|
||||
descr, err := metadata.Self(ctx)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to get instance metadata")
|
||||
}
|
||||
cloudLogger, err = gcpcloud.NewLogger(ctx, descr.ProviderID, "constellation-boot-log")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to set up cloud logger")
|
||||
}
|
||||
metadataAPI = metadata
|
||||
pcrsJSON, err := json.Marshal(pcrs)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to marshal PCRs")
|
||||
}
|
||||
clusterInitJoiner = kubernetes.New(
|
||||
"gcp", k8sapi.NewKubernetesUtil(), &k8sapi.CoreOSConfiguration{}, kubectl.New(), &gcpcloud.CloudControllerManager{},
|
||||
@ -107,23 +100,23 @@ func main() {
|
||||
case "azure":
|
||||
pcrs, err := vtpm.GetSelectedPCRs(vtpm.OpenVTPM, vtpm.AzurePCRSelection)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to get selected PCRs")
|
||||
}
|
||||
|
||||
issuer = azure.NewIssuer()
|
||||
|
||||
metadata, err := azurecloud.NewMetadata(ctx)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to create Azure metadata client")
|
||||
}
|
||||
cloudLogger, err = azurecloud.NewLogger(ctx, metadata)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to set up cloud logger")
|
||||
}
|
||||
metadataAPI = metadata
|
||||
pcrsJSON, err := json.Marshal(pcrs)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to marshal PCRs")
|
||||
}
|
||||
clusterInitJoiner = kubernetes.New(
|
||||
"azure", k8sapi.NewKubernetesUtil(), &k8sapi.CoreOSConfiguration{}, kubectl.New(), azurecloud.NewCloudControllerManager(metadata),
|
||||
@ -137,7 +130,7 @@ func main() {
|
||||
case "qemu":
|
||||
pcrs, err := vtpm.GetSelectedPCRs(vtpm.OpenVTPM, vtpm.QEMUPCRSelection)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to get selected PCRs")
|
||||
}
|
||||
|
||||
issuer = qemu.NewIssuer()
|
||||
@ -146,7 +139,7 @@ func main() {
|
||||
metadata := &qemucloud.Metadata{}
|
||||
pcrsJSON, err := json.Marshal(pcrs)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
log.With(zap.Error(err)).Fatalf("Failed to marshal PCRs")
|
||||
}
|
||||
clusterInitJoiner = kubernetes.New(
|
||||
"qemu", k8sapi.NewKubernetesUtil(), &k8sapi.CoreOSConfiguration{}, kubectl.New(), &qemucloud.CloudControllerManager{},
|
||||
@ -173,5 +166,5 @@ func main() {
|
||||
|
||||
fileHandler := file.NewHandler(fs)
|
||||
|
||||
run(issuer, openTPM, fileHandler, clusterInitJoiner, metadataAPI, bindIP, bindPort, logger, cloudLogger)
|
||||
run(issuer, openTPM, fileHandler, clusterInitJoiner, metadataAPI, bindIP, bindPort, log, cloudLogger)
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/edgelesssys/constellation/internal/attestation/vtpm"
|
||||
"github.com/edgelesssys/constellation/internal/file"
|
||||
"github.com/edgelesssys/constellation/internal/grpc/dialer"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"github.com/edgelesssys/constellation/internal/oid"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -19,32 +20,31 @@ var version = "0.0.0"
|
||||
|
||||
func run(issuer quoteIssuer, tpm vtpm.TPMOpenFunc, fileHandler file.Handler,
|
||||
kube clusterInitJoiner, metadata joinclient.MetadataAPI,
|
||||
bindIP, bindPort string, logger *zap.Logger,
|
||||
bindIP, bindPort string, log *logger.Logger,
|
||||
cloudLogger logging.CloudLogger,
|
||||
) {
|
||||
defer logger.Sync()
|
||||
defer cloudLogger.Close()
|
||||
|
||||
logger.Info("starting bootstrapper", zap.String("version", version))
|
||||
log.With(zap.String("version", version)).Infof("Starting bootstrapper")
|
||||
cloudLogger.Disclose("bootstrapper started running...")
|
||||
|
||||
nodeBootstrapped, err := vtpm.IsNodeBootstrapped(tpm)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to check for previous bootstrapping using vTPM", zap.Error(err))
|
||||
log.With(zap.Error(err)).Fatalf("Failed to check if node was previously bootstrapped")
|
||||
}
|
||||
|
||||
if nodeBootstrapped {
|
||||
if err := kube.StartKubelet(); err != nil {
|
||||
logger.Fatal("failed to restart kubelet", zap.Error(err))
|
||||
log.With(zap.Error(err)).Fatalf("Failed to restart kubelet")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
nodeLock := nodelock.New(tpm)
|
||||
initServer := initserver.New(nodeLock, kube, issuer, fileHandler, logger)
|
||||
initServer := initserver.New(nodeLock, kube, issuer, fileHandler, log)
|
||||
|
||||
dialer := dialer.New(issuer, nil, &net.Dialer{})
|
||||
joinClient := joinclient.New(nodeLock, dialer, kube, metadata, logger)
|
||||
joinClient := joinclient.New(nodeLock, dialer, kube, metadata, log)
|
||||
|
||||
cleaner := clean.New().With(initServer).With(joinClient)
|
||||
go cleaner.Start()
|
||||
@ -53,10 +53,10 @@ func run(issuer quoteIssuer, tpm vtpm.TPMOpenFunc, fileHandler file.Handler,
|
||||
joinClient.Start(cleaner)
|
||||
|
||||
if err := initServer.Serve(bindIP, bindPort, cleaner); err != nil {
|
||||
logger.Fatal("Failed to serve init server", zap.Error(err))
|
||||
log.With(zap.Error(err)).Fatalf("Failed to serve init server")
|
||||
}
|
||||
|
||||
logger.Info("bootstrapper done")
|
||||
log.Infof("bootstrapper done")
|
||||
cloudLogger.Disclose("bootstrapper done")
|
||||
}
|
||||
|
||||
|
@ -7,7 +7,7 @@ import (
|
||||
"github.com/edgelesssys/constellation/bootstrapper/role"
|
||||
attestationtypes "github.com/edgelesssys/constellation/internal/attestation/types"
|
||||
"github.com/edgelesssys/constellation/internal/cloud/metadata"
|
||||
"go.uber.org/zap"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
|
||||
)
|
||||
|
||||
@ -15,13 +15,13 @@ import (
|
||||
type clusterFake struct{}
|
||||
|
||||
// InitCluster fakes bootstrapping a new cluster with the current node being the master, returning the arguments required to join the cluster.
|
||||
func (c *clusterFake) InitCluster(context.Context, []string, string, string, attestationtypes.ID, kubernetes.KMSConfig, map[string]string, *zap.Logger,
|
||||
func (c *clusterFake) InitCluster(context.Context, []string, string, string, attestationtypes.ID, kubernetes.KMSConfig, map[string]string, *logger.Logger,
|
||||
) ([]byte, error) {
|
||||
return []byte{}, nil
|
||||
}
|
||||
|
||||
// JoinCluster will fake joining the current node to an existing cluster.
|
||||
func (c *clusterFake) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, *zap.Logger) error {
|
||||
func (c *clusterFake) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, *logger.Logger) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -17,9 +17,8 @@ import (
|
||||
"github.com/edgelesssys/constellation/internal/constants"
|
||||
"github.com/edgelesssys/constellation/internal/file"
|
||||
"github.com/edgelesssys/constellation/internal/grpc/atlscredentials"
|
||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
|
||||
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
|
||||
"github.com/edgelesssys/constellation/internal/grpc/grpclog"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -37,30 +36,25 @@ type Server struct {
|
||||
grpcServer serveStopper
|
||||
cleaner cleaner
|
||||
|
||||
logger *zap.Logger
|
||||
log *logger.Logger
|
||||
|
||||
initproto.UnimplementedAPIServer
|
||||
}
|
||||
|
||||
// New creates a new initialization server.
|
||||
func New(lock locker, kube ClusterInitializer, issuer atls.Issuer, fh file.Handler, logger *zap.Logger) *Server {
|
||||
logger = logger.Named("initServer")
|
||||
func New(lock locker, kube ClusterInitializer, issuer atls.Issuer, fh file.Handler, log *logger.Logger) *Server {
|
||||
log = log.Named("initServer")
|
||||
server := &Server{
|
||||
nodeLock: lock,
|
||||
disk: diskencryption.New(),
|
||||
initializer: kube,
|
||||
fileHandler: fh,
|
||||
logger: logger,
|
||||
log: log,
|
||||
}
|
||||
|
||||
creds := atlscredentials.New(issuer, nil)
|
||||
grpcLogger := logger.Named("gRPC")
|
||||
grpcServer := grpc.NewServer(
|
||||
grpc.Creds(creds),
|
||||
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
|
||||
grpc_ctxtags.UnaryServerInterceptor(),
|
||||
grpc_zap.UnaryServerInterceptor(grpcLogger),
|
||||
)),
|
||||
grpc.Creds(atlscredentials.New(issuer, nil)),
|
||||
log.Named("gRPC").GetServerUnaryInterceptor(),
|
||||
)
|
||||
initproto.RegisterAPIServer(grpcServer, server)
|
||||
|
||||
@ -82,7 +76,8 @@ func (s *Server) Serve(ip, port string, cleaner cleaner) error {
|
||||
// Init initializes the cluster.
|
||||
func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initproto.InitResponse, error) {
|
||||
defer s.cleaner.Clean()
|
||||
s.logger.Info("Init called")
|
||||
log := s.log.With(zap.String("peer", grpclog.PeerAddrFromContext(ctx)))
|
||||
log.Infof("Init called")
|
||||
|
||||
id, err := s.deriveAttestationID(req.MasterSecret)
|
||||
if err != nil {
|
||||
@ -99,7 +94,7 @@ func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initpro
|
||||
// init does not make sense, so we just stop.
|
||||
//
|
||||
// The server stops itself after the current call is done.
|
||||
s.logger.Info("node is already in a join process")
|
||||
log.Warnf("Node is already in a join process")
|
||||
return nil, status.Error(codes.FailedPrecondition, "node is already being activated")
|
||||
}
|
||||
|
||||
@ -129,13 +124,13 @@ func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initpro
|
||||
UseExistingKEK: req.UseExistingKek,
|
||||
},
|
||||
sshProtoKeysToMap(req.SshUserKeys),
|
||||
s.logger,
|
||||
s.log,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "initializing cluster: %s", err)
|
||||
}
|
||||
|
||||
s.logger.Info("Init succeeded")
|
||||
log.Infof("Init succeeded")
|
||||
return &initproto.InitResponse{
|
||||
Kubeconfig: kubeconfig,
|
||||
OwnerId: id.Owner,
|
||||
@ -203,7 +198,7 @@ type ClusterInitializer interface {
|
||||
id attestationtypes.ID,
|
||||
kmsConfig kubernetes.KMSConfig,
|
||||
sshUserKeys map[string]string,
|
||||
logger *zap.Logger,
|
||||
log *logger.Logger,
|
||||
) ([]byte, error)
|
||||
}
|
||||
|
||||
|
@ -12,12 +12,11 @@ import (
|
||||
"github.com/edgelesssys/constellation/bootstrapper/internal/kubernetes"
|
||||
attestationtypes "github.com/edgelesssys/constellation/internal/attestation/types"
|
||||
"github.com/edgelesssys/constellation/internal/file"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"github.com/spf13/afero"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
@ -28,9 +27,9 @@ func TestNew(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
fh := file.NewHandler(afero.NewMemMapFs())
|
||||
server := New(newFakeLock(), &stubClusterInitializer{}, nil, fh, zap.NewNop())
|
||||
server := New(newFakeLock(), &stubClusterInitializer{}, nil, fh, logger.NewTest(t))
|
||||
assert.NotNil(server)
|
||||
assert.NotNil(server.logger)
|
||||
assert.NotNil(server.log)
|
||||
assert.NotNil(server.nodeLock)
|
||||
assert.NotNil(server.initializer)
|
||||
assert.NotNil(server.grpcServer)
|
||||
@ -122,7 +121,7 @@ func TestInit(t *testing.T) {
|
||||
initializer: tc.initializer,
|
||||
disk: tc.disk,
|
||||
fileHandler: tc.fileHandler,
|
||||
logger: zaptest.NewLogger(t),
|
||||
log: logger.NewTest(t),
|
||||
grpcServer: serveStopper,
|
||||
cleaner: &fakeCleaner{serveStopper: serveStopper},
|
||||
}
|
||||
@ -220,7 +219,7 @@ type stubClusterInitializer struct {
|
||||
initClusterErr error
|
||||
}
|
||||
|
||||
func (i *stubClusterInitializer) InitCluster(context.Context, []string, string, string, attestationtypes.ID, kubernetes.KMSConfig, map[string]string, *zap.Logger,
|
||||
func (i *stubClusterInitializer) InitCluster(context.Context, []string, string, string, attestationtypes.ID, kubernetes.KMSConfig, map[string]string, *logger.Logger,
|
||||
) ([]byte, error) {
|
||||
return i.initClusterKubeconfig, i.initClusterErr
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"github.com/edgelesssys/constellation/internal/cloud/metadata"
|
||||
"github.com/edgelesssys/constellation/internal/constants"
|
||||
"github.com/edgelesssys/constellation/internal/file"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"github.com/edgelesssys/constellation/joinservice/joinproto"
|
||||
"github.com/spf13/afero"
|
||||
"go.uber.org/zap"
|
||||
@ -50,7 +51,7 @@ type JoinClient struct {
|
||||
joiner ClusterJoiner
|
||||
metadataAPI MetadataAPI
|
||||
|
||||
log *zap.Logger
|
||||
log *logger.Logger
|
||||
|
||||
mux sync.Mutex
|
||||
stopC chan struct{}
|
||||
@ -58,7 +59,7 @@ type JoinClient struct {
|
||||
}
|
||||
|
||||
// New creates a new JoinClient.
|
||||
func New(lock locker, dial grpcDialer, joiner ClusterJoiner, meta MetadataAPI, log *zap.Logger) *JoinClient {
|
||||
func New(lock locker, dial grpcDialer, joiner ClusterJoiner, meta MetadataAPI, log *logger.Logger) *JoinClient {
|
||||
return &JoinClient{
|
||||
nodeLock: lock,
|
||||
disk: diskencryption.New(),
|
||||
@ -87,7 +88,7 @@ func (c *JoinClient) Start(cleaner cleaner) {
|
||||
return
|
||||
}
|
||||
|
||||
c.log.Info("Starting")
|
||||
c.log.Infof("Starting")
|
||||
c.stopC = make(chan struct{}, 1)
|
||||
c.stopDone = make(chan struct{}, 1)
|
||||
|
||||
@ -95,12 +96,12 @@ func (c *JoinClient) Start(cleaner cleaner) {
|
||||
go func() {
|
||||
defer ticker.Stop()
|
||||
defer func() { c.stopDone <- struct{}{} }()
|
||||
defer c.log.Info("Client stopped")
|
||||
defer c.log.Infof("Client stopped")
|
||||
defer cleaner.Clean()
|
||||
|
||||
diskUUID, err := c.getDiskUUID()
|
||||
if err != nil {
|
||||
c.log.Error("Failed to get disk UUID", zap.Error(err))
|
||||
c.log.With(zap.Error(err)).Errorf("Failed to get disk UUID")
|
||||
return
|
||||
}
|
||||
c.diskUUID = diskUUID
|
||||
@ -108,12 +109,12 @@ func (c *JoinClient) Start(cleaner cleaner) {
|
||||
for {
|
||||
err := c.getNodeMetadata()
|
||||
if err == nil {
|
||||
c.log.Info("Received own instance metadata", zap.String("role", c.role.String()), zap.String("name", c.nodeName))
|
||||
c.log.With(zap.String("role", c.role.String()), zap.String("name", c.nodeName)).Infof("Received own instance metadata")
|
||||
break
|
||||
}
|
||||
c.log.Info("Failed to retrieve instance metadata", zap.Error(err))
|
||||
c.log.With(zap.Error(err)).Errorf("Failed to retrieve instance metadata")
|
||||
|
||||
c.log.Info("Sleeping", zap.Duration("interval", c.interval))
|
||||
c.log.With(zap.Duration("interval", c.interval)).Infof("Sleeping")
|
||||
select {
|
||||
case <-c.stopC:
|
||||
return
|
||||
@ -124,15 +125,15 @@ func (c *JoinClient) Start(cleaner cleaner) {
|
||||
for {
|
||||
err := c.tryJoinWithAvailableServices()
|
||||
if err == nil {
|
||||
c.log.Info("Joined successfully. Client is shut down.")
|
||||
c.log.Infof("Joined successfully. Client is shutting down")
|
||||
return
|
||||
} else if isUnrecoverable(err) {
|
||||
c.log.Error("Unrecoverable error occurred", zap.Error(err))
|
||||
c.log.With(zap.Error(err)).Errorf("Unrecoverable error occurred")
|
||||
return
|
||||
}
|
||||
c.log.Info("Join failed for all available endpoints", zap.Error(err))
|
||||
c.log.With(zap.Error(err)).Warnf("Join failed for all available endpoints")
|
||||
|
||||
c.log.Info("Sleeping", zap.Duration("interval", c.interval))
|
||||
c.log.With(zap.Duration("interval", c.interval)).Infof("Sleeping")
|
||||
select {
|
||||
case <-c.stopC:
|
||||
return
|
||||
@ -151,7 +152,7 @@ func (c *JoinClient) Stop() {
|
||||
return
|
||||
}
|
||||
|
||||
c.log.Info("Stopping")
|
||||
c.log.Infof("Stopping")
|
||||
|
||||
c.stopC <- struct{}{}
|
||||
<-c.stopDone
|
||||
@ -159,7 +160,7 @@ func (c *JoinClient) Stop() {
|
||||
c.stopC = nil
|
||||
c.stopDone = nil
|
||||
|
||||
c.log.Info("Stopped")
|
||||
c.log.Infof("Stopped")
|
||||
}
|
||||
|
||||
func (c *JoinClient) tryJoinWithAvailableServices() error {
|
||||
@ -191,7 +192,7 @@ func (c *JoinClient) join(serviceEndpoint string) error {
|
||||
|
||||
conn, err := c.dialer.Dial(ctx, serviceEndpoint)
|
||||
if err != nil {
|
||||
c.log.Info("Join service unreachable", zap.String("endpoint", serviceEndpoint), zap.Error(err))
|
||||
c.log.With(zap.String("endpoint", serviceEndpoint), zap.Error(err)).Errorf("Join service unreachable")
|
||||
return fmt.Errorf("dialing join service endpoint: %w", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
@ -204,7 +205,7 @@ func (c *JoinClient) join(serviceEndpoint string) error {
|
||||
}
|
||||
ticket, err := protoClient.IssueJoinTicket(ctx, req)
|
||||
if err != nil {
|
||||
c.log.Info("Issuing join ticket failed", zap.String("endpoint", serviceEndpoint), zap.Error(err))
|
||||
c.log.With(zap.String("endpoint", serviceEndpoint), zap.Error(err)).Errorf("Issuing join ticket failed")
|
||||
return fmt.Errorf("issuing join ticket: %w", err)
|
||||
}
|
||||
|
||||
@ -224,7 +225,7 @@ func (c *JoinClient) startNodeAndJoin(ticket *joinproto.IssueJoinTicketResponse)
|
||||
|
||||
nodeLockAcquired, err := c.nodeLock.TryLockOnce(ticket.OwnerId, ticket.ClusterId)
|
||||
if err != nil {
|
||||
c.log.Info("Acquiring node lock failed", zap.Error(err))
|
||||
c.log.With(zap.Error(err)).Errorf("Acquiring node lock failed")
|
||||
return fmt.Errorf("acquiring node lock: %w", err)
|
||||
}
|
||||
if !nodeLockAcquired {
|
||||
@ -269,12 +270,12 @@ func (c *JoinClient) getNodeMetadata() error {
|
||||
ctx, cancel := c.timeoutCtx()
|
||||
defer cancel()
|
||||
|
||||
c.log.Debug("Requesting node metadata from metadata API")
|
||||
c.log.Debugf("Requesting node metadata from metadata API")
|
||||
inst, err := c.metadataAPI.Self(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.log.Debug("Received node metadata", zap.Any("instance", inst))
|
||||
c.log.With(zap.Any("instance", inst)).Debugf("Received node metadata")
|
||||
|
||||
if inst.Name == "" {
|
||||
return errors.New("got instance metadata with empty name")
|
||||
@ -312,7 +313,7 @@ func (c *JoinClient) getControlPlaneIPs() ([]string, error) {
|
||||
|
||||
instances, err := c.metadataAPI.List(ctx)
|
||||
if err != nil {
|
||||
c.log.Error("Failed to list instances from metadata API", zap.Error(err))
|
||||
c.log.With(zap.Error(err)).Errorf("Failed to list instances from metadata API")
|
||||
return nil, fmt.Errorf("listing instances from metadata API: %w", err)
|
||||
}
|
||||
|
||||
@ -323,7 +324,7 @@ func (c *JoinClient) getControlPlaneIPs() ([]string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
c.log.Info("Received control plane endpoints", zap.Strings("IPs", ips))
|
||||
c.log.With(zap.Strings("IPs", ips)).Infof("Received control plane endpoints")
|
||||
return ips, nil
|
||||
}
|
||||
|
||||
@ -363,7 +364,7 @@ type ClusterJoiner interface {
|
||||
ctx context.Context,
|
||||
args *kubeadm.BootstrapTokenDiscovery,
|
||||
peerRole role.Role,
|
||||
logger *zap.Logger,
|
||||
log *logger.Logger,
|
||||
) error
|
||||
}
|
||||
|
||||
|
@ -16,13 +16,12 @@ import (
|
||||
"github.com/edgelesssys/constellation/internal/grpc/atlscredentials"
|
||||
"github.com/edgelesssys/constellation/internal/grpc/dialer"
|
||||
"github.com/edgelesssys/constellation/internal/grpc/testdialer"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"github.com/edgelesssys/constellation/joinservice/joinproto"
|
||||
"github.com/spf13/afero"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"google.golang.org/grpc"
|
||||
kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
|
||||
testclock "k8s.io/utils/clock/testing"
|
||||
@ -213,7 +212,7 @@ func TestClient(t *testing.T) {
|
||||
fileHandler: fileHandler,
|
||||
metadataAPI: metadataAPI,
|
||||
clock: clock,
|
||||
log: zaptest.NewLogger(t),
|
||||
log: logger.NewTest(t),
|
||||
}
|
||||
|
||||
serverCreds := atlscredentials.New(nil, nil)
|
||||
@ -268,7 +267,7 @@ func TestClientConcurrentStartStop(t *testing.T) {
|
||||
fileHandler: file.NewHandler(afero.NewMemMapFs()),
|
||||
metadataAPI: &stubRepeaterMetadataAPI{},
|
||||
clock: testclock.NewFakeClock(time.Now()),
|
||||
log: zap.NewNop(),
|
||||
log: logger.NewTest(t),
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
@ -386,7 +385,7 @@ type stubClusterJoiner struct {
|
||||
joinClusterErr error
|
||||
}
|
||||
|
||||
func (j *stubClusterJoiner) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, *zap.Logger) error {
|
||||
func (j *stubClusterJoiner) JoinCluster(context.Context, *kubeadm.BootstrapTokenDiscovery, role.Role, *logger.Logger) error {
|
||||
j.joinClusterCalled = true
|
||||
return j.joinClusterErr
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/edgelesssys/constellation/bootstrapper/internal/kubernetes/k8sapi/resources"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -57,7 +58,7 @@ func (k *KubernetesUtil) InstallComponents(ctx context.Context, version string)
|
||||
return enableSystemdUnit(ctx, kubeletServiceEtcPath)
|
||||
}
|
||||
|
||||
func (k *KubernetesUtil) InitCluster(ctx context.Context, initConfig []byte, logger *zap.Logger) error {
|
||||
func (k *KubernetesUtil) InitCluster(ctx context.Context, initConfig []byte, log *logger.Logger) error {
|
||||
// TODO: audit policy should be user input
|
||||
auditPolicy, err := resources.NewDefaultAuditPolicy().Marshal()
|
||||
if err != nil {
|
||||
@ -85,7 +86,7 @@ func (k *KubernetesUtil) InitCluster(ctx context.Context, initConfig []byte, log
|
||||
}
|
||||
return fmt.Errorf("kubeadm init: %w", err)
|
||||
}
|
||||
logger.Info("kubeadm init succeeded", zap.String("output", string(out)))
|
||||
log.With(zap.String("output", string(out))).Infof("kubeadm init succeeded")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -261,7 +262,7 @@ func (k *KubernetesUtil) SetupVerificationService(kubectl Client, verificationSe
|
||||
}
|
||||
|
||||
// JoinCluster joins existing Kubernetes cluster using kubeadm join.
|
||||
func (k *KubernetesUtil) JoinCluster(ctx context.Context, joinConfig []byte, logger *zap.Logger) error {
|
||||
func (k *KubernetesUtil) JoinCluster(ctx context.Context, joinConfig []byte, log *logger.Logger) error {
|
||||
// TODO: audit policy should be user input
|
||||
auditPolicy, err := resources.NewDefaultAuditPolicy().Marshal()
|
||||
if err != nil {
|
||||
@ -290,7 +291,7 @@ func (k *KubernetesUtil) JoinCluster(ctx context.Context, joinConfig []byte, log
|
||||
}
|
||||
return fmt.Errorf("kubeadm join: %w", err)
|
||||
}
|
||||
logger.Info("kubeadm join succeeded", zap.String("output", string(out)))
|
||||
log.With(zap.String("output", string(out))).Infof("kubeadm join succeeded")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -5,13 +5,13 @@ import (
|
||||
|
||||
"github.com/edgelesssys/constellation/bootstrapper/internal/kubernetes/k8sapi"
|
||||
"github.com/edgelesssys/constellation/bootstrapper/internal/kubernetes/k8sapi/resources"
|
||||
"go.uber.org/zap"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
)
|
||||
|
||||
type clusterUtil interface {
|
||||
InstallComponents(ctx context.Context, version string) error
|
||||
InitCluster(ctx context.Context, initConfig []byte, logger *zap.Logger) error
|
||||
JoinCluster(ctx context.Context, joinConfig []byte, logger *zap.Logger) error
|
||||
InitCluster(ctx context.Context, initConfig []byte, log *logger.Logger) error
|
||||
JoinCluster(ctx context.Context, joinConfig []byte, log *logger.Logger) error
|
||||
SetupPodNetwork(context.Context, k8sapi.SetupPodNetworkInput) error
|
||||
SetupAccessManager(kubectl k8sapi.Client, sshUsers resources.Marshaler) error
|
||||
SetupAutoscaling(kubectl k8sapi.Client, clusterAutoscalerConfiguration resources.Marshaler, secrets resources.Marshaler) error
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/edgelesssys/constellation/bootstrapper/util"
|
||||
attestationtypes "github.com/edgelesssys/constellation/internal/attestation/types"
|
||||
"github.com/edgelesssys/constellation/internal/cloud/metadata"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"github.com/spf13/afero"
|
||||
"go.uber.org/zap"
|
||||
kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
|
||||
@ -75,9 +76,10 @@ type KMSConfig struct {
|
||||
// InitCluster initializes a new Kubernetes cluster and applies pod network provider.
|
||||
func (k *KubeWrapper) InitCluster(
|
||||
ctx context.Context, autoscalingNodeGroups []string, cloudServiceAccountURI, k8sVersion string,
|
||||
id attestationtypes.ID, kmsConfig KMSConfig, sshUsers map[string]string, logger *zap.Logger,
|
||||
id attestationtypes.ID, kmsConfig KMSConfig, sshUsers map[string]string, log *logger.Logger,
|
||||
) ([]byte, error) {
|
||||
// TODO: k8s version should be user input
|
||||
log.With(zap.String("version", k8sVersion)).Infof("Installing Kubernetes components")
|
||||
if err := k.clusterUtil.InstallComponents(ctx, k8sVersion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -97,6 +99,7 @@ func (k *KubeWrapper) InitCluster(
|
||||
|
||||
// Step 1: retrieve cloud metadata for Kubernetes configuration
|
||||
if k.providerMetadata.Supported() {
|
||||
log.Infof("Retrieving node metadata")
|
||||
instance, err = k.providerMetadata.Self(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("retrieving own instance metadata failed: %w", err)
|
||||
@ -129,6 +132,13 @@ func (k *KubeWrapper) InitCluster(
|
||||
}
|
||||
}
|
||||
}
|
||||
log.With(
|
||||
zap.String("nodeName", nodeName),
|
||||
zap.String("providerID", providerID),
|
||||
zap.String("nodeIP", nodeIP),
|
||||
zap.String("controlPlaneEndpointIP", controlPlaneEndpointIP),
|
||||
zap.String("podCIDR", subnetworkPodCIDR),
|
||||
).Infof("Setting information for node")
|
||||
|
||||
// Step 2: configure kubeadm init config
|
||||
initConfig := k.configProvider.InitConfiguration(k.cloudControllerManager.Supported())
|
||||
@ -141,7 +151,8 @@ func (k *KubeWrapper) InitCluster(
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("encoding kubeadm init configuration as YAML: %w", err)
|
||||
}
|
||||
if err := k.clusterUtil.InitCluster(ctx, initConfigYAML, logger); err != nil {
|
||||
log.Infof("Initializing Kubernetes cluster")
|
||||
if err := k.clusterUtil.InitCluster(ctx, initConfigYAML, log); err != nil {
|
||||
return nil, fmt.Errorf("kubeadm init: %w", err)
|
||||
}
|
||||
kubeConfig, err := k.GetKubeconfig()
|
||||
@ -151,7 +162,7 @@ func (k *KubeWrapper) InitCluster(
|
||||
k.client.SetKubeconfig(kubeConfig)
|
||||
|
||||
// Step 3: configure & start kubernetes controllers
|
||||
|
||||
log.Infof("Starting Kubernetes controllers and deployments")
|
||||
setupPodNetworkInput := k8sapi.SetupPodNetworkInput{
|
||||
CloudProvider: k.cloudProvider,
|
||||
NodeName: nodeName,
|
||||
@ -206,8 +217,9 @@ func (k *KubeWrapper) InitCluster(
|
||||
}
|
||||
|
||||
// JoinCluster joins existing Kubernetes cluster.
|
||||
func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTokenDiscovery, peerRole role.Role, logger *zap.Logger) error {
|
||||
func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTokenDiscovery, peerRole role.Role, log *logger.Logger) error {
|
||||
// TODO: k8s version should be user input
|
||||
log.With(zap.String("version", "1.23.6")).Infof("Installing Kubernetes components")
|
||||
if err := k.clusterUtil.InstallComponents(ctx, "1.23.6"); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -220,6 +232,7 @@ func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTo
|
||||
nodeName := nodeInternalIP
|
||||
var providerID string
|
||||
if k.providerMetadata.Supported() {
|
||||
log.Infof("Retrieving node metadata")
|
||||
instance, err := k.providerMetadata.Self(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("retrieving own instance metadata failed: %w", err)
|
||||
@ -232,6 +245,12 @@ func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTo
|
||||
}
|
||||
nodeName = k8sCompliantHostname(nodeName)
|
||||
|
||||
log.With(
|
||||
zap.String("nodeName", nodeName),
|
||||
zap.String("providerID", providerID),
|
||||
zap.String("nodeIP", nodeInternalIP),
|
||||
).Infof("Setting information for node")
|
||||
|
||||
// Step 2: configure kubeadm join config
|
||||
|
||||
joinConfig := k.configProvider.JoinConfiguration(k.cloudControllerManager.Supported())
|
||||
@ -248,7 +267,8 @@ func (k *KubeWrapper) JoinCluster(ctx context.Context, args *kubeadm.BootstrapTo
|
||||
if err != nil {
|
||||
return fmt.Errorf("encoding kubeadm join configuration as YAML: %w", err)
|
||||
}
|
||||
if err := k.clusterUtil.JoinCluster(ctx, joinConfigYAML, logger); err != nil {
|
||||
log.With(zap.String("apiServerEndpoint", args.APIServerEndpoint)).Infof("Joining Kubernetes cluster")
|
||||
if err := k.clusterUtil.JoinCluster(ctx, joinConfigYAML, log); err != nil {
|
||||
return fmt.Errorf("joining cluster: %v; %w ", string(joinConfigYAML), err)
|
||||
}
|
||||
|
||||
|
@ -11,11 +11,10 @@ import (
|
||||
"github.com/edgelesssys/constellation/bootstrapper/role"
|
||||
attestationtypes "github.com/edgelesssys/constellation/internal/attestation/types"
|
||||
"github.com/edgelesssys/constellation/internal/cloud/metadata"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
|
||||
)
|
||||
|
||||
@ -269,7 +268,7 @@ func TestInitCluster(t *testing.T) {
|
||||
kubeconfigReader: tc.kubeconfigReader,
|
||||
getIPAddr: func() (string, error) { return privateIP, nil },
|
||||
}
|
||||
_, err := kube.InitCluster(context.Background(), autoscalingNodeGroups, serviceAccountURI, k8sVersion, attestationtypes.ID{}, KMSConfig{MasterSecret: masterSecret}, nil, zaptest.NewLogger(t))
|
||||
_, err := kube.InitCluster(context.Background(), autoscalingNodeGroups, serviceAccountURI, k8sVersion, attestationtypes.ID{}, KMSConfig{MasterSecret: masterSecret}, nil, logger.NewTest(t))
|
||||
|
||||
if tc.wantErr {
|
||||
assert.Error(err)
|
||||
@ -425,7 +424,7 @@ func TestJoinCluster(t *testing.T) {
|
||||
getIPAddr: func() (string, error) { return privateIP, nil },
|
||||
}
|
||||
|
||||
err := kube.JoinCluster(context.Background(), joinCommand, tc.role, zaptest.NewLogger(t))
|
||||
err := kube.JoinCluster(context.Background(), joinCommand, tc.role, logger.NewTest(t))
|
||||
if tc.wantErr {
|
||||
assert.Error(err)
|
||||
return
|
||||
@ -497,7 +496,7 @@ func (s *stubClusterUtil) InstallComponents(ctx context.Context, version string)
|
||||
return s.installComponentsErr
|
||||
}
|
||||
|
||||
func (s *stubClusterUtil) InitCluster(ctx context.Context, initConfig []byte, logger *zap.Logger) error {
|
||||
func (s *stubClusterUtil) InitCluster(ctx context.Context, initConfig []byte, log *logger.Logger) error {
|
||||
s.initConfigs = append(s.initConfigs, initConfig)
|
||||
return s.initClusterErr
|
||||
}
|
||||
@ -538,7 +537,7 @@ func (s *stubClusterUtil) SetupVerificationService(kubectl k8sapi.Client, verifi
|
||||
return s.setupVerificationServiceErr
|
||||
}
|
||||
|
||||
func (s *stubClusterUtil) JoinCluster(ctx context.Context, joinConfig []byte, logger *zap.Logger) error {
|
||||
func (s *stubClusterUtil) JoinCluster(ctx context.Context, joinConfig []byte, log *logger.Logger) error {
|
||||
s.joinConfigs = append(s.joinConfigs, joinConfig)
|
||||
return s.joinClusterErr
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user