add KMS to multi-coordinator (#68)

This commit is contained in:
Benedict Schlüter 2022-04-20 15:22:39 +02:00 committed by GitHub
parent 2d9b64df14
commit 938beec2ef
7 changed files with 66 additions and 84 deletions

View File

@ -250,31 +250,38 @@ func (c *Core) PersistNodeState(role role.Role, ownerID []byte, clusterID []byte
// SetUpKMS sets the Coordinators key management service and key encryption key ID.
// Creates a new key encryption key in the KMS, if requested.
// Otherwise the KEK is assumed to already exist in the KMS.
func (c *Core) SetUpKMS(ctx context.Context, storageURI, kmsURI, kekID string, useExisting bool) error {
func (c *Core) SetUpKMS(ctx context.Context, storageURI, kmsURI, kekID string, useExistingKEK bool) error {
kms, err := kmsSetup.SetUpKMS(ctx, storageURI, kmsURI)
if err != nil {
return err
}
c.kms = kms
if !useExisting {
// import Constellation master secret as key encryption key
kek, err := c.data().GetMasterSecret()
if err != nil {
return err
}
if err := kms.CreateKEK(ctx, kekID, kek); err != nil {
return err
}
if useExistingKEK {
return nil
}
// import Constellation master secret as key encryption key
kek, err := c.data().GetMasterSecret()
if err != nil {
return err
}
if err := kms.CreateKEK(ctx, kekID, kek); err != nil {
return err
}
if err := c.data().PutKEKID(kekID); err != nil {
return err
}
c.kms = kms
bundeldedKMSInfo := kmsSetup.KMSInformation{KmsUri: kmsURI, KeyEncryptionKeyID: kekID, StorageUri: storageURI}
if err := c.data().PutKMSData(bundeldedKMSInfo); err != nil {
return err
}
return nil
}
func (c *Core) GetKMSInfo() (kmsSetup.KMSInformation, error) {
return c.data().GetKMSData()
}
// GetDataKey derives a key of length from the Constellation's master secret.
func (c *Core) GetDataKey(ctx context.Context, keyID string, length int) ([]byte, error) {
if c.kms == nil {

View File

@ -26,6 +26,12 @@ const (
NoStoreURI = "storage://no-store"
)
type KMSInformation struct {
KmsUri string
StorageUri string
KeyEncryptionKeyID string
}
// SetUpKMS creates a KMS and key store from the given parameters.
func SetUpKMS(ctx context.Context, storageURI, kmsURI string) (kms.CloudKMS, error) {
store, err := getStore(ctx, storageURI)

View File

@ -3,6 +3,7 @@ package pubapi
import (
"context"
"github.com/edgelesssys/constellation/coordinator/kms"
"github.com/edgelesssys/constellation/coordinator/peer"
"github.com/edgelesssys/constellation/coordinator/role"
"github.com/edgelesssys/constellation/coordinator/state"
@ -20,6 +21,7 @@ type Core interface {
GetIDs(masterSecret []byte) (ownerID []byte, clusterID []byte, err error)
PersistNodeState(role role.Role, ownerID []byte, clusterID []byte) error
SetUpKMS(ctx context.Context, storageURI, kmsURI, kekID string, useExisting bool) error
GetKMSInfo() (kms.KMSInformation, error)
GetDataKey(ctx context.Context, keyID string, length int) ([]byte, error)
GetState() state.State

View File

@ -5,6 +5,7 @@ import (
"errors"
"net/netip"
"github.com/edgelesssys/constellation/coordinator/kms"
"github.com/edgelesssys/constellation/coordinator/peer"
"github.com/edgelesssys/constellation/coordinator/role"
"github.com/edgelesssys/constellation/coordinator/state"
@ -137,6 +138,10 @@ func (c *fakeCore) SetUpKMS(ctx context.Context, storageURI, kmsURI, kekID strin
return nil
}
func (c *fakeCore) GetKMSInfo() (kms.KMSInformation, error) {
return kms.KMSInformation{}, nil
}
func (c *fakeCore) GetDataKey(ctx context.Context, keyID string, length int) ([]byte, error) {
return c.dataKey, c.getDataKeyErr
}

View File

@ -37,9 +37,6 @@ func (a *API) ActivateAsAdditionalCoordinator(ctx context.Context, in *pubproto.
if err := a.core.AdvanceState(state.ActivatingNodes, in.OwnerId, in.ClusterId); err != nil {
return nil, status.Errorf(codes.Internal, "advance state to ActivatingNodes: %v", err)
}
// TODO: add KMS functions
// add one coordinator to the VPN
if err := a.core.SetVPNIP(in.AssignedVpnIp); err != nil {
return nil, status.Errorf(codes.Internal, "set vpn IP address: %v", err)
@ -69,8 +66,13 @@ func (a *API) ActivateAsAdditionalCoordinator(ctx context.Context, in *pubproto.
}
a.logger.Info("Transition to persistent store successful")
// regularly get (peer) updates from etcd
// start update before manual peer add to omit race conditions when multiple coordinator are activating nodes
kmsData, err := a.core.GetKMSInfo()
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}
if err := a.core.SetUpKMS(ctx, kmsData.StorageUri, kmsData.KmsUri, kmsData.KeyEncryptionKeyID, false); err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}
thisPeer, err := a.assemblePeerStruct(in.AssignedVpnIp, role.Coordinator)
if err != nil {

View File

@ -1,7 +1,6 @@
package storewrapper
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@ -9,6 +8,7 @@ import (
"strconv"
"strings"
"github.com/edgelesssys/constellation/coordinator/kms"
"github.com/edgelesssys/constellation/coordinator/peer"
"github.com/edgelesssys/constellation/coordinator/state"
"github.com/edgelesssys/constellation/coordinator/store"
@ -26,6 +26,7 @@ const (
keyKubeConfig = "kubeConfig"
keyClusterID = "clusterID"
keyVPNPubKey = "vpnKey"
keyKMSData = "KMSData"
keyKEKID = "kekID"
prefixFreeCoordinatorIPs = "freeCoordinatorVPNIPs"
prefixPeerLocation = "peerPrefix"
@ -128,71 +129,6 @@ func (s StoreWrapper) GetPeersResourceVersion() (int, error) {
return val, nil
}
// UpdatePeers synchronizes the stored peers with the passed peers, returning added and removed peers.
func (s StoreWrapper) UpdatePeers(peers []peer.Peer) (added, removed []peer.Peer, err error) {
// convert to map for easier lookup
updatedPeers := make(map[string]peer.Peer)
for _, p := range peers {
updatedPeers[p.VPNIP] = p
}
it, err := s.Store.Iterator(prefixPeerLocation)
if err != nil {
return nil, nil, err
}
// collect peers that need to be added or removed
for it.HasNext() {
key, err := it.GetNext()
if err != nil {
return nil, nil, err
}
val, err := s.Store.Get(key)
if err != nil {
return nil, nil, err
}
var storedPeer peer.Peer
if err := json.Unmarshal(val, &storedPeer); err != nil {
return nil, nil, err
}
if updPeer, ok := updatedPeers[storedPeer.VPNIP]; ok {
if updPeer.PublicIP != storedPeer.PublicIP || !bytes.Equal(updPeer.VPNPubKey, storedPeer.VPNPubKey) {
// stored peer must be updated, so mark for addition AND removal
added = append(added, updPeer)
removed = append(removed, storedPeer)
}
delete(updatedPeers, updPeer.VPNIP)
} else {
// stored peer is not contained in the updated peers, so mark for removal
removed = append(removed, storedPeer)
}
}
// remaining updated peers were not in the store, so mark for addition
for _, p := range updatedPeers {
added = append(added, p)
}
// perform remove and add
for _, p := range removed {
if err := s.Store.Delete(prefixPeerLocation + p.VPNIP); err != nil {
return nil, nil, err
}
}
for _, p := range added {
data, err := json.Marshal(p)
if err != nil {
return nil, nil, err
}
if err := s.Store.Put(prefixPeerLocation+p.VPNIP, data); err != nil {
return nil, nil, err
}
}
return added, removed, nil
}
func (s StoreWrapper) getPeersByPrefix(prefix string) ([]peer.Peer, error) {
peerKeys, err := s.Store.Iterator(prefix)
if err != nil {
@ -271,6 +207,28 @@ func (s StoreWrapper) PutKEKID(kekID string) error {
return s.Store.Put(keyKEKID, []byte(kekID))
}
// GetKMSData returns the KMSData from the store.
func (s StoreWrapper) GetKMSData() (kms.KMSInformation, error) {
storeData, err := s.Store.Get(keyKMSData)
if err != nil {
return kms.KMSInformation{}, err
}
data := kms.KMSInformation{}
if err := json.Unmarshal(storeData, &data); err != nil {
return kms.KMSInformation{}, err
}
return data, nil
}
// PutKMSData puts the KMSData in the store.
func (s StoreWrapper) PutKMSData(kmsInfo kms.KMSInformation) error {
byteKMSInfo, err := json.Marshal(kmsInfo)
if err != nil {
return err
}
return s.Store.Put(keyKMSData, byteKMSInfo)
}
// GetClusterID returns the unique identifier of the cluster from store.
func (s StoreWrapper) GetClusterID() ([]byte, error) {
return s.Store.Get(keyClusterID)

View File

@ -17,6 +17,8 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m,
// https://github.com/kubernetes/klog/issues/282, https://github.com/kubernetes/klog/issues/188
goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"),
// https://github.com/census-instrumentation/opencensus-go/issues/1262
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
)
}