package pubapi import ( "context" "errors" "fmt" "net" "time" "github.com/edgelesssys/constellation/coordinator/config" "github.com/edgelesssys/constellation/coordinator/peer" "github.com/edgelesssys/constellation/coordinator/pubapi/pubproto" "github.com/edgelesssys/constellation/coordinator/role" "github.com/edgelesssys/constellation/coordinator/state" "github.com/edgelesssys/constellation/internal/deploy/ssh" "github.com/edgelesssys/constellation/state/keyservice/keyproto" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // ActivateAsCoordinator is the RPC call to activate the Coordinator. func (a *API) ActivateAsCoordinator(in *pubproto.ActivateAsCoordinatorRequest, srv pubproto.API_ActivateAsCoordinatorServer) (reterr error) { a.mut.Lock() defer a.mut.Unlock() if err := a.core.RequireState(state.AcceptingInit); err != nil { return status.Errorf(codes.FailedPrecondition, "node is not in required state: %v", err) } if len(in.MasterSecret) == 0 { a.logger.Error("missing master secret") return status.Error(codes.InvalidArgument, "missing master secret") } logToCLI := a.newLogToCLIFunc(func(msg string) error { return srv.Send(&pubproto.ActivateAsCoordinatorResponse{ Content: &pubproto.ActivateAsCoordinatorResponse_Log{ Log: &pubproto.Log{ Message: msg, }, }, }) }) logToCLI("Initializing first control-plane node ...") // If any of the following actions fail, we cannot revert // Thus, mark this peer as failed. defer func() { if reterr != nil { _ = a.core.AdvanceState(state.Failed, nil, nil) } }() // AdvanceState MUST be called before any other functions that are not sanity checks or otherwise required // This ensures the node is marked as initialzed before the node is in a state that allows code execution // Any new additions to ActivateAsNode MUST come after if err := a.core.InitializeStoreIPs(); err != nil { return status.Errorf(codes.Internal, "initialize store IPs: %v", err) } ownerID, clusterID, err := a.core.GetIDs(in.MasterSecret) if err != nil { return status.Errorf(codes.Internal, "%v", err) } if err := a.core.AdvanceState(state.ActivatingNodes, ownerID, clusterID); err != nil { return status.Errorf(codes.Internal, "advance state to ActivatingNodes: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() if err := a.core.SetUpKMS(ctx, in.StorageUri, in.KmsUri, in.KeyEncryptionKeyId, in.UseExistingKek); err != nil { return status.Errorf(codes.Internal, "setting up KMS: %v", err) } vpnIP, err := a.core.GetNextCoordinatorIP() if err != nil { return status.Errorf(codes.Internal, "get coordinator vpn IP address: %v", err) } coordPeer, err := a.assemblePeerStruct(vpnIP, role.Coordinator) if err != nil { return status.Errorf(codes.Internal, "assembling the coordinator peer struct: %v", err) } if err := a.core.SetVPNIP(coordPeer.VPNIP); err != nil { return status.Errorf(codes.Internal, "set the vpn IP address: %v", err) } if err := a.core.AddPeer(coordPeer); err != nil { return status.Errorf(codes.Internal, "adding the coordinator to store/vpn: %v", err) } // Setup SSH users for the first coordinator, if defined if len(in.SshUserKeys) != 0 { logToCLI("Creating SSH users on first control-plane node...") sshUserKeys := ssh.FromProtoSlice(in.SshUserKeys) if err := a.core.CreateSSHUsers(sshUserKeys); err != nil { return status.Errorf(codes.Internal, "creating SSH users: %v", err) } } logToCLI("Initializing Kubernetes ...") kubeconfig, err := a.core.InitCluster(in.AutoscalingNodeGroups, in.CloudServiceAccountUri) if err != nil { return status.Errorf(codes.Internal, "initializing Kubernetes cluster failed: %v", err) } // run the VPN-API server if err := a.StartVPNAPIServer(coordPeer.VPNIP); err != nil { return status.Errorf(codes.Internal, "start vpnAPIServer: %v", err) } if err := a.core.SwitchToPersistentStore(); err != nil { return status.Errorf(codes.Internal, "switch to persistent store: %v", err) } // TODO: check performance and maybe make concurrent if err := a.activateCoordinators(logToCLI, in.CoordinatorPublicIps, in.SshUserKeys); err != nil { a.logger.Error("coordinator activation failed", zap.Error(err)) return status.Errorf(codes.Internal, "coordinator initialization: %v", err) } // TODO: check performance and maybe make concurrent if err := a.activateNodes(logToCLI, in.NodePublicIps, in.SshUserKeys); err != nil { a.logger.Error("node activation failed", zap.Error(err)) return status.Errorf(codes.Internal, "node initialization: %v", err) } // persist node state on disk if err := a.core.PersistNodeState(role.Coordinator, coordPeer.VPNIP, ownerID, clusterID); err != nil { return status.Errorf(codes.Internal, "persist node state: %v", err) } diskUUID, err := a.core.GetDiskUUID() if err != nil { return status.Errorf(codes.Internal, "getting disk uuid: %v", err) } diskKey, err := a.core.GetDataKey(ctx, diskUUID, 32) if err != nil { return status.Errorf(codes.Internal, "getting disk key: %v", err) } if err := a.core.UpdateDiskPassphrase(string(diskKey)); err != nil { return status.Errorf(codes.Internal, "updating disk key: %v", err) } adminVPNIP, err := a.core.GetNextNodeIP() if err != nil { return status.Errorf(codes.Internal, "requesting node IP address: %v", err) } // This effectively gives code execution, so we do this last. err = a.core.AddPeer(peer.Peer{ VPNIP: adminVPNIP, VPNPubKey: in.AdminVpnPubKey, Role: role.Admin, }) if err != nil { return status.Errorf(codes.Internal, "add peer to store/vpn: %v", err) } return srv.Send(&pubproto.ActivateAsCoordinatorResponse{ Content: &pubproto.ActivateAsCoordinatorResponse_AdminConfig{ AdminConfig: &pubproto.AdminConfig{ AdminVpnIp: adminVPNIP, CoordinatorVpnPubKey: coordPeer.VPNPubKey, Kubeconfig: kubeconfig, OwnerId: ownerID, ClusterId: clusterID, }, }, }) } // ActivateAdditionalNodes is the RPC call to activate additional nodes. func (a *API) ActivateAdditionalNodes(in *pubproto.ActivateAdditionalNodesRequest, srv pubproto.API_ActivateAdditionalNodesServer) error { if err := a.core.RequireState(state.ActivatingNodes); err != nil { return status.Errorf(codes.FailedPrecondition, "%v", err) } logToCLI := a.newLogToCLIFunc(func(msg string) error { return srv.Send(&pubproto.ActivateAdditionalNodesResponse{ Log: &pubproto.Log{ Message: msg, }, }) }) // TODO: check performance and maybe make concurrent if err := a.activateNodes(logToCLI, in.NodePublicIps, in.SshUserKeys); err != nil { a.logger.Error("node activation failed", zap.Error(err)) return status.Errorf(codes.Internal, "activating nodes: %v", err) } return srv.Send(&pubproto.ActivateAdditionalNodesResponse{ Log: &pubproto.Log{ Message: "success", }, }) } // RequestStateDiskKey triggers the Coordinator to return a key derived from the Constellation's master secret to the caller. func (a *API) RequestStateDiskKey(ctx context.Context, in *pubproto.RequestStateDiskKeyRequest) (*pubproto.RequestStateDiskKeyResponse, error) { if err := a.core.RequireState(state.ActivatingNodes); err != nil { return nil, status.Errorf(codes.FailedPrecondition, "%v", err) } key, err := a.core.GetDataKey(ctx, in.DiskUuid, config.RNGLengthDefault) if err != nil { return nil, status.Errorf(codes.Internal, "unable to load key: %v", err) } peer, err := a.peerFromContext(ctx) if err != nil { return nil, status.Errorf(codes.Internal, "%v", err) } conn, err := a.dialer.Dial(ctx, peer) if err != nil { return nil, status.Errorf(codes.Internal, "%v", err) } defer conn.Close() client := keyproto.NewAPIClient(conn) if _, err := client.PushStateDiskKey(ctx, &keyproto.PushStateDiskKeyRequest{StateDiskKey: key}); err != nil { return nil, status.Errorf(codes.Internal, "pushing key to peer %q: %v", peer, err) } return &pubproto.RequestStateDiskKeyResponse{}, nil } func (a *API) activateNodes(logToCLI logFunc, nodePublicIPs []string, sshUserKeys []*pubproto.SSHUserKey) error { _, peers, err := a.core.GetPeers(0) if err != nil { return err } // we need to add at least all coordinators to the peer for HA initialPeers := peer.ToPubProto(peers) ownerID, clusterID, err := a.core.GetIDs(nil) if err != nil { return err } // Activate all nodes. for num, nodePublicIP := range nodePublicIPs { logToCLI("Activating worker node %3d out of %3d ...", num+1, len(nodePublicIPs)) nodeVPNIP, err := a.core.GetNextNodeIP() if err != nil { a.logger.Error("generation of vpn ips failed", zap.Error(err)) return err } nodeVpnPubKey, err := a.activateNode(nodePublicIP, nodeVPNIP, initialPeers, ownerID, clusterID, sshUserKeys) if err != nil { return err } peer := peer.Peer{ PublicIP: nodePublicIP, VPNIP: nodeVPNIP, VPNPubKey: nodeVpnPubKey, Role: role.Node, } if err := a.core.AddPeer(peer); err != nil { return err } // This can be omitted if we // 1. Use a gRPC HA balancer mechanism, which picks one active coordinator connection // (nodeUpdate loop causes problems, even if we specify the IP in the joinCluster RPC) if err := a.updateCoordinator(); err != nil { return err } if err := a.joinCluster(nodePublicIP); err != nil { return err } } // Manually trigger an update operation on all peers. // This may be expendable in the future, depending on whether it's acceptable that it takes // some seconds until the nodes get all peer data via their regular update requests. _, peers, err = a.core.GetPeers(0) if err != nil { return err } vpnIP, err := a.core.GetVPNIP() if err != nil { return err } for _, p := range peers { if p.Role == role.Node { if err := a.triggerNodeUpdate(p.PublicIP); err != nil { a.logger.Error("TriggerNodeUpdate failed", zap.Error(err)) } } if p.Role == role.Coordinator && p.VPNIP != vpnIP { a.logger.Info("update coordinator", zap.String("coordinator vpnIP", p.VPNIP)) if err := a.triggerCoordinatorUpdate(context.TODO(), p.PublicIP); err != nil { // no reason to panic here, we can recover a.logger.Error("triggerCoordinatorUpdate failed", zap.Error(err), zap.String("endpoint", p.PublicIP), zap.String("vpnip", p.VPNIP)) } } } return nil } func (a *API) activateNode(nodePublicIP string, nodeVPNIP string, initialPeers []*pubproto.Peer, ownerID, clusterID []byte, sshUserKeys []*pubproto.SSHUserKey) ([]byte, error) { ctx, cancel := context.WithTimeout(context.Background(), deadlineDuration) defer cancel() conn, err := a.dialer.Dial(ctx, net.JoinHostPort(nodePublicIP, endpointAVPNPort)) if err != nil { return nil, err } defer conn.Close() client := pubproto.NewAPIClient(conn) stream, err := client.ActivateAsNode(ctx) if err != nil { a.logger.Error("connecting to node for activation failed", zap.Error(err)) return nil, err } /* coordinator -> initial request -> node */ if err := stream.Send(&pubproto.ActivateAsNodeRequest{ Request: &pubproto.ActivateAsNodeRequest_InitialRequest{ InitialRequest: &pubproto.ActivateAsNodeInitialRequest{ NodeVpnIp: nodeVPNIP, Peers: initialPeers, OwnerId: ownerID, ClusterId: clusterID, SshUserKeys: sshUserKeys, }, }, }); err != nil { a.logger.Error("sending initial message to node for activation failed", zap.Error(err)) return nil, err } /* coordinator <- state disk uuid <- node */ // wait for message containing the nodes disk UUID to send back the permanent encryption key message, err := stream.Recv() if err != nil { a.logger.Error("expected disk UUID message but no message received", zap.Error(err)) return nil, err } diskUUID, ok := message.GetResponse().(*pubproto.ActivateAsNodeResponse_StateDiskUuid) if !ok { a.logger.Error("expected disk UUID message but got different message") return nil, errors.New("expected state disk UUID but got different message type") } diskKey, err := a.core.GetDataKey(ctx, diskUUID.StateDiskUuid, 32) if err != nil { a.logger.Error("failed to derive node's disk key") return nil, err } /* coordinator -> state disk key -> node */ // send back state disk encryption key if err := stream.Send(&pubproto.ActivateAsNodeRequest{ Request: &pubproto.ActivateAsNodeRequest_StateDiskKey{ StateDiskKey: diskKey, }, }); err != nil { a.logger.Error("sending state disk key to node on activation failed", zap.Error(err)) return nil, err } /* coordinator <- VPN public key <- node */ // wait for message containing the node VPN pubkey message, err = stream.Recv() if err != nil { a.logger.Error("expected node VPN pubkey but no message received", zap.Error(err)) return nil, err } vpnPubKey, ok := message.GetResponse().(*pubproto.ActivateAsNodeResponse_NodeVpnPubKey) if !ok { a.logger.Error("expected node VPN pubkey but got different message") return nil, errors.New("expected node VPN pub key but got different message type") } return vpnPubKey.NodeVpnPubKey, nil } // assemblePeerStruct combines all information of this peer into a peer struct. func (a *API) assemblePeerStruct(vpnIP string, _ role.Role) (peer.Peer, error) { vpnPubKey, err := a.core.GetVPNPubKey() if err != nil { a.logger.Error("could not get key", zap.Error(err)) return peer.Peer{}, err } publicIP, err := a.getPublicIPAddr() if err != nil { a.logger.Error("could not get public IP", zap.Error(err)) return peer.Peer{}, err } return peer.Peer{ PublicIP: publicIP, VPNIP: vpnIP, VPNPubKey: vpnPubKey, Role: role.Coordinator, }, err } func (a *API) newLogToCLIFunc(send func(string) error) logFunc { return func(format string, v ...any) { if err := send(fmt.Sprintf(format, v...)); err != nil { a.logger.Error("logging to CLI failed", zap.Error(err)) } } } func (a *API) joinCluster(nodePublicIP string) error { ctx, cancel := context.WithTimeout(context.Background(), deadlineDuration) defer cancel() vpnIP, err := a.core.GetVPNIP() if err != nil { return err } // We don't verify the peer certificate here, since JoinCluster triggers a connection over VPN // The target of the rpc needs to already be part of the VPN to process the request, meaning it is trusted conn, err := a.dialer.DialNoVerify(ctx, net.JoinHostPort(nodePublicIP, endpointAVPNPort)) if err != nil { return err } defer conn.Close() client := pubproto.NewAPIClient(conn) _, err = client.JoinCluster(ctx, &pubproto.JoinClusterRequest{CoordinatorVpnIp: vpnIP}) return err } func (a *API) updateCoordinator() error { _, peers, err := a.core.GetPeers(0) if err != nil { return err } vpnIP, err := a.core.GetVPNIP() if err != nil { return err } for _, p := range peers { if p.Role == role.Coordinator && p.VPNIP != vpnIP { a.logger.Info("update coordinator", zap.String("coordinator vpnIP", p.VPNIP)) if err := a.triggerCoordinatorUpdate(context.TODO(), p.PublicIP); err != nil { a.logger.Error("triggerCoordinatorUpdate failed", zap.Error(err), zap.String("endpoint", p.PublicIP), zap.String("vpnip", p.VPNIP)) } } } return nil } func (a *API) triggerNodeUpdate(nodePublicIP string) error { ctx, cancel := context.WithTimeout(context.Background(), deadlineDuration) defer cancel() // We don't verify the peer certificate here, since TriggerNodeUpdate triggers a connection over VPN // The target of the rpc needs to already be part of the VPN to process the request, meaning it is trusted conn, err := a.dialer.DialNoVerify(ctx, net.JoinHostPort(nodePublicIP, endpointAVPNPort)) if err != nil { return err } defer conn.Close() client := pubproto.NewAPIClient(conn) _, err = client.TriggerNodeUpdate(ctx, &pubproto.TriggerNodeUpdateRequest{}) return err } type logFunc func(format string, v ...any)