mirror of
https://github.com/edgelesssys/constellation.git
synced 2025-05-02 22:34:56 -04:00
constellation-lib: refactor init RPC to be shared (#2665)
* constellation-lib: refactor init RPC Signed-off-by: Moritz Sanft <58110325+msanft@users.noreply.github.com> * constellation-lib: pass io.Writer for collecting logs Signed-off-by: Moritz Sanft <58110325+msanft@users.noreply.github.com> * constellation-lib: add init test Signed-off-by: Moritz Sanft <58110325+msanft@users.noreply.github.com> * constellation-lib: bin dialer to struct Signed-off-by: Moritz Sanft <58110325+msanft@users.noreply.github.com> * constellation-lib: set service CIDR on init Signed-off-by: Moritz Sanft <58110325+msanft@users.noreply.github.com> --------- Signed-off-by: Moritz Sanft <58110325+msanft@users.noreply.github.com>
This commit is contained in:
parent
db49093da7
commit
17aecaaf5f
12 changed files with 758 additions and 342 deletions
|
@ -8,11 +8,9 @@ package cmd
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
@ -22,13 +20,10 @@ import (
|
|||
clientcodec "k8s.io/client-go/tools/clientcmd/api/latest"
|
||||
"sigs.k8s.io/yaml"
|
||||
|
||||
"github.com/edgelesssys/constellation/v2/bootstrapper/initproto"
|
||||
"github.com/edgelesssys/constellation/v2/internal/attestation/variant"
|
||||
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
|
||||
"github.com/edgelesssys/constellation/v2/internal/config"
|
||||
"github.com/edgelesssys/constellation/v2/internal/constants"
|
||||
"github.com/edgelesssys/constellation/v2/internal/file"
|
||||
"github.com/edgelesssys/constellation/v2/internal/grpc/grpclog"
|
||||
"github.com/edgelesssys/constellation/v2/internal/helm"
|
||||
"github.com/edgelesssys/constellation/v2/internal/kms/uri"
|
||||
"github.com/edgelesssys/constellation/v2/internal/semver"
|
||||
|
@ -61,142 +56,6 @@ func NewInitCmd() *cobra.Command {
|
|||
return cmd
|
||||
}
|
||||
|
||||
type initDoer struct {
|
||||
dialer grpcDialer
|
||||
endpoint string
|
||||
req *initproto.InitRequest
|
||||
resp *initproto.InitSuccessResponse
|
||||
log debugLog
|
||||
spinner spinnerInterf
|
||||
connectedOnce bool
|
||||
fh file.Handler
|
||||
}
|
||||
|
||||
func (d *initDoer) Do(ctx context.Context) error {
|
||||
// connectedOnce is set in handleGRPCStateChanges when a connection was established in one retry attempt.
|
||||
// This should cancel any other retry attempts when the connection is lost since the bootstrapper likely won't accept any new attempts anymore.
|
||||
if d.connectedOnce {
|
||||
return &nonRetriableError{
|
||||
logCollectionErr: errors.New("init already connected to the remote server in a previous attempt - resumption is not supported"),
|
||||
err: errors.New("init already connected to the remote server in a previous attempt - resumption is not supported"),
|
||||
}
|
||||
}
|
||||
|
||||
conn, err := d.dialer.Dial(ctx, d.endpoint)
|
||||
if err != nil {
|
||||
d.log.Debugf("Dialing init server failed: %s. Retrying...", err)
|
||||
return fmt.Errorf("dialing init server: %w", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
|
||||
grpcStateLogCtx, grpcStateLogCancel := context.WithCancel(ctx)
|
||||
defer grpcStateLogCancel()
|
||||
d.handleGRPCStateChanges(grpcStateLogCtx, &wg, conn)
|
||||
|
||||
protoClient := initproto.NewAPIClient(conn)
|
||||
d.log.Debugf("Created protoClient")
|
||||
resp, err := protoClient.Init(ctx, d.req)
|
||||
if err != nil {
|
||||
return &nonRetriableError{
|
||||
logCollectionErr: errors.New("rpc failed before first response was received - no logs available"),
|
||||
err: fmt.Errorf("init call: %w", err),
|
||||
}
|
||||
}
|
||||
|
||||
res, err := resp.Recv() // get first response, either success or failure
|
||||
if err != nil {
|
||||
if e := d.getLogs(resp); e != nil {
|
||||
d.log.Debugf("Failed to collect logs: %s", e)
|
||||
return &nonRetriableError{
|
||||
logCollectionErr: e,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
return &nonRetriableError{err: err}
|
||||
}
|
||||
|
||||
switch res.Kind.(type) {
|
||||
case *initproto.InitResponse_InitFailure:
|
||||
if e := d.getLogs(resp); e != nil {
|
||||
d.log.Debugf("Failed to get logs from cluster: %s", e)
|
||||
return &nonRetriableError{
|
||||
logCollectionErr: e,
|
||||
err: errors.New(res.GetInitFailure().GetError()),
|
||||
}
|
||||
}
|
||||
return &nonRetriableError{err: errors.New(res.GetInitFailure().GetError())}
|
||||
case *initproto.InitResponse_InitSuccess:
|
||||
d.resp = res.GetInitSuccess()
|
||||
case nil:
|
||||
d.log.Debugf("Cluster returned nil response type")
|
||||
err = errors.New("empty response from cluster")
|
||||
if e := d.getLogs(resp); e != nil {
|
||||
d.log.Debugf("Failed to collect logs: %s", e)
|
||||
return &nonRetriableError{
|
||||
logCollectionErr: e,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
return &nonRetriableError{err: err}
|
||||
default:
|
||||
d.log.Debugf("Cluster returned unknown response type")
|
||||
err = errors.New("unknown response from cluster")
|
||||
if e := d.getLogs(resp); e != nil {
|
||||
d.log.Debugf("Failed to collect logs: %s", e)
|
||||
return &nonRetriableError{
|
||||
logCollectionErr: e,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
return &nonRetriableError{err: err}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *initDoer) getLogs(resp initproto.API_InitClient) error {
|
||||
d.log.Debugf("Attempting to collect cluster logs")
|
||||
for {
|
||||
res, err := resp.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch res.Kind.(type) {
|
||||
case *initproto.InitResponse_InitFailure:
|
||||
return errors.New("trying to collect logs: received init failure response, expected log response")
|
||||
case *initproto.InitResponse_InitSuccess:
|
||||
return errors.New("trying to collect logs: received init success response, expected log response")
|
||||
case nil:
|
||||
return errors.New("trying to collect logs: received nil response, expected log response")
|
||||
}
|
||||
|
||||
log := res.GetLog().GetLog()
|
||||
if log == nil {
|
||||
return errors.New("received empty logs")
|
||||
}
|
||||
|
||||
if err := d.fh.Write(constants.ErrorLog, log, file.OptAppend); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *initDoer) handleGRPCStateChanges(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn) {
|
||||
grpclog.LogStateChangesUntilReady(ctx, conn, d.log, wg, func() {
|
||||
d.connectedOnce = true
|
||||
d.spinner.Stop()
|
||||
d.spinner.Start("Initializing cluster ", false)
|
||||
})
|
||||
}
|
||||
|
||||
func writeRow(wr io.Writer, col1 string, col2 string) {
|
||||
fmt.Fprint(wr, col1, "\t", col2, "\n")
|
||||
}
|
||||
|
@ -257,22 +116,6 @@ func (c *kubeconfigMerger) kubeconfigEnvVar() string {
|
|||
type grpcDialer interface {
|
||||
Dial(ctx context.Context, target string) (*grpc.ClientConn, error)
|
||||
}
|
||||
|
||||
type nonRetriableError struct {
|
||||
logCollectionErr error
|
||||
err error
|
||||
}
|
||||
|
||||
// Error returns the error message.
|
||||
func (e *nonRetriableError) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
||||
|
||||
// Unwrap returns the wrapped error.
|
||||
func (e *nonRetriableError) Unwrap() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
type helmApplier interface {
|
||||
PrepareApply(
|
||||
csp cloudprovider.Provider, attestationVariant variant.Variant, k8sVersion versions.ValidK8sVersion, microserviceVersion semver.Semver, stateFile *state.State,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue