Implement reinitialization of the coordinator after reboot

Signed-off-by: Malte Poll <mp@edgeless.systems>
This commit is contained in:
Malte Poll 2022-04-28 10:14:40 +02:00 committed by Malte Poll
parent ffb471d023
commit f5aafd8178
6 changed files with 504 additions and 60 deletions

View File

@ -52,7 +52,7 @@ func run(issuer core.QuoteIssuer, vpn core.VPN, openTPM vtpm.TPMOpenFunc, getPub
zapLoggerPubapi := zapLoggerCore.Named("pubapi") zapLoggerPubapi := zapLoggerCore.Named("pubapi")
papi := pubapi.New(zapLoggerPubapi, core, dialer, vapiServer, getPublicIPAddr, pubapi.GetRecoveryPeerFromContext) papi := pubapi.New(zapLoggerPubapi, core, dialer, vapiServer, getPublicIPAddr, pubapi.GetRecoveryPeerFromContext)
// initialize state machine and wait for re-joining of the VPN (if applicable) // initialize state machine and wait for re-joining of the VPN (if applicable)
nodeActivated, err := core.Initialize() nodeActivated, err := core.Initialize(context.TODO(), dialer, papi)
if err != nil { if err != nil {
zapLoggerCore.Fatal("failed to initialize core", zap.Error(err)) zapLoggerCore.Fatal("failed to initialize core", zap.Error(err))
} }

View File

@ -21,27 +21,29 @@ import (
"github.com/edgelesssys/constellation/coordinator/util" "github.com/edgelesssys/constellation/coordinator/util"
"github.com/edgelesssys/constellation/kms/kms" "github.com/edgelesssys/constellation/kms/kms"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc"
) )
var coordinatorVPNIP = netip.AddrFrom4([4]byte{10, 118, 0, 1}) var coordinatorVPNIP = netip.AddrFrom4([4]byte{10, 118, 0, 1})
type Core struct { type Core struct {
state state.State state state.State
openTPM vtpm.TPMOpenFunc openTPM vtpm.TPMOpenFunc
mut sync.Mutex mut sync.Mutex
store store.Store store store.Store
vpn VPN vpn VPN
kube Cluster kube Cluster
metadata ProviderMetadata metadata ProviderMetadata
cloudControllerManager CloudControllerManager cloudControllerManager CloudControllerManager
cloudNodeManager CloudNodeManager cloudNodeManager CloudNodeManager
clusterAutoscaler ClusterAutoscaler clusterAutoscaler ClusterAutoscaler
encryptedDisk EncryptedDisk encryptedDisk EncryptedDisk
kms kms.CloudKMS kms kms.CloudKMS
zaplogger *zap.Logger zaplogger *zap.Logger
persistentStoreFactory PersistentStoreFactory persistentStoreFactory PersistentStoreFactory
lastHeartbeats map[string]time.Time initialVPNPeersRetriever initialVPNPeersRetriever
fileHandler file.Handler lastHeartbeats map[string]time.Time
fileHandler file.Handler
} }
// NewCore creates and initializes a new Core object. // NewCore creates and initializes a new Core object.
@ -51,29 +53,26 @@ func NewCore(vpn VPN, kube Cluster,
) (*Core, error) { ) (*Core, error) {
stor := store.NewStdStore() stor := store.NewStdStore()
c := &Core{ c := &Core{
openTPM: openTPM, openTPM: openTPM,
store: stor, store: stor,
vpn: vpn, vpn: vpn,
kube: kube, kube: kube,
metadata: metadata, metadata: metadata,
cloudNodeManager: cloudNodeManager, cloudNodeManager: cloudNodeManager,
cloudControllerManager: cloudControllerManager, cloudControllerManager: cloudControllerManager,
clusterAutoscaler: clusterAutoscaler, clusterAutoscaler: clusterAutoscaler,
encryptedDisk: encryptedDisk, encryptedDisk: encryptedDisk,
zaplogger: zapLogger, zaplogger: zapLogger,
kms: nil, // KMS is set up during init phase kms: nil, // KMS is set up during init phase
persistentStoreFactory: persistentStoreFactory, persistentStoreFactory: persistentStoreFactory,
lastHeartbeats: make(map[string]time.Time), initialVPNPeersRetriever: getInitialVPNPeers,
fileHandler: fileHandler, lastHeartbeats: make(map[string]time.Time),
fileHandler: fileHandler,
} }
if err := c.data().IncrementPeersResourceVersion(); err != nil { if err := c.data().IncrementPeersResourceVersion(); err != nil {
return nil, err return nil, err
} }
if err := vpn.Setup(nil); err != nil {
return nil, err
}
return c, nil return c, nil
} }
@ -183,13 +182,16 @@ func (c *Core) NotifyNodeHeartbeat(addr net.Addr) {
// Initialize initializes the state machine of the core and handles re-joining the VPN. // Initialize initializes the state machine of the core and handles re-joining the VPN.
// Blocks until the core is ready to be used. // Blocks until the core is ready to be used.
func (c *Core) Initialize() (nodeActivated bool, err error) { func (c *Core) Initialize(ctx context.Context, dialer Dialer, api PubAPI) (nodeActivated bool, err error) {
nodeActivated, err = vtpm.IsNodeInitialized(c.openTPM) nodeActivated, err = vtpm.IsNodeInitialized(c.openTPM)
if err != nil { if err != nil {
return false, fmt.Errorf("failed to check for previous activation using vTPM: %w", err) return false, fmt.Errorf("failed to check for previous activation using vTPM: %w", err)
} }
if !nodeActivated { if !nodeActivated {
c.zaplogger.Info("Node was never activated. Allowing node to be activated.") c.zaplogger.Info("Node was never activated. Allowing node to be activated.")
if err := c.vpn.Setup(nil); err != nil {
return false, fmt.Errorf("failed to setup VPN: %w", err)
}
c.state.Advance(state.AcceptingInit) c.state.Advance(state.AcceptingInit)
return false, nil return false, nil
} }
@ -198,28 +200,25 @@ func (c *Core) Initialize() (nodeActivated bool, err error) {
if err != nil { if err != nil {
return false, fmt.Errorf("failed to read node state: %w", err) return false, fmt.Errorf("failed to read node state: %w", err)
} }
if err := c.vpn.Setup(nodeState.VPNPrivKey); err != nil {
return false, fmt.Errorf("failed to setup VPN: %w", err)
}
var initialState state.State var initialState state.State
switch nodeState.Role { switch nodeState.Role {
case role.Coordinator: case role.Coordinator:
initialState = state.ActivatingNodes initialState = state.ActivatingNodes
err = c.ReinitializeAsCoordinator(ctx, dialer, nodeState.VPNIP, api, retrieveInitialVPNPeersRetryBackoff)
case role.Node: case role.Node:
initialState = state.IsNode initialState = state.IsNode
err = c.ReinitializeAsNode(ctx, dialer, nodeState.VPNIP, api, retrieveInitialVPNPeersRetryBackoff)
default: default:
return false, fmt.Errorf("invalid node role for initialized node: %v", nodeState.Role) return false, fmt.Errorf("invalid node role for initialized node: %v", nodeState.Role)
} }
// TODO: if node was previously initialized, attempt to re-join wireguard here. if err != nil {
// Steps to rejoining should include: return false, fmt.Errorf("reinit failed: %w", err)
// - retrieve list of coordinators from cloud provider API }
// - attempt to retrieve list of wireguard public keys from any other coordinator while checking for correct PCRs in ATLS c.zaplogger.Info("Re-join successful.")
// - re-establish wireguard connections
// - call update function successfully at least once
// - advance state to IsNode or ActivatingNodes respectively
// - restart update loop
// This procedure can be retried until it succeeds.
// The node must be put into the correct state before the update loop is started.
panic("not implemented")
//nolint:govet // this code is unreachable as long as the above is unimplemented
c.state.Advance(initialState) c.state.Advance(initialState)
return nodeActivated, nil return nodeActivated, nil
} }
@ -303,3 +302,8 @@ func deriveOwnerID(masterSecret []byte) ([]byte, error) {
// TODO: Choose a way to salt the key derivation // TODO: Choose a way to salt the key derivation
return util.DeriveKey(masterSecret, []byte("Constellation"), []byte("id"), config.RNGLengthDefault) return util.DeriveKey(masterSecret, []byte("Constellation"), []byte("id"), config.RNGLengthDefault)
} }
// Dialer can open grpc client connections with different levels of ATLS encryption / verification.
type Dialer interface {
Dial(ctx context.Context, target string) (*grpc.ClientConn, error)
}

View File

@ -9,14 +9,19 @@ import (
"github.com/edgelesssys/constellation/cli/file" "github.com/edgelesssys/constellation/cli/file"
"github.com/edgelesssys/constellation/coordinator/attestation/simulator" "github.com/edgelesssys/constellation/coordinator/attestation/simulator"
"github.com/edgelesssys/constellation/coordinator/attestation/vtpm" "github.com/edgelesssys/constellation/coordinator/attestation/vtpm"
"github.com/edgelesssys/constellation/coordinator/kms"
"github.com/edgelesssys/constellation/coordinator/nodestate" "github.com/edgelesssys/constellation/coordinator/nodestate"
"github.com/edgelesssys/constellation/coordinator/peer"
"github.com/edgelesssys/constellation/coordinator/role" "github.com/edgelesssys/constellation/coordinator/role"
"github.com/edgelesssys/constellation/coordinator/state" "github.com/edgelesssys/constellation/coordinator/state"
"github.com/edgelesssys/constellation/coordinator/store" "github.com/edgelesssys/constellation/coordinator/store"
"github.com/edgelesssys/constellation/coordinator/util/grpcutil"
"github.com/edgelesssys/constellation/coordinator/util/testdialer"
"github.com/spf13/afero" "github.com/spf13/afero"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/goleak" "go.uber.org/goleak"
"go.uber.org/zap"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -161,7 +166,6 @@ func TestInitialize(t *testing.T) {
role role.Role role role.Role
wantActivated bool wantActivated bool
wantState state.State wantState state.State
wantPanic bool
wantErr bool wantErr bool
}{ }{
"fresh node": { "fresh node": {
@ -171,7 +175,6 @@ func TestInitialize(t *testing.T) {
initializePCRs: true, initializePCRs: true,
writeNodeState: true, writeNodeState: true,
role: role.Coordinator, role: role.Coordinator,
wantPanic: true, // TODO: adapt test case once restart is implemented
wantActivated: true, wantActivated: true,
wantState: state.ActivatingNodes, wantState: state.ActivatingNodes,
}, },
@ -179,7 +182,6 @@ func TestInitialize(t *testing.T) {
initializePCRs: true, initializePCRs: true,
writeNodeState: true, writeNodeState: true,
role: role.Node, role: role.Node,
wantPanic: true, // TODO: adapt test case once restart is implemented
wantActivated: true, wantActivated: true,
wantState: state.IsNode, wantState: state.IsNode,
}, },
@ -207,16 +209,15 @@ func TestInitialize(t *testing.T) {
VPNPrivKey: []byte{0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7}, VPNPrivKey: []byte{0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7},
}).ToFile(fileHandler)) }).ToFile(fileHandler))
} }
core, err := NewCore(&stubVPN{}, nil, &ProviderMetadataFake{}, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, &fakeStoreFactory{}, fileHandler)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, nil, fileHandler)
require.NoError(err) require.NoError(err)
core.initialVPNPeersRetriever = fakeInitializeVPNPeersRetriever
// prepare store to emulate initialized KMS
require.NoError(core.data().PutKMSData(kms.KMSInformation{StorageUri: kms.NoStoreURI, KmsUri: kms.ClusterKMSURI}))
require.NoError(core.data().PutMasterSecret([]byte("master-secret")))
dialer := grpcutil.NewDialer(&MockValidator{}, testdialer.NewBufconnDialer())
if tc.wantPanic { nodeActivated, err := core.Initialize(context.Background(), dialer, &stubPubAPI{})
assert.Panics(func() { _, _ = core.Initialize() })
return
}
nodeActivated, err := core.Initialize()
if tc.wantErr { if tc.wantErr {
assert.Error(err) assert.Error(err)
return return
@ -316,3 +317,17 @@ func (k *fakeKMS) GetDEK(ctx context.Context, kekID, keyID string, length int) (
} }
return k.dek, nil return k.dek, nil
} }
type stubPubAPI struct {
startVPNAPIErr error
}
func (p *stubPubAPI) StartVPNAPIServer(vpnIP string) error {
return p.startVPNAPIErr
}
func (p *stubPubAPI) StartUpdateLoop() {}
func fakeInitializeVPNPeersRetriever(ctx context.Context, dialer Dialer, logger *zap.Logger, metadata ProviderMetadata, ownCoordinatorEndpoint *string) ([]peer.Peer, error) {
return []peer.Peer{}, nil
}

View File

@ -0,0 +1,142 @@
package core
import (
"context"
"fmt"
"math/rand"
"net"
"time"
"github.com/edgelesssys/constellation/coordinator/peer"
"github.com/edgelesssys/constellation/coordinator/pubapi/pubproto"
"go.uber.org/zap"
)
const (
callTimeout = 20 * time.Second
retrieveInitialVPNPeersRetryBackoff = 60 * time.Second
)
// ReinitializeAsCoordinator re-initializes a coordinator.
func (c *Core) ReinitializeAsCoordinator(ctx context.Context, dialer Dialer, vpnIP string, api PubAPI, retryBackoff time.Duration) error {
if err := c.SetVPNIP(vpnIP); err != nil {
return fmt.Errorf("set vpn IP address: %v", err)
}
// TODO: implement (manual) recovery endpoint in cases where no other coordinators are available
// or when etcd quorum is lost (when leader election fails)
ownCoordinatorEndpoint := net.JoinHostPort(vpnIP, coordinatorPort)
// try to find active coordinator to add as initial VPN peer
// retry until coordinator is found
var (
initialVPNPeers []peer.Peer
err error
)
for {
initialVPNPeers, err = c.initialVPNPeersRetriever(ctx, dialer, c.zaplogger, c.metadata, &ownCoordinatorEndpoint)
if err == nil {
break
}
time.Sleep(retryBackoff)
}
// add initial peers to the VPN
if err := c.UpdatePeers(initialVPNPeers); err != nil {
return fmt.Errorf("adding initial peers to vpn: %v", err)
}
// run the VPN-API server
if err := api.StartVPNAPIServer(vpnIP); err != nil {
return fmt.Errorf("start vpnAPIServer: %v", err)
}
// ATTENTION: STORE HAS TO BE EMPTY (NO OVERLAPPING KEYS) WHEN THIS FUNCTION IS CALLED
if err := c.SwitchToPersistentStore(); err != nil {
return fmt.Errorf("switch to persistent store: %v", err)
}
c.zaplogger.Info("Transition to persistent store successful")
kmsData, err := c.GetKMSInfo()
if err != nil {
return fmt.Errorf("get kms info: %v", err)
}
if err := c.SetUpKMS(ctx, kmsData.StorageUri, kmsData.KmsUri, kmsData.KeyEncryptionKeyID, false); err != nil {
return fmt.Errorf("setup kms: %v", err)
}
return nil
}
// ReinitializeAsNode re-initializes a node.
func (c *Core) ReinitializeAsNode(ctx context.Context, dialer Dialer, vpnIP string, api PubAPI, retryBackoff time.Duration) error {
if err := c.SetVPNIP(vpnIP); err != nil {
return fmt.Errorf("set vpn IP address: %v", err)
}
// try to find active coordinator to add as initial VPN peer
// retry until coordinator is found
var (
initialVPNPeers []peer.Peer
err error
)
for {
initialVPNPeers, err = c.initialVPNPeersRetriever(ctx, dialer, c.zaplogger, c.metadata, nil)
if err == nil {
break
}
time.Sleep(retryBackoff)
}
// add initial peers to the VPN
if err := c.UpdatePeers(initialVPNPeers); err != nil {
return fmt.Errorf("adding initial peers to vpn: %v", err)
}
api.StartUpdateLoop()
return nil
}
func getInitialVPNPeers(ctx context.Context, dialer Dialer, logger *zap.Logger, metadata ProviderMetadata, ownCoordinatorEndpoint *string) ([]peer.Peer, error) {
coordinatorEndpoints, err := CoordinatorEndpoints(ctx, metadata)
if err != nil {
return nil, fmt.Errorf("get coordinator endpoints: %v", err)
}
// shuffle endpoints using PRNG. While this is not a cryptographically secure random seed,
// it is good enough for loadbalancing.
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(coordinatorEndpoints), func(i, j int) {
coordinatorEndpoints[i], coordinatorEndpoints[j] = coordinatorEndpoints[j], coordinatorEndpoints[i]
})
// try to find active coordinator to retrieve peers
for _, coordinatorEndpoint := range coordinatorEndpoints {
if ownCoordinatorEndpoint != nil && coordinatorEndpoint == *ownCoordinatorEndpoint {
continue
}
callCTX, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()
conn, err := dialer.Dial(callCTX, coordinatorEndpoint)
if err != nil {
logger.Warn("getting VPN peer information from coordinator failed: dialing failed: ", zap.String("endpoint", coordinatorEndpoint), zap.Error(err))
continue
}
defer conn.Close()
client := pubproto.NewAPIClient(conn)
resp, err := client.GetVPNPeers(ctx, &pubproto.GetVPNPeersRequest{})
if err != nil {
logger.Warn("getting VPN peer information from coordinator failed: request failed: ", zap.String("endpoint", coordinatorEndpoint), zap.Error(err))
continue
}
return peer.FromPubProto(resp.Peers), nil
}
return nil, fmt.Errorf("no active coordinator found. tried %v", coordinatorEndpoints)
}
// PubAPI is the interface for the public API of the coordinator.
type PubAPI interface {
StartVPNAPIServer(vpnIP string) error
StartUpdateLoop()
}
type initialVPNPeersRetriever func(ctx context.Context, dialer Dialer, logger *zap.Logger, metadata ProviderMetadata, ownCoordinatorEndpoint *string) ([]peer.Peer, error)

View File

@ -0,0 +1,283 @@
package core
import (
"context"
"errors"
"testing"
"github.com/edgelesssys/constellation/cli/file"
"github.com/edgelesssys/constellation/coordinator/atls"
"github.com/edgelesssys/constellation/coordinator/kms"
"github.com/edgelesssys/constellation/coordinator/peer"
"github.com/edgelesssys/constellation/coordinator/pubapi/pubproto"
"github.com/edgelesssys/constellation/coordinator/role"
"github.com/edgelesssys/constellation/coordinator/util/grpcutil"
"github.com/edgelesssys/constellation/coordinator/util/testdialer"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/proto"
)
func TestReinitializeAsNode(t *testing.T) {
testPeers := []peer.Peer{
{
PublicIP: "192.0.2.1",
VPNIP: "198.51.100.1",
VPNPubKey: []byte{0x1, 0x2, 0x3},
Role: role.Coordinator,
},
}
wantedVPNPeers := []stubVPNPeer{
{
publicIP: "192.0.2.1",
vpnIP: "198.51.100.1",
pubKey: []byte{0x1, 0x2, 0x3},
},
}
vpnIP := "198.51.100.2"
testCases := map[string]struct {
getInitialVPNPeersResponses []struct {
peers []peer.Peer
err error
}
wantErr bool
}{
"reinitialize as node works": {
getInitialVPNPeersResponses: []struct {
peers []peer.Peer
err error
}{{peers: testPeers}},
},
"reinitialize as node will retry until vpn peers are retrieved": {
getInitialVPNPeersResponses: []struct {
peers []peer.Peer
err error
}{
{err: errors.New("retrieving vpn peers failed")},
{peers: testPeers},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
coordinators := []Instance{{IPs: []string{"192.0.2.1"}, Role: role.Coordinator}}
netDialer := testdialer.NewBufconnDialer()
dialer := grpcutil.NewDialer(&MockValidator{}, netDialer)
server := newPubAPIServer()
api := &pubAPIServerStub{responses: tc.getInitialVPNPeersResponses}
pubproto.RegisterAPIServer(server, api)
go server.Serve(netDialer.GetListener("192.0.2.1:9000"))
defer server.Stop()
vpn := &stubVPN{}
core, err := NewCore(vpn, nil, &stubMetadata{listRes: coordinators, supportedRes: true}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
err = core.ReinitializeAsNode(context.Background(), dialer, vpnIP, &stubPubAPI{}, 0)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
assert.Equal(vpnIP, vpn.interfaceIP)
assert.Equal(wantedVPNPeers, vpn.peers)
})
}
}
func TestReinitializeAsCoordinator(t *testing.T) {
testPeers := []peer.Peer{
{
PublicIP: "192.0.2.1",
VPNIP: "198.51.100.1",
VPNPubKey: []byte{0x1, 0x2, 0x3},
Role: role.Coordinator,
},
}
wantedVPNPeers := []stubVPNPeer{
{
publicIP: "192.0.2.1",
vpnIP: "198.51.100.1",
pubKey: []byte{0x1, 0x2, 0x3},
},
}
vpnIP := "198.51.100.2"
testCases := map[string]struct {
getInitialVPNPeersResponses []struct {
peers []peer.Peer
err error
}
wantErr bool
}{
"reinitialize as coordinator works": {
getInitialVPNPeersResponses: []struct {
peers []peer.Peer
err error
}{{peers: testPeers}},
},
"reinitialize as coordinator will retry until vpn peers are retrieved": {
getInitialVPNPeersResponses: []struct {
peers []peer.Peer
err error
}{
{err: errors.New("retrieving vpn peers failed")},
{peers: testPeers},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
coordinators := []Instance{{IPs: []string{"192.0.2.1"}, Role: role.Coordinator}}
netDialer := testdialer.NewBufconnDialer()
dialer := grpcutil.NewDialer(&MockValidator{}, netDialer)
server := newPubAPIServer()
api := &pubAPIServerStub{responses: tc.getInitialVPNPeersResponses}
pubproto.RegisterAPIServer(server, api)
go server.Serve(netDialer.GetListener("192.0.2.1:9000"))
defer server.Stop()
vpn := &stubVPN{}
core, err := NewCore(vpn, nil, &stubMetadata{listRes: coordinators, supportedRes: true}, nil, nil, nil, nil, zaptest.NewLogger(t), nil, &fakeStoreFactory{}, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
// prepare store to emulate initialized KMS
require.NoError(core.data().PutKMSData(kms.KMSInformation{StorageUri: kms.NoStoreURI, KmsUri: kms.ClusterKMSURI}))
require.NoError(core.data().PutMasterSecret([]byte("master-secret")))
err = core.ReinitializeAsCoordinator(context.Background(), dialer, vpnIP, &stubPubAPI{}, 0)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
assert.Equal(vpnIP, vpn.interfaceIP)
assert.Equal(wantedVPNPeers, vpn.peers)
})
}
}
func TestGetInitialVPNPeers(t *testing.T) {
testPeers := []peer.Peer{
{
PublicIP: "192.0.2.1",
VPNIP: "198.51.100.1",
VPNPubKey: []byte{0x1, 0x2, 0x3},
Role: role.Coordinator,
},
}
testCases := map[string]struct {
ownCoordinatorEndpoint *string
coordinatorIPs []string
metadataErr error
peers []peer.Peer
getVPNPeersErr error
wantErr bool
}{
"getInitialVPNPeers works from worker node": {
coordinatorIPs: []string{"192.0.2.1"},
peers: testPeers,
},
"getInitialVPNPeers works from coordinator": {
ownCoordinatorEndpoint: proto.String("192.0.2.2:9000"),
coordinatorIPs: []string{"192.0.2.1", "192.0.2.2"},
peers: testPeers,
},
"getInitialVPNPeers filters itself": {
ownCoordinatorEndpoint: proto.String("192.0.2.2:9000"),
coordinatorIPs: []string{"192.0.2.2"},
wantErr: true,
},
"getInitialVPNPeers fails if no coordinators are found": {
wantErr: true,
},
"getInitialVPNPeers fails if metadata API fails to retrieve coordinators": {
metadataErr: errors.New("metadata error"),
wantErr: true,
},
"getInitialVPNPeers fails if rpc call fails": {
coordinatorIPs: []string{"192.0.2.1"},
getVPNPeersErr: errors.New("rpc error"),
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
coordinators := func(ips []string) []Instance {
instances := []Instance{}
for _, ip := range ips {
instances = append(instances, Instance{IPs: []string{ip}, Role: role.Coordinator})
}
return instances
}(tc.coordinatorIPs)
zapLogger, err := zap.NewDevelopment()
require.NoError(err)
netDialer := testdialer.NewBufconnDialer()
dialer := grpcutil.NewDialer(&MockValidator{}, netDialer)
server := newPubAPIServer()
api := &pubAPIServerStub{
responses: []struct {
peers []peer.Peer
err error
}{{peers: tc.peers, err: tc.getVPNPeersErr}},
}
pubproto.RegisterAPIServer(server, api)
go server.Serve(netDialer.GetListener("192.0.2.1:9000"))
defer server.Stop()
peers, err := getInitialVPNPeers(context.Background(), dialer, zapLogger, &stubMetadata{listRes: coordinators, listErr: tc.metadataErr, supportedRes: true}, tc.ownCoordinatorEndpoint)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
assert.Equal(tc.peers, peers)
})
}
}
func newPubAPIServer() *grpc.Server {
tlsConfig, err := atls.CreateAttestationServerTLSConfig(&MockIssuer{})
if err != nil {
panic(err)
}
return grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
}
type pubAPIServerStub struct {
responses []struct {
peers []peer.Peer
err error
}
i int
pubproto.UnimplementedAPIServer
}
func (s *pubAPIServerStub) GetVPNPeers(ctx context.Context, in *pubproto.GetVPNPeersRequest) (*pubproto.GetVPNPeersResponse, error) {
if len(s.responses) == 0 {
return nil, nil
}
resp := s.responses[s.i]
s.i = (s.i + 1) % len(s.responses)
return &pubproto.GetVPNPeersResponse{
Peers: peer.ToPubProto(resp.peers),
}, resp.err
}

View File

@ -22,7 +22,7 @@ const (
keyFilepath = "/etc/kubernetes/pki/etcd/peer.key" keyFilepath = "/etc/kubernetes/pki/etcd/peer.key"
caCertFilepath = "/etc/kubernetes/pki/etcd/server.crt" caCertFilepath = "/etc/kubernetes/pki/etcd/server.crt"
etcdPrefix = "constellationRegion" etcdPrefix = "constellationRegion"
dialTimeout = 10 * time.Second dialTimeout = 60 * time.Second
) )
type EtcdStore struct { type EtcdStore struct {