peer: save PublicIP instead of publicEndpoint / add multi-coord gRPCs

This commit is contained in:
Benedict 2022-04-13 12:39:55 +02:00 committed by Benedict Schlüter
parent 55a1aa783f
commit f0e35a43d4
31 changed files with 1216 additions and 666 deletions

View file

@ -70,8 +70,11 @@ func (a *API) ActivateAsCoordinator(in *pubproto.ActivateAsCoordinatorRequest, s
if err := a.core.SetUpKMS(ctx, in.StorageUri, in.KmsUri, in.KeyEncryptionKeyId, in.UseExistingKek); err != nil {
return status.Errorf(codes.Internal, "%v", err)
}
coordPeer, err := a.makeCoordinatorPeer()
vpnIP, err := a.core.GetNextCoordinatorIP()
if err != nil {
return status.Errorf(codes.Internal, "could not obtain coordinator vpn ip%v", err)
}
coordPeer, err := a.assemblePeerStruct(vpnIP, role.Coordinator)
if err != nil {
return status.Errorf(codes.Internal, "%v", err)
}
@ -100,9 +103,8 @@ func (a *API) ActivateAsCoordinator(in *pubproto.ActivateAsCoordinatorRequest, s
panic(err)
}
}()
// TODO: check performance and maybe make concurrent
if err := a.activateNodes(logToCLI, in.NodePublicEndpoints, coordPeer); err != nil {
if err := a.activateNodes(logToCLI, in.NodePublicIps); err != nil {
a.logger.Error("node activation failed", zap.Error(err))
return status.Errorf(codes.Internal, "%v", err)
}
@ -115,9 +117,16 @@ func (a *API) ActivateAsCoordinator(in *pubproto.ActivateAsCoordinatorRequest, s
if err := a.core.PersistNodeState(role.Coordinator, ownerID, clusterID); err != nil {
return status.Errorf(codes.Internal, "%v", err)
}
adminVPNIP, err := a.core.GetNextNodeIP()
if err != nil {
return status.Errorf(codes.Internal, "%v", err)
}
// This effectively gives code execution, so we do this last.
adminVPNIP, err := a.core.AddAdmin(in.AdminVpnPubKey)
err = a.core.AddPeer(peer.Peer{
VPNIP: adminVPNIP,
VPNPubKey: in.AdminVpnPubKey,
Role: role.Admin,
})
if err != nil {
return status.Errorf(codes.Internal, "%v", err)
}
@ -141,11 +150,6 @@ func (a *API) ActivateAdditionalNodes(in *pubproto.ActivateAdditionalNodesReques
return status.Errorf(codes.FailedPrecondition, "%v", err)
}
coordPeer, err := a.makeCoordinatorPeer()
if err != nil {
return status.Errorf(codes.Internal, "%v", err)
}
logToCLI := a.newLogToCLIFunc(func(msg string) error {
return srv.Send(&pubproto.ActivateAdditionalNodesResponse{
Log: &pubproto.Log{
@ -155,7 +159,7 @@ func (a *API) ActivateAdditionalNodes(in *pubproto.ActivateAdditionalNodesReques
})
// TODO: check performance and maybe make concurrent
if err := a.activateNodes(logToCLI, in.NodePublicEndpoints, coordPeer); err != nil {
if err := a.activateNodes(logToCLI, in.NodePublicIps); err != nil {
a.logger.Error("node activation failed", zap.Error(err))
return status.Errorf(codes.Internal, "%v", err)
}
@ -182,9 +186,13 @@ func (a *API) RequestStateDiskKey(ctx context.Context, in *pubproto.RequestState
return &pubproto.RequestStateDiskKeyResponse{}, errors.New("unimplemented")
}
func (a *API) activateNodes(logToCLI logFunc, nodePublicEndpoints []string, coordPeer peer.Peer) error {
// Create initial peer data to be sent to the nodes. Currently, this is just this Coordinator.
initialPeers := peer.ToPubProto([]peer.Peer{coordPeer})
func (a *API) activateNodes(logToCLI logFunc, nodePublicIPs []string) error {
_, peers, err := a.core.GetPeers(0)
if err != nil {
return err
}
// we need to add at least all coordinators to the peer for HA
initialPeers := peer.ToPubProto(peers)
ownerID, clusterID, err := a.core.GetIDs(nil)
if err != nil {
@ -192,42 +200,60 @@ func (a *API) activateNodes(logToCLI logFunc, nodePublicEndpoints []string, coor
}
// Activate all nodes.
for num, nodePublicEndpoint := range nodePublicEndpoints {
logToCLI("Activating node %3d out of %3d ...", num+1, len(nodePublicEndpoints))
for num, nodePublicIP := range nodePublicIPs {
logToCLI("activating node %3d out of %3d nodes ...", num+1, len(nodePublicIPs))
nodeVPNIP, err := a.core.GetNextNodeIP()
if err != nil {
a.logger.Error("generation of vpn ips failed", zap.Error(err))
return err
}
nodeVpnPubKey, err := a.activateNode(nodePublicEndpoint, nodeVPNIP, initialPeers, ownerID, clusterID)
nodeVpnPubKey, err := a.activateNode(nodePublicIP, nodeVPNIP, initialPeers, ownerID, clusterID)
if err != nil {
return err
}
peer := peer.Peer{
PublicEndpoint: nodePublicEndpoint,
VPNIP: nodeVPNIP,
VPNPubKey: nodeVpnPubKey,
Role: role.Node,
PublicIP: nodePublicIP,
VPNIP: nodeVPNIP,
VPNPubKey: nodeVpnPubKey,
Role: role.Node,
}
if err := a.core.AddPeer(peer); err != nil {
return err
}
if err := a.joinCluster(nodePublicEndpoint); err != nil {
// This can be omitted if we
// 1. Use a gRPC HA balancer mechanism, which picks one active coordinator connection
// (nodeUpdate loop causes problems, even if we specify the IP in the joinCluster RPC)
if err := a.updateCoordinator(); err != nil {
return err
}
if err := a.joinCluster(nodePublicIP); err != nil {
return err
}
}
// Manually trigger an update operation on all nodes.
// Manually trigger an update operation on all peers.
// This may be expendable in the future, depending on whether it's acceptable that it takes
// some seconds until the nodes get all peer data via their regular update requests.
_, peers, err := a.core.GetPeers(0)
_, peers, err = a.core.GetPeers(0)
if err != nil {
return err
}
vpnIP, err := a.core.GetVPNIP()
if err != nil {
return err
}
for _, p := range peers {
if p.Role == role.Node {
if err := a.triggerNodeUpdate(p.PublicEndpoint); err != nil {
a.logger.DPanic("TriggerNodeUpdate failed", zap.Error(err))
if err := a.triggerNodeUpdate(p.PublicIP); err != nil {
a.logger.Error("TriggerNodeUpdate failed", zap.Error(err))
}
}
if p.Role == role.Coordinator && p.VPNIP != vpnIP {
a.logger.Info("update coordinator", zap.String("coordinator vpnIP", p.VPNIP))
if err := a.triggerCoordinatorUpdate(context.TODO(), p.PublicIP); err != nil {
// no reason to panic here, we can recover
a.logger.Error("triggerCoordinatorUpdate failed", zap.Error(err), zap.String("endpoint", p.PublicIP), zap.String("vpnip", p.VPNIP))
}
}
}
@ -235,11 +261,11 @@ func (a *API) activateNodes(logToCLI logFunc, nodePublicEndpoints []string, coor
return nil
}
func (a *API) activateNode(nodePublicEndpoint string, nodeVPNIP string, initialPeers []*pubproto.Peer, ownerID, clusterID []byte) ([]byte, error) {
func (a *API) activateNode(nodePublicIP string, nodeVPNIP string, initialPeers []*pubproto.Peer, ownerID, clusterID []byte) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), deadlineDuration)
defer cancel()
conn, err := a.dial(ctx, nodePublicEndpoint)
conn, err := a.dial(ctx, net.JoinHostPort(nodePublicIP, endpointAVPNPort))
if err != nil {
return nil, err
}
@ -261,22 +287,23 @@ func (a *API) activateNode(nodePublicEndpoint string, nodeVPNIP string, initialP
return resp.NodeVpnPubKey, nil
}
func (a *API) makeCoordinatorPeer() (peer.Peer, error) {
coordinatorVPNPubKey, err := a.core.GetVPNPubKey()
// assemblePeerStruct combines all information of this peer into a peer struct.
func (a *API) assemblePeerStruct(vpnIP string, _ role.Role) (peer.Peer, error) {
vpnPubKey, err := a.core.GetVPNPubKey()
if err != nil {
a.logger.Error("could not get key", zap.Error(err))
return peer.Peer{}, err
}
coordinatorPublicIP, err := a.getPublicIPAddr()
publicIP, err := a.getPublicIPAddr()
if err != nil {
a.logger.Error("could not get public IP", zap.Error(err))
return peer.Peer{}, err
}
return peer.Peer{
PublicEndpoint: net.JoinHostPort(coordinatorPublicIP, endpointAVPNPort),
VPNIP: a.core.GetCoordinatorVPNIP(),
VPNPubKey: coordinatorVPNPubKey,
Role: role.Coordinator,
PublicIP: publicIP,
VPNIP: vpnIP,
VPNPubKey: vpnPubKey,
Role: role.Coordinator,
}, err
}
@ -288,31 +315,55 @@ func (a *API) newLogToCLIFunc(send func(string) error) logFunc {
}
}
func (a *API) joinCluster(nodePublicEndpoint string) error {
func (a *API) joinCluster(nodePublicIP string) error {
ctx, cancel := context.WithTimeout(context.Background(), deadlineDuration)
defer cancel()
vpnIP, err := a.core.GetVPNIP()
if err != nil {
return err
}
// We don't verify the peer certificate here, since JoinCluster triggers a connection over VPN
// The target of the rpc needs to already be part of the VPN to process the request, meaning it is trusted
conn, err := a.dialNoVerify(ctx, nodePublicEndpoint)
conn, err := a.dialNoVerify(ctx, net.JoinHostPort(nodePublicIP, endpointAVPNPort))
if err != nil {
return err
}
defer conn.Close()
client := pubproto.NewAPIClient(conn)
_, err = client.JoinCluster(ctx, &pubproto.JoinClusterRequest{})
_, err = client.JoinCluster(ctx, &pubproto.JoinClusterRequest{CoordinatorVpnIp: vpnIP})
return err
}
func (a *API) triggerNodeUpdate(nodePublicEndpoint string) error {
func (a *API) updateCoordinator() error {
_, peers, err := a.core.GetPeers(0)
if err != nil {
return err
}
vpnIP, err := a.core.GetVPNIP()
if err != nil {
return err
}
for _, p := range peers {
if p.Role == role.Coordinator && p.VPNIP != vpnIP {
a.logger.Info("update coordinator", zap.String("coordinator vpnIP", p.VPNIP))
if err := a.triggerCoordinatorUpdate(context.TODO(), p.PublicIP); err != nil {
a.logger.Error("triggerCoordinatorUpdate failed", zap.Error(err), zap.String("endpoint", p.PublicIP), zap.String("vpnip", p.VPNIP))
}
}
}
return nil
}
func (a *API) triggerNodeUpdate(nodePublicIP string) error {
ctx, cancel := context.WithTimeout(context.Background(), deadlineDuration)
defer cancel()
// We don't verify the peer certificate here, since TriggerNodeUpdate triggers a connection over VPN
// The target of the rpc needs to already be part of the VPN to process the request, meaning it is trusted
conn, err := a.dialNoVerify(ctx, nodePublicEndpoint)
conn, err := a.dialNoVerify(ctx, net.JoinHostPort(nodePublicIP, endpointAVPNPort))
if err != nil {
return err
}