Implement activation service

Signed-off-by: Daniel Weiße <dw@edgeless.systems>
This commit is contained in:
Daniel Weiße 2022-05-23 11:36:54 +02:00 committed by Daniel Weiße
parent 0941ce8c7e
commit b461c40c3a
21 changed files with 1876 additions and 10 deletions

128
activation/server/server.go Normal file
View file

@ -0,0 +1,128 @@
package server
import (
"context"
"crypto/tls"
"fmt"
"net"
"time"
proto "github.com/edgelesssys/constellation/activation/activationproto"
"github.com/edgelesssys/constellation/internal/constants"
"github.com/edgelesssys/constellation/internal/file"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
kubeadmv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
)
// Server implements the core logic of Constellation's node activation service.
type Server struct {
file file.Handler
joinTokenGetter joinTokenGetter
dataKeyGetter dataKeyGetter
proto.UnimplementedAPIServer
}
// New initializes a new Server.
func New(fileHandler file.Handler, joinTokenGetter joinTokenGetter, dataKeyGetter dataKeyGetter) *Server {
return &Server{
file: fileHandler,
joinTokenGetter: joinTokenGetter,
dataKeyGetter: dataKeyGetter,
}
}
// Run starts the gRPC server on the given port, using the provided tlsConfig.
func (s *Server) Run(tlsConfig *tls.Config, port string) error {
grpcServer := grpc.NewServer(
grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.UnaryInterceptor(logGRPC),
)
proto.RegisterAPIServer(grpcServer, s)
lis, err := net.Listen("tcp", net.JoinHostPort("", port))
if err != nil {
return fmt.Errorf("failed to listen: %s", err)
}
klog.V(2).Infof("starting activation service on %s", lis.Addr().String())
return grpcServer.Serve(lis)
}
// ActivateNode handles activation requests of Constellation worker nodes.
// A worker node will receive:
// - stateful disk encryption key.
// - Kubernetes join token.
// - cluster and owner ID to taint the node as initialized.
func (s *Server) ActivateNode(ctx context.Context, req *proto.ActivateNodeRequest) (*proto.ActivateNodeResponse, error) {
klog.V(4).Infof("ActivateNode: loading IDs")
var id id
if err := s.file.ReadJSON(constants.ActivationIDFilename, &id); err != nil {
klog.Errorf("unable to load IDs: %s", err)
return nil, status.Errorf(codes.Internal, "unable to load IDs: %s", err)
}
klog.V(4).Infof("ActivateNode: requesting disk encryption key")
stateDiskKey, err := s.dataKeyGetter.GetDataKey(ctx, req.DiskUuid, constants.StateDiskKeyLength)
if err != nil {
klog.Errorf("unable to get key for stateful disk: %s", err)
return nil, status.Errorf(codes.Internal, "unable to get key for stateful disk: %s", err)
}
klog.V(4).Infof("ActivateNode: creating Kubernetes join token")
kubeArgs, err := s.joinTokenGetter.GetJoinToken(constants.KubernetesJoinTokenTTL)
if err != nil {
klog.Errorf("unable to generate Kubernetes join arguments: %s", err)
return nil, status.Errorf(codes.Internal, "unable to generate Kubernetes join arguments: %s", err)
}
klog.V(4).Info("ActivateNode successful")
return &proto.ActivateNodeResponse{
StateDiskKey: stateDiskKey,
ClusterId: id.Cluster,
OwnerId: id.Owner,
ApiServerEndpoint: kubeArgs.APIServerEndpoint,
Token: kubeArgs.Token,
DiscoveryTokenCaCertHash: kubeArgs.CACertHashes[0],
}, nil
}
// ActivateCoordinator handles activation requests of Constellation control-plane nodes.
func (s *Server) ActivateCoordinator(ctx context.Context, req *proto.ActivateCoordinatorRequest) (*proto.ActivateCoordinatorResponse, error) {
panic("not implemented")
}
// joinTokenGetter returns Kubernetes bootstrap (join) tokens.
type joinTokenGetter interface {
// GetJoinToken returns a bootstrap (join) token.
GetJoinToken(ttl time.Duration) (*kubeadmv1.BootstrapTokenDiscovery, error)
}
// dataKeyGetter interacts with Constellation's key management system to retrieve keys.
type dataKeyGetter interface {
// GetDataKey returns a key derived from Constellation's KMS.
GetDataKey(ctx context.Context, uuid string, length int) ([]byte, error)
}
type id struct {
Cluster []byte `json:"cluster"`
Owner []byte `json:"owner"`
}
// logGRPC writes a log with the name of every gRPC call or error it receives.
func logGRPC(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
// log the requests method name
klog.V(2).Infof("GRPC call: %s", info.FullMethod)
// log errors, if any
resp, err := handler(ctx, req)
if err != nil {
klog.Errorf("GRPC error: %v", err)
}
return resp, err
}

View file

@ -0,0 +1,123 @@
package server
import (
"context"
"encoding/json"
"errors"
"testing"
"time"
proto "github.com/edgelesssys/constellation/activation/activationproto"
"github.com/edgelesssys/constellation/internal/constants"
"github.com/edgelesssys/constellation/internal/file"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
kubeadmv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
)
func TestActivateNode(t *testing.T) {
someErr := errors.New("error")
testKey := []byte{0x1, 0x2, 0x3}
testID := id{
Owner: []byte{0x4, 0x5, 0x6},
Cluster: []byte{0x7, 0x8, 0x9},
}
testJoinToken := &kubeadmv1.BootstrapTokenDiscovery{
APIServerEndpoint: "192.0.2.1",
CACertHashes: []string{"hash"},
Token: "token",
}
testCases := map[string]struct {
kubeadm stubTokenGetter
kms stubKeyGetter
id []byte
wantErr bool
}{
"success": {
kubeadm: stubTokenGetter{token: testJoinToken},
kms: stubKeyGetter{dataKey: testKey},
id: mustMarshalID(testID),
},
"GetDataKey fails": {
kubeadm: stubTokenGetter{token: testJoinToken},
kms: stubKeyGetter{getDataKeyErr: someErr},
id: mustMarshalID(testID),
wantErr: true,
},
"loading IDs fails": {
kubeadm: stubTokenGetter{token: testJoinToken},
kms: stubKeyGetter{dataKey: testKey},
id: []byte{0x1, 0x2, 0x3},
wantErr: true,
},
"no ID file": {
kubeadm: stubTokenGetter{token: testJoinToken},
kms: stubKeyGetter{dataKey: testKey},
wantErr: true,
},
"GetJoinToken fails": {
kubeadm: stubTokenGetter{getJoinTokenErr: someErr},
kms: stubKeyGetter{dataKey: testKey},
id: mustMarshalID(testID),
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
file := file.NewHandler(afero.NewMemMapFs())
if len(tc.id) > 0 {
require.NoError(file.Write(constants.ActivationIDFilename, tc.id, 0o644))
}
api := New(file, tc.kubeadm, tc.kms)
resp, err := api.ActivateNode(context.Background(), &proto.ActivateNodeRequest{DiskUuid: "uuid"})
if tc.wantErr {
assert.Error(err)
return
}
var expectedIDs id
require.NoError(json.Unmarshal(tc.id, &expectedIDs))
require.NoError(err)
assert.Equal(tc.kms.dataKey, resp.StateDiskKey)
assert.Equal(expectedIDs.Cluster, resp.ClusterId)
assert.Equal(expectedIDs.Owner, resp.OwnerId)
assert.Equal(tc.kubeadm.token.APIServerEndpoint, resp.ApiServerEndpoint)
assert.Equal(tc.kubeadm.token.CACertHashes[0], resp.DiscoveryTokenCaCertHash)
assert.Equal(tc.kubeadm.token.Token, resp.Token)
})
}
}
func mustMarshalID(id id) []byte {
b, err := json.Marshal(id)
if err != nil {
panic(err)
}
return b
}
type stubTokenGetter struct {
token *kubeadmv1.BootstrapTokenDiscovery
getJoinTokenErr error
}
func (f stubTokenGetter) GetJoinToken(time.Duration) (*kubeadmv1.BootstrapTokenDiscovery, error) {
return f.token, f.getJoinTokenErr
}
type stubKeyGetter struct {
dataKey []byte
getDataKeyErr error
}
func (f stubKeyGetter) GetDataKey(context.Context, string, int) ([]byte, error) {
return f.dataKey, f.getDataKeyErr
}