coordinator: send additional status log messages to cli in ActivateAsCoordinator

This commit is contained in:
Thomas Tendyck 2022-04-05 09:12:18 +02:00 committed by Thomas Tendyck
parent 7315e80374
commit ea4b9d2d85
3 changed files with 19 additions and 17 deletions

View File

@ -80,8 +80,8 @@ func TestLegacyActivateCoordinator(t *testing.T) {
assert.NoError(err) assert.NoError(err)
// Coordinator streams admin conf // Coordinator streams admin conf
require.Equal(4, len(testActivationSvr.sent)) require.NotEmpty(testActivationSvr.sent)
adminConfig := testActivationSvr.sent[3].GetAdminConfig() adminConfig := testActivationSvr.sent[len(testActivationSvr.sent)-1].GetAdminConfig()
require.NotNil(adminConfig) require.NotNil(adminConfig)
assert.NotEmpty(adminConfig.AdminVpnIp) assert.NotEmpty(adminConfig.AdminVpnIp)
assert.Equal(coordinatorKey, adminConfig.CoordinatorVpnPubKey) assert.Equal(coordinatorKey, adminConfig.CoordinatorVpnPubKey)
@ -92,7 +92,6 @@ func TestLegacyActivateCoordinator(t *testing.T) {
// Coordinator cannot be activated a second time // Coordinator cannot be activated a second time
assert.Error(coordinatorAPI.ActivateAsCoordinator(activationReq, testActivationSvr)) assert.Error(coordinatorAPI.ActivateAsCoordinator(activationReq, testActivationSvr))
assert.Equal(4, len(testActivationSvr.sent))
// Node cannot be activated a second time // Node cannot be activated a second time
nodeResp, err := nodeAPI3.ActivateAsNode(context.TODO(), &pubproto.ActivateAsNodeRequest{ nodeResp, err := nodeAPI3.ActivateAsNode(context.TODO(), &pubproto.ActivateAsNodeRequest{

View File

@ -29,6 +29,18 @@ func (a *API) ActivateAsCoordinator(in *pubproto.ActivateAsCoordinatorRequest, s
return status.Error(codes.InvalidArgument, "missing master secret") return status.Error(codes.InvalidArgument, "missing master secret")
} }
logToCLI := a.newLogToCLIFunc(func(msg string) error {
return srv.Send(&pubproto.ActivateAsCoordinatorResponse{
Content: &pubproto.ActivateAsCoordinatorResponse_Log{
Log: &pubproto.Log{
Message: msg,
},
},
})
})
logToCLI("Initializing first Coordinator ...")
// If any of the following actions fail, we cannot revert // If any of the following actions fail, we cannot revert
// Thus, mark this peer as failed. // Thus, mark this peer as failed.
defer func() { defer func() {
@ -70,6 +82,7 @@ func (a *API) ActivateAsCoordinator(in *pubproto.ActivateAsCoordinatorRequest, s
return status.Errorf(codes.Internal, "%v", err) return status.Errorf(codes.Internal, "%v", err)
} }
logToCLI("Initializing Kubernetes ...")
kubeconfig, err := a.core.InitCluster(in.AutoscalingNodeGroups, in.CloudServiceAccountUri) kubeconfig, err := a.core.InitCluster(in.AutoscalingNodeGroups, in.CloudServiceAccountUri)
if err != nil { if err != nil {
return status.Errorf(codes.Internal, "%v", err) return status.Errorf(codes.Internal, "%v", err)
@ -87,16 +100,6 @@ func (a *API) ActivateAsCoordinator(in *pubproto.ActivateAsCoordinatorRequest, s
} }
}() }()
logToCLI := a.newLogToCLIFunc(func(msg string) error {
return srv.Send(&pubproto.ActivateAsCoordinatorResponse{
Content: &pubproto.ActivateAsCoordinatorResponse_Log{
Log: &pubproto.Log{
Message: msg,
},
},
})
})
// TODO: check performance and maybe make concurrent // TODO: check performance and maybe make concurrent
if err := a.activateNodes(logToCLI, in.NodePublicEndpoints, coordPeer); err != nil { if err := a.activateNodes(logToCLI, in.NodePublicEndpoints, coordPeer); err != nil {
a.logger.Error("node activation failed", zap.Error(err)) a.logger.Error("node activation failed", zap.Error(err))
@ -169,7 +172,7 @@ func (a *API) activateNodes(logToCLI logFunc, nodePublicEndpoints []string, coor
// Activate all nodes. // Activate all nodes.
for num, nodePublicEndpoint := range nodePublicEndpoints { for num, nodePublicEndpoint := range nodePublicEndpoints {
logToCLI("activating node %3d out of %3d nodes", num+1, len(nodePublicEndpoints)) logToCLI("Activating node %3d out of %3d ...", num+1, len(nodePublicEndpoints))
nodeVPNIP, err := a.core.GetNextNodeIP() nodeVPNIP, err := a.core.GetNextNodeIP()
if err != nil { if err != nil {
a.logger.Error("generation of vpn ips failed", zap.Error(err)) a.logger.Error("generation of vpn ips failed", zap.Error(err))

View File

@ -152,11 +152,11 @@ func TestActivateAsCoordinator(t *testing.T) {
require.NoError(err) require.NoError(err)
// Coordinator streams logs and admin conf // Coordinator streams logs and admin conf
require.Len(stream.sent, len(tc.nodes)+1) require.Greater(len(stream.sent), len(tc.nodes))
for i := 0; i < len(tc.nodes); i++ { for i := 0; i < len(stream.sent)-1; i++ {
assert.NotEmpty(stream.sent[i].GetLog().Message) assert.NotEmpty(stream.sent[i].GetLog().Message)
} }
adminConfig := stream.sent[len(tc.nodes)].GetAdminConfig() adminConfig := stream.sent[len(stream.sent)-1].GetAdminConfig()
assert.Equal("192.0.2.99", adminConfig.AdminVpnIp) assert.Equal("192.0.2.99", adminConfig.AdminVpnIp)
assert.Equal(coordinatorPubKey, adminConfig.CoordinatorVpnPubKey) assert.Equal(coordinatorPubKey, adminConfig.CoordinatorVpnPubKey)
assert.Equal(core.kubeconfig, adminConfig.Kubeconfig) assert.Equal(core.kubeconfig, adminConfig.Kubeconfig)