Simplify joinproto

This commit is contained in:
katexochen 2022-07-05 11:41:31 +02:00 committed by Paul Meyer
parent dc9e8e75df
commit 15adba9235
14 changed files with 606 additions and 1180 deletions

View file

@ -16,7 +16,7 @@ import (
"github.com/edgelesssys/constellation/internal/cloud/metadata"
"github.com/edgelesssys/constellation/internal/constants"
"github.com/edgelesssys/constellation/internal/file"
activationproto "github.com/edgelesssys/constellation/joinservice/joinproto"
"github.com/edgelesssys/constellation/joinservice/joinproto"
"github.com/spf13/afero"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -167,10 +167,13 @@ func (c *JoinClient) tryJoinAtAvailableServices() error {
}
for _, ip := range ips {
err = c.join(net.JoinHostPort(ip, strconv.Itoa(constants.ActivationServiceNodePort)))
err = c.join(net.JoinHostPort(ip, strconv.Itoa(constants.JoinServiceNodePort)))
if err == nil {
return nil
}
if isUnrecoverable(err) {
return err
}
}
return err
@ -182,78 +185,27 @@ func (c *JoinClient) join(serviceEndpoint string) error {
conn, err := c.dialer.Dial(ctx, serviceEndpoint)
if err != nil {
c.log.Info("join service unreachable", zap.String("endpoint", serviceEndpoint), zap.Error(err))
return fmt.Errorf("dialing join service endpoint: %v", err)
c.log.Info("Join service unreachable", zap.String("endpoint", serviceEndpoint), zap.Error(err))
return fmt.Errorf("dialing join service endpoint: %w", err)
}
defer conn.Close()
protoClient := activationproto.NewAPIClient(conn)
switch c.role {
case role.Worker:
return c.joinAsWorkerNode(ctx, protoClient)
case role.ControlPlane:
return c.joinAsControlPlaneNode(ctx, protoClient)
default:
return fmt.Errorf("cannot activate as %s", role.Unknown)
protoClient := joinproto.NewAPIClient(conn)
req := &joinproto.IssueJoinTicketRequest{
DiskUuid: c.diskUUID,
NodeName: c.nodeName,
IsControlPlane: c.role == role.ControlPlane,
}
}
func (c *JoinClient) joinAsWorkerNode(ctx context.Context, client activationproto.APIClient) error {
req := &activationproto.ActivateWorkerNodeRequest{
DiskUuid: c.diskUUID,
NodeName: c.nodeName,
}
resp, err := client.ActivateWorkerNode(ctx, req)
ticket, err := protoClient.IssueJoinTicket(ctx, req)
if err != nil {
c.log.Info("Failed to activate as worker node", zap.Error(err))
return fmt.Errorf("activating worker node: %w", err)
c.log.Info("Issuing join ticket failed", zap.String("endpoint", serviceEndpoint), zap.Error(err))
return fmt.Errorf("issuing join ticket: %w", err)
}
c.log.Info("Activation at AaaS succeeded")
return c.startNodeAndJoin(
ctx,
resp.StateDiskKey,
resp.OwnerId,
resp.ClusterId,
resp.KubeletKey,
resp.KubeletCert,
resp.ApiServerEndpoint,
resp.Token,
resp.DiscoveryTokenCaCertHash,
"",
)
return c.startNodeAndJoin(ctx, ticket)
}
func (c *JoinClient) joinAsControlPlaneNode(ctx context.Context, client activationproto.APIClient) error {
req := &activationproto.ActivateControlPlaneNodeRequest{
DiskUuid: c.diskUUID,
NodeName: c.nodeName,
}
resp, err := client.ActivateControlPlaneNode(ctx, req)
if err != nil {
c.log.Info("Failed to activate as control plane node", zap.Error(err))
return fmt.Errorf("activating control plane node: %w", err)
}
c.log.Info("Activation at AaaS succeeded")
return c.startNodeAndJoin(
ctx,
resp.StateDiskKey,
resp.OwnerId,
resp.ClusterId,
resp.KubeletKey,
resp.KubeletCert,
resp.ApiServerEndpoint,
resp.Token,
resp.DiscoveryTokenCaCertHash,
resp.CertificateKey,
)
}
func (c *JoinClient) startNodeAndJoin(ctx context.Context, diskKey, ownerID, clusterID, kubeletKey, kubeletCert []byte, endpoint, token,
discoveryCACertHash, certKey string,
) (retErr error) {
func (c *JoinClient) startNodeAndJoin(ctx context.Context, ticket *joinproto.IssueJoinTicketResponse) (retErr error) {
// If an error occurs in this func, the client cannot continue.
defer func() {
if retErr != nil {
@ -268,25 +220,25 @@ func (c *JoinClient) startNodeAndJoin(ctx context.Context, diskKey, ownerID, clu
return errors.New("node is already being initialized")
}
if err := c.updateDiskPassphrase(string(diskKey)); err != nil {
if err := c.updateDiskPassphrase(string(ticket.StateDiskKey)); err != nil {
return fmt.Errorf("updating disk passphrase: %w", err)
}
state := nodestate.NodeState{
Role: c.role,
OwnerID: ownerID,
ClusterID: clusterID,
OwnerID: ticket.OwnerId,
ClusterID: ticket.ClusterId,
}
if err := state.ToFile(c.fileHandler); err != nil {
return fmt.Errorf("persisting node state: %w", err)
}
btd := &kubeadm.BootstrapTokenDiscovery{
APIServerEndpoint: endpoint,
Token: token,
CACertHashes: []string{discoveryCACertHash},
APIServerEndpoint: ticket.ApiServerEndpoint,
Token: ticket.ApiServerEndpoint,
CACertHashes: []string{ticket.DiscoveryTokenCaCertHash},
}
if err := c.joiner.JoinCluster(ctx, btd, certKey, c.role); err != nil {
if err := c.joiner.JoinCluster(ctx, btd, ticket.CertificateKey, c.role); err != nil {
return fmt.Errorf("joining Kubernetes cluster: %w", err)
}