coordinator-integrationtest: cover multi-coordinator

This commit is contained in:
Benedict 2022-04-13 12:40:57 +02:00 committed by Benedict Schlüter
parent 8d5c50014d
commit c1c12fd7d0

View File

@ -25,6 +25,7 @@ import (
"github.com/edgelesssys/constellation/coordinator/core" "github.com/edgelesssys/constellation/coordinator/core"
"github.com/edgelesssys/constellation/coordinator/kms" "github.com/edgelesssys/constellation/coordinator/kms"
"github.com/edgelesssys/constellation/coordinator/pubapi/pubproto" "github.com/edgelesssys/constellation/coordinator/pubapi/pubproto"
"github.com/edgelesssys/constellation/coordinator/role"
"github.com/edgelesssys/constellation/coordinator/store" "github.com/edgelesssys/constellation/coordinator/store"
"github.com/edgelesssys/constellation/coordinator/storewrapper" "github.com/edgelesssys/constellation/coordinator/storewrapper"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -62,30 +63,18 @@ const (
) )
var ( var (
hostconfigMaster = &container.HostConfig{ hostconfigConstellationPeer = &container.HostConfig{
Binds: []string{"/dev/net/tun:/dev/net/tun"}, // necessary for wireguard interface creation Binds: []string{"/dev/net/tun:/dev/net/tun"}, // necessary for wireguard interface creation
CapAdd: strslice.StrSlice{"NET_ADMIN"}, // necessary for wireguard interface creation CapAdd: strslice.StrSlice{"NET_ADMIN"}, // necessary for wireguard interface creation
AutoRemove: true, AutoRemove: true,
} }
configMaster = &container.Config{ configConstellationPeer = &container.Config{
Image: constellationImageName, Image: constellationImageName,
AttachStdout: true, // necessary to attach to the container log AttachStdout: true, // necessary to attach to the container log
AttachStderr: true, // necessary to attach to the container log AttachStderr: true, // necessary to attach to the container log
Tty: true, // necessary to attach to the container log Tty: true, // necessary to attach to the container log
} }
hostconfigNode = &container.HostConfig{
Binds: []string{"/dev/net/tun:/dev/net/tun"},
CapAdd: strslice.StrSlice{"NET_ADMIN"},
AutoRemove: true,
}
configNode = &container.Config{
Image: constellationImageName,
AttachStdout: true,
AttachStderr: true,
Tty: true,
}
hostconfigEtcd = &container.HostConfig{ hostconfigEtcd = &container.HostConfig{
AutoRemove: true, AutoRemove: true,
} }
@ -125,6 +114,7 @@ var (
AttachStdout: true, AttachStdout: true,
AttachStderr: true, AttachStderr: true,
} }
activeCoordinators []string
) )
type peerInfo struct { type peerInfo struct {
@ -138,7 +128,6 @@ func TestMain(t *testing.T) {
require := require.New(t) require := require.New(t)
activePeers := make(map[string]peerInfo) activePeers := make(map[string]peerInfo)
var activeCoordinators []string
defer goleak.VerifyNone(t, defer goleak.VerifyNone(t,
// https://github.com/census-instrumentation/opencensus-go/issues/1262 // https://github.com/census-instrumentation/opencensus-go/issues/1262
@ -194,30 +183,23 @@ func TestMain(t *testing.T) {
defer etcdstore.Close() defer etcdstore.Close()
defer killDockerContainers(ctx, cli, activePeers) defer killDockerContainers(ctx, cli, activePeers)
// setup coordinator container // setup coordinator containers
t.Log("create coordinator container...") t.Log("create 1st coordinator container...")
resp, err := cli.ContainerCreate(ctx, configMaster, hostconfigMaster, nil, nil, "master") require.NoError(createCoordinatorContainer(ctx, cli, debugMode, "master-1", dockerNetwork.ID, activePeers))
require.NoError(err) t.Log("create 2nd coordinator container...")
require.NoError(cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{})) require.NoError(createCoordinatorContainer(ctx, cli, debugMode, "master-2", dockerNetwork.ID, activePeers))
if debugMode == "true" {
attachDockerContainerStdout(ctx, cli, resp.ID)
}
coordinatorData, err := cli.ContainerInspect(ctx, resp.ID)
require.NoError(err)
activePeers[coordinatorData.NetworkSettings.DefaultNetworkSettings.IPAddress] = peerInfo{dockerData: resp, isCoordinator: true}
activeCoordinators = append(activeCoordinators, coordinatorData.NetworkSettings.DefaultNetworkSettings.IPAddress)
require.NoError(cli.NetworkConnect(ctx, dockerNetwork.ID, resp.ID, nil))
// 1st activation phase // 1st activation phase
endpoints, err := spawnContainers(ctx, cli, numberFirstActivation, activePeers) ips, err := spawnContainers(ctx, cli, numberFirstActivation, activePeers)
require.NoError(err) require.NoError(err)
t.Logf("node endpoints: %v", endpoints) t.Logf("node ips: %v", ips)
t.Log("activate coordinator...") t.Log("activate coordinator...")
start := time.Now() start := time.Now()
assert.NoError(startCoordinator(ctx, activeCoordinators[0], endpoints)) assert.NoError(startCoordinator(ctx, activeCoordinators[0], ips))
elapsed := time.Since(start) elapsed := time.Since(start)
t.Logf("activation took %v", elapsed) t.Logf("activation took %v", elapsed)
// activate additional coordinator
require.NoError(addNewCoordinatorToCoordinator(ctx, activeCoordinators[1], activeCoordinators[0]))
require.NoError(updateVPNIPs(activePeers, etcdstore)) require.NoError(updateVPNIPs(activePeers, etcdstore))
t.Log("count peers in instances") t.Log("count peers in instances")
@ -226,12 +208,12 @@ func TestMain(t *testing.T) {
pingTest(ctx, t, cli, pingExecConfig, activePeers, etcdstore) pingTest(ctx, t, cli, pingExecConfig, activePeers, etcdstore)
// 2nd activation phase // 2nd activation phase
endpoints, err = spawnContainers(ctx, cli, numberSecondaryActivation, activePeers) ips, err = spawnContainers(ctx, cli, numberSecondaryActivation, activePeers)
require.NoError(err) require.NoError(err)
t.Logf("node endpoints: %v", endpoints) t.Logf("node ips: %v", ips)
t.Log("add additional nodes") t.Log("add additional nodes")
start = time.Now() start = time.Now()
assert.NoError(addNewNodesToCoordinator(ctx, activeCoordinators[0], endpoints)) assert.NoError(addNewNodesToCoordinator(ctx, activeCoordinators[1], ips))
elapsed = time.Since(start) elapsed = time.Since(start)
t.Logf("adding took %v", elapsed) t.Logf("adding took %v", elapsed)
require.NoError(updateVPNIPs(activePeers, etcdstore)) require.NoError(updateVPNIPs(activePeers, etcdstore))
@ -241,13 +223,20 @@ func TestMain(t *testing.T) {
t.Log("start ping test") t.Log("start ping test")
pingTest(ctx, t, cli, pingExecConfig, activePeers, etcdstore) pingTest(ctx, t, cli, pingExecConfig, activePeers, etcdstore)
// ----------------------------------------------------------------
t.Log("create 3nd coordinator container...")
require.NoError(createCoordinatorContainer(ctx, cli, debugMode, "master-3", dockerNetwork.ID, activePeers))
// activate additional coordinator
require.NoError(addNewCoordinatorToCoordinator(ctx, activeCoordinators[2], activeCoordinators[1]))
require.NoError(updateVPNIPs(activePeers, etcdstore))
// 3rd activation phase // 3rd activation phase
endpoints, err = spawnContainers(ctx, cli, numberThirdActivation, activePeers) ips, err = spawnContainers(ctx, cli, numberThirdActivation, activePeers)
require.NoError(err) require.NoError(err)
t.Logf("node endpoints: %v", endpoints) t.Logf("node ips: %v", ips)
t.Log("add additional nodes") t.Log("add additional nodes")
start = time.Now() start = time.Now()
assert.NoError(addNewNodesToCoordinator(ctx, activeCoordinators[0], endpoints)) assert.NoError(addNewNodesToCoordinator(ctx, activeCoordinators[2], ips))
elapsed = time.Since(start) elapsed = time.Since(start)
t.Logf("adding took %v", elapsed) t.Logf("adding took %v", elapsed)
require.NoError(updateVPNIPs(activePeers, etcdstore)) require.NoError(updateVPNIPs(activePeers, etcdstore))
@ -259,7 +248,7 @@ func TestMain(t *testing.T) {
} }
// helper methods // helper methods
func startCoordinator(ctx context.Context, coordinatorAddr string, endpoints []string) error { func startCoordinator(ctx context.Context, coordinatorAddr string, ips []string) error {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}}) tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil { if err != nil {
return err return err
@ -279,13 +268,13 @@ func startCoordinator(ctx context.Context, coordinatorAddr string, endpoints []s
adminKey = adminKey.PublicKey() adminKey = adminKey.PublicKey()
stream, err := client.ActivateAsCoordinator(ctx, &pubproto.ActivateAsCoordinatorRequest{ stream, err := client.ActivateAsCoordinator(ctx, &pubproto.ActivateAsCoordinatorRequest{
AdminVpnPubKey: adminKey[:], AdminVpnPubKey: adminKey[:],
NodePublicEndpoints: endpoints, NodePublicIps: ips,
MasterSecret: []byte(masterSecret), MasterSecret: []byte(masterSecret),
KmsUri: kms.ClusterKMSURI, KmsUri: kms.ClusterKMSURI,
StorageUri: kms.NoStoreURI, StorageUri: kms.NoStoreURI,
KeyEncryptionKeyId: "", KeyEncryptionKeyId: "",
UseExistingKek: false, UseExistingKek: false,
}) })
if err != nil { if err != nil {
return err return err
@ -302,7 +291,30 @@ func startCoordinator(ctx context.Context, coordinatorAddr string, endpoints []s
} }
} }
func addNewNodesToCoordinator(ctx context.Context, coordinatorAddr string, endpoints []string) error { func addNewCoordinatorToCoordinator(ctx context.Context, newCoordinatorAddr, oldCoordinatorAddr string) error {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil {
return err
}
conn, err := grpc.DialContext(ctx, net.JoinHostPort(oldCoordinatorAddr, publicgRPCPort), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil {
return err
}
defer conn.Close()
client := pubproto.NewAPIClient(conn)
_, err = client.ActivateAdditionalCoordinator(ctx, &pubproto.ActivateAdditionalCoordinatorRequest{
CoordinatorPublicIp: newCoordinatorAddr,
})
if err != nil {
return err
}
return nil
}
func addNewNodesToCoordinator(ctx context.Context, coordinatorAddr string, ips []string) error {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}}) tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil { if err != nil {
return err return err
@ -316,7 +328,7 @@ func addNewNodesToCoordinator(ctx context.Context, coordinatorAddr string, endpo
client := pubproto.NewAPIClient(conn) client := pubproto.NewAPIClient(conn)
stream, err := client.ActivateAdditionalNodes(ctx, &pubproto.ActivateAdditionalNodesRequest{NodePublicEndpoints: endpoints}) stream, err := client.ActivateAdditionalNodes(ctx, &pubproto.ActivateAdditionalNodesRequest{NodePublicIps: ips})
if err != nil { if err != nil {
return err return err
} }
@ -332,21 +344,42 @@ func addNewNodesToCoordinator(ctx context.Context, coordinatorAddr string, endpo
} }
func spawnContainers(ctx context.Context, cli *client.Client, count int, activeContainers map[string]peerInfo) ([]string, error) { func spawnContainers(ctx context.Context, cli *client.Client, count int, activeContainers map[string]peerInfo) ([]string, error) {
tmpPeerEndpoints := make([]string, 0, count) tmpPeerIPs := make([]string, 0, count)
// spawn client container(s) and obtain their docker network ip address // spawn client container(s) and obtain their docker network ip address
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
resp, err := createNewNode(ctx, cli) resp, err := createNewNode(ctx, cli)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tmpPeerEndpoints = append(tmpPeerEndpoints, resp.dockerIPAddr) tmpPeerIPs = append(tmpPeerIPs, resp.dockerIPAddr)
containerData, err := cli.ContainerInspect(ctx, resp.containerResponse.ID) containerData, err := cli.ContainerInspect(ctx, resp.containerResponse.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
activeContainers[containerData.NetworkSettings.DefaultNetworkSettings.IPAddress] = peerInfo{dockerData: resp.containerResponse, isCoordinator: false} activeContainers[containerData.NetworkSettings.DefaultNetworkSettings.IPAddress] = peerInfo{dockerData: resp.containerResponse, isCoordinator: false}
} }
return tmpPeerEndpoints, blockUntilUp(ctx, tmpPeerEndpoints) return tmpPeerIPs, blockUntilUp(ctx, tmpPeerIPs)
}
func createCoordinatorContainer(ctx context.Context, cli *client.Client, debugMode, name, dockerNetworkID string, activePeers map[string]peerInfo) error {
resp, err := cli.ContainerCreate(ctx, configConstellationPeer, hostconfigConstellationPeer, nil, nil, name)
if err != nil {
return err
}
err = cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{})
if err != nil {
return err
}
if debugMode == "true" {
attachDockerContainerStdout(ctx, cli, resp.ID)
}
coordinatorData, err := cli.ContainerInspect(ctx, resp.ID)
if err != nil {
return err
}
activePeers[coordinatorData.NetworkSettings.DefaultNetworkSettings.IPAddress] = peerInfo{dockerData: resp, isCoordinator: true}
activeCoordinators = append(activeCoordinators, coordinatorData.NetworkSettings.DefaultNetworkSettings.IPAddress)
return cli.NetworkConnect(ctx, dockerNetworkID, resp.ID, nil)
} }
// Make the port forward binding, so we can access the coordinator from the host // Make the port forward binding, so we can access the coordinator from the host
@ -423,14 +456,9 @@ func countPeersTest(ctx context.Context, t *testing.T, cli *client.Client, execC
countedPeers := strings.Count(string(output), "peer") countedPeers := strings.Count(string(output), "peer")
fmt.Printf("% 3d peers in container %s [%s] out of % 3d total nodes \n", countedPeers, id.dockerData.ID, ip, len(activeContainers)) fmt.Printf("% 3d peers in container %s [%s] out of % 3d total nodes \n", countedPeers, id.dockerData.ID, ip, len(activeContainers))
// coordinator has another VPN endpoint for communication with the end user assert.Equal(len(activeContainers), countedPeers)
if id.isCoordinator { if (len(activeContainers)) != countedPeers {
assert.Equal(len(activeContainers), countedPeers) attachDockerContainerStdout(ctx, cli, id.dockerData.ID)
} else {
assert.Equal(len(activeContainers)-1, countedPeers)
if (len(activeContainers) - 1) != countedPeers {
attachDockerContainerStdout(ctx, cli, id.dockerData.ID)
}
} }
} }
}) })
@ -440,15 +468,15 @@ func pingTest(ctx context.Context, t *testing.T, cli *client.Client, execConfig
t.Run("pingTest", func(t *testing.T) { t.Run("pingTest", func(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
require := require.New(t) require := require.New(t)
peerEndpoints, err := getPeerVPNIPsFromEtcd(etcdstore) peerVPNIPsWithoutAdmins, err := getPeerVPNIPsFromEtcd(etcdstore)
require.NoError(err) require.NoError(err)
// all nodes + coordinator are peers // all nodes + coordinator are peers
require.Equal(len(peerEndpoints), len(activeContainers)) require.Equal(len(peerVPNIPsWithoutAdmins), len(activeContainers))
for i := 0; i < len(peerEndpoints); i++ { for i := 0; i < len(peerVPNIPsWithoutAdmins); i++ {
execConfig.Cmd = []string{"ping", "-q", "-c", "1", "-W", "1", peerEndpoints[i]} execConfig.Cmd = []string{"ping", "-q", "-c", "1", "-W", "1", peerVPNIPsWithoutAdmins[i]}
for _, id := range activeContainers { for _, id := range activeContainers {
fmt.Printf("Ping from container %v | % 19s to container % 19s", id.dockerData.ID, id.vpnIP, peerEndpoints[i]) fmt.Printf("Ping from container %v | % 19s to container % 19s", id.dockerData.ID, id.vpnIP, peerVPNIPsWithoutAdmins[i])
respIDExecCreate, err := cli.ContainerExecCreate(ctx, id.dockerData.ID, execConfig) respIDExecCreate, err := cli.ContainerExecCreate(ctx, id.dockerData.ID, execConfig)
require.NoError(err) require.NoError(err)
@ -476,7 +504,7 @@ type newNodeData struct {
// pass error one level up // pass error one level up
func createNewNode(ctx context.Context, cli *client.Client) (*newNodeData, error) { func createNewNode(ctx context.Context, cli *client.Client) (*newNodeData, error) {
resp, err := cli.ContainerCreate(ctx, configNode, hostconfigNode, nil, nil, "") resp, err := cli.ContainerCreate(ctx, configConstellationPeer, hostconfigConstellationPeer, nil, nil, "")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -489,28 +517,28 @@ func createNewNode(ctx context.Context, cli *client.Client) (*newNodeData, error
return nil, err return nil, err
} }
fmt.Printf("created Node %v\n", containerData.ID) fmt.Printf("created Node %v\n", containerData.ID)
return &newNodeData{resp, net.JoinHostPort(containerData.NetworkSettings.IPAddress, publicgRPCPort)}, nil return &newNodeData{resp, containerData.NetworkSettings.IPAddress}, nil
} }
func awaitPeerResponse(ctx context.Context, endpoint string, tlsConfig *tls.Config) error { func awaitPeerResponse(ctx context.Context, ip string, tlsConfig *tls.Config) error {
// Block, so the connection gets established/fails immediately // Block, so the connection gets established/fails immediately
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
conn, err := grpc.DialContext(ctx, endpoint, grpc.WithBlock(), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) conn, err := grpc.DialContext(ctx, net.JoinHostPort(ip, publicgRPCPort), grpc.WithBlock(), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil { if err != nil {
return err return err
} }
return conn.Close() return conn.Close()
} }
func blockUntilUp(ctx context.Context, peerEndpoints []string) error { func blockUntilUp(ctx context.Context, peerIPs []string) error {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}}) tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil { if err != nil {
return err return err
} }
for _, endpoint := range peerEndpoints { for _, ip := range peerIPs {
// Block, so the connection gets established/fails immediately // Block, so the connection gets established/fails immediately
if err := awaitPeerResponse(ctx, endpoint, tlsConfig); err != nil { if err := awaitPeerResponse(ctx, ip, tlsConfig); err != nil {
return err return err
} }
} }
@ -525,8 +553,10 @@ func getPeerVPNIPsFromEtcd(etcdstore store.Store) ([]string, error) {
vpnIPS := make([]string, 0, len(peers)) vpnIPS := make([]string, 0, len(peers))
for _, v := range peers { for _, peer := range peers {
vpnIPS = append(vpnIPS, v.VPNIP) if peer.Role != role.Admin {
vpnIPS = append(vpnIPS, peer.VPNIP)
}
} }
return vpnIPS, nil return vpnIPS, nil
} }
@ -536,10 +566,8 @@ func translatePublicToVPNIP(publicIP string, etcdstore store.Store) (string, err
if err != nil { if err != nil {
return "", err return "", err
} }
// port has to be the same as pubapi-Server port
publicEndpoint := net.JoinHostPort(publicIP, "9000")
for _, peer := range peers { for _, peer := range peers {
if peer.PublicEndpoint == publicEndpoint { if peer.PublicIP == publicIP {
return peer.VPNIP, nil return peer.VPNIP, nil
} }
} }