From f5aafd81787f0b131f7d83b66ada6798b2f11bbb Mon Sep 17 00:00:00 2001 From: Malte Poll Date: Thu, 28 Apr 2022 10:14:40 +0200 Subject: [PATCH] Implement reinitialization of the coordinator after reboot Signed-off-by: Malte Poll --- coordinator/cmd/coordinator/run.go | 2 +- coordinator/core/core.go | 98 ++++----- coordinator/core/core_test.go | 37 +++- coordinator/core/reinitialize.go | 142 +++++++++++++ coordinator/core/reinitialize_test.go | 283 ++++++++++++++++++++++++++ coordinator/store/etcdstore.go | 2 +- 6 files changed, 504 insertions(+), 60 deletions(-) create mode 100644 coordinator/core/reinitialize.go create mode 100644 coordinator/core/reinitialize_test.go diff --git a/coordinator/cmd/coordinator/run.go b/coordinator/cmd/coordinator/run.go index 6d22b6d3a..ff131d03c 100644 --- a/coordinator/cmd/coordinator/run.go +++ b/coordinator/cmd/coordinator/run.go @@ -52,7 +52,7 @@ func run(issuer core.QuoteIssuer, vpn core.VPN, openTPM vtpm.TPMOpenFunc, getPub zapLoggerPubapi := zapLoggerCore.Named("pubapi") papi := pubapi.New(zapLoggerPubapi, core, dialer, vapiServer, getPublicIPAddr, pubapi.GetRecoveryPeerFromContext) // initialize state machine and wait for re-joining of the VPN (if applicable) - nodeActivated, err := core.Initialize() + nodeActivated, err := core.Initialize(context.TODO(), dialer, papi) if err != nil { zapLoggerCore.Fatal("failed to initialize core", zap.Error(err)) } diff --git a/coordinator/core/core.go b/coordinator/core/core.go index 299bb18f2..5d1302e2f 100644 --- a/coordinator/core/core.go +++ b/coordinator/core/core.go @@ -21,27 +21,29 @@ import ( "github.com/edgelesssys/constellation/coordinator/util" "github.com/edgelesssys/constellation/kms/kms" "go.uber.org/zap" + "google.golang.org/grpc" ) var coordinatorVPNIP = netip.AddrFrom4([4]byte{10, 118, 0, 1}) type Core struct { - state state.State - openTPM vtpm.TPMOpenFunc - mut sync.Mutex - store store.Store - vpn VPN - kube Cluster - metadata ProviderMetadata - cloudControllerManager CloudControllerManager - cloudNodeManager CloudNodeManager - clusterAutoscaler ClusterAutoscaler - encryptedDisk EncryptedDisk - kms kms.CloudKMS - zaplogger *zap.Logger - persistentStoreFactory PersistentStoreFactory - lastHeartbeats map[string]time.Time - fileHandler file.Handler + state state.State + openTPM vtpm.TPMOpenFunc + mut sync.Mutex + store store.Store + vpn VPN + kube Cluster + metadata ProviderMetadata + cloudControllerManager CloudControllerManager + cloudNodeManager CloudNodeManager + clusterAutoscaler ClusterAutoscaler + encryptedDisk EncryptedDisk + kms kms.CloudKMS + zaplogger *zap.Logger + persistentStoreFactory PersistentStoreFactory + initialVPNPeersRetriever initialVPNPeersRetriever + lastHeartbeats map[string]time.Time + fileHandler file.Handler } // NewCore creates and initializes a new Core object. @@ -51,29 +53,26 @@ func NewCore(vpn VPN, kube Cluster, ) (*Core, error) { stor := store.NewStdStore() c := &Core{ - openTPM: openTPM, - store: stor, - vpn: vpn, - kube: kube, - metadata: metadata, - cloudNodeManager: cloudNodeManager, - cloudControllerManager: cloudControllerManager, - clusterAutoscaler: clusterAutoscaler, - encryptedDisk: encryptedDisk, - zaplogger: zapLogger, - kms: nil, // KMS is set up during init phase - persistentStoreFactory: persistentStoreFactory, - lastHeartbeats: make(map[string]time.Time), - fileHandler: fileHandler, + openTPM: openTPM, + store: stor, + vpn: vpn, + kube: kube, + metadata: metadata, + cloudNodeManager: cloudNodeManager, + cloudControllerManager: cloudControllerManager, + clusterAutoscaler: clusterAutoscaler, + encryptedDisk: encryptedDisk, + zaplogger: zapLogger, + kms: nil, // KMS is set up during init phase + persistentStoreFactory: persistentStoreFactory, + initialVPNPeersRetriever: getInitialVPNPeers, + lastHeartbeats: make(map[string]time.Time), + fileHandler: fileHandler, } if err := c.data().IncrementPeersResourceVersion(); err != nil { return nil, err } - if err := vpn.Setup(nil); err != nil { - return nil, err - } - return c, nil } @@ -183,13 +182,16 @@ func (c *Core) NotifyNodeHeartbeat(addr net.Addr) { // Initialize initializes the state machine of the core and handles re-joining the VPN. // Blocks until the core is ready to be used. -func (c *Core) Initialize() (nodeActivated bool, err error) { +func (c *Core) Initialize(ctx context.Context, dialer Dialer, api PubAPI) (nodeActivated bool, err error) { nodeActivated, err = vtpm.IsNodeInitialized(c.openTPM) if err != nil { return false, fmt.Errorf("failed to check for previous activation using vTPM: %w", err) } if !nodeActivated { c.zaplogger.Info("Node was never activated. Allowing node to be activated.") + if err := c.vpn.Setup(nil); err != nil { + return false, fmt.Errorf("failed to setup VPN: %w", err) + } c.state.Advance(state.AcceptingInit) return false, nil } @@ -198,28 +200,25 @@ func (c *Core) Initialize() (nodeActivated bool, err error) { if err != nil { return false, fmt.Errorf("failed to read node state: %w", err) } + if err := c.vpn.Setup(nodeState.VPNPrivKey); err != nil { + return false, fmt.Errorf("failed to setup VPN: %w", err) + } var initialState state.State switch nodeState.Role { case role.Coordinator: initialState = state.ActivatingNodes + err = c.ReinitializeAsCoordinator(ctx, dialer, nodeState.VPNIP, api, retrieveInitialVPNPeersRetryBackoff) case role.Node: initialState = state.IsNode + err = c.ReinitializeAsNode(ctx, dialer, nodeState.VPNIP, api, retrieveInitialVPNPeersRetryBackoff) default: return false, fmt.Errorf("invalid node role for initialized node: %v", nodeState.Role) } - // TODO: if node was previously initialized, attempt to re-join wireguard here. - // Steps to rejoining should include: - // - retrieve list of coordinators from cloud provider API - // - attempt to retrieve list of wireguard public keys from any other coordinator while checking for correct PCRs in ATLS - // - re-establish wireguard connections - // - call update function successfully at least once - // - advance state to IsNode or ActivatingNodes respectively - // - restart update loop - // This procedure can be retried until it succeeds. - // The node must be put into the correct state before the update loop is started. - panic("not implemented") + if err != nil { + return false, fmt.Errorf("reinit failed: %w", err) + } + c.zaplogger.Info("Re-join successful.") - //nolint:govet // this code is unreachable as long as the above is unimplemented c.state.Advance(initialState) return nodeActivated, nil } @@ -303,3 +302,8 @@ func deriveOwnerID(masterSecret []byte) ([]byte, error) { // TODO: Choose a way to salt the key derivation return util.DeriveKey(masterSecret, []byte("Constellation"), []byte("id"), config.RNGLengthDefault) } + +// Dialer can open grpc client connections with different levels of ATLS encryption / verification. +type Dialer interface { + Dial(ctx context.Context, target string) (*grpc.ClientConn, error) +} diff --git a/coordinator/core/core_test.go b/coordinator/core/core_test.go index de900776a..5d48b68bb 100644 --- a/coordinator/core/core_test.go +++ b/coordinator/core/core_test.go @@ -9,14 +9,19 @@ import ( "github.com/edgelesssys/constellation/cli/file" "github.com/edgelesssys/constellation/coordinator/attestation/simulator" "github.com/edgelesssys/constellation/coordinator/attestation/vtpm" + "github.com/edgelesssys/constellation/coordinator/kms" "github.com/edgelesssys/constellation/coordinator/nodestate" + "github.com/edgelesssys/constellation/coordinator/peer" "github.com/edgelesssys/constellation/coordinator/role" "github.com/edgelesssys/constellation/coordinator/state" "github.com/edgelesssys/constellation/coordinator/store" + "github.com/edgelesssys/constellation/coordinator/util/grpcutil" + "github.com/edgelesssys/constellation/coordinator/util/testdialer" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" + "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -161,7 +166,6 @@ func TestInitialize(t *testing.T) { role role.Role wantActivated bool wantState state.State - wantPanic bool wantErr bool }{ "fresh node": { @@ -171,7 +175,6 @@ func TestInitialize(t *testing.T) { initializePCRs: true, writeNodeState: true, role: role.Coordinator, - wantPanic: true, // TODO: adapt test case once restart is implemented wantActivated: true, wantState: state.ActivatingNodes, }, @@ -179,7 +182,6 @@ func TestInitialize(t *testing.T) { initializePCRs: true, writeNodeState: true, role: role.Node, - wantPanic: true, // TODO: adapt test case once restart is implemented wantActivated: true, wantState: state.IsNode, }, @@ -207,16 +209,15 @@ func TestInitialize(t *testing.T) { VPNPrivKey: []byte{0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7}, }).ToFile(fileHandler)) } - - core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, nil, fileHandler) + core, err := NewCore(&stubVPN{}, nil, &ProviderMetadataFake{}, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, &fakeStoreFactory{}, fileHandler) require.NoError(err) + core.initialVPNPeersRetriever = fakeInitializeVPNPeersRetriever + // prepare store to emulate initialized KMS + require.NoError(core.data().PutKMSData(kms.KMSInformation{StorageUri: kms.NoStoreURI, KmsUri: kms.ClusterKMSURI})) + require.NoError(core.data().PutMasterSecret([]byte("master-secret"))) + dialer := grpcutil.NewDialer(&MockValidator{}, testdialer.NewBufconnDialer()) - if tc.wantPanic { - assert.Panics(func() { _, _ = core.Initialize() }) - return - } - - nodeActivated, err := core.Initialize() + nodeActivated, err := core.Initialize(context.Background(), dialer, &stubPubAPI{}) if tc.wantErr { assert.Error(err) return @@ -316,3 +317,17 @@ func (k *fakeKMS) GetDEK(ctx context.Context, kekID, keyID string, length int) ( } return k.dek, nil } + +type stubPubAPI struct { + startVPNAPIErr error +} + +func (p *stubPubAPI) StartVPNAPIServer(vpnIP string) error { + return p.startVPNAPIErr +} + +func (p *stubPubAPI) StartUpdateLoop() {} + +func fakeInitializeVPNPeersRetriever(ctx context.Context, dialer Dialer, logger *zap.Logger, metadata ProviderMetadata, ownCoordinatorEndpoint *string) ([]peer.Peer, error) { + return []peer.Peer{}, nil +} diff --git a/coordinator/core/reinitialize.go b/coordinator/core/reinitialize.go new file mode 100644 index 000000000..3b663c47d --- /dev/null +++ b/coordinator/core/reinitialize.go @@ -0,0 +1,142 @@ +package core + +import ( + "context" + "fmt" + "math/rand" + "net" + "time" + + "github.com/edgelesssys/constellation/coordinator/peer" + "github.com/edgelesssys/constellation/coordinator/pubapi/pubproto" + "go.uber.org/zap" +) + +const ( + callTimeout = 20 * time.Second + retrieveInitialVPNPeersRetryBackoff = 60 * time.Second +) + +// ReinitializeAsCoordinator re-initializes a coordinator. +func (c *Core) ReinitializeAsCoordinator(ctx context.Context, dialer Dialer, vpnIP string, api PubAPI, retryBackoff time.Duration) error { + if err := c.SetVPNIP(vpnIP); err != nil { + return fmt.Errorf("set vpn IP address: %v", err) + } + + // TODO: implement (manual) recovery endpoint in cases where no other coordinators are available + // or when etcd quorum is lost (when leader election fails) + + ownCoordinatorEndpoint := net.JoinHostPort(vpnIP, coordinatorPort) + // try to find active coordinator to add as initial VPN peer + // retry until coordinator is found + var ( + initialVPNPeers []peer.Peer + err error + ) + for { + initialVPNPeers, err = c.initialVPNPeersRetriever(ctx, dialer, c.zaplogger, c.metadata, &ownCoordinatorEndpoint) + if err == nil { + break + } + time.Sleep(retryBackoff) + } + + // add initial peers to the VPN + if err := c.UpdatePeers(initialVPNPeers); err != nil { + return fmt.Errorf("adding initial peers to vpn: %v", err) + } + + // run the VPN-API server + if err := api.StartVPNAPIServer(vpnIP); err != nil { + return fmt.Errorf("start vpnAPIServer: %v", err) + } + + // ATTENTION: STORE HAS TO BE EMPTY (NO OVERLAPPING KEYS) WHEN THIS FUNCTION IS CALLED + if err := c.SwitchToPersistentStore(); err != nil { + return fmt.Errorf("switch to persistent store: %v", err) + } + c.zaplogger.Info("Transition to persistent store successful") + + kmsData, err := c.GetKMSInfo() + if err != nil { + return fmt.Errorf("get kms info: %v", err) + } + if err := c.SetUpKMS(ctx, kmsData.StorageUri, kmsData.KmsUri, kmsData.KeyEncryptionKeyID, false); err != nil { + return fmt.Errorf("setup kms: %v", err) + } + return nil +} + +// ReinitializeAsNode re-initializes a node. +func (c *Core) ReinitializeAsNode(ctx context.Context, dialer Dialer, vpnIP string, api PubAPI, retryBackoff time.Duration) error { + if err := c.SetVPNIP(vpnIP); err != nil { + return fmt.Errorf("set vpn IP address: %v", err) + } + + // try to find active coordinator to add as initial VPN peer + // retry until coordinator is found + var ( + initialVPNPeers []peer.Peer + err error + ) + for { + initialVPNPeers, err = c.initialVPNPeersRetriever(ctx, dialer, c.zaplogger, c.metadata, nil) + if err == nil { + break + } + time.Sleep(retryBackoff) + } + + // add initial peers to the VPN + if err := c.UpdatePeers(initialVPNPeers); err != nil { + return fmt.Errorf("adding initial peers to vpn: %v", err) + } + + api.StartUpdateLoop() + return nil +} + +func getInitialVPNPeers(ctx context.Context, dialer Dialer, logger *zap.Logger, metadata ProviderMetadata, ownCoordinatorEndpoint *string) ([]peer.Peer, error) { + coordinatorEndpoints, err := CoordinatorEndpoints(ctx, metadata) + if err != nil { + return nil, fmt.Errorf("get coordinator endpoints: %v", err) + } + // shuffle endpoints using PRNG. While this is not a cryptographically secure random seed, + // it is good enough for loadbalancing. + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(coordinatorEndpoints), func(i, j int) { + coordinatorEndpoints[i], coordinatorEndpoints[j] = coordinatorEndpoints[j], coordinatorEndpoints[i] + }) + + // try to find active coordinator to retrieve peers + for _, coordinatorEndpoint := range coordinatorEndpoints { + if ownCoordinatorEndpoint != nil && coordinatorEndpoint == *ownCoordinatorEndpoint { + continue + } + callCTX, cancel := context.WithTimeout(ctx, callTimeout) + defer cancel() + conn, err := dialer.Dial(callCTX, coordinatorEndpoint) + if err != nil { + logger.Warn("getting VPN peer information from coordinator failed: dialing failed: ", zap.String("endpoint", coordinatorEndpoint), zap.Error(err)) + continue + } + defer conn.Close() + client := pubproto.NewAPIClient(conn) + resp, err := client.GetVPNPeers(ctx, &pubproto.GetVPNPeersRequest{}) + if err != nil { + logger.Warn("getting VPN peer information from coordinator failed: request failed: ", zap.String("endpoint", coordinatorEndpoint), zap.Error(err)) + continue + } + return peer.FromPubProto(resp.Peers), nil + } + + return nil, fmt.Errorf("no active coordinator found. tried %v", coordinatorEndpoints) +} + +// PubAPI is the interface for the public API of the coordinator. +type PubAPI interface { + StartVPNAPIServer(vpnIP string) error + StartUpdateLoop() +} + +type initialVPNPeersRetriever func(ctx context.Context, dialer Dialer, logger *zap.Logger, metadata ProviderMetadata, ownCoordinatorEndpoint *string) ([]peer.Peer, error) diff --git a/coordinator/core/reinitialize_test.go b/coordinator/core/reinitialize_test.go new file mode 100644 index 000000000..557578d34 --- /dev/null +++ b/coordinator/core/reinitialize_test.go @@ -0,0 +1,283 @@ +package core + +import ( + "context" + "errors" + "testing" + + "github.com/edgelesssys/constellation/cli/file" + "github.com/edgelesssys/constellation/coordinator/atls" + "github.com/edgelesssys/constellation/coordinator/kms" + "github.com/edgelesssys/constellation/coordinator/peer" + "github.com/edgelesssys/constellation/coordinator/pubapi/pubproto" + "github.com/edgelesssys/constellation/coordinator/role" + "github.com/edgelesssys/constellation/coordinator/util/grpcutil" + "github.com/edgelesssys/constellation/coordinator/util/testdialer" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/protobuf/proto" +) + +func TestReinitializeAsNode(t *testing.T) { + testPeers := []peer.Peer{ + { + PublicIP: "192.0.2.1", + VPNIP: "198.51.100.1", + VPNPubKey: []byte{0x1, 0x2, 0x3}, + Role: role.Coordinator, + }, + } + wantedVPNPeers := []stubVPNPeer{ + { + publicIP: "192.0.2.1", + vpnIP: "198.51.100.1", + pubKey: []byte{0x1, 0x2, 0x3}, + }, + } + vpnIP := "198.51.100.2" + + testCases := map[string]struct { + getInitialVPNPeersResponses []struct { + peers []peer.Peer + err error + } + wantErr bool + }{ + "reinitialize as node works": { + getInitialVPNPeersResponses: []struct { + peers []peer.Peer + err error + }{{peers: testPeers}}, + }, + "reinitialize as node will retry until vpn peers are retrieved": { + getInitialVPNPeersResponses: []struct { + peers []peer.Peer + err error + }{ + {err: errors.New("retrieving vpn peers failed")}, + {peers: testPeers}, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + coordinators := []Instance{{IPs: []string{"192.0.2.1"}, Role: role.Coordinator}} + netDialer := testdialer.NewBufconnDialer() + dialer := grpcutil.NewDialer(&MockValidator{}, netDialer) + server := newPubAPIServer() + api := &pubAPIServerStub{responses: tc.getInitialVPNPeersResponses} + pubproto.RegisterAPIServer(server, api) + go server.Serve(netDialer.GetListener("192.0.2.1:9000")) + defer server.Stop() + vpn := &stubVPN{} + core, err := NewCore(vpn, nil, &stubMetadata{listRes: coordinators, supportedRes: true}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil, file.NewHandler(afero.NewMemMapFs())) + require.NoError(err) + err = core.ReinitializeAsNode(context.Background(), dialer, vpnIP, &stubPubAPI{}, 0) + + if tc.wantErr { + assert.Error(err) + return + } + require.NoError(err) + + assert.Equal(vpnIP, vpn.interfaceIP) + assert.Equal(wantedVPNPeers, vpn.peers) + }) + } +} + +func TestReinitializeAsCoordinator(t *testing.T) { + testPeers := []peer.Peer{ + { + PublicIP: "192.0.2.1", + VPNIP: "198.51.100.1", + VPNPubKey: []byte{0x1, 0x2, 0x3}, + Role: role.Coordinator, + }, + } + wantedVPNPeers := []stubVPNPeer{ + { + publicIP: "192.0.2.1", + vpnIP: "198.51.100.1", + pubKey: []byte{0x1, 0x2, 0x3}, + }, + } + vpnIP := "198.51.100.2" + + testCases := map[string]struct { + getInitialVPNPeersResponses []struct { + peers []peer.Peer + err error + } + wantErr bool + }{ + "reinitialize as coordinator works": { + getInitialVPNPeersResponses: []struct { + peers []peer.Peer + err error + }{{peers: testPeers}}, + }, + "reinitialize as coordinator will retry until vpn peers are retrieved": { + getInitialVPNPeersResponses: []struct { + peers []peer.Peer + err error + }{ + {err: errors.New("retrieving vpn peers failed")}, + {peers: testPeers}, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + coordinators := []Instance{{IPs: []string{"192.0.2.1"}, Role: role.Coordinator}} + netDialer := testdialer.NewBufconnDialer() + dialer := grpcutil.NewDialer(&MockValidator{}, netDialer) + server := newPubAPIServer() + api := &pubAPIServerStub{responses: tc.getInitialVPNPeersResponses} + pubproto.RegisterAPIServer(server, api) + go server.Serve(netDialer.GetListener("192.0.2.1:9000")) + defer server.Stop() + vpn := &stubVPN{} + core, err := NewCore(vpn, nil, &stubMetadata{listRes: coordinators, supportedRes: true}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, &fakeStoreFactory{}, file.NewHandler(afero.NewMemMapFs())) + require.NoError(err) + // prepare store to emulate initialized KMS + require.NoError(core.data().PutKMSData(kms.KMSInformation{StorageUri: kms.NoStoreURI, KmsUri: kms.ClusterKMSURI})) + require.NoError(core.data().PutMasterSecret([]byte("master-secret"))) + err = core.ReinitializeAsCoordinator(context.Background(), dialer, vpnIP, &stubPubAPI{}, 0) + + if tc.wantErr { + assert.Error(err) + return + } + require.NoError(err) + + assert.Equal(vpnIP, vpn.interfaceIP) + assert.Equal(wantedVPNPeers, vpn.peers) + }) + } +} + +func TestGetInitialVPNPeers(t *testing.T) { + testPeers := []peer.Peer{ + { + PublicIP: "192.0.2.1", + VPNIP: "198.51.100.1", + VPNPubKey: []byte{0x1, 0x2, 0x3}, + Role: role.Coordinator, + }, + } + + testCases := map[string]struct { + ownCoordinatorEndpoint *string + coordinatorIPs []string + metadataErr error + peers []peer.Peer + getVPNPeersErr error + wantErr bool + }{ + "getInitialVPNPeers works from worker node": { + coordinatorIPs: []string{"192.0.2.1"}, + peers: testPeers, + }, + "getInitialVPNPeers works from coordinator": { + ownCoordinatorEndpoint: proto.String("192.0.2.2:9000"), + coordinatorIPs: []string{"192.0.2.1", "192.0.2.2"}, + peers: testPeers, + }, + "getInitialVPNPeers filters itself": { + ownCoordinatorEndpoint: proto.String("192.0.2.2:9000"), + coordinatorIPs: []string{"192.0.2.2"}, + wantErr: true, + }, + "getInitialVPNPeers fails if no coordinators are found": { + wantErr: true, + }, + "getInitialVPNPeers fails if metadata API fails to retrieve coordinators": { + metadataErr: errors.New("metadata error"), + wantErr: true, + }, + "getInitialVPNPeers fails if rpc call fails": { + coordinatorIPs: []string{"192.0.2.1"}, + getVPNPeersErr: errors.New("rpc error"), + wantErr: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + coordinators := func(ips []string) []Instance { + instances := []Instance{} + for _, ip := range ips { + instances = append(instances, Instance{IPs: []string{ip}, Role: role.Coordinator}) + } + return instances + }(tc.coordinatorIPs) + zapLogger, err := zap.NewDevelopment() + require.NoError(err) + netDialer := testdialer.NewBufconnDialer() + dialer := grpcutil.NewDialer(&MockValidator{}, netDialer) + server := newPubAPIServer() + api := &pubAPIServerStub{ + responses: []struct { + peers []peer.Peer + err error + }{{peers: tc.peers, err: tc.getVPNPeersErr}}, + } + pubproto.RegisterAPIServer(server, api) + go server.Serve(netDialer.GetListener("192.0.2.1:9000")) + defer server.Stop() + peers, err := getInitialVPNPeers(context.Background(), dialer, zapLogger, &stubMetadata{listRes: coordinators, listErr: tc.metadataErr, supportedRes: true}, tc.ownCoordinatorEndpoint) + + if tc.wantErr { + assert.Error(err) + return + } + require.NoError(err) + assert.Equal(tc.peers, peers) + }) + } +} + +func newPubAPIServer() *grpc.Server { + tlsConfig, err := atls.CreateAttestationServerTLSConfig(&MockIssuer{}) + if err != nil { + panic(err) + } + return grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig))) +} + +type pubAPIServerStub struct { + responses []struct { + peers []peer.Peer + err error + } + i int + pubproto.UnimplementedAPIServer +} + +func (s *pubAPIServerStub) GetVPNPeers(ctx context.Context, in *pubproto.GetVPNPeersRequest) (*pubproto.GetVPNPeersResponse, error) { + if len(s.responses) == 0 { + return nil, nil + } + resp := s.responses[s.i] + s.i = (s.i + 1) % len(s.responses) + return &pubproto.GetVPNPeersResponse{ + Peers: peer.ToPubProto(resp.peers), + }, resp.err +} diff --git a/coordinator/store/etcdstore.go b/coordinator/store/etcdstore.go index 4e1c4d36e..5eeeadb37 100644 --- a/coordinator/store/etcdstore.go +++ b/coordinator/store/etcdstore.go @@ -22,7 +22,7 @@ const ( keyFilepath = "/etc/kubernetes/pki/etcd/peer.key" caCertFilepath = "/etc/kubernetes/pki/etcd/server.crt" etcdPrefix = "constellationRegion" - dialTimeout = 10 * time.Second + dialTimeout = 60 * time.Second ) type EtcdStore struct {