diff --git a/coordinator/pubapi/multicoord.go b/coordinator/pubapi/multicoord.go new file mode 100644 index 000000000..932f2dcec --- /dev/null +++ b/coordinator/pubapi/multicoord.go @@ -0,0 +1,200 @@ +package pubapi + +import ( + "context" + "net" + + "github.com/edgelesssys/constellation/coordinator/peer" + "github.com/edgelesssys/constellation/coordinator/pubapi/pubproto" + "github.com/edgelesssys/constellation/coordinator/role" + "github.com/edgelesssys/constellation/coordinator/state" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// ActivateAsAdditionalCoordinator is the RPC call to activate subsequent coordinators. +func (a *API) ActivateAsAdditionalCoordinator(ctx context.Context, in *pubproto.ActivateAsAdditionalCoordinatorRequest) (out *pubproto.ActivateAsAdditionalCoordinatorResponse, reterr error) { + _, cancel := context.WithTimeout(ctx, deadlineDuration) + defer cancel() + a.mut.Lock() + defer a.mut.Unlock() + + if err := a.core.RequireState(state.AcceptingInit); err != nil { + return nil, status.Errorf(codes.FailedPrecondition, "%v", err) + } + // Some of the following actions can't be reverted (yet). If there's an + // error, we may be in a weird state. Thus, mark this peer as failed. + defer func() { + if reterr != nil { + _ = a.core.AdvanceState(state.Failed, nil, nil) + } + }() + + // AdvanceState MUST be called before any other functions that are not sanity checks or otherwise required + // This ensures the node is marked as initialzed before the node is in a state that allows code execution + // Any new additions to ActivateAsAdditionalCoordinator MUST come after + if err := a.core.AdvanceState(state.ActivatingNodes, in.OwnerId, in.ClusterId); err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + + // TODO: add KMS functions + + // add one coordinator to the VPN + if err := a.core.SetVPNIP(in.AssignedVpnIp); err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + + if err := a.core.AddPeerToVPN(peer.FromPubProto([]*pubproto.Peer{in.ActivatingCoordinatorData})[0]); err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + + // run the VPN-API server + if err := a.vpnAPIServer.Listen(net.JoinHostPort(in.AssignedVpnIp, vpnAPIPort)); err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + a.wgClose.Add(1) + go func() { + defer a.wgClose.Done() + if err := a.vpnAPIServer.Serve(); err != nil { + panic(err) + } + }() + + // TODO: kubernetes information and join + + // ATTENTION: STORE HAS TO BE EMPTY (NO OVERLAPPING KEYS) WHEN THIS FUNCTION IS CALLED + if err := a.core.SwitchToPersistentStore(); err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + a.logger.Info("Transition to persistent store successful") + + // regularly get (peer) updates from etcd + // start update before manual peer add to omit race conditions when multiple coordinator are activating nodes + + thisPeer, err := a.assemblePeerStruct(in.AssignedVpnIp, role.Coordinator) + if err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + if err := a.core.AddPeerToStore(thisPeer); err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + + resourceVersion, peers, err := a.core.GetPeers(0) + if err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + a.resourceVersion = resourceVersion + + err = a.core.UpdatePeers(peers) + if err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + // Manually trigger an update operation on all peers. + // This may be expendable in the future, depending on whether it's acceptable that it takes + // some seconds until the nodes get all peer data via their regular update requests. + _, peers, err = a.core.GetPeers(0) + if err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + a.logger.Info("", zap.Any("peers", peers)) + for _, p := range peers { + if p.Role == role.Node { + if err := a.triggerNodeUpdate(p.PublicIP); err != nil { + a.logger.Error("triggerNodeUpdate failed", zap.Error(err), zap.String("endpoint", p.PublicIP), zap.String("vpnip", p.VPNIP)) + } + } + if p.Role == role.Coordinator && p.VPNIP != thisPeer.VPNIP { + a.logger.Info("update coordinator", zap.String("coordinator vpnIP", p.VPNIP)) + if err := a.triggerCoordinatorUpdate(context.TODO(), p.PublicIP); err != nil { + a.logger.Error("triggerCoordinatorUpdate failed", zap.Error(err), zap.String("endpoint", p.PublicIP), zap.String("vpnip", p.VPNIP)) + } + } + } + + return &pubproto.ActivateAsAdditionalCoordinatorResponse{}, nil +} + +func (a *API) ActivateAdditionalCoordinator(ctx context.Context, in *pubproto.ActivateAdditionalCoordinatorRequest) (*pubproto.ActivateAdditionalCoordinatorResponse, error) { + ctx, cancel := context.WithTimeout(ctx, deadlineDuration) + defer cancel() + + if err := a.core.RequireState(state.ActivatingNodes); err != nil { + return nil, status.Errorf(codes.FailedPrecondition, "%v", err) + } + assignedVPNIP, err := a.core.GetNextCoordinatorIP() + if err != nil { + return nil, err + } + vpnIP, err := a.core.GetVPNIP() + if err != nil { + return nil, err + } + thisPeer, err := a.assemblePeerStruct(vpnIP, role.Coordinator) + if err != nil { + return nil, err + } + ownerID, clusterID, err := a.core.GetIDs(nil) + if err != nil { + return nil, err + } + + conn, err := a.dial(ctx, net.JoinHostPort(in.CoordinatorPublicIp, endpointAVPNPort)) + if err != nil { + return nil, err + } + defer conn.Close() + + client := pubproto.NewAPIClient(conn) + + _, err = client.ActivateAsAdditionalCoordinator(ctx, &pubproto.ActivateAsAdditionalCoordinatorRequest{ + AssignedVpnIp: assignedVPNIP, + ActivatingCoordinatorData: peer.ToPubProto([]peer.Peer{thisPeer})[0], + OwnerId: ownerID, + ClusterId: clusterID, + }) + if err != nil { + a.logger.Error("coordinator activation failed", zap.Error(err)) + return nil, err + } + + return &pubproto.ActivateAdditionalCoordinatorResponse{}, nil +} + +func (a *API) TriggerCoordinatorUpdate(ctx context.Context, in *pubproto.TriggerCoordinatorUpdateRequest) (*pubproto.TriggerCoordinatorUpdateResponse, error) { + if err := a.core.RequireState(state.ActivatingNodes); err != nil { + return nil, status.Errorf(codes.FailedPrecondition, "%v", err) + } + resourceVersion, peers, err := a.core.GetPeers(a.resourceVersion) + if err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + if resourceVersion == a.resourceVersion { + a.logger.Info("coordinator: ressource version identical, no need to update") + return &pubproto.TriggerCoordinatorUpdateResponse{}, nil + } + err = a.core.UpdatePeers(peers) + if err != nil { + return nil, status.Errorf(codes.Internal, "%v", err) + } + a.resourceVersion = resourceVersion + return &pubproto.TriggerCoordinatorUpdateResponse{}, nil +} + +func (a *API) triggerCoordinatorUpdate(ctx context.Context, publicIP string) error { + ctx, cancel := context.WithTimeout(ctx, deadlineDuration) + defer cancel() + + // We don't verify the peer certificate here, since TriggerNodeUpdate triggers a connection over VPN + // The target of the rpc needs to already be part of the VPN to process the request, meaning it is trusted + conn, err := a.dialNoVerify(ctx, net.JoinHostPort(publicIP, endpointAVPNPort)) + if err != nil { + return err + } + defer conn.Close() + + client := pubproto.NewAPIClient(conn) + _, err = client.TriggerCoordinatorUpdate(ctx, &pubproto.TriggerCoordinatorUpdateRequest{}) + + return err +} diff --git a/coordinator/pubapi/multicoord_test.go b/coordinator/pubapi/multicoord_test.go new file mode 100644 index 000000000..f8d832228 --- /dev/null +++ b/coordinator/pubapi/multicoord_test.go @@ -0,0 +1,225 @@ +package pubapi + +import ( + "context" + "errors" + "net" + "testing" + + "github.com/edgelesssys/constellation/coordinator/peer" + "github.com/edgelesssys/constellation/coordinator/pubapi/pubproto" + "github.com/edgelesssys/constellation/coordinator/role" + "github.com/edgelesssys/constellation/coordinator/state" + "github.com/edgelesssys/constellation/coordinator/util/testdialer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestActivateAsCoordinators(t *testing.T) { + coordinatorPubKey := []byte{6, 7, 8} + testCoord1 := stubPeer{peer: peer.Peer{PublicIP: "192.0.2.11", VPNPubKey: []byte{1, 2, 3}, VPNIP: "10.118.0.1", Role: role.Coordinator}} + + someErr := errors.New("some error") + testCases := map[string]struct { + coordinators stubPeer + state state.State + switchToPersistentStoreErr error + expectErr bool + expectedState state.State + }{ + "basic": { + coordinators: testCoord1, + state: state.AcceptingInit, + expectedState: state.ActivatingNodes, + }, + "already activated": { + state: state.ActivatingNodes, + expectErr: true, + expectedState: state.ActivatingNodes, + }, + "SwitchToPersistentStore error": { + coordinators: testCoord1, + state: state.AcceptingInit, + switchToPersistentStoreErr: someErr, + expectErr: true, + expectedState: state.Failed, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + core := &fakeCore{ + state: tc.state, + vpnPubKey: coordinatorPubKey, + switchToPersistentStoreErr: tc.switchToPersistentStoreErr, + kubeconfig: []byte("kubeconfig"), + ownerID: []byte("ownerID"), + clusterID: []byte("clusterID"), + } + dialer := testdialer.NewBufconnDialer() + + getPublicIPAddr := func() (string, error) { + return "192.0.2.1", nil + } + + api := New(zaptest.NewLogger(t), core, dialer, stubVPNAPIServer{}, fakeValidator{}, getPublicIPAddr) + defer api.Close() + + // spawn coordinator + server := tc.coordinators.newServer() + go server.Serve(dialer.GetListener(tc.coordinators.peer.PublicIP)) + defer server.GracefulStop() + + _, err := api.ActivateAsAdditionalCoordinator(context.Background(), &pubproto.ActivateAsAdditionalCoordinatorRequest{ + AssignedVpnIp: "10.118.0.2", + ActivatingCoordinatorData: peer.ToPubProto([]peer.Peer{tc.coordinators.peer})[0], + OwnerId: core.ownerID, + ClusterId: core.clusterID, + }) + + assert.Equal(tc.expectedState, core.state) + + if tc.expectErr { + assert.Error(err) + return + } + require.NoError(err) + }) + } +} + +func TestTriggerCoordinatorUpdate(t *testing.T) { + // someErr := errors.New("failed") + peers := []peer.Peer{ + {PublicIP: "192.0.2.11:2000", VPNIP: "192.0.2.21", VPNPubKey: []byte{1, 2, 3}}, + {PublicIP: "192.0.2.12:2000", VPNIP: "192.0.2.22", VPNPubKey: []byte{2, 3, 4}}, + } + + testCases := map[string]struct { + peers []peer.Peer + state state.State + getUpdateErr error + expectErr bool + }{ + "basic": { + peers: peers, + state: state.ActivatingNodes, + }, + "not activated": { + peers: peers, + state: state.AcceptingInit, + expectErr: true, + }, + "wrong peer kind": { + peers: peers, + state: state.IsNode, + expectErr: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + logger := zaptest.NewLogger(t) + core := &fakeCore{ + state: tc.state, + peers: tc.peers, + } + dialer := testdialer.NewBufconnDialer() + + api := New(logger, core, dialer, nil, nil, nil) + + _, err := api.TriggerCoordinatorUpdate(context.Background(), &pubproto.TriggerCoordinatorUpdateRequest{}) + if tc.expectErr { + assert.Error(err) + return + } + require.NoError(err) + + // second update should be a noop + _, err = api.TriggerCoordinatorUpdate(context.Background(), &pubproto.TriggerCoordinatorUpdateRequest{}) + require.NoError(err) + + require.Len(core.updatedPeers, 1) + assert.Equal(tc.peers, core.updatedPeers[0]) + }) + } +} + +func TestActivateAdditionalCoordinators(t *testing.T) { + someErr := errors.New("failed") + coordinatorPubKey := []byte{6, 7, 8} + testCoord1 := stubPeer{peer: peer.Peer{PublicIP: "192.0.2.11", VPNPubKey: []byte{1, 2, 3}, VPNIP: "10.118.0.1", Role: role.Coordinator}} + + testCases := map[string]struct { + coordinators stubPeer + state state.State + activateErr error + expectErr bool + expectedState state.State + }{ + "basic": { + coordinators: testCoord1, + state: state.ActivatingNodes, + expectedState: state.ActivatingNodes, + }, + "Activation Err": { + coordinators: testCoord1, + state: state.ActivatingNodes, + expectedState: state.ActivatingNodes, + activateErr: someErr, + expectErr: true, + }, + "Not in exprected state": { + coordinators: testCoord1, + state: state.AcceptingInit, + expectedState: state.AcceptingInit, + expectErr: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + core := &fakeCore{ + state: tc.state, + vpnPubKey: coordinatorPubKey, + kubeconfig: []byte("kubeconfig"), + ownerID: []byte("ownerID"), + clusterID: []byte("clusterID"), + } + dialer := testdialer.NewBufconnDialer() + + getPublicIPAddr := func() (string, error) { + return "192.0.2.1", nil + } + + api := New(zaptest.NewLogger(t), core, dialer, stubVPNAPIServer{}, fakeValidator{}, getPublicIPAddr) + defer api.Close() + + // spawn coordinator + tc.coordinators.activateErr = tc.activateErr + server := tc.coordinators.newServer() + go server.Serve(dialer.GetListener(net.JoinHostPort(tc.coordinators.peer.PublicIP, endpointAVPNPort))) + defer server.GracefulStop() + + _, err := api.ActivateAdditionalCoordinator(context.Background(), &pubproto.ActivateAdditionalCoordinatorRequest{CoordinatorPublicIp: tc.coordinators.peer.PublicIP}) + + assert.Equal(tc.expectedState, core.state) + + if tc.expectErr { + assert.Error(err) + return + } + require.NoError(err) + }) + } +}