Delete Coordinator core and apis

This commit is contained in:
katexochen 2022-06-21 17:59:12 +02:00 committed by Paul Meyer
parent e534c6a338
commit 32f1f5fd3e
93 changed files with 1824 additions and 16487 deletions

View file

@ -1,145 +0,0 @@
package proto
import (
"context"
"errors"
"io"
"github.com/edgelesssys/constellation/coordinator/pubapi/pubproto"
"github.com/edgelesssys/constellation/coordinator/state"
"github.com/edgelesssys/constellation/internal/atls"
"github.com/edgelesssys/constellation/internal/grpc/atlscredentials"
kms "github.com/edgelesssys/constellation/kms/setup"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
)
// Client wraps a PubAPI client and the connection to it.
type Client struct {
conn *grpc.ClientConn
pubapi pubproto.APIClient
}
// Connect connects the client to a given server, using the handed
// Validators for the attestation of the connection.
// The connection must be closed using Close(). If connect is
// called on a client that already has a connection, the old
// connection is closed.
func (c *Client) Connect(endpoint string, validators []atls.Validator) error {
creds := atlscredentials.New(nil, validators)
conn, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(creds))
if err != nil {
return err
}
if c.conn != nil {
c.conn.Close()
}
c.conn = conn
c.pubapi = pubproto.NewAPIClient(conn)
return nil
}
// Close closes the grpc connection of the client.
// Close is idempotent and can be called on non connected clients
// without returning an error.
func (c *Client) Close() error {
if c.conn == nil {
return nil
}
if err := c.conn.Close(); err != nil {
return err
}
c.conn = nil
return nil
}
// GetState returns the state of the connected server.
func (c *Client) GetState(ctx context.Context) (state.State, error) {
if c.pubapi == nil {
return state.Uninitialized, errors.New("client is not connected")
}
resp, err := c.pubapi.GetState(ctx, &pubproto.GetStateRequest{})
if err != nil {
return state.Uninitialized, err
}
return state.State(resp.State), nil
}
// Activate activates the Constellation coordinator via a grpc call.
// The handed IP addresses must be the private IP addresses of running AWS or GCP instances,
// and the userPublicKey is the VPN key of the users WireGuard interface.
func (c *Client) Activate(ctx context.Context, userPublicKey, masterSecret []byte, nodeIPs, coordinatorIPs, autoscalingNodeGroups []string, cloudServiceAccountURI string, sshUserKeys []*pubproto.SSHUserKey) (ActivationResponseClient, error) {
if c.pubapi == nil {
return nil, errors.New("client is not connected")
}
if len(userPublicKey) == 0 {
return nil, errors.New("parameter userPublicKey is empty")
}
if len(nodeIPs) == 0 {
return nil, errors.New("parameter ips is empty")
}
pubKey, err := wgtypes.ParseKey(string(userPublicKey))
if err != nil {
return nil, err
}
req := &pubproto.ActivateAsCoordinatorRequest{
AdminVpnPubKey: pubKey[:],
NodePublicIps: nodeIPs,
CoordinatorPublicIps: coordinatorIPs,
AutoscalingNodeGroups: autoscalingNodeGroups,
MasterSecret: masterSecret,
KmsUri: kms.ClusterKMSURI,
StorageUri: kms.NoStoreURI,
KeyEncryptionKeyId: "",
UseExistingKek: false,
CloudServiceAccountUri: cloudServiceAccountURI,
SshUserKeys: sshUserKeys,
}
client, err := c.pubapi.ActivateAsCoordinator(ctx, req)
if err != nil {
return nil, err
}
return NewActivationRespClient(client), nil
}
// ActivationResponseClient has methods to read messages from a stream of
// ActivateAsCoordinatorResponses.
type ActivationResponseClient interface {
// NextLog reads responses from the response stream and returns the
// first received log.
// If AdminConfig responses are received before the first log response
// occurs, the state of the client is updated with those configs. An
// io.EOF error is returned at the end of the stream.
NextLog() (string, error)
// WriteLogStream reads responses from the response stream and
// writes log responses to the handed writer.
// Occurring AdminConfig responses update the state of the client.
WriteLogStream(io.Writer) error
// GetKubeconfig returns the kubeconfig that was received in the
// latest AdminConfig response or an error if the field is empty.
GetKubeconfig() (string, error)
// GetCoordinatorVpnKey returns the Coordinator's VPN key that was
// received in the latest AdminConfig response or an error if the field
// is empty.
GetCoordinatorVpnKey() (string, error)
// GetClientVpnIp returns the client VPN IP that was received
// in the latest AdminConfig response or an error if the field is empty.
GetClientVpnIp() (string, error)
// GetOwnerID returns the owner identifier, derived from the client's master secret
// or an error if the field is empty.
GetOwnerID() (string, error)
// GetClusterID returns the cluster's unique identifier
// or an error if the field is empty.
GetClusterID() (string, error)
}

View file

@ -1,220 +0,0 @@
package proto
import (
"context"
"encoding/base64"
"errors"
"net"
"testing"
"time"
"github.com/edgelesssys/constellation/coordinator/pubapi/pubproto"
"github.com/edgelesssys/constellation/coordinator/state"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m,
// https://github.com/census-instrumentation/opencensus-go/issues/1262
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
)
}
func TestClose(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
client := Client{}
// Create a connection.
listener := bufconn.Listen(4)
defer listener.Close()
ctx := context.Background()
conn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return listener.Dial()
}), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(err)
defer conn.Close()
// Wait for connection to reach 'connecting' state.
// Connection is not yet usable in this state, but we just need
// any stable non 'shutdown' state to validate that the state
// previous to calling close isn't already 'shutdown'.
err = func() error {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
for {
if ctx.Err() != nil {
return ctx.Err()
}
if connectivity.Connecting == conn.GetState() {
return nil
}
time.Sleep(5 * time.Millisecond)
}
}()
require.NoError(err)
client.conn = conn
// Close connection.
assert.NoError(client.Close())
assert.Empty(client.conn)
assert.Equal(connectivity.Shutdown, conn.GetState())
// Close closed connection.
assert.NoError(client.Close())
assert.Empty(client.conn)
assert.Equal(connectivity.Shutdown, conn.GetState())
}
func TestGetState(t *testing.T) {
someErr := errors.New("some error")
testCases := map[string]struct {
pubAPIClient pubproto.APIClient
wantErr bool
wantState state.State
}{
"success": {
pubAPIClient: &stubPubAPIClient{getStateState: state.IsNode},
wantState: state.IsNode,
},
"getState error": {
pubAPIClient: &stubPubAPIClient{getStateErr: someErr},
wantErr: true,
},
"uninitialized": {
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
client := Client{}
if tc.pubAPIClient != nil {
client.pubapi = tc.pubAPIClient
}
state, err := client.GetState(context.Background())
if tc.wantErr {
assert.Error(err)
} else {
assert.NoError(err)
assert.Equal(tc.wantState, state)
}
})
}
}
func TestActivate(t *testing.T) {
testKey := base64.StdEncoding.EncodeToString([]byte("32bytesWireGuardKeyForTheTesting"))
someErr := errors.New("failed")
testCases := map[string]struct {
pubAPIClient *stubPubAPIClient
userPublicKey string
ips []string
wantErr bool
}{
"normal activation": {
pubAPIClient: &stubPubAPIClient{},
userPublicKey: testKey,
ips: []string{"192.0.2.1", "192.0.2.1", "192.0.2.1"},
wantErr: false,
},
"client without pubAPIClient": {
userPublicKey: testKey,
ips: []string{"192.0.2.1", "192.0.2.1", "192.0.2.1"},
wantErr: true,
},
"empty public key parameter": {
pubAPIClient: &stubPubAPIClient{},
userPublicKey: "",
ips: []string{"192.0.2.1", "192.0.2.1", "192.0.2.1"},
wantErr: true,
},
"invalid public key parameter": {
pubAPIClient: &stubPubAPIClient{},
userPublicKey: "invalid Key",
ips: []string{"192.0.2.1", "192.0.2.1", "192.0.2.1"},
wantErr: true,
},
"empty ips parameter": {
pubAPIClient: &stubPubAPIClient{},
userPublicKey: testKey,
ips: []string{},
wantErr: true,
},
"fail ActivateAsCoordinator": {
pubAPIClient: &stubPubAPIClient{activateAsCoordinatorErr: someErr},
userPublicKey: testKey,
ips: []string{"192.0.2.1", "192.0.2.1", "192.0.2.1"},
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
client := Client{}
if tc.pubAPIClient != nil {
client.pubapi = tc.pubAPIClient
}
_, err := client.Activate(context.Background(), []byte(tc.userPublicKey), []byte("Constellation"), tc.ips, nil, nil, "serviceaccount://test", nil)
if tc.wantErr {
assert.Error(err)
} else {
assert.NoError(err)
assert.Equal("32bytesWireGuardKeyForTheTesting", string(tc.pubAPIClient.activateAsCoordinatorReqKey))
assert.Equal(tc.ips, tc.pubAPIClient.activateAsCoordinatorReqIPs)
assert.Equal("Constellation", string(tc.pubAPIClient.activateAsCoordinatorMasterSecret))
assert.Equal("serviceaccount://test", tc.pubAPIClient.activateCloudServiceAccountURI)
}
})
}
}
type stubPubAPIClient struct {
getStateState state.State
getStateErr error
activateAsCoordinatorErr error
activateAdditionalNodesErr error
activateAsCoordinatorReqKey []byte
activateAsCoordinatorReqIPs []string
activateAsCoordinatorMasterSecret []byte
activateAdditionalNodesReqIPs []string
activateCloudServiceAccountURI string
pubproto.APIClient
}
func (s *stubPubAPIClient) GetState(ctx context.Context, in *pubproto.GetStateRequest, opts ...grpc.CallOption) (*pubproto.GetStateResponse, error) {
return &pubproto.GetStateResponse{State: uint32(s.getStateState)}, s.getStateErr
}
func (s *stubPubAPIClient) ActivateAsCoordinator(ctx context.Context, in *pubproto.ActivateAsCoordinatorRequest,
opts ...grpc.CallOption,
) (pubproto.API_ActivateAsCoordinatorClient, error) {
s.activateAsCoordinatorReqKey = in.AdminVpnPubKey
s.activateAsCoordinatorReqIPs = in.NodePublicIps
s.activateAsCoordinatorMasterSecret = in.MasterSecret
s.activateCloudServiceAccountURI = in.CloudServiceAccountUri
return dummyActivateAsCoordinatorClient{}, s.activateAsCoordinatorErr
}
func (s *stubPubAPIClient) ActivateAdditionalNodes(ctx context.Context, in *pubproto.ActivateAdditionalNodesRequest,
opts ...grpc.CallOption,
) (pubproto.API_ActivateAdditionalNodesClient, error) {
s.activateAdditionalNodesReqIPs = in.NodePublicIps
return dummyActivateAdditionalNodesClient{}, s.activateAdditionalNodesErr
}

View file

@ -1,117 +0,0 @@
package proto
import (
"encoding/base64"
"errors"
"fmt"
"io"
"github.com/edgelesssys/constellation/coordinator/pubapi/pubproto"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
// ActivationRespClient has methods to read messages from a stream of
// ActivateAsCoordinatorResponses. It wraps an API_ActivateAsCoordinatorClient.
type ActivationRespClient struct {
client pubproto.API_ActivateAsCoordinatorClient
kubeconfig string
coordinatorVpnKey string
clientVpnIp string
ownerID string
clusterID string
}
// NewActivationRespClient creates a new ActivationRespClient with the handed
// API_ActivateAsCoordinatorClient.
func NewActivationRespClient(client pubproto.API_ActivateAsCoordinatorClient) *ActivationRespClient {
return &ActivationRespClient{
client: client,
}
}
// NextLog reads responses from the response stream and returns the
// first received log.
func (a *ActivationRespClient) NextLog() (string, error) {
for {
resp, err := a.client.Recv()
if err != nil {
return "", err
}
switch x := resp.Content.(type) {
case *pubproto.ActivateAsCoordinatorResponse_Log:
return x.Log.Message, nil
case *pubproto.ActivateAsCoordinatorResponse_AdminConfig:
config := x.AdminConfig
a.kubeconfig = string(config.Kubeconfig)
coordinatorVpnKey, err := wgtypes.NewKey(config.CoordinatorVpnPubKey)
if err != nil {
return "", err
}
a.coordinatorVpnKey = coordinatorVpnKey.String()
a.clientVpnIp = config.AdminVpnIp
a.ownerID = base64.StdEncoding.EncodeToString(config.OwnerId)
a.clusterID = base64.StdEncoding.EncodeToString(config.ClusterId)
}
}
}
// WriteLogStream reads responses from the response stream and
// writes log responses to the handed writer.
func (a *ActivationRespClient) WriteLogStream(w io.Writer) error {
log, err := a.NextLog()
for err == nil {
fmt.Fprintln(w, log)
log, err = a.NextLog()
}
if !errors.Is(err, io.EOF) {
return err
}
return nil
}
// GetKubeconfig returns the kubeconfig that was received in the
// latest AdminConfig response or an error if the field is empty.
func (a *ActivationRespClient) GetKubeconfig() (string, error) {
if a.kubeconfig == "" {
return "", errors.New("kubeconfig is empty")
}
return a.kubeconfig, nil
}
// GetCoordinatorVpnKey returns the Coordinator's VPN key that was
// received in the latest AdminConfig response or an error if the field
// is empty.
func (a *ActivationRespClient) GetCoordinatorVpnKey() (string, error) {
if a.coordinatorVpnKey == "" {
return "", errors.New("coordinator public VPN key is empty")
}
return a.coordinatorVpnKey, nil
}
// GetClientVpnIp returns the client VPN IP that was received
// in the latest AdminConfig response or an error if the field is empty.
func (a *ActivationRespClient) GetClientVpnIp() (string, error) {
if a.clientVpnIp == "" {
return "", errors.New("client VPN IP is empty")
}
return a.clientVpnIp, nil
}
// GetOwnerID returns the owner identifier, derived from the client's master secret
// or an error if the field is empty.
func (a *ActivationRespClient) GetOwnerID() (string, error) {
if a.ownerID == "" {
return "", errors.New("secret identifier is empty")
}
return a.ownerID, nil
}
// GetClusterID returns the cluster's unique identifier
// or an error if the field is empty.
func (a *ActivationRespClient) GetClusterID() (string, error) {
if a.clusterID == "" {
return "", errors.New("cluster identifier is empty")
}
return a.clusterID, nil
}

View file

@ -1,241 +0,0 @@
package proto
import (
"bytes"
"errors"
"io"
"testing"
"github.com/edgelesssys/constellation/coordinator/pubapi/pubproto"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
// dummyActivateAsCoordinatorClient is a dummy and panics if Recv() is called.
type dummyActivateAsCoordinatorClient struct {
grpc.ClientStream
}
func (c dummyActivateAsCoordinatorClient) Recv() (*pubproto.ActivateAsCoordinatorResponse, error) {
panic("i'm a dummy, Recv() not implemented")
}
// dummyActivateAsCoordinatorClient is a dummy and panics if Recv() is called.
type dummyActivateAdditionalNodesClient struct {
grpc.ClientStream
}
func (c dummyActivateAdditionalNodesClient) Recv() (*pubproto.ActivateAdditionalNodesResponse, error) {
panic("i'm a dummy, Recv() not implemented")
}
// stubActivationAsCoordinatorClient recives responses from an predefined
// response stream iterator or a stub error.
type stubActivationAsCoordinatorClient struct {
grpc.ClientStream
stream *stubActivateAsCoordinatorResponseIter
recvErr error
}
func (c stubActivationAsCoordinatorClient) Recv() (*pubproto.ActivateAsCoordinatorResponse, error) {
if c.recvErr != nil {
return nil, c.recvErr
}
return c.stream.Next()
}
// stubActivateAsCoordinatorResponseIter is an iterator over a slice of
// ActivateAsCoordinatorResponses. It returns the messages in the order
// they occur in the slice and returns an io.EOF error when no response
// is left.
type stubActivateAsCoordinatorResponseIter struct {
msgs []*pubproto.ActivateAsCoordinatorResponse
}
// Next returns the next message from the message slice or an io.EOF error
// if the message slice is empty.
func (q *stubActivateAsCoordinatorResponseIter) Next() (*pubproto.ActivateAsCoordinatorResponse, error) {
if len(q.msgs) == 0 {
return nil, io.EOF
}
msg := q.msgs[0]
q.msgs = q.msgs[1:]
return msg, nil
}
func TestNextLog(t *testing.T) {
testClientVpnIp := "192.0.2.1"
testCoordinatorVpnKey := []byte("32bytesWireGuardKeyForTheTesting")
testCoordinatorVpnKey64 := []byte("MzJieXRlc1dpcmVHdWFyZEtleUZvclRoZVRlc3Rpbmc=")
testKubeconfig := []byte("apiVersion:v1 kind:Config...")
testConfigResp := &pubproto.ActivateAsCoordinatorResponse{
Content: &pubproto.ActivateAsCoordinatorResponse_AdminConfig{
AdminConfig: &pubproto.AdminConfig{
AdminVpnIp: testClientVpnIp,
CoordinatorVpnPubKey: testCoordinatorVpnKey,
Kubeconfig: testKubeconfig,
},
},
}
testLogMessage := "some log message"
testLogResp := &pubproto.ActivateAsCoordinatorResponse{
Content: &pubproto.ActivateAsCoordinatorResponse_Log{
Log: &pubproto.Log{
Message: testLogMessage,
},
},
}
someErr := errors.New("failed")
testCases := map[string]struct {
msgs []*pubproto.ActivateAsCoordinatorResponse
wantLogLen int
wantState bool
recvErr error
wantErr bool
}{
"some logs": {
msgs: []*pubproto.ActivateAsCoordinatorResponse{testLogResp, testLogResp, testLogResp},
wantLogLen: 3,
},
"only admin config": {
msgs: []*pubproto.ActivateAsCoordinatorResponse{testConfigResp},
wantState: true,
},
"logs and configs": {
msgs: []*pubproto.ActivateAsCoordinatorResponse{testLogResp, testConfigResp, testLogResp, testConfigResp},
wantLogLen: 2,
wantState: true,
},
"no response": {
msgs: []*pubproto.ActivateAsCoordinatorResponse{},
wantLogLen: 0,
},
"recv fail": {
recvErr: someErr,
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
respClient := stubActivationAsCoordinatorClient{
stream: &stubActivateAsCoordinatorResponseIter{
msgs: tc.msgs,
},
recvErr: tc.recvErr,
}
client := NewActivationRespClient(respClient)
var logs []string
var err error
for err == nil {
var log string
log, err = client.NextLog()
if err == nil {
logs = append(logs, log)
}
}
assert.Error(err)
if tc.wantErr {
assert.NotErrorIs(err, io.EOF)
return
}
assert.ErrorIs(err, io.EOF)
assert.Len(logs, tc.wantLogLen)
if tc.wantState {
ip, err := client.GetClientVpnIp()
assert.NoError(err)
assert.Equal(testClientVpnIp, ip)
config, err := client.GetKubeconfig()
assert.NoError(err)
assert.Equal(string(testKubeconfig), config)
key, err := client.GetCoordinatorVpnKey()
assert.NoError(err)
assert.Equal(string(testCoordinatorVpnKey64), key)
}
})
}
}
func TestPrintLogStream(t *testing.T) {
assert := assert.New(t)
//
// 10 logs a 10 byte
//
var msgs []*pubproto.ActivateAsCoordinatorResponse
for i := 0; i < 10; i++ {
msgs = append(msgs, &pubproto.ActivateAsCoordinatorResponse{
Content: &pubproto.ActivateAsCoordinatorResponse_Log{
Log: &pubproto.Log{
Message: "10BytesLog",
},
},
})
}
respClient := stubActivationAsCoordinatorClient{
stream: &stubActivateAsCoordinatorResponseIter{
msgs: msgs,
},
}
client := NewActivationRespClient(respClient)
out := &bytes.Buffer{}
assert.NoError(client.WriteLogStream(out))
assert.Equal(out.Len(), 10*11) // 10 messages * (len(message) + 1 newline)
//
// Check error handling.
//
someErr := errors.New("failed")
respClient = stubActivationAsCoordinatorClient{
recvErr: someErr,
}
client = NewActivationRespClient(respClient)
assert.Error(client.WriteLogStream(&bytes.Buffer{}))
}
func TestGetKubeconfig(t *testing.T) {
assert := assert.New(t)
client := NewActivationRespClient(dummyActivateAsCoordinatorClient{})
_, err := client.GetKubeconfig()
assert.Error(err)
client.kubeconfig = "apiVersion:v1 kind:Config..."
config, err := client.GetKubeconfig()
assert.NoError(err)
assert.Equal("apiVersion:v1 kind:Config...", config)
}
func TestGetCoordinatorVpnKey(t *testing.T) {
assert := assert.New(t)
client := NewActivationRespClient(dummyActivateAsCoordinatorClient{})
_, err := client.GetCoordinatorVpnKey()
assert.Error(err)
client.coordinatorVpnKey = "32bytesWireGuardKeyForTheTesting"
key, err := client.GetCoordinatorVpnKey()
assert.NoError(err)
assert.Equal("32bytesWireGuardKeyForTheTesting", key)
}
func TestGetClientVpnIp(t *testing.T) {
assert := assert.New(t)
client := NewActivationRespClient(dummyActivateAsCoordinatorClient{})
_, err := client.GetClientVpnIp()
assert.Error(err)
client.clientVpnIp = "192.0.2.1"
ip, err := client.GetClientVpnIp()
assert.NoError(err)
assert.Equal("192.0.2.1", ip)
}