diff --git a/cli/internal/cmd/BUILD.bazel b/cli/internal/cmd/BUILD.bazel index d3e540db7..5e1dbc2a8 100644 --- a/cli/internal/cmd/BUILD.bazel +++ b/cli/internal/cmd/BUILD.bazel @@ -66,6 +66,7 @@ go_library( "//internal/crypto", "//internal/file", "//internal/grpc/dialer", + "//internal/grpc/grpclog", "//internal/grpc/retry", "//internal/imagefetcher", "//internal/kms/uri", @@ -89,7 +90,6 @@ go_library( "@io_k8s_client_go//tools/clientcmd/api/latest", "@io_k8s_sigs_yaml//:yaml", "@org_golang_google_grpc//:go_default_library", - "@org_golang_google_grpc//connectivity", "@org_golang_x_mod//semver", "@org_uber_go_zap//zapcore", ] + select({ diff --git a/cli/internal/cmd/init.go b/cli/internal/cmd/init.go index 35659c3d8..e7ff165d5 100644 --- a/cli/internal/cmd/init.go +++ b/cli/internal/cmd/init.go @@ -27,7 +27,6 @@ import ( "github.com/spf13/afero" "github.com/spf13/cobra" "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/clientcmd" clientcodec "k8s.io/client-go/tools/clientcmd/api/latest" @@ -46,6 +45,7 @@ import ( "github.com/edgelesssys/constellation/v2/internal/crypto" "github.com/edgelesssys/constellation/v2/internal/file" "github.com/edgelesssys/constellation/v2/internal/grpc/dialer" + "github.com/edgelesssys/constellation/v2/internal/grpc/grpclog" grpcRetry "github.com/edgelesssys/constellation/v2/internal/grpc/retry" "github.com/edgelesssys/constellation/v2/internal/kms/uri" "github.com/edgelesssys/constellation/v2/internal/license" @@ -327,23 +327,11 @@ func (d *initDoer) getLogs(resp initproto.API_InitClient) error { } func (d *initDoer) handleGRPCStateChanges(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn) { - wg.Add(1) - go func() { - defer wg.Done() - state := conn.GetState() - d.log.Debugf("Connection state started as %s", state) - for ; state != connectivity.Ready && conn.WaitForStateChange(ctx, state); state = conn.GetState() { - d.log.Debugf("Connection state changed to %s", state) - } - if state == connectivity.Ready { - d.log.Debugf("Connection ready") - d.connectedOnce = true - d.spinner.Stop() - d.spinner.Start("Initializing cluster ", false) - } else { - d.log.Debugf("Connection state ended with %s", state) - } - }() + grpclog.LogStateChangesUntilReady(ctx, conn, d.log, wg, func() { + d.connectedOnce = true + d.spinner.Stop() + d.spinner.Start("Initializing cluster ", false) + }) } func (i *initCmd) writeOutput( diff --git a/debugd/internal/cdbg/cmd/BUILD.bazel b/debugd/internal/cdbg/cmd/BUILD.bazel index 631850962..94420b1c1 100644 --- a/debugd/internal/cdbg/cmd/BUILD.bazel +++ b/debugd/internal/cdbg/cmd/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//internal/config", "//internal/constants", "//internal/file", + "//internal/grpc/grpclog", "//internal/logger", "@com_github_spf13_afero//:afero", "@com_github_spf13_cobra//:cobra", diff --git a/debugd/internal/cdbg/cmd/deploy.go b/debugd/internal/cdbg/cmd/deploy.go index b760d4284..4969531d6 100644 --- a/debugd/internal/cdbg/cmd/deploy.go +++ b/debugd/internal/cdbg/cmd/deploy.go @@ -10,10 +10,11 @@ import ( "context" "errors" "fmt" - "io" "net" "strconv" "strings" + "sync" + "time" "github.com/edgelesssys/constellation/v2/debugd/internal/debugd" "github.com/edgelesssys/constellation/v2/debugd/internal/debugd/logcollector" @@ -24,6 +25,7 @@ import ( "github.com/edgelesssys/constellation/v2/internal/config" "github.com/edgelesssys/constellation/v2/internal/constants" "github.com/edgelesssys/constellation/v2/internal/file" + "github.com/edgelesssys/constellation/v2/internal/grpc/grpclog" "github.com/edgelesssys/constellation/v2/internal/logger" "github.com/spf13/afero" "github.com/spf13/cobra" @@ -31,6 +33,8 @@ import ( "google.golang.org/grpc/credentials/insecure" ) +const deployEndpointTimeout = 20 * time.Minute + func newDeployCmd() *cobra.Command { deployCmd := &cobra.Command{ Use: "deploy", @@ -77,11 +81,13 @@ func runDeploy(cmd *cobra.Command, _ []string) error { if err != nil { return err } - return deploy(cmd, fileHandler, constellationConfig, transfer, log) } -func deploy(cmd *cobra.Command, fileHandler file.Handler, constellationConfig *config.Config, transfer fileTransferer, log *logger.Logger) error { +func deploy(cmd *cobra.Command, fileHandler file.Handler, constellationConfig *config.Config, + transfer fileTransferer, + log *logger.Logger, +) error { bootstrapperPath, err := cmd.Flags().GetString("bootstrapper") if err != nil { return err @@ -145,7 +151,7 @@ func deploy(cmd *cobra.Command, fileHandler file.Handler, constellationConfig *c log: log, } if err := deployOnEndpoint(cmd.Context(), input); err != nil { - return err + return fmt.Errorf("deploying endpoint on %q: %w", ip, err) } } @@ -162,13 +168,16 @@ type deployOnEndpointInput struct { // deployOnEndpoint deploys a custom built bootstrapper binary to a debugd endpoint. func deployOnEndpoint(ctx context.Context, in deployOnEndpointInput) error { + ctx, cancel := context.WithTimeout(ctx, deployEndpointTimeout) + defer cancel() in.log.Infof("Deploying on %v", in.debugdEndpoint) - client, closer, err := newDebugdClient(ctx, in.debugdEndpoint) + client, closeAndWaitFn, err := newDebugdClient(ctx, in.debugdEndpoint, in.log) if err != nil { return fmt.Errorf("creating debugd client: %w", err) } - defer closer.Close() + + defer closeAndWaitFn() if err := setInfo(ctx, in.log, client, in.infos); err != nil { return fmt.Errorf("sending info: %w", err) @@ -181,17 +190,27 @@ func deployOnEndpoint(ctx context.Context, in deployOnEndpointInput) error { return nil } -func newDebugdClient(ctx context.Context, ip string) (pb.DebugdClient, io.Closer, error) { +type closeAndWait func() + +// newDebugdClient creates a new gRPC client for the debugd service and logs the connection state changes. +func newDebugdClient(ctx context.Context, ip string, log *logger.Logger) (pb.DebugdClient, closeAndWait, error) { conn, err := grpc.DialContext( ctx, net.JoinHostPort(ip, strconv.Itoa(constants.DebugdPort)), grpc.WithTransportCredentials(insecure.NewCredentials()), + log.GetClientUnaryInterceptor(), + log.GetClientStreamInterceptor(), ) if err != nil { return nil, nil, fmt.Errorf("connecting to other instance via gRPC: %w", err) } - - return pb.NewDebugdClient(conn), conn, nil + var wg sync.WaitGroup + grpclog.LogStateChangesUntilReady(ctx, conn, log, &wg, func() {}) + closeAndWait := func() { + conn.Close() + wg.Wait() + } + return pb.NewDebugdClient(conn), closeAndWait, nil } func setInfo(ctx context.Context, log *logger.Logger, client pb.DebugdClient, infos map[string]string) error { diff --git a/internal/grpc/grpclog/BUILD.bazel b/internal/grpc/grpclog/BUILD.bazel index 2b42afda6..1c1709925 100644 --- a/internal/grpc/grpclog/BUILD.bazel +++ b/internal/grpc/grpclog/BUILD.bazel @@ -1,9 +1,24 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("//bazel/go:go_test.bzl", "go_test") go_library( name = "grpclog", - srcs = ["grplog.go"], + srcs = ["grpclog.go"], importpath = "github.com/edgelesssys/constellation/v2/internal/grpc/grpclog", visibility = ["//:__subpackages__"], - deps = ["@org_golang_google_grpc//peer"], + deps = [ + "@org_golang_google_grpc//connectivity", + "@org_golang_google_grpc//peer", + ], +) + +go_test( + name = "grpclog_test", + srcs = ["grpclog_test.go"], + embed = [":grpclog"], + deps = [ + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//connectivity", + ], ) diff --git a/internal/grpc/grpclog/grpclog.go b/internal/grpc/grpclog/grpclog.go new file mode 100644 index 000000000..aceb4af46 --- /dev/null +++ b/internal/grpc/grpclog/grpclog.go @@ -0,0 +1,53 @@ +/* +Copyright (c) Edgeless Systems GmbH + +SPDX-License-Identifier: AGPL-3.0-only +*/ + +// grpclog provides a logging utilities for gRPC. +package grpclog + +import ( + "context" + "sync" + + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/peer" +) + +// PeerAddrFromContext returns a peer's address from context, or "unknown" if not found. +func PeerAddrFromContext(ctx context.Context) string { + p, ok := peer.FromContext(ctx) + if !ok { + return "unknown" + } + return p.Addr.String() +} + +// LogStateChangesUntilReady logs the state changes of a gRPC connection. +func LogStateChangesUntilReady(ctx context.Context, conn getStater, log debugLog, wg *sync.WaitGroup, isReadyCallback func()) { + wg.Add(1) + go func() { + defer wg.Done() + state := conn.GetState() + log.Debugf("Connection state started as %s", state) + for ; state != connectivity.Ready && conn.WaitForStateChange(ctx, state); state = conn.GetState() { + log.Debugf("Connection state changed to %s", state) + } + if state == connectivity.Ready { + log.Debugf("Connection ready") + isReadyCallback() + } else { + log.Debugf("Connection state ended with %s", state) + } + }() +} + +type getStater interface { + GetState() connectivity.State + WaitForStateChange(context.Context, connectivity.State) bool +} + +type debugLog interface { + Debugf(format string, args ...any) +} diff --git a/internal/grpc/grpclog/grpclog_test.go b/internal/grpc/grpclog/grpclog_test.go new file mode 100644 index 000000000..7460de7d9 --- /dev/null +++ b/internal/grpc/grpclog/grpclog_test.go @@ -0,0 +1,115 @@ +/* +Copyright (c) Edgeless Systems GmbH + +SPDX-License-Identifier: AGPL-3.0-only +*/ + +// grpclog provides a logging utilities for gRPC. +package grpclog + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/connectivity" +) + +func TestLogStateChanges(t *testing.T) { + testCases := map[string]struct { + name string + conn getStater + assert func(t *testing.T, lg *spyLog, isReadyCallbackCalled bool) + }{ + "state: connecting, ready": { + conn: &stubConn{ + states: []connectivity.State{ + connectivity.Connecting, + connectivity.Ready, + connectivity.Ready, + }, + }, + assert: func(t *testing.T, lg *spyLog, isReadyCallbackCalled bool) { + require.Len(t, lg.msgs, 3) + assert.Equal(t, "Connection state started as CONNECTING", lg.msgs[0]) + assert.Equal(t, "Connection state changed to CONNECTING", lg.msgs[1]) + assert.Equal(t, "Connection ready", lg.msgs[2]) + assert.True(t, isReadyCallbackCalled) + }, + }, + "state: ready": { + conn: &stubConn{ + states: []connectivity.State{ + connectivity.Ready, + connectivity.Idle, + }, + stopWaitForChange: false, + }, + assert: func(t *testing.T, lg *spyLog, isReadyCallbackCalled bool) { + require.Len(t, lg.msgs, 2) + assert.Equal(t, "Connection state started as READY", lg.msgs[0]) + assert.Equal(t, "Connection ready", lg.msgs[1]) + assert.True(t, isReadyCallbackCalled) + }, + }, + "no WaitForStateChange (e.g. when context is canceled)": { + conn: &stubConn{ + states: []connectivity.State{ + connectivity.Connecting, + connectivity.Idle, + }, + stopWaitForChange: true, + }, + assert: func(t *testing.T, lg *spyLog, isReadyCallbackCalled bool) { + require.Len(t, lg.msgs, 2) + assert.Equal(t, "Connection state started as CONNECTING", lg.msgs[0]) + assert.Equal(t, "Connection state ended with CONNECTING", lg.msgs[1]) + assert.False(t, isReadyCallbackCalled) + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + logger := &spyLog{} + + var wg sync.WaitGroup + isReadyCallbackCalled := false + LogStateChangesUntilReady(context.Background(), tc.conn, logger, &wg, func() { isReadyCallbackCalled = true }) + wg.Wait() + tc.assert(t, logger, isReadyCallbackCalled) + }) + } +} + +type spyLog struct { + msgs []string +} + +func (f *spyLog) Debugf(format string, args ...any) { + f.msgs = append(f.msgs, fmt.Sprintf(format, args...)) +} + +type stubConn struct { + states []connectivity.State + idx int + stopWaitForChange bool +} + +func (f *stubConn) GetState() connectivity.State { + if f.idx > len(f.states)-1 { + return f.states[len(f.states)-1] + } + res := f.states[f.idx] + f.idx++ + return res +} + +func (f *stubConn) WaitForStateChange(context.Context, connectivity.State) bool { + if f.stopWaitForChange { + return false + } + return f.idx < len(f.states) +} diff --git a/internal/grpc/grpclog/grplog.go b/internal/grpc/grpclog/grplog.go deleted file mode 100644 index e0df1afab..000000000 --- a/internal/grpc/grpclog/grplog.go +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright (c) Edgeless Systems GmbH - -SPDX-License-Identifier: AGPL-3.0-only -*/ - -// grpclog provides a logging utilities for gRPC. -package grpclog - -import ( - "context" - - "google.golang.org/grpc/peer" -) - -// PeerAddrFromContext returns a peer's address from context, or "unknown" if not found. -func PeerAddrFromContext(ctx context.Context) string { - p, ok := peer.FromContext(ctx) - if !ok { - return "unknown" - } - return p.Addr.String() -}