mirror of
synced 2025-03-06 05:36:07 -05:00
405 lines
12 KiB
405 lines
12 KiB
package coordinator
import (
kms "github.com/edgelesssys/constellation/kms/server/setup"
func TestMain(m *testing.M) {
// https://github.com/census-instrumentation/opencensus-go/issues/1262
// TestCoordinator tests the integration of packages core, pubapi, and vpnapi. It activates
// a coordinator and some nodes and (virtually) sends a packet over the fake VPN.
func TestCoordinator(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
nodeIPs := []string{"", "", ""}
coordinatorIP := ""
bindPort := "9000"
logger := zaptest.NewLogger(t)
dialer := testdialer.NewBufconnDialer()
netw := newNetwork()
// spawn 4 peers: 1 designated coordinator and 3 nodes
coordServer, coordPAPI, _ := spawnPeer(require, logger.Named("coord"), dialer, netw, net.JoinHostPort(coordinatorIP, bindPort))
defer coordPAPI.Close()
defer coordServer.GracefulStop()
nodeServer1, nodePAPI1, nodeVPN1 := spawnPeer(require, logger.Named("node1"), dialer, netw, net.JoinHostPort(nodeIPs[0], bindPort))
defer nodePAPI1.Close()
defer nodeServer1.GracefulStop()
nodeServer2, nodePAPI2, nodeVPN2 := spawnPeer(require, logger.Named("node2"), dialer, netw, net.JoinHostPort(nodeIPs[1], bindPort))
defer nodePAPI2.Close()
defer nodeServer2.GracefulStop()
nodeServer3, nodePAPI3, nodeVPN3 := spawnPeer(require, logger.Named("node3"), dialer, netw, net.JoinHostPort(nodeIPs[2], bindPort))
defer nodePAPI3.Close()
defer nodeServer3.GracefulStop()
require.NoError(activateCoordinator(require, dialer, coordinatorIP, bindPort, nodeIPs))
// send something from node 1 to node 2
nodeIP1, err := nodeVPN1.GetInterfaceIP()
nodeIP2, err := nodeVPN2.GetInterfaceIP()
assert.NotEqual(nodeIP1, nodeIP2)
nodeVPN1.send(nodeIP2, "foo")
pa := nodeVPN2.recv()
assert.Equal(nodeIP1, pa.src)
assert.Equal("foo", pa.data)
// TestConcurrent is supposed to detect data races when run with -race.
func TestConcurrent(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
nodeIPs := []string{"", ""}
coordinatorIP := ""
bindPort := "9000"
logger := zaptest.NewLogger(t)
dialer := testdialer.NewBufconnDialer()
netw := newNetwork()
// spawn peers
coordServer, coordPAPI, _ := spawnPeer(require, logger.Named("coord"), dialer, netw, net.JoinHostPort(coordinatorIP, bindPort))
defer coordPAPI.Close()
defer coordServer.GracefulStop()
nodeServer1, nodePAPI1, _ := spawnPeer(require, logger.Named("node1"), dialer, netw, net.JoinHostPort(nodeIPs[0], bindPort))
defer nodePAPI1.Close()
defer nodeServer1.GracefulStop()
nodeServer2, nodePAPI2, _ := spawnPeer(require, logger.Named("node2"), dialer, netw, net.JoinHostPort(nodeIPs[1], bindPort))
defer nodePAPI2.Close()
defer nodeServer2.GracefulStop()
var wg sync.WaitGroup
// This test is a rather rough check for concurrency errors in the pubapi. To this end, various funcs of the pubapi
// are called concurrently. As a minimal verification, returned errors are checked.
// The coverage of this test alone isn't sufficient. Not all funcs of the pubapi are tested, and arguments are constant.
// In the future, we should have something more sophisticated.
actCoord := func(retErr chan error) {
defer wg.Done()
retErr <- activateCoordinator(require, dialer, coordinatorIP, bindPort, nodeIPs)
actNode := func(papi *pubapi.API) {
defer wg.Done()
// actNode is called on already activated nodes, so this will fail due to wrong state.
updNode := func(papi *pubapi.API, noerr bool) {
defer wg.Done()
_, err := papi.TriggerNodeUpdate(context.Background(), &pubproto.TriggerNodeUpdateRequest{})
if noerr {
getState := func(papi *pubapi.API) {
defer wg.Done()
// GetState should always succeed, regardless of what happened to the peer before.
_, err := papi.GetState(context.Background(), &pubproto.GetStateRequest{})
join := func(papi *pubapi.API) {
defer wg.Done()
// For now, we always pass an empty JoinClusterRequest, so JoinCluster
// is expected to fail even if the peer is in the required state.
_, err := papi.JoinCluster(context.Background(), &pubproto.JoinClusterRequest{})
// activate coordinator and make some other calls concurrently
actCoordErrs := make(chan error, 2)
go actCoord(actCoordErrs)
go actCoord(actCoordErrs)
// updNode on unactivated node should fail.
// updNode on Coordinator should fail.
// updNode on Node should succeed, but we don't know whether the node is already activated or not, so we can't expect no error.
go updNode(coordPAPI, false)
go updNode(coordPAPI, false)
go updNode(nodePAPI1, false)
go updNode(nodePAPI1, false)
go updNode(nodePAPI2, false)
go updNode(nodePAPI2, false)
go getState(coordPAPI)
go getState(coordPAPI)
go getState(nodePAPI1)
go getState(nodePAPI1)
go getState(nodePAPI2)
go getState(nodePAPI2)
go join(coordPAPI)
go join(coordPAPI)
actCoord1HasErr := <-actCoordErrs != nil
actCoord2HasErr := <-actCoordErrs != nil
require.NotEqual(actCoord1HasErr, actCoord2HasErr, "exactly one actCoord call should succeed")
// make some concurrent calls on the activated peers
go actCoord(actCoordErrs)
go actCoord(actCoordErrs)
go actNode(coordPAPI)
go actNode(coordPAPI)
go actNode(nodePAPI1)
go actNode(nodePAPI1)
go actNode(nodePAPI2)
go actNode(nodePAPI2)
go updNode(coordPAPI, false)
go updNode(coordPAPI, false)
go updNode(nodePAPI1, true)
go updNode(nodePAPI1, true)
go updNode(nodePAPI2, true)
go updNode(nodePAPI2, true)
go getState(coordPAPI)
go getState(coordPAPI)
go getState(nodePAPI1)
go getState(nodePAPI1)
go getState(nodePAPI2)
go getState(nodePAPI2)
go join(coordPAPI)
go join(coordPAPI)
go join(nodePAPI1)
go join(nodePAPI1)
go join(nodePAPI2)
go join(nodePAPI2)
// One Coordinator is already activated, following both activation calls will fail now.
func spawnPeer(require *require.Assertions, logger *zap.Logger, netDialer *testdialer.BufconnDialer, netw *network, endpoint string) (*grpc.Server, *pubapi.API, *fakeVPN) {
vpn := newVPN(netw, endpoint)
cor, err := core.NewCore(vpn, &core.ClusterFake{}, &core.ProviderMetadataFake{}, &core.CloudControllerManagerFake{}, &core.CloudNodeManagerFake{}, &core.ClusterAutoscalerFake{}, &core.EncryptedDiskFake{}, logger, simulator.OpenSimulatedTPM, fakeStoreFactory{}, file.NewHandler(afero.NewMemMapFs()))
require.NoError(cor.AdvanceState(state.AcceptingInit, nil, nil))
getPublicAddr := func() (string, error) {
return "", nil
dialer := grpcutil.NewDialer(&core.MockValidator{}, netDialer)
vapiServer := &fakeVPNAPIServer{logger: logger.Named("vpnapi"), core: cor, dialer: netDialer}
papi := pubapi.New(logger, cor, dialer, vapiServer, getPublicAddr, nil)
tlsConfig, err := atls.CreateAttestationServerTLSConfig(&core.MockIssuer{})
server := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
pubproto.RegisterAPIServer(server, papi)
listener := netDialer.GetListener(endpoint)
go server.Serve(listener)
return server, papi, vpn
func activateCoordinator(require *require.Assertions, dialer netDialer, coordinatorIP, bindPort string, nodeIPs []string) error {
ctx := context.Background()
conn, err := dialGRPC(ctx, dialer, net.JoinHostPort(coordinatorIP, bindPort))
defer conn.Close()
client := pubproto.NewAPIClient(conn)
stream, err := client.ActivateAsCoordinator(ctx, &pubproto.ActivateAsCoordinatorRequest{
NodePublicIps: nodeIPs,
MasterSecret: []byte("Constellation"),
KmsUri: kms.ClusterKMSURI,
StorageUri: kms.NoStoreURI,
for {
_, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
if err != nil {
return err
func dialGRPC(ctx context.Context, dialer netDialer, target string) (*grpc.ClientConn, error) {
tlsConfig, err := atls.CreateAttestationClientTLSConfig([]atls.Validator{&core.MockValidator{}})
if err != nil {
return nil, err
return grpc.DialContext(ctx, target,
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return dialer.DialContext(ctx, "tcp", addr)
type fakeStoreFactory struct{}
func (fakeStoreFactory) New() (store.Store, error) {
return store.NewStdStore(), nil
type fakeVPNAPIServer struct {
logger *zap.Logger
core vpnapi.Core
dialer *testdialer.BufconnDialer
listener net.Listener
server *grpc.Server
func (v *fakeVPNAPIServer) Listen(endpoint string) error {
api := vpnapi.New(v.logger, v.core)
v.server = grpc.NewServer()
vpnproto.RegisterAPIServer(v.server, api)
v.listener = v.dialer.GetListener(endpoint)
return nil
func (v *fakeVPNAPIServer) Serve() error {
return v.server.Serve(v.listener)
func (v *fakeVPNAPIServer) Close() {
if v.server != nil {
type network struct {
packets map[string][]packet
func newNetwork() *network {
return &network{packets: make(map[string][]packet)}
type packet struct {
src string
data string
type fakeVPN struct {
peers map[string]string // vpnIP -> publicIP
netw *network
publicIP string
interfaceIP string
func newVPN(netw *network, publicEndpoint string) *fakeVPN {
publicIP, _, err := net.SplitHostPort(publicEndpoint)
if err != nil {
return &fakeVPN{
peers: make(map[string]string),
netw: netw,
publicIP: publicIP,
func (*fakeVPN) Setup(privKey []byte) error {
return nil
func (*fakeVPN) GetPrivateKey() ([]byte, error) {
return nil, nil
func (*fakeVPN) GetPublicKey() ([]byte, error) {
return nil, nil
func (v *fakeVPN) GetInterfaceIP() (string, error) {
return v.interfaceIP, nil
func (v *fakeVPN) SetInterfaceIP(ip string) error {
v.interfaceIP = ip
return nil
func (v *fakeVPN) AddPeer(pubKey []byte, publicIP string, vpnIP string) error {
v.peers[vpnIP] = publicIP
return nil
func (v *fakeVPN) RemovePeer(pubKey []byte) error {
func (v *fakeVPN) UpdatePeers(peers []peer.Peer) error {
for _, peer := range peers {
if err := v.AddPeer(peer.VPNPubKey, peer.PublicIP, peer.VPNIP); err != nil {
return err
return nil
func (v *fakeVPN) send(dst string, data string) {
pubdst := v.peers[dst]
packets := v.netw.packets
packets[pubdst] = append(packets[pubdst], packet{src: v.publicIP, data: data})
func (v *fakeVPN) recv() *packet {
packets := v.netw.packets
queue := packets[v.publicIP]
if len(queue) == 0 {
return nil
packet := queue[0]
packets[v.publicIP] = queue[1:]
for vpnIP, pubIP := range v.peers {
if pubIP == packet.src {
packet.src = vpnIP
return &packet
type netDialer interface {
DialContext(ctx context.Context, network, address string) (net.Conn, error)