constellation-lib: move kubecmd package usage (#2673)

* Reduce external dependencies of kubecmd package
* Add kubecmd wrapper to constellation-lib
* Update CLI code to use constellation-lib
* Move kubecmd package to subpackage of constellation-lib
* Initialise helm and kubecmd clients when kubeConfig is set

---------

Signed-off-by: Daniel Weiße <dw@edgeless.systems>
This commit is contained in:
Daniel Weiße 2023-12-05 16:23:31 +01:00 committed by GitHub
parent c07c333d3d
commit 3691defce7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 751 additions and 714 deletions

View file

@ -22,6 +22,7 @@ import (
"github.com/edgelesssys/constellation/v2/bootstrapper/initproto"
"github.com/edgelesssys/constellation/v2/cli/internal/cloudcmd"
"github.com/edgelesssys/constellation/v2/internal/api/attestationconfigapi"
"github.com/edgelesssys/constellation/v2/internal/api/versionsapi"
"github.com/edgelesssys/constellation/v2/internal/atls"
"github.com/edgelesssys/constellation/v2/internal/attestation/variant"
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
@ -29,16 +30,19 @@ import (
"github.com/edgelesssys/constellation/v2/internal/config"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/constellation"
"github.com/edgelesssys/constellation/v2/internal/constellation/kubecmd"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/grpc/dialer"
"github.com/edgelesssys/constellation/v2/internal/helm"
"github.com/edgelesssys/constellation/v2/internal/imagefetcher"
"github.com/edgelesssys/constellation/v2/internal/kms/uri"
"github.com/edgelesssys/constellation/v2/internal/kubecmd"
"github.com/edgelesssys/constellation/v2/internal/semver"
"github.com/edgelesssys/constellation/v2/internal/state"
"github.com/edgelesssys/constellation/v2/internal/versions"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
)
@ -225,13 +229,7 @@ func runApply(cmd *cobra.Command, _ []string) error {
newDialer := func(validator atls.Validator) *dialer.Dialer {
return dialer.New(nil, validator, &net.Dialer{})
}
newKubeUpgrader := func(w io.Writer, kubeConfigPath string, log debugLog) (kubernetesUpgrader, error) {
kubeConfig, err := fileHandler.Read(kubeConfigPath)
if err != nil {
return nil, fmt.Errorf("reading kubeconfig: %w", err)
}
return kubecmd.New(w, kubeConfig, fileHandler, log)
}
newHelmClient := func(kubeConfigPath string, log debugLog) (helmApplier, error) {
kubeConfig, err := fileHandler.Read(kubeConfigPath)
if err != nil {
@ -264,9 +262,8 @@ func runApply(cmd *cobra.Command, _ []string) error {
spinner: spinner,
merger: &kubeconfigMerger{log: log},
newHelmClient: newHelmClient,
newDialer: newDialer,
newKubeUpgrader: newKubeUpgrader,
newInfraApplier: newInfraApplier,
imageFetcher: imagefetcher.New(),
applier: applier,
}
@ -287,15 +284,15 @@ type applyCmd struct {
merger configMerger
applier applier
imageFetcher imageFetcher
applier applier
newHelmClient func(kubeConfigPath string, log debugLog) (helmApplier, error)
newDialer func(validator atls.Validator) *dialer.Dialer
newKubeUpgrader func(out io.Writer, kubeConfigPath string, log debugLog) (kubernetesUpgrader, error)
newInfraApplier func(context.Context) (cloudApplier, func(), error)
}
type applier interface {
SetKubeConfig(kubeConfig []byte) error
CheckLicense(ctx context.Context, csp cloudprovider.Provider, licenseID string) (int, error)
GenerateMasterSecret() (uri.MasterSecret, error)
GenerateMeasurementSalt() ([]byte, error)
@ -309,6 +306,13 @@ type applier interface {
*initproto.InitSuccessResponse,
error,
)
ExtendClusterConfigCertSANs(ctx context.Context, clusterEndpoint, customEndpoint string, additionalAPIServerCertSANs []string) error
GetClusterAttestationConfig(ctx context.Context, variant variant.Variant) (config.AttestationCfg, error)
ApplyJoinConfig(ctx context.Context, newAttestConfig config.AttestationCfg, measurementSalt []byte) error
UpgradeNodeImage(ctx context.Context, imageVersion semver.Semver, imageReference string, force bool) error
UpgradeKubernetesVersion(ctx context.Context, kubernetesVersion versions.ValidK8sVersion, force bool) error
BackupCRDs(ctx context.Context, fileHandler file.Handler, upgradeDir string) ([]apiextensionsv1.CustomResourceDefinition, error)
BackupCRs(ctx context.Context, fileHandler file.Handler, crds []apiextensionsv1.CustomResourceDefinition, upgradeDir string) error
}
type warnLog interface {
@ -415,42 +419,57 @@ func (a *applyCmd) apply(
}
}
if a.flags.skipPhases.contains(skipAttestationConfigPhase, skipCertSANsPhase, skipHelmPhase, skipK8sPhase, skipImagePhase) {
cmd.Print(bufferedOutput.String())
return nil
}
// From now on we can assume a valid Kubernetes admin config file exists
var kubeUpgrader kubernetesUpgrader
if !a.flags.skipPhases.contains(skipAttestationConfigPhase, skipCertSANsPhase, skipHelmPhase, skipK8sPhase, skipImagePhase) {
a.log.Debugf("Creating Kubernetes client using %s", a.flags.pathPrefixer.PrefixPrintablePath(constants.AdminConfFilename))
kubeUpgrader, err = a.newKubeUpgrader(cmd.OutOrStdout(), constants.AdminConfFilename, a.log)
if err != nil {
return err
}
kubeConfig, err := a.fileHandler.Read(constants.AdminConfFilename)
if err != nil {
return fmt.Errorf("reading kubeconfig: %w", err)
}
if err := a.applier.SetKubeConfig(kubeConfig); err != nil {
return err
}
// Apply Attestation Config
if !a.flags.skipPhases.contains(skipAttestationConfigPhase) {
a.log.Debugf("Applying new attestation config to cluster")
if err := a.applyJoinConfig(cmd, kubeUpgrader, conf.GetAttestationConfig(), stateFile.ClusterValues.MeasurementSalt); err != nil {
if err := a.applyJoinConfig(cmd, conf.GetAttestationConfig(), stateFile.ClusterValues.MeasurementSalt); err != nil {
return fmt.Errorf("applying attestation config: %w", err)
}
}
// Extend API Server Cert SANs
if !a.flags.skipPhases.contains(skipCertSANsPhase) {
sans := append([]string{stateFile.Infrastructure.ClusterEndpoint, conf.CustomEndpoint}, stateFile.Infrastructure.APIServerCertSANs...)
if err := kubeUpgrader.ExtendClusterConfigCertSANs(cmd.Context(), sans); err != nil {
if err := a.applier.ExtendClusterConfigCertSANs(
cmd.Context(),
stateFile.Infrastructure.ClusterEndpoint,
conf.CustomEndpoint,
stateFile.Infrastructure.APIServerCertSANs,
); err != nil {
return fmt.Errorf("extending cert SANs: %w", err)
}
}
// Apply Helm Charts
if !a.flags.skipPhases.contains(skipHelmPhase) {
if err := a.runHelmApply(cmd, conf, stateFile, kubeUpgrader, upgradeDir); err != nil {
if err := a.runHelmApply(cmd, conf, stateFile, upgradeDir); err != nil {
return err
}
}
// Upgrade NodeVersion object
if !(a.flags.skipPhases.contains(skipK8sPhase, skipImagePhase)) {
if err := a.runK8sUpgrade(cmd, conf, kubeUpgrader); err != nil {
// Upgrade node image
if !a.flags.skipPhases.contains(skipImagePhase) {
if err := a.runNodeImageUpgrade(cmd, conf); err != nil {
return err
}
}
// Upgrade Kubernetes version
if !a.flags.skipPhases.contains(skipK8sPhase) {
if err := a.runK8sVersionUpgrade(cmd, conf); err != nil {
return err
}
}
@ -598,15 +617,14 @@ func (a *applyCmd) validateInputs(cmd *cobra.Command, configFetcher attestationc
// applyJoinConfig creates or updates the cluster's join config.
// If the config already exists, and is different from the new config, the user is asked to confirm the upgrade.
func (a *applyCmd) applyJoinConfig(
cmd *cobra.Command, kubeUpgrader kubernetesUpgrader, newConfig config.AttestationCfg, measurementSalt []byte,
func (a *applyCmd) applyJoinConfig(cmd *cobra.Command, newConfig config.AttestationCfg, measurementSalt []byte,
) error {
clusterAttestationConfig, err := kubeUpgrader.GetClusterAttestationConfig(cmd.Context(), newConfig.GetVariant())
clusterAttestationConfig, err := a.applier.GetClusterAttestationConfig(cmd.Context(), newConfig.GetVariant())
if err != nil {
a.log.Debugf("Getting cluster attestation config failed: %s", err)
if k8serrors.IsNotFound(err) {
a.log.Debugf("Creating new join config")
return kubeUpgrader.ApplyJoinConfig(cmd.Context(), newConfig, measurementSalt)
return a.applier.ApplyJoinConfig(cmd.Context(), newConfig, measurementSalt)
}
return fmt.Errorf("getting cluster attestation config: %w", err)
}
@ -638,7 +656,7 @@ func (a *applyCmd) applyJoinConfig(
}
}
if err := kubeUpgrader.ApplyJoinConfig(cmd.Context(), newConfig, measurementSalt); err != nil {
if err := a.applier.ApplyJoinConfig(cmd.Context(), newConfig, measurementSalt); err != nil {
return fmt.Errorf("updating attestation config: %w", err)
}
cmd.Println("Successfully updated the cluster's attestation config")
@ -646,19 +664,29 @@ func (a *applyCmd) applyJoinConfig(
return nil
}
// runK8sUpgrade upgrades image and Kubernetes version of the Constellation cluster.
func (a *applyCmd) runK8sUpgrade(cmd *cobra.Command, conf *config.Config, kubeUpgrader kubernetesUpgrader,
) error {
err := kubeUpgrader.UpgradeNodeVersion(
cmd.Context(), conf, a.flags.force,
a.flags.skipPhases.contains(skipImagePhase),
a.flags.skipPhases.contains(skipK8sPhase),
)
func (a *applyCmd) runNodeImageUpgrade(cmd *cobra.Command, conf *config.Config) error {
provider := conf.GetProvider()
attestationVariant := conf.GetAttestationConfig().GetVariant()
region := conf.GetRegion()
imageReference, err := a.imageFetcher.FetchReference(cmd.Context(), provider, attestationVariant, conf.Image, region)
if err != nil {
return fmt.Errorf("fetching image reference: %w", err)
}
imageVersionInfo, err := versionsapi.NewVersionFromShortPath(conf.Image, versionsapi.VersionKindImage)
if err != nil {
return fmt.Errorf("parsing version from image short path: %w", err)
}
imageVersion, err := semver.New(imageVersionInfo.Version())
if err != nil {
return fmt.Errorf("parsing image version: %w", err)
}
err = a.applier.UpgradeNodeImage(cmd.Context(), imageVersion, imageReference, a.flags.force)
var upgradeErr *compatibility.InvalidUpgradeError
switch {
case errors.Is(err, kubecmd.ErrInProgress):
cmd.PrintErrln("Skipping image and Kubernetes upgrades. Another upgrade is in progress.")
cmd.PrintErrln("Skipping image upgrade: Another upgrade is already in progress.")
case errors.As(err, &upgradeErr):
cmd.PrintErrln(err)
case err != nil:
@ -668,6 +696,19 @@ func (a *applyCmd) runK8sUpgrade(cmd *cobra.Command, conf *config.Config, kubeUp
return nil
}
func (a *applyCmd) runK8sVersionUpgrade(cmd *cobra.Command, conf *config.Config) error {
err := a.applier.UpgradeKubernetesVersion(cmd.Context(), conf.KubernetesVersion, a.flags.force)
var upgradeErr *compatibility.InvalidUpgradeError
switch {
case errors.As(err, &upgradeErr):
cmd.PrintErrln(err)
case err != nil:
return fmt.Errorf("upgrading Kubernetes version: %w", err)
}
return nil
}
// checkCreateFilesClean ensures that the workspace is clean before creating a new cluster.
func (a *applyCmd) checkCreateFilesClean() error {
if err := a.checkInitFilesClean(); err != nil {
@ -803,3 +844,11 @@ func (wl warnLogger) Infof(fmtStr string, args ...any) {
func (wl warnLogger) Warnf(fmtStr string, args ...any) {
wl.cmd.PrintErrf("Warning: %s\n", fmt.Sprintf(fmtStr, args...))
}
// imageFetcher gets an image reference from the versionsapi.
type imageFetcher interface {
FetchReference(ctx context.Context,
provider cloudprovider.Provider, attestationVariant variant.Variant,
image, region string,
) (string, error)
}