coordinator-core: add multi coordinator Kubernetes integration (#39)

Signed-off-by: Benedict Schlueter <bs@edgeless.systems>
This commit is contained in:
Benedict Schlueter 2022-04-25 17:26:17 +02:00 committed by Benedict Schlüter
parent 0ac9617dac
commit 86178df205
19 changed files with 359 additions and 154 deletions

View File

@ -328,15 +328,15 @@ func newVPN(netw *network, publicEndpoint string) *fakeVPN {
}
}
func (*fakeVPN) Setup(privKey []byte) ([]byte, error) {
return nil, nil
func (*fakeVPN) Setup(privKey []byte) error {
return nil
}
func (*fakeVPN) GetPrivateKey() ([]byte, error) {
return nil, nil
}
func (*fakeVPN) GetPublicKey(privKey []byte) ([]byte, error) {
func (*fakeVPN) GetPublicKey() ([]byte, error) {
return nil, nil
}

View File

@ -70,17 +70,7 @@ func NewCore(vpn VPN, kube Cluster,
return nil, err
}
privk, err := vpn.Setup(nil)
if err != nil {
return nil, err
}
pubk, err := vpn.GetPublicKey(privk)
if err != nil {
return nil, err
}
if err := c.data().PutVPNKey(pubk); err != nil {
if err := vpn.Setup(nil); err != nil {
return nil, err
}
@ -89,7 +79,7 @@ func NewCore(vpn VPN, kube Cluster,
// GetVPNPubKey returns the peer's VPN public key.
func (c *Core) GetVPNPubKey() ([]byte, error) {
return c.data().GetVPNKey()
return c.vpn.GetPublicKey()
}
// GetVPNPubKey returns the peer's VPN public key.

View File

@ -13,7 +13,6 @@ import (
"github.com/edgelesssys/constellation/coordinator/role"
"github.com/edgelesssys/constellation/coordinator/state"
"github.com/edgelesssys/constellation/coordinator/store"
"github.com/edgelesssys/constellation/coordinator/storewrapper"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -78,13 +77,13 @@ func TestSwitchToPersistentStore(t *testing.T) {
storeFactory := &fakeStoreFactory{}
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, storeFactory, file.NewHandler(afero.NewMemMapFs()))
require.NoError(core.store.Put("test", []byte("test")))
require.NoError(err)
require.NoError(core.SwitchToPersistentStore())
key, err := storewrapper.StoreWrapper{Store: storeFactory.store}.GetVPNKey()
require.NoError(err)
assert.NotEmpty(key)
value, err := core.store.Get("test")
assert.NoError(err)
assert.Equal("test", string(value))
}
func TestGetIDs(t *testing.T) {

View File

@ -79,16 +79,11 @@ func TestLegacyActivateCoordinator(t *testing.T) {
testActivationSvr := &stubAVPNActivateCoordinatorServer{}
assert.NoError(coordinatorAPI.ActivateAsCoordinator(activationReq, testActivationSvr))
// Coordinator sets own key
coordinatorKey, err := coordinatorCore.data().GetVPNKey()
assert.NoError(err)
// Coordinator streams admin conf
require.NotEmpty(testActivationSvr.sent)
adminConfig := testActivationSvr.sent[len(testActivationSvr.sent)-1].GetAdminConfig()
require.NotNil(adminConfig)
assert.NotEmpty(adminConfig.AdminVpnIp)
assert.Equal(coordinatorKey, adminConfig.CoordinatorVpnPubKey)
assert.NotNil(adminConfig.Kubeconfig)
require.NotNil(testActivationSvr.sent[0])
require.NotNil(testActivationSvr.sent[0].GetLog())
@ -114,7 +109,6 @@ func TestLegacyActivateCoordinator(t *testing.T) {
peers = nodeCore1.vpn.(*stubVPN).peers
assert.Less(0, len(peers))
assert.NotEmpty(peers[0].publicIP)
assert.Equal(coordinatorKey, peers[0].pubKey)
}
// newMockCoreWithDialer creates a new core object with attestation mock and provided dialer for testing.

View File

@ -2,15 +2,14 @@ package core
import (
"bytes"
"errors"
"github.com/edgelesssys/constellation/coordinator/peer"
)
type VPN interface {
Setup(privKey []byte) ([]byte, error)
Setup(privKey []byte) error
GetPrivateKey() ([]byte, error)
GetPublicKey(privKey []byte) ([]byte, error)
GetPublicKey() ([]byte, error)
GetInterfaceIP() (string, error)
SetInterfaceIP(ip string) error
AddPeer(pubKey []byte, publicIP string, vpnIP string) error
@ -28,19 +27,16 @@ type stubVPN struct {
getPrivateKeyErr error
}
func (*stubVPN) Setup(privKey []byte) ([]byte, error) {
return []byte{2, 3, 4}, nil
func (*stubVPN) Setup(privKey []byte) error {
return nil
}
func (v *stubVPN) GetPrivateKey() ([]byte, error) {
return v.privateKey, v.getPrivateKeyErr
}
func (*stubVPN) GetPublicKey(privKey []byte) ([]byte, error) {
if bytes.Equal(privKey, []byte{2, 3, 4}) {
return []byte{3, 4, 5}, nil
}
return nil, errors.New("unexpected privKey")
func (*stubVPN) GetPublicKey() ([]byte, error) {
return []byte{3, 4, 5}, nil
}
func (v *stubVPN) GetInterfaceIP() (string, error) {

View File

@ -105,16 +105,20 @@ func (a *API) ActivateAsCoordinator(in *pubproto.ActivateAsCoordinatorRequest, s
panic(err)
}
}()
if err := a.core.SwitchToPersistentStore(); err != nil {
return status.Errorf(codes.Internal, "switch to persistent store: %v", err)
}
// TODO: check performance and maybe make concurrent
if err := a.activateCoordinators(logToCLI, in.CoordinatorPublicIps); err != nil {
a.logger.Error("coordinator activation failed", zap.Error(err))
return status.Errorf(codes.Internal, "coordinator initialization: %v", err)
}
// TODO: check performance and maybe make concurrent
if err := a.activateNodes(logToCLI, in.NodePublicIps); err != nil {
a.logger.Error("node activation failed", zap.Error(err))
return status.Errorf(codes.Internal, "node initialization: %v", err)
}
if err := a.core.SwitchToPersistentStore(); err != nil {
return status.Errorf(codes.Internal, "switch to persistent store: %v", err)
}
// persist node state on disk
if err := a.core.PersistNodeState(role.Coordinator, ownerID, clusterID); err != nil {
return status.Errorf(codes.Internal, "persist node state: %v", err)

View File

@ -334,6 +334,7 @@ type stubPeer struct {
activateAsNodeReceive int
activateErr error
joinErr error
getPubKeyErr error
pubproto.UnimplementedAPIServer
}
@ -380,6 +381,10 @@ func (n *stubPeer) JoinCluster(ctx context.Context, in *pubproto.JoinClusterRequ
return &pubproto.JoinClusterResponse{}, n.joinErr
}
func (n *stubPeer) GetPeerVPNPublicKey(ctx context.Context, in *pubproto.GetPeerVPNPublicKeyRequest) (*pubproto.GetPeerVPNPublicKeyResponse, error) {
return &pubproto.GetPeerVPNPublicKeyResponse{CoordinatorPubKey: n.peer.VPNPubKey}, n.getPubKeyErr
}
func (n *stubPeer) newServer() *grpc.Server {
tlsConfig, err := atls.CreateAttestationServerTLSConfig(fakeIssuer{})
if err != nil {

View File

@ -37,5 +37,5 @@ type Core interface {
UpdatePeers([]peer.Peer) error
InitCluster(autoscalingNodeGroups []string, cloudServiceAccountURI string) ([]byte, error)
JoinCluster(kubeadm.BootstrapTokenDiscovery) error
JoinCluster(joinToken *kubeadm.BootstrapTokenDiscovery, certificateKey string, role role.Role) error
}

View File

@ -14,6 +14,7 @@ import (
type fakeCore struct {
vpnPubKey []byte
getvpnPubKeyErr error
vpnIP string
setVPNIPErr error
nextNodeIP netip.Addr
@ -38,7 +39,7 @@ type fakeCore struct {
}
func (c *fakeCore) GetVPNPubKey() ([]byte, error) {
return c.vpnPubKey, nil
return c.vpnPubKey, c.getvpnPubKeyErr
}
func (c *fakeCore) SetVPNIP(ip string) error {
@ -123,8 +124,8 @@ func (c *fakeCore) InitCluster(autoscalingNodeGroups []string, cloudServiceAccou
return c.kubeconfig, nil
}
func (c *fakeCore) JoinCluster(args kubeadm.BootstrapTokenDiscovery) error {
c.joinArgs = append(c.joinArgs, args)
func (c *fakeCore) JoinCluster(args *kubeadm.BootstrapTokenDiscovery, _ string, _ role.Role) error {
c.joinArgs = append(c.joinArgs, *args)
return c.joinClusterErr
}

View File

@ -2,15 +2,18 @@ package pubapi
import (
"context"
"fmt"
"net"
"github.com/edgelesssys/constellation/coordinator/peer"
"github.com/edgelesssys/constellation/coordinator/pubapi/pubproto"
"github.com/edgelesssys/constellation/coordinator/role"
"github.com/edgelesssys/constellation/coordinator/state"
"github.com/edgelesssys/constellation/coordinator/vpnapi/vpnproto"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
)
// ActivateAsAdditionalCoordinator is the RPC call to activate subsequent coordinators.
@ -58,7 +61,21 @@ func (a *API) ActivateAsAdditionalCoordinator(ctx context.Context, in *pubproto.
}
}()
// TODO: kubernetes information and join
a.logger.Info("retrieving k8s join information ")
joinArgs, certKey, err := a.getk8SCoordinatorJoinArgs(ctx, in.ActivatingCoordinatorData.VpnIp, vpnAPIPort)
if err != nil {
return nil, status.Errorf(codes.Internal, "error in getk8sJoinArgs: %v", err)
}
// Before we join the cluster we need to be able to communicate with ALL other control-planes
err = a.core.UpdatePeers(peer.FromPubProto(in.Peers))
if err != nil {
return nil, status.Errorf(codes.Internal, "add peers to vpn: %v", err)
}
a.logger.Info("about to join the k8s cluster")
err = a.core.JoinCluster(joinArgs, certKey, role.Coordinator)
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}
// ATTENTION: STORE HAS TO BE EMPTY (NO OVERLAPPING KEYS) WHEN THIS FUNCTION IS CALLED
if err := a.core.SwitchToPersistentStore(); err != nil {
@ -118,7 +135,6 @@ func (a *API) ActivateAsAdditionalCoordinator(ctx context.Context, in *pubproto.
if err != nil {
return nil, status.Errorf(codes.Internal, "get peers from store: %v", err)
}
a.logger.Info("", zap.Any("peers", peers))
for _, p := range peers {
if p.Role == role.Node {
if err := a.triggerNodeUpdate(p.PublicIP); err != nil {
@ -126,7 +142,6 @@ func (a *API) ActivateAsAdditionalCoordinator(ctx context.Context, in *pubproto.
}
}
if p.Role == role.Coordinator && p.VPNIP != thisPeer.VPNIP {
a.logger.Info("update coordinator", zap.String("coordinator vpnIP", p.VPNIP))
if err := a.triggerCoordinatorUpdate(context.TODO(), p.PublicIP); err != nil {
a.logger.Error("triggerCoordinatorUpdate failed", zap.Error(err), zap.String("endpoint", p.PublicIP), zap.String("vpnip", p.VPNIP))
}
@ -137,51 +152,90 @@ func (a *API) ActivateAsAdditionalCoordinator(ctx context.Context, in *pubproto.
}
func (a *API) ActivateAdditionalCoordinator(ctx context.Context, in *pubproto.ActivateAdditionalCoordinatorRequest) (*pubproto.ActivateAdditionalCoordinatorResponse, error) {
ctx, cancel := context.WithTimeout(ctx, deadlineDuration)
defer cancel()
if err := a.core.RequireState(state.ActivatingNodes); err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "coordinator is not in required state: %v", err)
}
assignedVPNIP, err := a.core.GetNextCoordinatorIP()
if err != nil {
return nil, status.Errorf(codes.Internal, "requesting new coordinator vpn IP address: %v", err)
}
vpnIP, err := a.core.GetVPNIP()
if err != nil {
return nil, status.Errorf(codes.Internal, "get own vpn IP address: %v", err)
}
thisPeer, err := a.assemblePeerStruct(vpnIP, role.Coordinator)
if err != nil {
return nil, status.Errorf(codes.Internal, "assembling coordinator peer struct: %v", err)
}
ownerID, clusterID, err := a.core.GetIDs(nil)
if err != nil {
return nil, status.Errorf(codes.Internal, "get owner and cluster ID: %v", err)
}
conn, err := a.dial(ctx, net.JoinHostPort(in.CoordinatorPublicIp, endpointAVPNPort))
if err != nil {
return nil, status.Errorf(codes.Internal, "dialing new coordinator: %v", err)
}
defer conn.Close()
client := pubproto.NewAPIClient(conn)
_, err = client.ActivateAsAdditionalCoordinator(ctx, &pubproto.ActivateAsAdditionalCoordinatorRequest{
AssignedVpnIp: assignedVPNIP,
ActivatingCoordinatorData: peer.ToPubProto([]peer.Peer{thisPeer})[0],
OwnerId: ownerID,
ClusterId: clusterID,
})
err := a.activateCoordinator(ctx, in.CoordinatorPublicIp)
if err != nil {
a.logger.Error("coordinator activation failed", zap.Error(err))
return nil, status.Errorf(codes.Internal, "activate new coordinator: %v", err)
}
return &pubproto.ActivateAdditionalCoordinatorResponse{}, nil
}
func (a *API) activateCoordinators(logToCLI logFunc, coordinatorPublicIPs []string) error {
// Activate all coordinators.
for num, coordinatorPublicIP := range coordinatorPublicIPs {
logToCLI("activating coordinator %3d out of %3d coordinators ...", num+2, len(coordinatorPublicIPs)+1)
if err := a.activateCoordinator(context.TODO(), coordinatorPublicIP); err != nil {
return err
}
}
return nil
}
func (a *API) activateCoordinator(ctx context.Context, coordinatorIP string) error {
ctx, cancel := context.WithTimeout(ctx, deadlineDuration)
defer cancel()
if err := a.core.RequireState(state.ActivatingNodes); err != nil {
return fmt.Errorf("coordinator is not in required state: %v", err)
}
assignedVPNIP, err := a.core.GetNextCoordinatorIP()
if err != nil {
return fmt.Errorf("requesting new coordinator vpn IP address: %v", err)
}
vpnIP, err := a.core.GetVPNIP()
if err != nil {
return fmt.Errorf("get own vpn IP address: %v", err)
}
thisPeer, err := a.assemblePeerStruct(vpnIP, role.Coordinator)
if err != nil {
return fmt.Errorf("assembling coordinator peer struct: %v", err)
}
ownerID, clusterID, err := a.core.GetIDs(nil)
if err != nil {
return fmt.Errorf("get owner and cluster ID: %v", err)
}
_, peers, err := a.core.GetPeers(0)
if err != nil {
return err
}
conn, err := a.dial(ctx, net.JoinHostPort(coordinatorIP, endpointAVPNPort))
if err != nil {
return fmt.Errorf("dialing new coordinator: %v", err)
}
defer conn.Close()
client := pubproto.NewAPIClient(conn)
// This call can be omitted, since this function will be called by the ToActivating Coordinator
// and he knows his own PubKey, so he can pass this as argument.
// TODO: Remove this gRPC function when we have working integration.
resp, err := client.GetPeerVPNPublicKey(ctx, &pubproto.GetPeerVPNPublicKeyRequest{})
if err != nil {
a.logger.Error("failed to get PubKey from new coordinator", zap.Error(err))
return err
}
newCoordinatorPeer := peer.Peer{VPNIP: assignedVPNIP, PublicIP: coordinatorIP, VPNPubKey: resp.CoordinatorPubKey, Role: role.Coordinator}
err = a.core.AddPeer(newCoordinatorPeer)
if err != nil {
a.logger.Error("failed to store new coordinator data", zap.Error(err))
return err
}
for _, p := range peers {
if p.Role == role.Coordinator && p.VPNIP != thisPeer.VPNIP {
if err := a.triggerCoordinatorUpdate(context.TODO(), p.PublicIP); err != nil {
a.logger.Error("triggerCoordinatorUpdate failed", zap.Error(err), zap.String("endpoint", p.PublicIP), zap.String("vpnip", p.VPNIP))
}
}
}
_, err = client.ActivateAsAdditionalCoordinator(ctx, &pubproto.ActivateAsAdditionalCoordinatorRequest{
AssignedVpnIp: assignedVPNIP,
ActivatingCoordinatorData: peer.ToPubProto([]peer.Peer{thisPeer})[0],
Peers: peer.ToPubProto(peers),
OwnerId: ownerID,
ClusterId: clusterID,
})
return err
}
func (a *API) TriggerCoordinatorUpdate(ctx context.Context, in *pubproto.TriggerCoordinatorUpdateRequest) (*pubproto.TriggerCoordinatorUpdateResponse, error) {
if err := a.core.RequireState(state.ActivatingNodes); err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "coordinator is not in required state for updating state: %v", err)
@ -202,6 +256,15 @@ func (a *API) TriggerCoordinatorUpdate(ctx context.Context, in *pubproto.Trigger
return &pubproto.TriggerCoordinatorUpdateResponse{}, nil
}
// GetPeerVPNPublicKey return the VPN publicKey of the peer.
func (a *API) GetPeerVPNPublicKey(ctx context.Context, in *pubproto.GetPeerVPNPublicKeyRequest) (*pubproto.GetPeerVPNPublicKeyResponse, error) {
key, err := a.core.GetVPNPubKey()
if err != nil {
return nil, status.Errorf(codes.Internal, "could not obtain VPNPubKey %v", err)
}
return &pubproto.GetPeerVPNPublicKeyResponse{CoordinatorPubKey: key}, nil
}
func (a *API) triggerCoordinatorUpdate(ctx context.Context, publicIP string) error {
ctx, cancel := context.WithTimeout(ctx, deadlineDuration)
defer cancel()
@ -219,3 +282,28 @@ func (a *API) triggerCoordinatorUpdate(ctx context.Context, publicIP string) err
return err
}
func (a *API) getk8SCoordinatorJoinArgs(ctx context.Context, coordinatorIP, port string) (*kubeadm.BootstrapTokenDiscovery, string, error) {
conn, err := a.dialInsecure(ctx, net.JoinHostPort(coordinatorIP, port))
if err != nil {
return nil, "", err
}
defer conn.Close()
client := vpnproto.NewAPIClient(conn)
// since the key has to generated every time, this gRPC induces ~1s overhead.
resp, err := client.GetK8SCertificateKey(ctx, &vpnproto.GetK8SCertificateKeyRequest{})
if err != nil {
return nil, "", err
}
joinArgs, err := client.GetK8SJoinArgs(ctx, &vpnproto.GetK8SJoinArgsRequest{})
if err != nil {
return nil, "", err
}
joinToken := &kubeadm.BootstrapTokenDiscovery{
Token: joinArgs.Token,
APIServerEndpoint: joinArgs.ApiServerEndpoint,
CACertHashes: []string{joinArgs.DiscoveryTokenCaCertHash},
}
return joinToken, resp.CertificateKey, err
}

View File

@ -14,29 +14,40 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
)
func TestActivateAsCoordinators(t *testing.T) {
func TestActivateAsAdditionalCoordinator(t *testing.T) {
coordinatorPubKey := []byte{6, 7, 8}
testCoord1 := stubPeer{peer: peer.Peer{PublicIP: "192.0.2.11", VPNPubKey: []byte{1, 2, 3}, VPNIP: "10.118.0.1", Role: role.Coordinator}}
stubVPN := stubVPNAPI{joinArgs: kubeadm.BootstrapTokenDiscovery{
APIServerEndpoint: "endp",
Token: "token",
CACertHashes: []string{"dis"},
}}
someErr := errors.New("some error")
testCases := map[string]struct {
coordinators stubPeer
state state.State
switchToPersistentStoreErr error
expectErr bool
expectedState state.State
vpnapi stubVPNAPI
expectErr bool
switchToPersistentStoreErr error
k8sJoinargsErr error
k8sCertKeyErr error
}{
"basic": {
coordinators: testCoord1,
state: state.AcceptingInit,
expectedState: state.ActivatingNodes,
vpnapi: stubVPN,
},
"already activated": {
state: state.ActivatingNodes,
expectErr: true,
expectedState: state.ActivatingNodes,
vpnapi: stubVPN,
},
"SwitchToPersistentStore error": {
coordinators: testCoord1,
@ -44,6 +55,25 @@ func TestActivateAsCoordinators(t *testing.T) {
switchToPersistentStoreErr: someErr,
expectErr: true,
expectedState: state.Failed,
vpnapi: stubVPN,
},
"GetK8SJoinArgs error": {
coordinators: testCoord1,
state: state.AcceptingInit,
switchToPersistentStoreErr: someErr,
expectErr: true,
expectedState: state.Failed,
vpnapi: stubVPN,
k8sJoinargsErr: someErr,
},
"GetK8SCertificateKeyErr error": {
coordinators: testCoord1,
state: state.AcceptingInit,
switchToPersistentStoreErr: someErr,
expectErr: true,
expectedState: state.Failed,
vpnapi: stubVPN,
k8sCertKeyErr: someErr,
},
}
@ -52,6 +82,8 @@ func TestActivateAsCoordinators(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
tc.vpnapi.getJoinArgsErr = tc.k8sJoinargsErr
tc.vpnapi.getK8SCertKeyErr = tc.k8sCertKeyErr
core := &fakeCore{
state: tc.state,
vpnPubKey: coordinatorPubKey,
@ -69,10 +101,10 @@ func TestActivateAsCoordinators(t *testing.T) {
api := New(zaptest.NewLogger(t), core, dialer, stubVPNAPIServer{}, fakeValidator{}, getPublicIPAddr, nil)
defer api.Close()
// spawn coordinator
server := tc.coordinators.newServer()
go server.Serve(dialer.GetListener(tc.coordinators.peer.PublicIP))
defer server.GracefulStop()
// spawn vpnServer
vpnapiServer := tc.vpnapi.newServer()
go vpnapiServer.Serve(dialer.GetListener(net.JoinHostPort(tc.coordinators.peer.VPNIP, vpnAPIPort)))
defer vpnapiServer.GracefulStop()
_, err := api.ActivateAsAdditionalCoordinator(context.Background(), &pubproto.ActivateAsAdditionalCoordinatorRequest{
AssignedVpnIp: "10.118.0.2",
@ -158,11 +190,12 @@ func TestActivateAdditionalCoordinators(t *testing.T) {
testCoord1 := stubPeer{peer: peer.Peer{PublicIP: "192.0.2.11", VPNPubKey: []byte{1, 2, 3}, VPNIP: "10.118.0.1", Role: role.Coordinator}}
testCases := map[string]struct {
coordinators stubPeer
state state.State
activateErr error
expectErr bool
expectedState state.State
coordinators stubPeer
state state.State
activateErr error
getPublicKeyErr error
expectErr bool
expectedState state.State
}{
"basic": {
coordinators: testCoord1,
@ -182,6 +215,13 @@ func TestActivateAdditionalCoordinators(t *testing.T) {
expectedState: state.AcceptingInit,
expectErr: true,
},
"getPeerPublicKey error": {
coordinators: testCoord1,
state: state.ActivatingNodes,
expectedState: state.ActivatingNodes,
getPublicKeyErr: someErr,
expectErr: true,
},
}
for name, tc := range testCases {
@ -207,6 +247,7 @@ func TestActivateAdditionalCoordinators(t *testing.T) {
// spawn coordinator
tc.coordinators.activateErr = tc.activateErr
tc.coordinators.getPubKeyErr = tc.getPublicKeyErr
server := tc.coordinators.newServer()
go server.Serve(dialer.GetListener(net.JoinHostPort(tc.coordinators.peer.PublicIP, endpointAVPNPort)))
defer server.GracefulStop()
@ -223,3 +264,52 @@ func TestActivateAdditionalCoordinators(t *testing.T) {
})
}
}
func TestGetPeerVPNPublicKey(t *testing.T) {
someErr := errors.New("failed")
testCoord := stubPeer{peer: peer.Peer{PublicIP: "192.0.2.11", VPNPubKey: []byte{1, 2, 3}, VPNIP: "10.118.0.1", Role: role.Coordinator}}
testCases := map[string]struct {
coordinator stubPeer
getVPNPubKeyErr error
expectErr bool
}{
"basic": {
coordinator: testCoord,
},
"Activation Err": {
coordinator: testCoord,
getVPNPubKeyErr: someErr,
expectErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core := &fakeCore{
vpnPubKey: tc.coordinator.peer.VPNPubKey,
getvpnPubKeyErr: tc.getVPNPubKeyErr,
}
dialer := testdialer.NewBufconnDialer()
getPublicIPAddr := func() (string, error) {
return "192.0.2.1", nil
}
api := New(zaptest.NewLogger(t), core, dialer, stubVPNAPIServer{}, fakeValidator{}, getPublicIPAddr, nil)
defer api.Close()
resp, err := api.GetPeerVPNPublicKey(context.Background(), &pubproto.GetPeerVPNPublicKeyRequest{})
if tc.expectErr {
assert.Error(err)
return
}
require.NoError(err)
assert.Equal(tc.coordinator.peer.VPNPubKey, resp.CoordinatorPubKey)
})
}
}

View File

@ -178,11 +178,11 @@ func (a *API) JoinCluster(ctx context.Context, in *pubproto.JoinClusterRequest)
return nil, status.Errorf(codes.Internal, "request K8s join string: %v", err)
}
err = a.core.JoinCluster(kubeadm.BootstrapTokenDiscovery{
err = a.core.JoinCluster(&kubeadm.BootstrapTokenDiscovery{
APIServerEndpoint: resp.ApiServerEndpoint,
Token: resp.Token,
CACertHashes: []string{resp.DiscoveryTokenCaCertHash},
})
}, "", role.Node)
if err != nil {
_ = a.core.AdvanceState(state.Failed, nil, nil)
return nil, status.Errorf(codes.Internal, "joining kubernetes cluster: %v", err)

View File

@ -400,10 +400,11 @@ func dialGRPC(ctx context.Context, dialer Dialer, target string) (*grpc.ClientCo
}
type stubVPNAPI struct {
peers []peer.Peer
getUpdateErr error
joinArgs kubeadm.BootstrapTokenDiscovery
getJoinArgsErr error
peers []peer.Peer
joinArgs kubeadm.BootstrapTokenDiscovery
getUpdateErr error
getJoinArgsErr error
getK8SCertKeyErr error
vpnproto.UnimplementedAPIServer
}
@ -418,3 +419,13 @@ func (a *stubVPNAPI) GetK8SJoinArgs(ctx context.Context, in *vpnproto.GetK8SJoin
DiscoveryTokenCaCertHash: a.joinArgs.CACertHashes[0],
}, a.getJoinArgsErr
}
func (a *stubVPNAPI) GetK8SCertificateKey(ctx context.Context, in *vpnproto.GetK8SCertificateKeyRequest) (*vpnproto.GetK8SCertificateKeyResponse, error) {
return &vpnproto.GetK8SCertificateKeyResponse{CertificateKey: "dummyCertKey"}, a.getK8SCertKeyErr
}
func (a *stubVPNAPI) newServer() *grpc.Server {
server := grpc.NewServer()
vpnproto.RegisterAPIServer(server, a)
return server
}

View File

@ -19,7 +19,7 @@ import (
)
const (
deadlineDuration = time.Minute
deadlineDuration = 5 * time.Minute
endpointAVPNPort = "9000"
vpnAPIPort = "9027"
updateInterval = 10 * time.Second

View File

@ -25,7 +25,6 @@ const (
keyMasterSecret = "masterSecret"
keyKubeConfig = "kubeConfig"
keyClusterID = "clusterID"
keyVPNPubKey = "vpnKey"
keyKMSData = "KMSData"
keyKEKID = "kekID"
prefixFreeCoordinatorIPs = "freeCoordinatorVPNIPs"
@ -71,16 +70,6 @@ func (s StoreWrapper) PutState(currState state.State) error {
return s.Store.Put("state", rawState)
}
// GetVPNKey returns the VPN pubKey from Store.
func (s StoreWrapper) GetVPNKey() ([]byte, error) {
return s.Store.Get(keyVPNPubKey)
}
// PutVPNKey saves the VPN pubKey to store.
func (s StoreWrapper) PutVPNKey(key []byte) error {
return s.Store.Put(keyVPNPubKey, key)
}
// PutPeer puts a single peer in the store, with a unique key derived form the VPNIP.
func (s StoreWrapper) PutPeer(peer peer.Peer) error {
if len(peer.VPNIP) == 0 {

View File

@ -27,15 +27,11 @@ func TestStoreWrapper(t *testing.T) {
curState := state.IsNode
key := []byte{2, 3, 4}
masterSecret := []byte("Constellation")
stor := store.NewStdStore()
stwrapper := StoreWrapper{Store: stor}
assert.NoError(stwrapper.PutState(state.AcceptingInit))
dummyKey, err := wgtypes.GenerateKey()
assert.NoError(err)
assert.NoError(stwrapper.PutVPNKey(dummyKey[:]))
assert.NoError(stwrapper.PutMasterSecret(masterSecret))
// save values to store
@ -43,16 +39,12 @@ func TestStoreWrapper(t *testing.T) {
assert.NoError(err)
txdata := StoreWrapper{tx}
assert.NoError(txdata.PutState(curState))
assert.NoError(txdata.PutVPNKey(key))
assert.NoError(tx.Commit())
// see if we can retrieve them again
savedState, err := stwrapper.GetState()
assert.NoError(err)
assert.Equal(curState, savedState)
savedKey, err := stwrapper.GetVPNKey()
assert.NoError(err)
assert.Equal(key, savedKey)
savedSecret, err := stwrapper.GetMasterSecret()
assert.NoError(err)
assert.Equal(masterSecret, savedSecret)
@ -64,19 +56,10 @@ func TestStoreWrapperDefaults(t *testing.T) {
stor := store.NewStdStore()
stwrapper := StoreWrapper{Store: stor}
assert.NoError(stwrapper.PutState(state.AcceptingInit))
dummyKey, err := wgtypes.GenerateKey()
assert.NoError(err)
assert.NoError(stwrapper.PutVPNKey(dummyKey[:]))
statevalue, err := stwrapper.GetState()
assert.NoError(err)
assert.Equal(state.AcceptingInit, statevalue)
k, err := stwrapper.GetVPNKey()
assert.NoError(err)
assert.NotEmpty(k)
// Nothing else was set, should always return error
}
func TestStoreWrapperRollback(t *testing.T) {
@ -85,26 +68,25 @@ func TestStoreWrapperRollback(t *testing.T) {
stor := store.NewStdStore()
stwrapper := StoreWrapper{Store: stor}
assert.NoError(stwrapper.PutState(state.AcceptingInit))
dummyKey, err := wgtypes.GenerateKey()
assert.NoError(err)
assert.NoError(stwrapper.PutVPNKey(dummyKey[:]))
k1 := []byte{2, 3, 4}
k2 := []byte{3, 4, 5}
assert.NoError(stwrapper.PutClusterID([]byte{1, 2, 3}))
c1 := []byte{2, 3, 4}
c2 := []byte{3, 4, 5}
tx, err := stor.BeginTransaction()
assert.NoError(err)
assert.NoError(StoreWrapper{tx}.PutVPNKey(k1))
assert.NoError(StoreWrapper{tx}.PutClusterID(c1))
assert.NoError(tx.Commit())
tx, err = stor.BeginTransaction()
assert.NoError(err)
assert.NoError(StoreWrapper{tx}.PutVPNKey(k2))
assert.NoError(StoreWrapper{tx}.PutClusterID(c2))
tx.Rollback()
val, err := stwrapper.GetVPNKey()
val, err := stwrapper.GetClusterID()
assert.NoError(err)
assert.Equal(k1, val)
assert.Equal(c1, val)
}
func TestStoreWrapperPeerInterface(t *testing.T) {
@ -114,9 +96,6 @@ func TestStoreWrapperPeerInterface(t *testing.T) {
stor := store.NewStdStore()
stwrapper := StoreWrapper{Store: stor}
assert.NoError(stwrapper.PutState(state.AcceptingInit))
dummyKey, err := wgtypes.GenerateKey()
assert.NoError(err)
assert.NoError(stwrapper.PutVPNKey(dummyKey[:]))
key, err := wgtypes.GeneratePrivateKey()
assert.NoError(err)

View File

@ -58,6 +58,15 @@ func (a *API) GetK8SJoinArgs(ctx context.Context, in *vpnproto.GetK8SJoinArgsReq
}, nil
}
// GetK8SCertificateKey is the RPC call to get the K8s certificateKey necessary for control-plane join.
func (a *API) GetK8SCertificateKey(ctx context.Context, in *vpnproto.GetK8SCertificateKeyRequest) (*vpnproto.GetK8SCertificateKeyResponse, error) {
certKey, err := a.core.GetK8SCertificateKey()
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}
return &vpnproto.GetK8SCertificateKeyResponse{CertificateKey: certKey}, nil
}
// GetDataKey returns a data key derived from the Constellation's master secret.
func (a *API) GetDataKey(ctx context.Context, in *vpnproto.GetDataKeyRequest) (*vpnproto.GetDataKeyResponse, error) {
key, err := a.core.GetDataKey(ctx, in.DataKeyId, int(in.Length))
@ -71,5 +80,6 @@ type Core interface {
GetPeers(resourceVersion int) (int, []peer.Peer, error)
NotifyNodeHeartbeat(net.Addr)
GetK8sJoinArgs() (*kubeadm.BootstrapTokenDiscovery, error)
GetK8SCertificateKey() (string, error)
GetDataKey(ctx context.Context, dataKeyID string, length int) ([]byte, error)
}

View File

@ -143,6 +143,46 @@ func TestGetDataKey(t *testing.T) {
assert.Nil(res)
}
func TestGetK8SCertificateKey(t *testing.T) {
someErr := errors.New("someErr")
certKey := "kubeadmKey"
testCases := map[string]struct {
certKey string
getCertKeyErr error
expectErr bool
}{
"basic": {
certKey: certKey,
},
"error": {
getCertKeyErr: someErr,
expectErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core := &stubCore{
kubeadmCertificateKey: certKey,
getCertKeyErr: tc.getCertKeyErr,
}
api := New(zaptest.NewLogger(t), core)
resp, err := api.GetK8SCertificateKey(context.Background(), &vpnproto.GetK8SCertificateKeyRequest{})
if tc.expectErr {
assert.Error(err)
return
}
require.NoError(err)
assert.Equal(certKey, resp.CertificateKey)
})
}
}
type stubCore struct {
peers []peer.Peer
serverResourceVersion int
@ -150,6 +190,8 @@ type stubCore struct {
clientResourceVersions []int
heartbeats []net.Addr
joinArgs kubeadm.BootstrapTokenDiscovery
kubeadmCertificateKey string
getCertKeyErr error
derivedKey []byte
deriveKeyErr error
}
@ -167,6 +209,10 @@ func (c *stubCore) GetK8sJoinArgs() (*kubeadm.BootstrapTokenDiscovery, error) {
return &c.joinArgs, nil
}
func (c *stubCore) GetK8SCertificateKey() (string, error) {
return c.kubeadmCertificateKey, c.getCertKeyErr
}
func (c *stubCore) GetDataKey(ctx context.Context, dataKeyID string, length int) ([]byte, error) {
if c.deriveKeyErr != nil {
return nil, c.deriveKeyErr

View File

@ -33,7 +33,7 @@ func New() (*Wireguard, error) {
return &Wireguard{client: client}, nil
}
func (w *Wireguard) Setup(privKey []byte) ([]byte, error) {
func (w *Wireguard) Setup(privKey []byte) error {
var key wgtypes.Key
var err error
if len(privKey) == 0 {
@ -42,15 +42,10 @@ func (w *Wireguard) Setup(privKey []byte) ([]byte, error) {
key, err = wgtypes.NewKey(privKey)
}
if err != nil {
return nil, err
return err
}
listenPort := port
if err := w.client.ConfigureDevice(netInterface, wgtypes.Config{PrivateKey: &key, ListenPort: &listenPort}); err != nil {
return nil, prettyWgError(err)
}
return key[:], nil
return w.client.ConfigureDevice(netInterface, wgtypes.Config{PrivateKey: &key, ListenPort: &listenPort})
}
// GetPrivateKey returns the private key of the wireguard interface.
@ -62,7 +57,7 @@ func (w *Wireguard) GetPrivateKey() ([]byte, error) {
return device.PrivateKey[:], nil
}
func (w *Wireguard) GetPublicKey(privKey []byte) ([]byte, error) {
func (w *Wireguard) DerivePublicKey(privKey []byte) ([]byte, error) {
key, err := wgtypes.NewKey(privKey)
if err != nil {
return nil, err
@ -71,6 +66,14 @@ func (w *Wireguard) GetPublicKey(privKey []byte) ([]byte, error) {
return pubkey[:], nil
}
func (w *Wireguard) GetPublicKey() ([]byte, error) {
deviceData, err := w.client.Device(netInterface)
if err != nil {
return nil, err
}
return deviceData.PublicKey[:], nil
}
func (w *Wireguard) GetInterfaceIP() (string, error) {
return util.GetInterfaceIP(netInterface)
}