mirror of
synced 2025-03-01 11:21:23 -05:00

Improved unit & integration tests for store, by making them independent and test a single thing.
560 lines
18 KiB
560 lines
18 KiB
//go:build integration
package integration
import (
Notes regarding the integration test implementation:
'numberPeers' should be < 30, otherwise activation might stuck, because something with the docker network
doesn't scale well (maybe > 50 wireguard kernel interfaces are the reason).
With over 150 nodes, the node activation will fail due to Docker internal network naming issues.
This could be further extended, but currently the number of possible nodes is enough for this test.
Usage of docker library:
Sometimes the API calls are slower than using the 'sh docker ...' commands. This is specifically the case
for the termination. However, to keep the code clean, we accept this tradeoff and use the library functions.
const (
publicgRPCPort = "9000"
constellationImageName = "constellation:latest"
etcdImageName = "bitnami/etcd:3.5.2"
etcdOverlayNetwork = "constellationIntegrationTest"
masterSecret = "ConstellationIntegrationTest"
numberFirstActivation = 3
numberSecondaryActivation = 3
numberThirdActivation = 3
var (
hostconfigMaster = &container.HostConfig{
Binds: []string{"/dev/net/tun:/dev/net/tun"}, // necessary for wireguard interface creation
CapAdd: strslice.StrSlice{"NET_ADMIN"}, // necessary for wireguard interface creation
AutoRemove: true,
configMaster = &container.Config{
Image: constellationImageName,
AttachStdout: true, // necessary to attach to the container log
AttachStderr: true, // necessary to attach to the container log
Tty: true, // necessary to attach to the container log
hostconfigNode = &container.HostConfig{
Binds: []string{"/dev/net/tun:/dev/net/tun"},
CapAdd: strslice.StrSlice{"NET_ADMIN"},
AutoRemove: true,
configNode = &container.Config{
Image: constellationImageName,
AttachStdout: true,
AttachStderr: true,
Tty: true,
hostconfigEtcd = &container.HostConfig{
AutoRemove: true,
configEtcd = &container.Config{
Image: etcdImageName,
Env: []string{
Entrypoint: []string{"/opt/bitnami/etcd/bin/etcd"},
AttachStdout: true,
AttachStderr: true,
Tty: true,
constellationDockerImageBuildOptions = types.ImageBuildOptions{
Dockerfile: "test/Dockerfile",
Tags: []string{constellationImageName},
Remove: true,
ForceRemove: true,
SuppressOutput: false,
PullParent: true,
containerLogConfig = types.ContainerLogsOptions{
ShowStdout: true,
Follow: true,
wgExecConfig = types.ExecConfig{
Cmd: []string{"wg"},
AttachStdout: true,
AttachStderr: true,
pingExecConfig = types.ExecConfig{
AttachStdout: true,
AttachStderr: true,
type peerInfo struct {
dockerData container.ContainerCreateCreatedBody
isCoordinator bool
vpnIP string
func TestMain(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
activePeers := make(map[string]peerInfo)
var activeCoordinators []string
defer goleak.VerifyNone(t,
// https://github.com/census-instrumentation/opencensus-go/issues/1262
// https://github.com/kubernetes/klog/issues/282, https://github.com/kubernetes/klog/issues/188
debugMode := os.Getenv("DEBUG")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cwd, err := os.Getwd()
require.NoError(os.Chdir(filepath.Join(cwd, "..")))
// setup Docker containers
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
defer cli.Close()
versionInfo, err := cli.Info(ctx)
t.Logf("start integration test, local docker version %v", versionInfo.ServerVersion)
require.NoError(imageBuild(ctx, cli, debugMode))
defer cli.ImageRemove(ctx, constellationImageName, types.ImageRemoveOptions{Force: true, PruneChildren: true})
reader, err := cli.ImagePull(ctx, etcdImageName, types.ImagePullOptions{})
_, err = io.Copy(os.Stdout, reader)
// Add another docker network to be able to resolve etcd-storage from the coordinator.
// This is not possible in the default "bridge" network.
dockerNetwork, err := cli.NetworkCreate(ctx, etcdOverlayNetwork, types.NetworkCreate{Driver: "bridge", Internal: true})
defer cli.NetworkRemove(ctx, etcdOverlayNetwork)
// setup etcd
t.Log("create etcd container...")
respEtcd, err := cli.ContainerCreate(ctx, configEtcd, hostconfigEtcd, nil, nil, "etcd-storage")
require.NoError(cli.ContainerStart(ctx, respEtcd.ID, types.ContainerStartOptions{}))
defer killDockerContainer(ctx, cli, respEtcd)
require.NoError(cli.NetworkConnect(ctx, dockerNetwork.ID, respEtcd.ID, nil))
etcdData, err := cli.ContainerInspect(ctx, respEtcd.ID)
etcdIPAddr := etcdData.NetworkSettings.DefaultNetworkSettings.IPAddress
etcdstore, err := store.NewEtcdStore(net.JoinHostPort(etcdIPAddr, "2379"), false, zap.NewNop())
defer etcdstore.Close()
defer killDockerContainers(ctx, cli, activePeers)
// setup coordinator container
t.Log("create coordinator container...")
resp, err := cli.ContainerCreate(ctx, configMaster, hostconfigMaster, nil, nil, "master")
require.NoError(cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}))
if debugMode == "true" {
attachDockerContainerStdout(ctx, cli, resp.ID)
coordinatorData, err := cli.ContainerInspect(ctx, resp.ID)
activePeers[coordinatorData.NetworkSettings.DefaultNetworkSettings.IPAddress] = peerInfo{dockerData: resp, isCoordinator: true}
activeCoordinators = append(activeCoordinators, coordinatorData.NetworkSettings.DefaultNetworkSettings.IPAddress)
require.NoError(cli.NetworkConnect(ctx, dockerNetwork.ID, resp.ID, nil))
// 1st activation phase
endpoints, err := spawnContainers(ctx, cli, numberFirstActivation, activePeers)
t.Logf("node endpoints: %v", endpoints)
t.Log("activate coordinator...")
start := time.Now()
assert.NoError(startCoordinator(ctx, activeCoordinators[0], endpoints))
elapsed := time.Since(start)
t.Logf("activation took %v", elapsed)
require.NoError(updateVPNIPs(activePeers, etcdstore))
t.Log("count peers in instances")
countPeersTest(ctx, t, cli, wgExecConfig, activePeers)
t.Log("start ping test")
pingTest(ctx, t, cli, pingExecConfig, activePeers, etcdstore)
// 2nd activation phase
endpoints, err = spawnContainers(ctx, cli, numberSecondaryActivation, activePeers)
t.Logf("node endpoints: %v", endpoints)
t.Log("add additional nodes")
start = time.Now()
assert.NoError(addNewNodesToCoordinator(ctx, activeCoordinators[0], endpoints))
elapsed = time.Since(start)
t.Logf("adding took %v", elapsed)
require.NoError(updateVPNIPs(activePeers, etcdstore))
t.Log("count peers in instances")
countPeersTest(ctx, t, cli, wgExecConfig, activePeers)
t.Log("start ping test")
pingTest(ctx, t, cli, pingExecConfig, activePeers, etcdstore)
// 3rd activation phase
endpoints, err = spawnContainers(ctx, cli, numberThirdActivation, activePeers)
t.Logf("node endpoints: %v", endpoints)
t.Log("add additional nodes")
start = time.Now()
assert.NoError(addNewNodesToCoordinator(ctx, activeCoordinators[0], endpoints))
elapsed = time.Since(start)
t.Logf("adding took %v", elapsed)
require.NoError(updateVPNIPs(activePeers, etcdstore))
t.Log("count peers in instances")
countPeersTest(ctx, t, cli, wgExecConfig, activePeers)
t.Log("start ping test")
pingTest(ctx, t, cli, pingExecConfig, activePeers, etcdstore)
// helper methods
func startCoordinator(ctx context.Context, coordinatorAddr string, endpoints []string) error {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil {
return err
conn, err := grpc.DialContext(ctx, net.JoinHostPort(coordinatorAddr, publicgRPCPort), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil {
return err
defer conn.Close()
client := pubproto.NewAPIClient(conn)
adminKey, err := wgtypes.GeneratePrivateKey()
if err != nil {
return err
adminKey = adminKey.PublicKey()
stream, err := client.ActivateAsCoordinator(ctx, &pubproto.ActivateAsCoordinatorRequest{
AdminVpnPubKey: adminKey[:],
NodePublicEndpoints: endpoints,
MasterSecret: []byte(masterSecret),
KmsUri: kms.ClusterKMSURI,
StorageUri: kms.NoStoreURI,
KeyEncryptionKeyId: "",
UseExistingKek: false,
if err != nil {
return err
for {
_, err := stream.Recv()
if err == io.EOF {
return nil
if err != nil {
return err
func addNewNodesToCoordinator(ctx context.Context, coordinatorAddr string, endpoints []string) error {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil {
return err
conn, err := grpc.DialContext(ctx, net.JoinHostPort(coordinatorAddr, publicgRPCPort), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil {
return err
defer conn.Close()
client := pubproto.NewAPIClient(conn)
stream, err := client.ActivateAdditionalNodes(ctx, &pubproto.ActivateAdditionalNodesRequest{NodePublicEndpoints: endpoints})
if err != nil {
return err
for {
_, err := stream.Recv()
if err == io.EOF {
return nil
if err != nil {
return err
func spawnContainers(ctx context.Context, cli *client.Client, count int, activeContainers map[string]peerInfo) ([]string, error) {
tmpPeerEndpoints := make([]string, 0, count)
// spawn client container(s) and obtain their docker network ip address
for i := 0; i < count; i++ {
resp, err := createNewNode(ctx, cli)
if err != nil {
return nil, err
tmpPeerEndpoints = append(tmpPeerEndpoints, resp.dockerIPAddr)
containerData, err := cli.ContainerInspect(ctx, resp.containerResponse.ID)
if err != nil {
return nil, err
activeContainers[containerData.NetworkSettings.DefaultNetworkSettings.IPAddress] = peerInfo{dockerData: resp.containerResponse, isCoordinator: false}
return tmpPeerEndpoints, blockUntilUp(ctx, tmpPeerEndpoints)
// Make the port forward binding, so we can access the coordinator from the host
func makeBinding(ip, internalPort string, externalPort string) nat.PortMap {
binding := nat.PortBinding{
HostIP: ip,
HostPort: externalPort,
bindingMap := map[nat.Port][]nat.PortBinding{nat.Port(fmt.Sprintf("%s/tcp", internalPort)): {binding}}
return nat.PortMap(bindingMap)
func killDockerContainers(ctx context.Context, cli *client.Client, activeContainers map[string]peerInfo) {
for _, v := range activeContainers {
killDockerContainer(ctx, cli, v.dockerData)
func killDockerContainer(ctx context.Context, cli *client.Client, container container.ContainerCreateCreatedBody) {
fmt.Print("Kill container ", container.ID[:10], "... ")
if err := cli.ContainerKill(ctx, container.ID, "9"); err != nil {
func attachDockerContainerStdout(ctx context.Context, cli *client.Client, id string) {
resp, err := cli.ContainerLogs(ctx, id, containerLogConfig)
if err != nil {
go io.Copy(os.Stdout, resp) // TODO: this goroutine leaks
func imageBuild(ctx context.Context, dockerClient *client.Client, debugMode string) error {
// Docker need a BuildContext, generate it...
tar, err := archive.TarWithOptions(".", &archive.TarOptions{})
if err != nil {
return err
resp, err := dockerClient.ImageBuild(ctx, tar, constellationDockerImageBuildOptions)
if err != nil {
return err
defer resp.Body.Close()
if debugMode == "true" {
if _, err := io.Copy(os.Stdout, resp.Body); err != nil {
return err
// Block until EOF, so the build has finished if we continue
_, err = io.Copy(io.Discard, resp.Body)
return err
// count number of wireguard peers within all active docker containers
func countPeersTest(ctx context.Context, t *testing.T, cli *client.Client, execConfig types.ExecConfig, activeContainers map[string]peerInfo) {
t.Run("countPeerTest", func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
for ip, id := range activeContainers {
respIDExecCreate, err := cli.ContainerExecCreate(ctx, id.dockerData.ID, execConfig)
respID, err := cli.ContainerExecAttach(ctx, respIDExecCreate.ID, types.ExecStartCheck{})
output, err := io.ReadAll(respID.Reader)
countedPeers := strings.Count(string(output), "peer")
fmt.Printf("% 3d peers in container %s [%s] out of % 3d total nodes \n", countedPeers, id.dockerData.ID, ip, len(activeContainers))
// coordinator has another VPN endpoint for communication with the end user
if id.isCoordinator {
assert.Equal(len(activeContainers), countedPeers)
} else {
assert.Equal(len(activeContainers)-1, countedPeers)
if (len(activeContainers) - 1) != countedPeers {
attachDockerContainerStdout(ctx, cli, id.dockerData.ID)
func pingTest(ctx context.Context, t *testing.T, cli *client.Client, execConfig types.ExecConfig, activeContainers map[string]peerInfo, etcdstore store.Store) {
t.Run("pingTest", func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
peerEndpoints, err := getPeerVPNIPsFromEtcd(etcdstore)
// all nodes + coordinator are peers
require.Equal(len(peerEndpoints), len(activeContainers))
for i := 0; i < len(peerEndpoints); i++ {
execConfig.Cmd = []string{"ping", "-q", "-c", "1", "-W", "1", peerEndpoints[i]}
for _, id := range activeContainers {
fmt.Printf("Ping from container %v | % 19s to container % 19s", id.dockerData.ID, id.vpnIP, peerEndpoints[i])
respIDExecCreate, err := cli.ContainerExecCreate(ctx, id.dockerData.ID, execConfig)
err = cli.ContainerExecStart(ctx, respIDExecCreate.ID, types.ExecStartCheck{})
resp, err := cli.ContainerExecInspect(ctx, respIDExecCreate.ID)
assert.Equal(0, resp.ExitCode)
if resp.ExitCode == 0 {
fmt.Printf(" ...Success\n")
} else {
fmt.Printf(" ...Failure\n")
type newNodeData struct {
containerResponse container.ContainerCreateCreatedBody
dockerIPAddr string
// pass error one level up
func createNewNode(ctx context.Context, cli *client.Client) (*newNodeData, error) {
resp, err := cli.ContainerCreate(ctx, configNode, hostconfigNode, nil, nil, "")
if err != nil {
return nil, err
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
return nil, err
containerData, err := cli.ContainerInspect(ctx, resp.ID)
if err != nil {
return nil, err
fmt.Printf("created Node %v\n", containerData.ID)
return &newNodeData{resp, net.JoinHostPort(containerData.NetworkSettings.IPAddress, publicgRPCPort)}, nil
func awaitPeerResponse(ctx context.Context, endpoint string, tlsConfig *tls.Config) error {
// Block, so the connection gets established/fails immediately
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, endpoint, grpc.WithBlock(), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil {
return err
return conn.Close()
func blockUntilUp(ctx context.Context, peerEndpoints []string) error {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil {
return err
for _, endpoint := range peerEndpoints {
// Block, so the connection gets established/fails immediately
if err := awaitPeerResponse(ctx, endpoint, tlsConfig); err != nil {
return err
return nil
func getPeerVPNIPsFromEtcd(etcdstore store.Store) ([]string, error) {
peers, err := storewrapper.StoreWrapper{Store: etcdstore}.GetPeers()
if err != nil {
return nil, err
vpnIPS := make([]string, 0, len(peers))
for _, v := range peers {
vpnIPS = append(vpnIPS, v.VPNIP)
return vpnIPS, nil
func translatePublicToVPNIP(publicIP string, etcdstore store.Store) (string, error) {
peers, err := storewrapper.StoreWrapper{Store: etcdstore}.GetPeers()
if err != nil {
return "", err
// port has to be the same as pubapi-Server port
publicEndpoint := net.JoinHostPort(publicIP, "9000")
for _, peer := range peers {
if peer.PublicEndpoint == publicEndpoint {
return peer.VPNIP, nil
return "", errors.New("Did not found VPN IP")
func updateVPNIPs(activeContainers map[string]peerInfo, etcdstore store.Store) error {
for publicIP, v := range activeContainers {
vpnIP, err := translatePublicToVPNIP(publicIP, etcdstore)
if err != nil {
return err
v.vpnIP = vpnIP
activeContainers[publicIP] = v
return nil