Coordinator start: add skeleton to check for pre-existing node state

Signed-off-by: Malte Poll <mp@edgeless.systems>
This commit is contained in:
Malte Poll 2022-04-11 10:38:03 +02:00 committed by Malte Poll
parent 462052427f
commit bcd8c36777
9 changed files with 186 additions and 24 deletions

View file

@ -5,9 +5,11 @@ import (
"regexp"
"testing"
"github.com/edgelesssys/constellation/cli/file"
"github.com/edgelesssys/constellation/coordinator/attestation/vtpm"
"github.com/edgelesssys/constellation/coordinator/kubernetes"
"github.com/edgelesssys/constellation/coordinator/kubernetes/k8sapi/resources"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@ -167,7 +169,7 @@ func TestInitCluster(t *testing.T) {
zapLogger, err := zap.NewDevelopment()
require.NoError(err)
core, err := NewCore(&stubVPN{}, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.cloudNodeManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil)
core, err := NewCore(&stubVPN{}, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.cloudNodeManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
kubeconfig, err := core.InitCluster(tc.autoscalingNodeGroups, "cloud-service-account-uri")
@ -282,7 +284,7 @@ func TestJoinCluster(t *testing.T) {
zapLogger, err := zap.NewDevelopment()
require.NoError(err)
core, err := NewCore(&tc.vpn, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.cloudNodeManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil)
core, err := NewCore(&tc.vpn, &tc.cluster, &tc.metadata, &tc.cloudControllerManager, &tc.cloudNodeManager, &tc.clusterAutoscaler, zapLogger, vtpm.OpenSimulatedTPM, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
joinReq := kubeadm.BootstrapTokenDiscovery{

View file

@ -3,14 +3,18 @@ package core
import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"sync"
"time"
"github.com/edgelesssys/constellation/cli/file"
"github.com/edgelesssys/constellation/coordinator/attestation/vtpm"
"github.com/edgelesssys/constellation/coordinator/config"
kmsSetup "github.com/edgelesssys/constellation/coordinator/kms"
"github.com/edgelesssys/constellation/coordinator/nodestate"
"github.com/edgelesssys/constellation/coordinator/role"
"github.com/edgelesssys/constellation/coordinator/state"
"github.com/edgelesssys/constellation/coordinator/store"
"github.com/edgelesssys/constellation/coordinator/storewrapper"
@ -36,12 +40,13 @@ type Core struct {
zaplogger *zap.Logger
persistentStoreFactory PersistentStoreFactory
lastHeartbeats map[string]time.Time
fileHandler file.Handler
}
// NewCore creates and initializes a new Core object.
func NewCore(vpn VPN, kube Cluster,
metadata ProviderMetadata, cloudControllerManager CloudControllerManager, cloudNodeManager CloudNodeManager, clusterAutoscaler ClusterAutoscaler,
zapLogger *zap.Logger, openTPM vtpm.TPMOpenFunc, persistentStoreFactory PersistentStoreFactory,
zapLogger *zap.Logger, openTPM vtpm.TPMOpenFunc, persistentStoreFactory PersistentStoreFactory, fileHandler file.Handler,
) (*Core, error) {
stor := store.NewStdStore()
c := &Core{
@ -57,6 +62,7 @@ func NewCore(vpn VPN, kube Cluster,
kms: nil, // KMS is set up during init phase
persistentStoreFactory: persistentStoreFactory,
lastHeartbeats: make(map[string]time.Time),
fileHandler: fileHandler,
}
if err := c.data().IncrementPeersResourceVersion(); err != nil {
return nil, err
@ -76,8 +82,6 @@ func NewCore(vpn VPN, kube Cluster,
return nil, err
}
c.state.Advance(state.AcceptingInit)
return c, nil
}
@ -183,6 +187,49 @@ func (c *Core) NotifyNodeHeartbeat(addr net.Addr) {
c.mut.Unlock()
}
// Initialize initializes the state machine of the core and handles re-joining the VPN.
// Blocks until the core is ready to be used.
func (c *Core) Initialize() (nodeActivated bool, err error) {
nodeActivated, err = vtpm.IsNodeInitialized(c.openTPM)
if err != nil {
return false, fmt.Errorf("failed to check for previous activation using vTPM: %w", err)
}
if !nodeActivated {
c.zaplogger.Info("Node was never activated. Allowing node to be activated.")
c.state.Advance(state.AcceptingInit)
return false, nil
}
c.zaplogger.Info("Node was previously activated. Attempting re-join.")
nodeState, err := nodestate.FromFile(c.fileHandler)
if err != nil {
return false, fmt.Errorf("failed to read node state: %w", err)
}
var initialState state.State
switch nodeState.Role {
case role.Coordinator:
initialState = state.ActivatingNodes
case role.Node:
initialState = state.IsNode
default:
return false, fmt.Errorf("invalid node role for initialized node: %v", nodeState.Role)
}
// TODO: if node was previously initialized, attempt to re-join wireguard here.
// Steps to rejoining should include:
// - retrieve list of coordinators from cloud provider API
// - attempt to retrieve list of wireguard public keys from any other coordinator while checking for correct PCRs in ATLS
// - re-establish wireguard connections
// - call update function successfully at least once
// - advance state to IsNode or ActivatingNodes respectively
// - restart update loop
// This procedure can be retried until it succeeds.
// The node must be put into the correct state before the update loop is started.
panic("not implemented")
//nolint:govet // this code is unreachable as long as the above is unimplemented
c.state.Advance(initialState)
return nodeActivated, nil
}
// SetUpKMS sets the Coordinators key management service and key encryption key ID.
// Creates a new key encryption key in the KMS, if requested.
// Otherwise the KEK is assumed to already exist in the KMS.

View file

@ -6,8 +6,14 @@ import (
"net"
"testing"
"github.com/edgelesssys/constellation/cli/file"
"github.com/edgelesssys/constellation/coordinator/attestation/vtpm"
"github.com/edgelesssys/constellation/coordinator/nodestate"
"github.com/edgelesssys/constellation/coordinator/role"
"github.com/edgelesssys/constellation/coordinator/state"
"github.com/edgelesssys/constellation/coordinator/store"
"github.com/edgelesssys/constellation/coordinator/storewrapper"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
@ -28,7 +34,7 @@ func TestAddAdmin(t *testing.T) {
require := require.New(t)
vpn := &stubVPN{}
core, err := NewCore(vpn, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(vpn, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
require.NoError(core.InitializeStoreIPs())
@ -44,7 +50,7 @@ func TestGetNextNodeIP(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
require.NoError(core.InitializeStoreIPs())
@ -87,7 +93,7 @@ func TestSwitchToPersistentStore(t *testing.T) {
require := require.New(t)
storeFactory := &fakeStoreFactory{}
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, storeFactory)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, storeFactory, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
require.NoError(core.SwitchToPersistentStore())
@ -101,7 +107,7 @@ func TestGetIDs(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
_, _, err = core.GetIDs(nil)
@ -125,7 +131,7 @@ func TestNotifyNodeHeartbeat(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
const ip = "192.0.2.1"
@ -138,7 +144,7 @@ func TestDeriveKey(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
// error when no kms is set up
@ -165,6 +171,80 @@ func TestDeriveKey(t *testing.T) {
assert.Error(err)
}
func TestInitialize(t *testing.T) {
testCases := map[string]struct {
initializePCRs bool
writeNodeState bool
role role.Role
expectActivated bool
expectedState state.State
expectPanic bool
expectErr bool
}{
"fresh node": {
expectedState: state.AcceptingInit,
},
"activated coordinator": {
initializePCRs: true,
writeNodeState: true,
role: role.Coordinator,
expectPanic: true, // TODO: adapt test case once restart is implemented
expectActivated: true,
expectedState: state.ActivatingNodes,
},
"activated node": {
initializePCRs: true,
writeNodeState: true,
role: role.Node,
expectPanic: true, // TODO: adapt test case once restart is implemented
expectActivated: true,
expectedState: state.IsNode,
},
"activated node with no node state": {
initializePCRs: true,
writeNodeState: false,
expectErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
openTPM, simulatedTPMCloser := vtpm.NewSimulatedTPMOpenFunc()
defer simulatedTPMCloser.Close()
if tc.initializePCRs {
require.NoError(vtpm.MarkNodeAsInitialized(openTPM, []byte{0x0, 0x1, 0x2, 0x3}, []byte{0x4, 0x5, 0x6, 0x7}))
}
fileHandler := file.NewHandler(afero.NewMemMapFs())
if tc.writeNodeState {
require.NoError((&nodestate.NodeState{
Role: tc.role,
VPNPrivKey: []byte{0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7},
}).ToFile(fileHandler))
}
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, nil, fileHandler)
require.NoError(err)
if tc.expectPanic {
assert.Panics(func() { _, _ = core.Initialize() })
return
}
nodeActivated, err := core.Initialize()
if tc.expectErr {
assert.Error(err)
return
}
require.NoError(err)
assert.Equal(tc.expectActivated, nodeActivated)
assert.Equal(tc.expectedState, core.state)
})
}
}
type fakeStoreFactory struct {
store store.Store
}

View file

@ -9,13 +9,16 @@ import (
"sync"
"testing"
"github.com/edgelesssys/constellation/cli/file"
"github.com/edgelesssys/constellation/coordinator/atls"
"github.com/edgelesssys/constellation/coordinator/attestation/vtpm"
"github.com/edgelesssys/constellation/coordinator/kms"
"github.com/edgelesssys/constellation/coordinator/pubapi"
"github.com/edgelesssys/constellation/coordinator/pubapi/pubproto"
"github.com/edgelesssys/constellation/coordinator/state"
"github.com/edgelesssys/constellation/coordinator/vpnapi"
"github.com/edgelesssys/constellation/coordinator/vpnapi/vpnproto"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@ -145,10 +148,13 @@ func newMockCoreWithDialer(dialer *bufconnDialer) (*Core, *pubapi.API, error) {
getPublicAddr := func() (string, error) {
return "192.0.2.1", nil
}
core, err := NewCore(vpn, kubeFake, metadataFake, ccmFake, cnmFake, autoscalerFake, zapLogger, vtpm.OpenSimulatedTPM, &fakeStoreFactory{})
core, err := NewCore(vpn, kubeFake, metadataFake, ccmFake, cnmFake, autoscalerFake, zapLogger, vtpm.OpenSimulatedTPM, &fakeStoreFactory{}, file.NewHandler(afero.NewMemMapFs()))
if err != nil {
return nil, nil, err
}
if err := core.AdvanceState(state.AcceptingInit, nil, nil); err != nil {
return nil, nil, err
}
vapiServer := &fakeVPNAPIServer{logger: zapLogger, core: core, dialer: dialer}
papi := pubapi.New(zapLogger, core, dialer, vapiServer, validator, getPublicAddr)

View file

@ -4,7 +4,9 @@ import (
"errors"
"testing"
"github.com/edgelesssys/constellation/cli/file"
"github.com/edgelesssys/constellation/coordinator/peer"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
@ -51,7 +53,7 @@ func TestGetPeers(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
// prepare store
@ -119,7 +121,7 @@ func TestAddPeer(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
core, err := NewCore(&tc.vpn, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil)
core, err := NewCore(&tc.vpn, nil, nil, nil, nil, nil, zaptest.NewLogger(t), nil, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
err = core.AddPeer(tc.peer)

View file

@ -5,8 +5,10 @@ import (
"io"
"testing"
"github.com/edgelesssys/constellation/cli/file"
"github.com/edgelesssys/constellation/coordinator/attestation/vtpm"
"github.com/edgelesssys/constellation/coordinator/state"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
@ -63,9 +65,9 @@ func TestAdvanceState(t *testing.T) {
return vtpm.OpenSimulatedTPM()
}
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, nil)
core, err := NewCore(&stubVPN{}, nil, nil, nil, nil, nil, zaptest.NewLogger(t), openTPM, nil, file.NewHandler(afero.NewMemMapFs()))
require.NoError(err)
assert.Equal(state.AcceptingInit, core.GetState())
assert.Equal(state.Uninitialized, core.GetState())
core.state = tc.initialState
err = core.AdvanceState(tc.newState, []byte("secret"), []byte("cluster"))