constellation/test/coordinator_integration_test.go

560 lines
18 KiB
Go

//go:build integration
package integration
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/strslice"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/archive"
"github.com/docker/go-connections/nat"
"github.com/edgelesssys/constellation/coordinator/atls"
"github.com/edgelesssys/constellation/coordinator/core"
"github.com/edgelesssys/constellation/coordinator/kms"
"github.com/edgelesssys/constellation/coordinator/pubapi/pubproto"
"github.com/edgelesssys/constellation/coordinator/store"
"github.com/edgelesssys/constellation/coordinator/storewrapper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"go.uber.org/zap"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
/*
Notes regarding the integration test implementation:
Scale:
'numberPeers' should be < 30, otherwise activation might stuck, because something with the docker network
doesn't scale well (maybe > 50 wireguard kernel interfaces are the reason).
With over 150 nodes, the node activation will fail due to Docker internal network naming issues.
This could be further extended, but currently the number of possible nodes is enough for this test.
Usage of docker library:
Sometimes the API calls are slower than using the 'sh docker ...' commands. This is specifically the case
for the termination. However, to keep the code clean, we accept this tradeoff and use the library functions.
*/
const (
publicgRPCPort = "9000"
constellationImageName = "constellation:latest"
etcdImageName = "bitnami/etcd:3.5.1"
etcdOverlayNetwork = "constellationIntegrationTest"
masterSecret = "ConstellationIntegrationTest"
numberFirstActivation = 3
numberSecondaryActivation = 3
numberThirdActivation = 3
)
var (
hostconfigMaster = &container.HostConfig{
Binds: []string{"/dev/net/tun:/dev/net/tun"}, // necessary for wireguard interface creation
CapAdd: strslice.StrSlice{"NET_ADMIN"}, // necessary for wireguard interface creation
AutoRemove: true,
}
configMaster = &container.Config{
Image: constellationImageName,
AttachStdout: true, // necessary to attach to the container log
AttachStderr: true, // necessary to attach to the container log
Tty: true, // necessary to attach to the container log
}
hostconfigNode = &container.HostConfig{
Binds: []string{"/dev/net/tun:/dev/net/tun"},
CapAdd: strslice.StrSlice{"NET_ADMIN"},
AutoRemove: true,
}
configNode = &container.Config{
Image: constellationImageName,
AttachStdout: true,
AttachStderr: true,
Tty: true,
}
hostconfigEtcd = &container.HostConfig{
AutoRemove: true,
}
configEtcd = &container.Config{
Image: etcdImageName,
Env: []string{
"ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379",
"ETCD_ADVERTISE_CLIENT_URLS=http://127.0.0.1:2379",
"ETCD_LOG_LEVEL=debug",
"ETCD_DATA_DIR=/bitnami/etcd/data",
},
Entrypoint: []string{"/opt/bitnami/etcd/bin/etcd"},
AttachStdout: true,
AttachStderr: true,
Tty: true,
}
constellationDockerImageBuildOptions = types.ImageBuildOptions{
Dockerfile: "test/Dockerfile",
Tags: []string{constellationImageName},
Remove: true,
ForceRemove: true,
SuppressOutput: false,
PullParent: true,
}
containerLogConfig = types.ContainerLogsOptions{
ShowStdout: true,
Follow: true,
}
wgExecConfig = types.ExecConfig{
Cmd: []string{"wg"},
AttachStdout: true,
AttachStderr: true,
}
pingExecConfig = types.ExecConfig{
AttachStdout: true,
AttachStderr: true,
}
)
type peerInfo struct {
dockerData container.ContainerCreateCreatedBody
isCoordinator bool
vpnIP string
}
func TestMain(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
activePeers := make(map[string]peerInfo)
var activeCoordinators []string
defer goleak.VerifyNone(t,
// https://github.com/census-instrumentation/opencensus-go/issues/1262
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
// https://github.com/kubernetes/klog/issues/282, https://github.com/kubernetes/klog/issues/188
goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"),
)
debugMode := os.Getenv("DEBUG")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cwd, err := os.Getwd()
require.NoError(err)
require.NoError(os.Chdir(filepath.Join(cwd, "..")))
// setup Docker containers
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
require.NoError(err)
defer cli.Close()
versionInfo, err := cli.Info(ctx)
require.NoError(err)
t.Logf("start integration test, local docker version %v", versionInfo.ServerVersion)
require.NoError(imageBuild(ctx, cli, debugMode))
defer cli.ImageRemove(ctx, constellationImageName, types.ImageRemoveOptions{Force: true, PruneChildren: true})
reader, err := cli.ImagePull(ctx, etcdImageName, types.ImagePullOptions{})
require.NoError(err)
_, err = io.Copy(os.Stdout, reader)
require.NoError(err)
require.NoError(reader.Close())
// Add another docker network to be able to resolve etcd-storage from the coordinator.
// This is not possible in the default "bridge" network.
dockerNetwork, err := cli.NetworkCreate(ctx, etcdOverlayNetwork, types.NetworkCreate{Driver: "bridge", Internal: true})
require.NoError(err)
defer cli.NetworkRemove(ctx, etcdOverlayNetwork)
// setup etcd
t.Log("create etcd container...")
respEtcd, err := cli.ContainerCreate(ctx, configEtcd, hostconfigEtcd, nil, nil, "etcd-storage")
require.NoError(err)
require.NoError(cli.ContainerStart(ctx, respEtcd.ID, types.ContainerStartOptions{}))
defer killDockerContainer(ctx, cli, respEtcd)
require.NoError(cli.NetworkConnect(ctx, dockerNetwork.ID, respEtcd.ID, nil))
etcdData, err := cli.ContainerInspect(ctx, respEtcd.ID)
require.NoError(err)
etcdIPAddr := etcdData.NetworkSettings.DefaultNetworkSettings.IPAddress
etcdstore, err := store.NewEtcdStore(net.JoinHostPort(etcdIPAddr, "2379"), false, zap.NewNop())
require.NoError(err)
defer etcdstore.Close()
defer killDockerContainers(ctx, cli, activePeers)
// setup coordinator container
t.Log("create coordinator container...")
resp, err := cli.ContainerCreate(ctx, configMaster, hostconfigMaster, nil, nil, "master")
require.NoError(err)
require.NoError(cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}))
if debugMode == "true" {
attachDockerContainerStdout(ctx, cli, resp.ID)
}
coordinatorData, err := cli.ContainerInspect(ctx, resp.ID)
require.NoError(err)
activePeers[coordinatorData.NetworkSettings.DefaultNetworkSettings.IPAddress] = peerInfo{dockerData: resp, isCoordinator: true}
activeCoordinators = append(activeCoordinators, coordinatorData.NetworkSettings.DefaultNetworkSettings.IPAddress)
require.NoError(cli.NetworkConnect(ctx, dockerNetwork.ID, resp.ID, nil))
// 1st activation phase
endpoints, err := spawnContainers(ctx, cli, numberFirstActivation, activePeers)
require.NoError(err)
t.Logf("node endpoints: %v", endpoints)
t.Log("activate coordinator...")
start := time.Now()
assert.NoError(startCoordinator(ctx, activeCoordinators[0], endpoints))
elapsed := time.Since(start)
t.Logf("activation took %v", elapsed)
require.NoError(updateVPNIPs(activePeers, etcdstore))
t.Log("count peers in instances")
countPeersTest(ctx, t, cli, wgExecConfig, activePeers)
t.Log("start ping test")
pingTest(ctx, t, cli, pingExecConfig, activePeers, etcdstore)
// 2nd activation phase
endpoints, err = spawnContainers(ctx, cli, numberSecondaryActivation, activePeers)
require.NoError(err)
t.Logf("node endpoints: %v", endpoints)
t.Log("add additional nodes")
start = time.Now()
assert.NoError(addNewNodesToCoordinator(ctx, activeCoordinators[0], endpoints))
elapsed = time.Since(start)
t.Logf("adding took %v", elapsed)
require.NoError(updateVPNIPs(activePeers, etcdstore))
t.Log("count peers in instances")
countPeersTest(ctx, t, cli, wgExecConfig, activePeers)
t.Log("start ping test")
pingTest(ctx, t, cli, pingExecConfig, activePeers, etcdstore)
// 3rd activation phase
endpoints, err = spawnContainers(ctx, cli, numberThirdActivation, activePeers)
require.NoError(err)
t.Logf("node endpoints: %v", endpoints)
t.Log("add additional nodes")
start = time.Now()
assert.NoError(addNewNodesToCoordinator(ctx, activeCoordinators[0], endpoints))
elapsed = time.Since(start)
t.Logf("adding took %v", elapsed)
require.NoError(updateVPNIPs(activePeers, etcdstore))
t.Log("count peers in instances")
countPeersTest(ctx, t, cli, wgExecConfig, activePeers)
t.Log("start ping test")
pingTest(ctx, t, cli, pingExecConfig, activePeers, etcdstore)
}
// helper methods
func startCoordinator(ctx context.Context, coordinatorAddr string, endpoints []string) error {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil {
return err
}
conn, err := grpc.DialContext(ctx, net.JoinHostPort(coordinatorAddr, publicgRPCPort), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil {
return err
}
defer conn.Close()
client := pubproto.NewAPIClient(conn)
adminKey, err := wgtypes.GeneratePrivateKey()
if err != nil {
return err
}
adminKey = adminKey.PublicKey()
stream, err := client.ActivateAsCoordinator(ctx, &pubproto.ActivateAsCoordinatorRequest{
AdminVpnPubKey: adminKey[:],
NodePublicEndpoints: endpoints,
MasterSecret: []byte(masterSecret),
KmsUri: kms.ClusterKMSURI,
StorageUri: kms.NoStoreURI,
KeyEncryptionKeyId: "",
UseExistingKek: false,
})
if err != nil {
return err
}
for {
_, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
}
func addNewNodesToCoordinator(ctx context.Context, coordinatorAddr string, endpoints []string) error {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil {
return err
}
conn, err := grpc.DialContext(ctx, net.JoinHostPort(coordinatorAddr, publicgRPCPort), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil {
return err
}
defer conn.Close()
client := pubproto.NewAPIClient(conn)
stream, err := client.ActivateAdditionalNodes(ctx, &pubproto.ActivateAdditionalNodesRequest{NodePublicEndpoints: endpoints})
if err != nil {
return err
}
for {
_, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
}
func spawnContainers(ctx context.Context, cli *client.Client, count int, activeContainers map[string]peerInfo) ([]string, error) {
tmpPeerEndpoints := make([]string, 0, count)
// spawn client container(s) and obtain their docker network ip address
for i := 0; i < count; i++ {
resp, err := createNewNode(ctx, cli)
if err != nil {
return nil, err
}
tmpPeerEndpoints = append(tmpPeerEndpoints, resp.dockerIPAddr)
containerData, err := cli.ContainerInspect(ctx, resp.containerResponse.ID)
if err != nil {
return nil, err
}
activeContainers[containerData.NetworkSettings.DefaultNetworkSettings.IPAddress] = peerInfo{dockerData: resp.containerResponse, isCoordinator: false}
}
return tmpPeerEndpoints, blockUntilUp(ctx, tmpPeerEndpoints)
}
// Make the port forward binding, so we can access the coordinator from the host
func makeBinding(ip, internalPort string, externalPort string) nat.PortMap {
binding := nat.PortBinding{
HostIP: ip,
HostPort: externalPort,
}
bindingMap := map[nat.Port][]nat.PortBinding{nat.Port(fmt.Sprintf("%s/tcp", internalPort)): {binding}}
return nat.PortMap(bindingMap)
}
func killDockerContainers(ctx context.Context, cli *client.Client, activeContainers map[string]peerInfo) {
for _, v := range activeContainers {
killDockerContainer(ctx, cli, v.dockerData)
}
}
func killDockerContainer(ctx context.Context, cli *client.Client, container container.ContainerCreateCreatedBody) {
fmt.Print("Kill container ", container.ID[:10], "... ")
if err := cli.ContainerKill(ctx, container.ID, "9"); err != nil {
fmt.Println(err)
return
}
fmt.Println("Success")
}
func attachDockerContainerStdout(ctx context.Context, cli *client.Client, id string) {
resp, err := cli.ContainerLogs(ctx, id, containerLogConfig)
if err != nil {
panic(err)
}
go io.Copy(os.Stdout, resp) // TODO: this goroutine leaks
}
func imageBuild(ctx context.Context, dockerClient *client.Client, debugMode string) error {
// Docker need a BuildContext, generate it...
tar, err := archive.TarWithOptions(".", &archive.TarOptions{})
if err != nil {
return err
}
resp, err := dockerClient.ImageBuild(ctx, tar, constellationDockerImageBuildOptions)
if err != nil {
return err
}
defer resp.Body.Close()
if debugMode == "true" {
if _, err := io.Copy(os.Stdout, resp.Body); err != nil {
return err
}
}
// Block until EOF, so the build has finished if we continue
_, err = io.Copy(io.Discard, resp.Body)
return err
}
// count number of wireguard peers within all active docker containers
func countPeersTest(ctx context.Context, t *testing.T, cli *client.Client, execConfig types.ExecConfig, activeContainers map[string]peerInfo) {
t.Run("countPeerTest", func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
for ip, id := range activeContainers {
respIDExecCreate, err := cli.ContainerExecCreate(ctx, id.dockerData.ID, execConfig)
require.NoError(err)
respID, err := cli.ContainerExecAttach(ctx, respIDExecCreate.ID, types.ExecStartCheck{})
require.NoError(err)
output, err := io.ReadAll(respID.Reader)
require.NoError(err)
respID.Close()
countedPeers := strings.Count(string(output), "peer")
fmt.Printf("% 3d peers in container %s [%s] out of % 3d total nodes \n", countedPeers, id.dockerData.ID, ip, len(activeContainers))
// coordinator has another VPN endpoint for communication with the end user
if id.isCoordinator {
assert.Equal(len(activeContainers), countedPeers)
} else {
assert.Equal(len(activeContainers)-1, countedPeers)
if (len(activeContainers) - 1) != countedPeers {
attachDockerContainerStdout(ctx, cli, id.dockerData.ID)
}
}
}
})
}
func pingTest(ctx context.Context, t *testing.T, cli *client.Client, execConfig types.ExecConfig, activeContainers map[string]peerInfo, etcdstore store.Store) {
t.Run("pingTest", func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
peerEndpoints, err := getPeerVPNIPsFromEtcd(etcdstore)
require.NoError(err)
// all nodes + coordinator are peers
require.Equal(len(peerEndpoints), len(activeContainers))
for i := 0; i < len(peerEndpoints); i++ {
execConfig.Cmd = []string{"ping", "-q", "-c", "1", "-W", "1", peerEndpoints[i]}
for _, id := range activeContainers {
fmt.Printf("Ping from container %v | % 19s to container % 19s", id.dockerData.ID, id.vpnIP, peerEndpoints[i])
respIDExecCreate, err := cli.ContainerExecCreate(ctx, id.dockerData.ID, execConfig)
require.NoError(err)
err = cli.ContainerExecStart(ctx, respIDExecCreate.ID, types.ExecStartCheck{})
require.NoError(err)
resp, err := cli.ContainerExecInspect(ctx, respIDExecCreate.ID)
require.NoError(err)
assert.Equal(0, resp.ExitCode)
if resp.ExitCode == 0 {
fmt.Printf(" ...Success\n")
} else {
fmt.Printf(" ...Failure\n")
}
}
}
})
}
type newNodeData struct {
containerResponse container.ContainerCreateCreatedBody
dockerIPAddr string
}
// pass error one level up
func createNewNode(ctx context.Context, cli *client.Client) (*newNodeData, error) {
resp, err := cli.ContainerCreate(ctx, configNode, hostconfigNode, nil, nil, "")
if err != nil {
return nil, err
}
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
return nil, err
}
containerData, err := cli.ContainerInspect(ctx, resp.ID)
if err != nil {
return nil, err
}
fmt.Printf("created Node %v\n", containerData.ID)
return &newNodeData{resp, net.JoinHostPort(containerData.NetworkSettings.IPAddress, publicgRPCPort)}, nil
}
func awaitPeerResponse(ctx context.Context, endpoint string, tlsConfig *tls.Config) error {
// Block, so the connection gets established/fails immediately
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, endpoint, grpc.WithBlock(), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil {
return err
}
return conn.Close()
}
func blockUntilUp(ctx context.Context, peerEndpoints []string) error {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil {
return err
}
for _, endpoint := range peerEndpoints {
// Block, so the connection gets established/fails immediately
if err := awaitPeerResponse(ctx, endpoint, tlsConfig); err != nil {
return err
}
}
return nil
}
func getPeerVPNIPsFromEtcd(etcdstore store.Store) ([]string, error) {
peers, err := storewrapper.StoreWrapper{Store: etcdstore}.GetPeers()
if err != nil {
return nil, err
}
vpnIPS := make([]string, 0, len(peers))
for _, v := range peers {
vpnIPS = append(vpnIPS, v.VPNIP)
}
return vpnIPS, nil
}
func translatePublicToVPNIP(publicIP string, etcdstore store.Store) (string, error) {
peers, err := storewrapper.StoreWrapper{Store: etcdstore}.GetPeers()
if err != nil {
return "", err
}
// port has to be the same as pubapi-Server port
publicEndpoint := net.JoinHostPort(publicIP, "9000")
for _, peer := range peers {
if peer.PublicEndpoint == publicEndpoint {
return peer.VPNIP, nil
}
}
return "", errors.New("Did not found VPN IP")
}
func updateVPNIPs(activeContainers map[string]peerInfo, etcdstore store.Store) error {
for publicIP, v := range activeContainers {
vpnIP, err := translatePublicToVPNIP(publicIP, etcdstore)
if err != nil {
return err
}
v.vpnIP = vpnIP
activeContainers[publicIP] = v
}
return nil
}