From e534c6a338ed6b545210f8805f41578c29a8837a Mon Sep 17 00:00:00 2001 From: katexochen <49727155+katexochen@users.noreply.github.com> Date: Tue, 21 Jun 2022 11:56:16 +0200 Subject: [PATCH] Self activation of nodes --- coordinator/core/activate.go | 67 ++++ .../internal/selfactivation/selfactivation.go | 257 ++++++++++++++++ .../selfactivation/selfactivation_test.go | 285 ++++++++++++++++++ 3 files changed, 609 insertions(+) create mode 100644 coordinator/core/activate.go create mode 100644 coordinator/internal/selfactivation/selfactivation.go create mode 100644 coordinator/internal/selfactivation/selfactivation_test.go diff --git a/coordinator/core/activate.go b/coordinator/core/activate.go new file mode 100644 index 000000000..b4c42e9b8 --- /dev/null +++ b/coordinator/core/activate.go @@ -0,0 +1,67 @@ +package core + +import ( + "context" + "errors" + "fmt" + + "github.com/edgelesssys/constellation/coordinator/role" + "github.com/edgelesssys/constellation/coordinator/state" + kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3" +) + +// SetNodeActive activates as node and joins the cluster. +func (c *Core) SetNodeActive(diskKey, ownerID, clusterID []byte, kubeAPIendpoint, token, discoveryCACertHash string) (reterr error) { + c.mut.Lock() + defer c.mut.Unlock() + + if err := c.RequireState(state.AcceptingInit); err != nil { + return fmt.Errorf("node is not in required state for activation: %w", err) + } + + if len(ownerID) == 0 || len(clusterID) == 0 { + c.zaplogger.Error("Missing data to taint worker node as initialized") + return errors.New("missing data to taint worker node as initialized") + } + + // If any of the following actions fail, we cannot revert. + // Thus, mark this peer as failed. + defer func() { + if reterr != nil { + _ = c.AdvanceState(state.Failed, nil, nil) + } + }() + + // AdvanceState MUST be called before any other functions that are not sanity checks or otherwise required + // This ensures the node is marked as initialzed before the node is in a state that allows code execution + // Any new additions to ActivateAsNode MUST come after + if err := c.AdvanceState(state.IsNode, ownerID, clusterID); err != nil { + return fmt.Errorf("advancing node state: %w", err) + } + + // TODO: SSH keys are currently not available from the Aaas, so we can't create user SSH keys here. + + if err := c.PersistNodeState(role.Node, "", ownerID, clusterID); err != nil { + return fmt.Errorf("persisting node state: %w", err) + } + + if err := c.UpdateDiskPassphrase(string(diskKey)); err != nil { + return fmt.Errorf("updateing disk passphrase: %w", err) + } + + btd := &kubeadm.BootstrapTokenDiscovery{ + APIServerEndpoint: kubeAPIendpoint, + Token: token, + CACertHashes: []string{discoveryCACertHash}, + } + if err := c.JoinCluster(context.TODO(), btd, "", role.Node); err != nil { + return fmt.Errorf("joining Kubernetes cluster: %w", err) + } + + return nil +} + +// SetCoordinatorActive activates as coordinator. +func (c *Core) SetCoordinatorActive() error { + panic("not implemented") +} diff --git a/coordinator/internal/selfactivation/selfactivation.go b/coordinator/internal/selfactivation/selfactivation.go new file mode 100644 index 000000000..53444401b --- /dev/null +++ b/coordinator/internal/selfactivation/selfactivation.go @@ -0,0 +1,257 @@ +package selfactivation + +import ( + "context" + "errors" + "fmt" + "net" + "strconv" + "sync" + "time" + + "github.com/edgelesssys/constellation/activation/activationproto" + "github.com/edgelesssys/constellation/coordinator/cloudprovider/cloudtypes" + "github.com/edgelesssys/constellation/coordinator/role" + "github.com/edgelesssys/constellation/internal/constants" + "go.uber.org/zap" + "google.golang.org/grpc" + "k8s.io/utils/clock" +) + +const ( + interval = 30 * time.Second + timeout = 30 * time.Second +) + +// SelfActivationClient is a client for self-activation of node. +type SelfActivationClient struct { + diskUUID string + role role.Role + + timeout time.Duration + interval time.Duration + clock clock.WithTicker + + dialer grpcDialer + setterAPI activeSetter + metadataAPI metadataAPI + + log *zap.Logger + + mux sync.Mutex + stopC chan struct{} + stopDone chan struct{} +} + +// NewClient creates a new SelfActivationClient. +func NewClient(diskUUID string, dial grpcDialer, setter activeSetter, meta metadataAPI, log *zap.Logger) *SelfActivationClient { + return &SelfActivationClient{ + diskUUID: diskUUID, + timeout: timeout, + interval: interval, + clock: clock.RealClock{}, + dialer: dial, + setterAPI: setter, + metadataAPI: meta, + log: log.Named("selfactivation-client"), + } +} + +// Start starts the client routine. The client will make the needed API calls to activate +// the node as the role it receives from the metadata API. +// Multiple calls of start on the same client won't start a second routine if there is +// already a routine running. +func (c *SelfActivationClient) Start() { + c.mux.Lock() + defer c.mux.Unlock() + + if c.stopC != nil { // daemon already running + return + } + + c.log.Info("Starting") + c.stopC = make(chan struct{}, 1) + c.stopDone = make(chan struct{}, 1) + + ticker := c.clock.NewTicker(c.interval) + go func() { + defer ticker.Stop() + defer func() { c.stopDone <- struct{}{} }() + + for { + c.role = c.getRole() + if c.role != role.Unknown { + break + } + + c.log.Info("Sleeping", zap.Duration("interval", c.interval)) + select { + case <-c.stopC: + return + case <-ticker.C(): + } + } + + // TODO(katexochen): Delete when Coordinator self-activation is implemented. + if c.role == role.Coordinator { + c.log.Info("Role is Coordinator, terminating") + return + } + + for { + err := c.tryActivationAtAvailableServices() + if err == nil { + c.log.Info("Activated successfully. SelfActivationClient shut down.") + return + } + c.log.Info("Activation failed for all available endpoints", zap.Error(err)) + + c.log.Info("Sleeping", zap.Duration("interval", c.interval)) + select { + case <-c.stopC: + return + case <-ticker.C(): + } + } + }() +} + +// Stop stops the client and blocks until the client's routine is stopped. +func (c *SelfActivationClient) Stop() { + c.mux.Lock() + defer c.mux.Unlock() + + if c.stopC == nil { // daemon not running + return + } + + c.log.Info("Stopping") + + c.stopC <- struct{}{} + <-c.stopDone + + c.stopC = nil + c.stopDone = nil + + c.log.Info("Stopped") +} + +func (c *SelfActivationClient) tryActivationAtAvailableServices() error { + ips, err := c.getCoordinatorIPs() + if err != nil { + return err + } + + if len(ips) == 0 { + return errors.New("no coordinator IPs found") + } + + for _, ip := range ips { + err = c.activate(net.JoinHostPort(ip, strconv.Itoa(constants.ActivationServicePort))) + if err == nil { + return nil + } + } + + return err +} + +func (c *SelfActivationClient) activate(aaasEndpoint string) error { + ctx, cancel := c.timeoutCtx() + defer cancel() + + conn, err := c.dialer.Dial(ctx, aaasEndpoint) + if err != nil { + c.log.Info("AaaS unreachable", zap.String("endpoint", aaasEndpoint), zap.Error(err)) + return fmt.Errorf("dialing AaaS endpoint: %v", err) + } + defer conn.Close() + + protoClient := activationproto.NewAPIClient(conn) + + switch c.role { + case role.Node: + return c.activateAsWorkerNode(ctx, protoClient) + case role.Coordinator: + return c.activateAsControlePlaneNode(ctx, protoClient) + default: + return fmt.Errorf("cannot activate as %s", role.Unknown) + } +} + +func (c *SelfActivationClient) activateAsWorkerNode(ctx context.Context, client activationproto.APIClient) error { + req := &activationproto.ActivateWorkerNodeRequest{DiskUuid: c.diskUUID} + resp, err := client.ActivateWorkerNode(ctx, req) + if err != nil { + c.log.Info("Failed to activate as node", zap.Error(err)) + return fmt.Errorf("activating node: %w", err) + } + c.log.Info("Activation at AaaS succeeded") + + return c.setterAPI.SetNodeActive( + resp.StateDiskKey, + resp.OwnerId, + resp.ClusterId, + resp.ApiServerEndpoint, + resp.Token, + resp.DiscoveryTokenCaCertHash, + ) +} + +func (c *SelfActivationClient) activateAsControlePlaneNode(ctx context.Context, client activationproto.APIClient) error { + panic("not implemented") +} + +func (c *SelfActivationClient) getRole() role.Role { + ctx, cancel := c.timeoutCtx() + defer cancel() + + c.log.Info("Requesting role from metadata API") + inst, err := c.metadataAPI.Self(ctx) + if err != nil { + c.log.Error("Failed to get self instance from metadata API", zap.Error(err)) + return role.Unknown + } + + c.log.Info("Received new role", zap.String("role", inst.Role.String())) + return inst.Role +} + +func (c *SelfActivationClient) getCoordinatorIPs() ([]string, error) { + ctx, cancel := c.timeoutCtx() + defer cancel() + + instances, err := c.metadataAPI.List(ctx) + if err != nil { + c.log.Error("Failed to list instances from metadata API", zap.Error(err)) + return nil, fmt.Errorf("listing instances from metadata API: %w", err) + } + + ips := []string{} + for _, instance := range instances { + if instance.Role == role.Coordinator { + ips = append(ips, instance.PrivateIPs...) + } + } + + c.log.Info("Received Coordinator endpoints", zap.Strings("IPs", ips)) + return ips, nil +} + +func (c *SelfActivationClient) timeoutCtx() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), c.timeout) +} + +type grpcDialer interface { + Dial(ctx context.Context, target string) (*grpc.ClientConn, error) +} + +type activeSetter interface { + SetNodeActive(diskKey, ownerID, clusterID []byte, endpoint, token, discoveryCACertHash string) error + SetCoordinatorActive() error +} + +type metadataAPI interface { + Self(ctx context.Context) (cloudtypes.Instance, error) + List(ctx context.Context) ([]cloudtypes.Instance, error) +} diff --git a/coordinator/internal/selfactivation/selfactivation_test.go b/coordinator/internal/selfactivation/selfactivation_test.go new file mode 100644 index 000000000..3caf497bd --- /dev/null +++ b/coordinator/internal/selfactivation/selfactivation_test.go @@ -0,0 +1,285 @@ +package selfactivation + +import ( + "context" + "errors" + "net" + "strconv" + "sync" + "testing" + "time" + + "github.com/edgelesssys/constellation/activation/activationproto" + "github.com/edgelesssys/constellation/coordinator/cloudprovider/cloudtypes" + "github.com/edgelesssys/constellation/coordinator/role" + "github.com/edgelesssys/constellation/internal/constants" + "github.com/edgelesssys/constellation/internal/grpc/atlscredentials" + "github.com/edgelesssys/constellation/internal/grpc/dialer" + "github.com/edgelesssys/constellation/internal/grpc/testdialer" + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc" + testclock "k8s.io/utils/clock/testing" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +func TestClient(t *testing.T) { + someErr := errors.New("failed") + peers := []cloudtypes.Instance{ + {Role: role.Node, PrivateIPs: []string{"192.0.2.8"}}, + {Role: role.Coordinator, PrivateIPs: []string{"192.0.2.1"}}, + {Role: role.Coordinator, PrivateIPs: []string{"192.0.2.2", "192.0.2.3"}}, + } + + testCases := map[string]struct { + role role.Role + apiAnswers []any + setterAPI *stubActiveSetter + }{ + "on node: metadata self: errors occur": { + role: role.Node, + apiAnswers: []any{ + selfAnswer{err: someErr}, + selfAnswer{err: someErr}, + selfAnswer{err: someErr}, + selfAnswer{instance: cloudtypes.Instance{Role: role.Node}}, + listAnswer{instances: peers}, + activateNodeAnswer{}, + }, + setterAPI: &stubActiveSetter{}, + }, + "on node: metadata self: no role in answer": { + role: role.Node, + apiAnswers: []any{ + selfAnswer{}, + selfAnswer{}, + selfAnswer{}, + selfAnswer{instance: cloudtypes.Instance{Role: role.Node}}, + listAnswer{instances: peers}, + activateNodeAnswer{}, + }, + setterAPI: &stubActiveSetter{}, + }, + "on node: metadata list: errors occur": { + role: role.Node, + apiAnswers: []any{ + selfAnswer{instance: cloudtypes.Instance{Role: role.Node}}, + listAnswer{err: someErr}, + listAnswer{err: someErr}, + listAnswer{err: someErr}, + listAnswer{instances: peers}, + activateNodeAnswer{}, + }, + setterAPI: &stubActiveSetter{}, + }, + "on node: metadata list: no coordinators in answer": { + role: role.Node, + apiAnswers: []any{ + selfAnswer{instance: cloudtypes.Instance{Role: role.Node}}, + listAnswer{}, + listAnswer{}, + listAnswer{}, + listAnswer{instances: peers}, + activateNodeAnswer{}, + }, + setterAPI: &stubActiveSetter{}, + }, + "on node: aaas ActivateNode: errors": { + role: role.Node, + apiAnswers: []any{ + selfAnswer{instance: cloudtypes.Instance{Role: role.Node}}, + listAnswer{instances: peers}, + activateNodeAnswer{err: someErr}, + listAnswer{instances: peers}, + activateNodeAnswer{err: someErr}, + listAnswer{instances: peers}, + activateNodeAnswer{}, + }, + setterAPI: &stubActiveSetter{}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert := assert.New(t) + + clock := testclock.NewFakeClock(time.Now()) + metadataAPI := newStubMetadataAPI() + + netDialer := testdialer.NewBufconnDialer() + dialer := dialer.New(nil, nil, netDialer) + + client := &SelfActivationClient{ + timeout: 30 * time.Second, + interval: time.Millisecond, + dialer: dialer, + setterAPI: tc.setterAPI, + metadataAPI: metadataAPI, + clock: clock, + log: zaptest.NewLogger(t), + } + + serverCreds := atlscredentials.New(nil, nil) + activationSever := grpc.NewServer(grpc.Creds(serverCreds)) + activationAPI := newStubActivationServiceAPI() + activationproto.RegisterAPIServer(activationSever, activationAPI) + port := strconv.Itoa(constants.ActivationServicePort) + listener := netDialer.GetListener(net.JoinHostPort("192.0.2.3", port)) + go activationSever.Serve(listener) + defer activationSever.GracefulStop() + + client.Start() + + for _, a := range tc.apiAnswers { + switch a := a.(type) { + case selfAnswer: + metadataAPI.selfAnswerC <- a + case listAnswer: + metadataAPI.listAnswerC <- a + case activateNodeAnswer: + activationAPI.activateNodeAnswerC <- a + } + clock.Step(time.Second) + } + + client.Stop() + + if tc.role == role.Node { + assert.Equal(1, tc.setterAPI.setNodeActiveCalled) + } else { + assert.Equal(1, tc.setterAPI.setCoordinatorActiveCalled) + } + }) + } +} + +func TestClientConcurrentStartStop(t *testing.T) { + client := &SelfActivationClient{ + setterAPI: &stubActiveSetter{}, + metadataAPI: &stubRepeaterMetadataAPI{}, + clock: testclock.NewFakeClock(time.Now()), + log: zap.NewNop(), + } + + wg := sync.WaitGroup{} + + start := func() { + defer wg.Done() + client.Start() + } + + stop := func() { + defer wg.Done() + client.Stop() + } + + wg.Add(10) + go stop() + go start() + go start() + go stop() + go stop() + go start() + go start() + go stop() + go stop() + go start() + wg.Wait() + + client.Stop() +} + +type stubActiveSetter struct { + setNodeActiveErr error + setNodeActiveCalled int + setCoordinatorActiveErr error + setCoordinatorActiveCalled int +} + +func (s *stubActiveSetter) SetNodeActive(_, _, _ []byte, _, _, _ string) error { + s.setNodeActiveCalled++ + return s.setNodeActiveErr +} + +func (s *stubActiveSetter) SetCoordinatorActive() error { + s.setCoordinatorActiveCalled++ + return s.setCoordinatorActiveErr +} + +type stubRepeaterMetadataAPI struct { + selfInstance cloudtypes.Instance + selfErr error + listInstances []cloudtypes.Instance + listErr error +} + +func (s *stubRepeaterMetadataAPI) Self(_ context.Context) (cloudtypes.Instance, error) { + return s.selfInstance, s.selfErr +} + +func (s *stubRepeaterMetadataAPI) List(_ context.Context) ([]cloudtypes.Instance, error) { + return s.listInstances, s.listErr +} + +type stubMetadataAPI struct { + selfAnswerC chan selfAnswer + listAnswerC chan listAnswer +} + +func newStubMetadataAPI() *stubMetadataAPI { + return &stubMetadataAPI{ + selfAnswerC: make(chan selfAnswer), + listAnswerC: make(chan listAnswer), + } +} + +func (s *stubMetadataAPI) Self(_ context.Context) (cloudtypes.Instance, error) { + answer := <-s.selfAnswerC + return answer.instance, answer.err +} + +func (s *stubMetadataAPI) List(_ context.Context) ([]cloudtypes.Instance, error) { + answer := <-s.listAnswerC + return answer.instances, answer.err +} + +type selfAnswer struct { + instance cloudtypes.Instance + err error +} + +type listAnswer struct { + instances []cloudtypes.Instance + err error +} + +type stubActivationServiceAPI struct { + activateNodeAnswerC chan activateNodeAnswer + + activationproto.UnimplementedAPIServer +} + +func newStubActivationServiceAPI() *stubActivationServiceAPI { + return &stubActivationServiceAPI{ + activateNodeAnswerC: make(chan activateNodeAnswer), + } +} + +func (s *stubActivationServiceAPI) ActivateWorkerNode(_ context.Context, _ *activationproto.ActivateWorkerNodeRequest, +) (*activationproto.ActivateWorkerNodeResponse, error) { + answer := <-s.activateNodeAnswerC + if answer.resp == nil { + answer.resp = &activationproto.ActivateWorkerNodeResponse{} + } + return answer.resp, answer.err +} + +type activateNodeAnswer struct { + resp *activationproto.ActivateWorkerNodeResponse + err error +}