cdbg: make endpoint deployment failure more transparent (#1883)

* add retry + timeout + intercept grpc logs

* LogStateChanges inside grplog pkg

* remove retry and tj/assert

* rename nit

* Update debugd/internal/cdbg/cmd/deploy.go

Co-authored-by: Paul Meyer <49727155+katexochen@users.noreply.github.com>

* Update debugd/internal/cdbg/cmd/deploy.go

Co-authored-by: Paul Meyer <49727155+katexochen@users.noreply.github.com>

* paul feedback

* return waitFn instead of WaitGroup

* Revert "return waitFn instead of WaitGroup"

This reverts commit 45700f30e341ce3af509b687febbc0125f7ddb38.

* log routine inside debugd constructor

* test doubles names

* Update debugd/internal/cdbg/cmd/deploy.go

Co-authored-by: Paul Meyer <49727155+katexochen@users.noreply.github.com>

* fix newDebugClient closeFn

---------

Co-authored-by: Paul Meyer <49727155+katexochen@users.noreply.github.com>
This commit is contained in:
Adrian Stobbe 2023-06-12 13:45:34 +02:00 committed by GitHub
parent 167052d443
commit e738f15f0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 221 additions and 53 deletions

View File

@ -66,6 +66,7 @@ go_library(
"//internal/crypto",
"//internal/file",
"//internal/grpc/dialer",
"//internal/grpc/grpclog",
"//internal/grpc/retry",
"//internal/imagefetcher",
"//internal/kms/uri",
@ -89,7 +90,6 @@ go_library(
"@io_k8s_client_go//tools/clientcmd/api/latest",
"@io_k8s_sigs_yaml//:yaml",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//connectivity",
"@org_golang_x_mod//semver",
"@org_uber_go_zap//zapcore",
] + select({

View File

@ -27,7 +27,6 @@ import (
"github.com/spf13/afero"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/clientcmd"
clientcodec "k8s.io/client-go/tools/clientcmd/api/latest"
@ -46,6 +45,7 @@ import (
"github.com/edgelesssys/constellation/v2/internal/crypto"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/grpc/dialer"
"github.com/edgelesssys/constellation/v2/internal/grpc/grpclog"
grpcRetry "github.com/edgelesssys/constellation/v2/internal/grpc/retry"
"github.com/edgelesssys/constellation/v2/internal/kms/uri"
"github.com/edgelesssys/constellation/v2/internal/license"
@ -327,23 +327,11 @@ func (d *initDoer) getLogs(resp initproto.API_InitClient) error {
}
func (d *initDoer) handleGRPCStateChanges(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn) {
wg.Add(1)
go func() {
defer wg.Done()
state := conn.GetState()
d.log.Debugf("Connection state started as %s", state)
for ; state != connectivity.Ready && conn.WaitForStateChange(ctx, state); state = conn.GetState() {
d.log.Debugf("Connection state changed to %s", state)
}
if state == connectivity.Ready {
d.log.Debugf("Connection ready")
grpclog.LogStateChangesUntilReady(ctx, conn, d.log, wg, func() {
d.connectedOnce = true
d.spinner.Stop()
d.spinner.Start("Initializing cluster ", false)
} else {
d.log.Debugf("Connection state ended with %s", state)
}
}()
})
}
func (i *initCmd) writeOutput(

View File

@ -18,6 +18,7 @@ go_library(
"//internal/config",
"//internal/constants",
"//internal/file",
"//internal/grpc/grpclog",
"//internal/logger",
"@com_github_spf13_afero//:afero",
"@com_github_spf13_cobra//:cobra",

View File

@ -10,10 +10,11 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/edgelesssys/constellation/v2/debugd/internal/debugd"
"github.com/edgelesssys/constellation/v2/debugd/internal/debugd/logcollector"
@ -24,6 +25,7 @@ import (
"github.com/edgelesssys/constellation/v2/internal/config"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/grpc/grpclog"
"github.com/edgelesssys/constellation/v2/internal/logger"
"github.com/spf13/afero"
"github.com/spf13/cobra"
@ -31,6 +33,8 @@ import (
"google.golang.org/grpc/credentials/insecure"
)
const deployEndpointTimeout = 20 * time.Minute
func newDeployCmd() *cobra.Command {
deployCmd := &cobra.Command{
Use: "deploy",
@ -77,11 +81,13 @@ func runDeploy(cmd *cobra.Command, _ []string) error {
if err != nil {
return err
}
return deploy(cmd, fileHandler, constellationConfig, transfer, log)
}
func deploy(cmd *cobra.Command, fileHandler file.Handler, constellationConfig *config.Config, transfer fileTransferer, log *logger.Logger) error {
func deploy(cmd *cobra.Command, fileHandler file.Handler, constellationConfig *config.Config,
transfer fileTransferer,
log *logger.Logger,
) error {
bootstrapperPath, err := cmd.Flags().GetString("bootstrapper")
if err != nil {
return err
@ -145,7 +151,7 @@ func deploy(cmd *cobra.Command, fileHandler file.Handler, constellationConfig *c
log: log,
}
if err := deployOnEndpoint(cmd.Context(), input); err != nil {
return err
return fmt.Errorf("deploying endpoint on %q: %w", ip, err)
}
}
@ -162,13 +168,16 @@ type deployOnEndpointInput struct {
// deployOnEndpoint deploys a custom built bootstrapper binary to a debugd endpoint.
func deployOnEndpoint(ctx context.Context, in deployOnEndpointInput) error {
ctx, cancel := context.WithTimeout(ctx, deployEndpointTimeout)
defer cancel()
in.log.Infof("Deploying on %v", in.debugdEndpoint)
client, closer, err := newDebugdClient(ctx, in.debugdEndpoint)
client, closeAndWaitFn, err := newDebugdClient(ctx, in.debugdEndpoint, in.log)
if err != nil {
return fmt.Errorf("creating debugd client: %w", err)
}
defer closer.Close()
defer closeAndWaitFn()
if err := setInfo(ctx, in.log, client, in.infos); err != nil {
return fmt.Errorf("sending info: %w", err)
@ -181,17 +190,27 @@ func deployOnEndpoint(ctx context.Context, in deployOnEndpointInput) error {
return nil
}
func newDebugdClient(ctx context.Context, ip string) (pb.DebugdClient, io.Closer, error) {
type closeAndWait func()
// newDebugdClient creates a new gRPC client for the debugd service and logs the connection state changes.
func newDebugdClient(ctx context.Context, ip string, log *logger.Logger) (pb.DebugdClient, closeAndWait, error) {
conn, err := grpc.DialContext(
ctx,
net.JoinHostPort(ip, strconv.Itoa(constants.DebugdPort)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
log.GetClientUnaryInterceptor(),
log.GetClientStreamInterceptor(),
)
if err != nil {
return nil, nil, fmt.Errorf("connecting to other instance via gRPC: %w", err)
}
return pb.NewDebugdClient(conn), conn, nil
var wg sync.WaitGroup
grpclog.LogStateChangesUntilReady(ctx, conn, log, &wg, func() {})
closeAndWait := func() {
conn.Close()
wg.Wait()
}
return pb.NewDebugdClient(conn), closeAndWait, nil
}
func setInfo(ctx context.Context, log *logger.Logger, client pb.DebugdClient, infos map[string]string) error {

View File

@ -1,9 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("//bazel/go:go_test.bzl", "go_test")
go_library(
name = "grpclog",
srcs = ["grplog.go"],
srcs = ["grpclog.go"],
importpath = "github.com/edgelesssys/constellation/v2/internal/grpc/grpclog",
visibility = ["//:__subpackages__"],
deps = ["@org_golang_google_grpc//peer"],
deps = [
"@org_golang_google_grpc//connectivity",
"@org_golang_google_grpc//peer",
],
)
go_test(
name = "grpclog_test",
srcs = ["grpclog_test.go"],
embed = [":grpclog"],
deps = [
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//connectivity",
],
)

View File

@ -0,0 +1,53 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
// grpclog provides a logging utilities for gRPC.
package grpclog
import (
"context"
"sync"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/peer"
)
// PeerAddrFromContext returns a peer's address from context, or "unknown" if not found.
func PeerAddrFromContext(ctx context.Context) string {
p, ok := peer.FromContext(ctx)
if !ok {
return "unknown"
}
return p.Addr.String()
}
// LogStateChangesUntilReady logs the state changes of a gRPC connection.
func LogStateChangesUntilReady(ctx context.Context, conn getStater, log debugLog, wg *sync.WaitGroup, isReadyCallback func()) {
wg.Add(1)
go func() {
defer wg.Done()
state := conn.GetState()
log.Debugf("Connection state started as %s", state)
for ; state != connectivity.Ready && conn.WaitForStateChange(ctx, state); state = conn.GetState() {
log.Debugf("Connection state changed to %s", state)
}
if state == connectivity.Ready {
log.Debugf("Connection ready")
isReadyCallback()
} else {
log.Debugf("Connection state ended with %s", state)
}
}()
}
type getStater interface {
GetState() connectivity.State
WaitForStateChange(context.Context, connectivity.State) bool
}
type debugLog interface {
Debugf(format string, args ...any)
}

View File

@ -0,0 +1,115 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
// grpclog provides a logging utilities for gRPC.
package grpclog
import (
"context"
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/connectivity"
)
func TestLogStateChanges(t *testing.T) {
testCases := map[string]struct {
name string
conn getStater
assert func(t *testing.T, lg *spyLog, isReadyCallbackCalled bool)
}{
"state: connecting, ready": {
conn: &stubConn{
states: []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Ready,
},
},
assert: func(t *testing.T, lg *spyLog, isReadyCallbackCalled bool) {
require.Len(t, lg.msgs, 3)
assert.Equal(t, "Connection state started as CONNECTING", lg.msgs[0])
assert.Equal(t, "Connection state changed to CONNECTING", lg.msgs[1])
assert.Equal(t, "Connection ready", lg.msgs[2])
assert.True(t, isReadyCallbackCalled)
},
},
"state: ready": {
conn: &stubConn{
states: []connectivity.State{
connectivity.Ready,
connectivity.Idle,
},
stopWaitForChange: false,
},
assert: func(t *testing.T, lg *spyLog, isReadyCallbackCalled bool) {
require.Len(t, lg.msgs, 2)
assert.Equal(t, "Connection state started as READY", lg.msgs[0])
assert.Equal(t, "Connection ready", lg.msgs[1])
assert.True(t, isReadyCallbackCalled)
},
},
"no WaitForStateChange (e.g. when context is canceled)": {
conn: &stubConn{
states: []connectivity.State{
connectivity.Connecting,
connectivity.Idle,
},
stopWaitForChange: true,
},
assert: func(t *testing.T, lg *spyLog, isReadyCallbackCalled bool) {
require.Len(t, lg.msgs, 2)
assert.Equal(t, "Connection state started as CONNECTING", lg.msgs[0])
assert.Equal(t, "Connection state ended with CONNECTING", lg.msgs[1])
assert.False(t, isReadyCallbackCalled)
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
logger := &spyLog{}
var wg sync.WaitGroup
isReadyCallbackCalled := false
LogStateChangesUntilReady(context.Background(), tc.conn, logger, &wg, func() { isReadyCallbackCalled = true })
wg.Wait()
tc.assert(t, logger, isReadyCallbackCalled)
})
}
}
type spyLog struct {
msgs []string
}
func (f *spyLog) Debugf(format string, args ...any) {
f.msgs = append(f.msgs, fmt.Sprintf(format, args...))
}
type stubConn struct {
states []connectivity.State
idx int
stopWaitForChange bool
}
func (f *stubConn) GetState() connectivity.State {
if f.idx > len(f.states)-1 {
return f.states[len(f.states)-1]
}
res := f.states[f.idx]
f.idx++
return res
}
func (f *stubConn) WaitForStateChange(context.Context, connectivity.State) bool {
if f.stopWaitForChange {
return false
}
return f.idx < len(f.states)
}

View File

@ -1,23 +0,0 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
// grpclog provides a logging utilities for gRPC.
package grpclog
import (
"context"
"google.golang.org/grpc/peer"
)
// PeerAddrFromContext returns a peer's address from context, or "unknown" if not found.
func PeerAddrFromContext(ctx context.Context) string {
p, ok := peer.FromContext(ctx)
if !ok {
return "unknown"
}
return p.Addr.String()
}