cli: helm install and upgrade unification (#2244)

This commit is contained in:
Adrian Stobbe 2023-08-24 16:40:47 +02:00 committed by GitHub
parent 9e79e2e0a1
commit a03325466c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1140 additions and 1054 deletions

View File

@ -68,7 +68,6 @@ go_library(
"//internal/grpc/grpclog",
"//internal/grpc/retry",
"//internal/kms/uri",
"//internal/kubernetes/kubectl",
"//internal/license",
"//internal/logger",
"//internal/retry",
@ -88,6 +87,7 @@ go_library(
"@com_github_spf13_afero//:afero",
"@com_github_spf13_cobra//:cobra",
"@in_gopkg_yaml_v3//:yaml_v3",
"@io_k8s_apiextensions_apiserver//pkg/apis/apiextensions/v1:apiextensions",
"@io_k8s_apimachinery//pkg/runtime",
"@io_k8s_client_go//tools/clientcmd",
"@io_k8s_client_go//tools/clientcmd/api/latest",
@ -165,6 +165,7 @@ go_test(
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@io_k8s_api//core/v1:core",
"@io_k8s_apiextensions_apiserver//pkg/apis/apiextensions/v1:apiextensions",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",

View File

@ -76,17 +76,12 @@ type initCmd struct {
merger configMerger
spinner spinnerInterf
fileHandler file.Handler
helmInstaller initializer
clusterShower clusterShower
pf pathprefix.PathPrefixer
}
type clusterShower interface {
ShowCluster(ctx context.Context, provider cloudprovider.Provider) (terraform.ApplyOutput, error)
}
func newInitCmd(
clusterShower clusterShower, helmInstaller initializer, fileHandler file.Handler,
clusterShower clusterShower, fileHandler file.Handler,
spinner spinnerInterf, merger configMerger, log debugLog,
) *initCmd {
return &initCmd{
@ -94,7 +89,6 @@ func newInitCmd(
merger: merger,
spinner: spinner,
fileHandler: fileHandler,
helmInstaller: helmInstaller,
clusterShower: clusterShower,
}
}
@ -125,18 +119,17 @@ func runInitialize(cmd *cobra.Command, _ []string) error {
if err != nil {
return fmt.Errorf("creating Terraform client: %w", err)
}
helmInstaller, err := helm.NewInitializer(log, constants.AdminConfFilename)
if err != nil {
return fmt.Errorf("creating Helm installer: %w", err)
}
i := newInitCmd(tfClient, helmInstaller, fileHandler, spinner, &kubeconfigMerger{log: log}, log)
i := newInitCmd(tfClient, fileHandler, spinner, &kubeconfigMerger{log: log}, log)
fetcher := attestationconfigapi.NewFetcher()
newAttestationApplier := func(w io.Writer, kubeConfig string, log debugLog) (attestationConfigApplier, error) {
return kubecmd.New(w, kubeConfig, log)
return kubecmd.New(w, kubeConfig, fileHandler, log)
}
newHelmClient := func(kubeConfigPath string, log debugLog) (helmApplier, error) {
return helm.NewClient(kubeConfigPath, log)
} // need to defer helm client instantiation until kubeconfig is available
return i.initialize(cmd, newDialer, license.NewClient(), fetcher, newAttestationApplier)
return i.initialize(cmd, newDialer, license.NewClient(), fetcher, newAttestationApplier, newHelmClient)
}
// initialize initializes a Constellation.
@ -144,6 +137,7 @@ func (i *initCmd) initialize(
cmd *cobra.Command, newDialer func(validator atls.Validator) *dialer.Dialer,
quotaChecker license.QuotaChecker, configFetcher attestationconfigapi.Fetcher,
newAttestationApplier func(io.Writer, string, debugLog) (attestationConfigApplier, error),
newHelmClient func(kubeConfigPath string, log debugLog) (helmApplier, error),
) error {
flags, err := i.evalFlagArgs(cmd)
if err != nil {
@ -265,27 +259,35 @@ func (i *initCmd) initialize(
return fmt.Errorf("applying attestation config: %w", err)
}
helmLoader := helm.NewLoader(provider, k8sVersion, clusterName)
i.log.Debugf("Created new Helm loader")
output, err := i.clusterShower.ShowCluster(cmd.Context(), conf.GetProvider())
if err != nil {
return fmt.Errorf("getting Terraform output: %w", err)
}
i.log.Debugf("Loading Helm deployments")
i.spinner.Start("Installing Kubernetes components ", false)
releases, err := helmLoader.LoadReleases(conf, flags.conformance, flags.helmWaitMode, masterSecret, serviceAccURI, idFile, output)
if err != nil {
return fmt.Errorf("loading Helm charts: %w", err)
options := helm.Options{
Force: flags.force,
Conformance: flags.conformance,
HelmWaitMode: flags.helmWaitMode,
AllowDestructive: helm.DenyDestructive,
}
i.log.Debugf("Installing Helm deployments")
if err := i.helmInstaller.Install(cmd.Context(), releases); err != nil {
return fmt.Errorf("installing Helm charts: %w", err)
helmApplier, err := newHelmClient(constants.AdminConfFilename, i.log)
if err != nil {
return fmt.Errorf("creating Helm client: %w", err)
}
executor, includesUpgrades, err := helmApplier.PrepareApply(conf, k8sVersion, idFile, options, output,
serviceAccURI, masterSecret)
if err != nil {
return fmt.Errorf("getting Helm chart executor: %w", err)
}
if includesUpgrades {
return errors.New("init: helm tried to upgrade charts instead of installing them")
}
if err := executor.Apply(cmd.Context()); err != nil {
return fmt.Errorf("applying Helm charts: %w", err)
}
i.spinner.Stop()
i.log.Debugf("Helm deployment installation succeeded")
cmd.Println(bufferedOutput.String())
return nil
}
@ -622,10 +624,14 @@ func (e *nonRetriableError) Unwrap() error {
return e.err
}
type initializer interface {
Install(ctx context.Context, releases *helm.Releases) error
}
type attestationConfigApplier interface {
ApplyJoinConfig(ctx context.Context, newAttestConfig config.AttestationCfg, measurementSalt []byte) error
}
type helmApplier interface {
PrepareApply(conf *config.Config, validK8sversion versions.ValidK8sVersion, idFile clusterid.File, flags helm.Options, tfOutput terraform.ApplyOutput, serviceAccURI string, masterSecret uri.MasterSecret) (helm.Applier, bool, error)
}
type clusterShower interface {
ShowCluster(ctx context.Context, provider cloudprovider.Provider) (terraform.ApplyOutput, error)
}

View File

@ -186,16 +186,13 @@ func TestInitialize(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()
cmd.SetContext(ctx)
i := newInitCmd(&stubShowCluster{}, &stubHelmInstaller{}, fileHandler, &nopSpinner{}, nil, logger.NewTest(t))
err := i.initialize(
cmd,
newDialer,
&stubLicenseClient{},
stubAttestationFetcher{},
i := newInitCmd(&stubShowCluster{}, fileHandler, &nopSpinner{}, nil, logger.NewTest(t))
err := i.initialize(cmd, newDialer, &stubLicenseClient{}, stubAttestationFetcher{},
func(io.Writer, string, debugLog) (attestationConfigApplier, error) {
return &stubAttestationApplier{}, nil
},
)
}, func(_ string, _ debugLog) (helmApplier, error) {
return &stubApplier{}, nil
})
if tc.wantErr {
assert.Error(err)
@ -221,6 +218,20 @@ func TestInitialize(t *testing.T) {
}
}
type stubApplier struct {
err error
}
func (s stubApplier) PrepareApply(_ *config.Config, _ versions.ValidK8sVersion, _ clusterid.File, _ helm.Options, _ terraform.ApplyOutput, _ string, _ uri.MasterSecret) (helm.Applier, bool, error) {
return stubRunner{}, false, s.err
}
type stubRunner struct{}
func (s stubRunner) Apply(_ context.Context) error {
return nil
}
func TestGetLogs(t *testing.T) {
someErr := errors.New("failed")
@ -310,7 +321,7 @@ func TestWriteOutput(t *testing.T) {
UID: "test-uid",
IP: "cluster-ip",
}
i := newInitCmd(nil, nil, fileHandler, nil, &stubMerger{}, logger.NewTest(t))
i := newInitCmd(nil, fileHandler, &nopSpinner{}, &stubMerger{}, logger.NewTest(t))
err := i.writeOutput(idFile, resp.GetInitSuccess(), false, &out)
require.NoError(err)
// assert.Contains(out.String(), ownerID)
@ -403,7 +414,7 @@ func TestGenerateMasterSecret(t *testing.T) {
require.NoError(tc.createFileFunc(fileHandler))
var out bytes.Buffer
i := newInitCmd(nil, nil, fileHandler, nil, nil, logger.NewTest(t))
i := newInitCmd(nil, fileHandler, nil, nil, logger.NewTest(t))
secret, err := i.generateMasterSecret(&out)
if tc.wantErr {
@ -493,16 +504,13 @@ func TestAttestation(t *testing.T) {
defer cancel()
cmd.SetContext(ctx)
i := newInitCmd(nil, nil, fileHandler, &nopSpinner{}, nil, logger.NewTest(t))
err := i.initialize(
cmd,
newDialer,
&stubLicenseClient{},
stubAttestationFetcher{},
i := newInitCmd(nil, fileHandler, &nopSpinner{}, nil, logger.NewTest(t))
err := i.initialize(cmd, newDialer, &stubLicenseClient{}, stubAttestationFetcher{},
func(io.Writer, string, debugLog) (attestationConfigApplier, error) {
return &stubAttestationApplier{}, nil
},
)
}, func(_ string, _ debugLog) (helmApplier, error) {
return &stubApplier{}, nil
})
assert.Error(err)
// make sure the error is actually a TLS handshake error
assert.Contains(err.Error(), "transport: authentication handshake failed")
@ -664,12 +672,6 @@ func (c stubInitClient) Recv() (*initproto.InitResponse, error) {
return res, err
}
type stubHelmInstaller struct{}
func (i *stubHelmInstaller) Install(_ context.Context, _ *helm.Releases) error {
return nil
}
type stubShowCluster struct{}
func (s *stubShowCluster) ShowCluster(_ context.Context, csp cloudprovider.Provider) (terraform.ApplyOutput, error) {

View File

@ -208,21 +208,20 @@ func (m *miniUpCmd) initializeMiniCluster(cmd *cobra.Command, fileHandler file.H
m.log.Debugf("Created new logger")
defer log.Sync()
helmInstaller, err := helm.NewInitializer(log, constants.AdminConfFilename)
if err != nil {
return fmt.Errorf("creating Helm installer: %w", err)
}
tfClient, err := terraform.New(cmd.Context(), constants.TerraformWorkingDir)
if err != nil {
return fmt.Errorf("creating Terraform client: %w", err)
}
newAttestationApplier := func(w io.Writer, kubeConfig string, log debugLog) (attestationConfigApplier, error) {
return kubecmd.New(w, kubeConfig, log)
return kubecmd.New(w, kubeConfig, fileHandler, log)
}
i := newInitCmd(tfClient, helmInstaller, fileHandler, spinner, &kubeconfigMerger{log: log}, log)
if err := i.initialize(cmd, newDialer, license.NewClient(), m.configFetcher, newAttestationApplier); err != nil {
newHelmClient := func(kubeConfigPath string, log debugLog) (helmApplier, error) {
return helm.NewClient(kubeConfigPath, log)
} // need to defer helm client instantiation until kubeconfig is available
i := newInitCmd(tfClient, fileHandler, spinner, &kubeconfigMerger{log: log}, log)
if err := i.initialize(cmd, newDialer, license.NewClient(), m.configFetcher,
newAttestationApplier, newHelmClient); err != nil {
return err
}
m.log.Debugf("Initialized mini cluster")

View File

@ -19,7 +19,6 @@ import (
"github.com/edgelesssys/constellation/v2/internal/config"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/kubernetes/kubectl"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
@ -53,8 +52,7 @@ func runStatus(cmd *cobra.Command, _ []string) error {
fileHandler := file.NewHandler(afero.NewOsFs())
// set up helm client to fetch service versions
helmClient, err := helm.NewUpgradeClient(kubectl.NewUninitialized(), constants.AdminConfFilename, constants.HelmNamespace, log)
helmClient, err := helm.NewReleaseVersionClient(constants.AdminConfFilename, log)
if err != nil {
return fmt.Errorf("setting up helm client: %w", err)
}
@ -70,7 +68,7 @@ func runStatus(cmd *cobra.Command, _ []string) error {
}
variant := conf.GetAttestationConfig().GetVariant()
kubeClient, err := kubecmd.New(cmd.OutOrStdout(), constants.AdminConfFilename, log)
kubeClient, err := kubecmd.New(cmd.OutOrStdout(), constants.AdminConfFilename, fileHandler, log)
if err != nil {
return fmt.Errorf("setting up kubernetes client: %w", err)
}

View File

@ -28,13 +28,13 @@ import (
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/kms/uri"
"github.com/edgelesssys/constellation/v2/internal/kubernetes/kubectl"
"github.com/edgelesssys/constellation/v2/internal/semver"
"github.com/edgelesssys/constellation/v2/internal/versions"
"github.com/rogpeppe/go-internal/diff"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
)
func newUpgradeApplyCmd() *cobra.Command {
@ -75,16 +75,11 @@ func runUpgradeApply(cmd *cobra.Command, _ []string) error {
fileHandler := file.NewHandler(afero.NewOsFs())
upgradeID := generateUpgradeID(upgradeCmdKindApply)
kubeUpgrader, err := kubecmd.New(cmd.OutOrStdout(), constants.AdminConfFilename, log)
kubeUpgrader, err := kubecmd.New(cmd.OutOrStdout(), constants.AdminConfFilename, fileHandler, log)
if err != nil {
return err
}
helmUpgrader, err := helm.NewUpgradeClient(kubectl.NewUninitialized(), constants.AdminConfFilename, constants.HelmNamespace, log)
if err != nil {
return fmt.Errorf("setting up helm client: %w", err)
}
configFetcher := attestationconfigapi.NewFetcher()
// Set up terraform upgrader
@ -105,10 +100,14 @@ func runUpgradeApply(cmd *cobra.Command, _ []string) error {
if err != nil {
return fmt.Errorf("setting up terraform client: %w", err)
}
helmClient, err := helm.NewClient(constants.AdminConfFilename, log)
if err != nil {
return fmt.Errorf("creating Helm client: %w", err)
}
applyCmd := upgradeApplyCmd{
helmUpgrader: helmUpgrader,
kubeUpgrader: kubeUpgrader,
helmApplier: helmClient,
clusterUpgrader: clusterUpgrader,
configFetcher: configFetcher,
clusterShower: tfShower,
@ -119,7 +118,7 @@ func runUpgradeApply(cmd *cobra.Command, _ []string) error {
}
type upgradeApplyCmd struct {
helmUpgrader helmUpgrader
helmApplier helmApplier
kubeUpgrader kubernetesUpgrader
clusterUpgrader clusterUpgrader
configFetcher attestationconfigapi.Fetcher
@ -381,12 +380,31 @@ func (u *upgradeApplyCmd) handleServiceUpgrade(
if err != nil {
return fmt.Errorf("getting service account URI: %w", err)
}
err = u.helmUpgrader.Upgrade(
cmd.Context(), conf, idFile,
flags.upgradeTimeout, helm.DenyDestructive, flags.force, upgradeDir,
flags.conformance, flags.helmWaitMode, secret, serviceAccURI, validK8sVersion, tfOutput,
)
if errors.Is(err, helm.ErrConfirmationMissing) {
options := helm.Options{
Force: flags.force,
Conformance: flags.conformance,
HelmWaitMode: flags.helmWaitMode,
}
prepareApply := func(allowDestructive bool) (helm.Applier, bool, error) {
options.AllowDestructive = allowDestructive
executor, includesUpgrades, err := u.helmApplier.PrepareApply(conf, validK8sVersion, idFile, options,
tfOutput, serviceAccURI, secret)
var upgradeErr *compatibility.InvalidUpgradeError
switch {
case errors.As(err, &upgradeErr):
cmd.PrintErrln(err)
case err != nil:
return nil, false, fmt.Errorf("getting chart executor: %w", err)
}
return executor, includesUpgrades, nil
}
executor, includesUpgrades, err := prepareApply(helm.DenyDestructive)
if err != nil {
if !errors.Is(err, helm.ErrConfirmationMissing) {
return fmt.Errorf("upgrading charts with deny destructive mode: %w", err)
}
if !flags.yes {
cmd.PrintErrln("WARNING: Upgrading cert-manager will destroy all custom resources you have manually created that are based on the current version of cert-manager.")
ok, askErr := askToConfirm(cmd, "Do you want to upgrade cert-manager anyway?")
@ -398,14 +416,27 @@ func (u *upgradeApplyCmd) handleServiceUpgrade(
return nil
}
}
err = u.helmUpgrader.Upgrade(
cmd.Context(), conf, idFile,
flags.upgradeTimeout, helm.AllowDestructive, flags.force, upgradeDir,
flags.conformance, flags.helmWaitMode, secret, serviceAccURI, validK8sVersion, tfOutput,
)
executor, includesUpgrades, err = prepareApply(helm.AllowDestructive)
if err != nil {
return fmt.Errorf("upgrading charts with allow destructive mode: %w", err)
}
}
return err
if includesUpgrades {
u.log.Debugf("Creating backup of CRDs and CRs")
crds, err := u.kubeUpgrader.BackupCRDs(cmd.Context(), upgradeDir)
if err != nil {
return fmt.Errorf("creating CRD backup: %w", err)
}
if err := u.kubeUpgrader.BackupCRs(cmd.Context(), crds, upgradeDir); err != nil {
return fmt.Errorf("creating CR backup: %w", err)
}
}
if err := executor.Apply(cmd.Context()); err != nil {
return fmt.Errorf("applying Helm charts: %w", err)
}
return nil
}
// migrateFrom2_10 applies migrations necessary for upgrading from v2.10 to v2.11
@ -527,20 +558,14 @@ type kubernetesUpgrader interface {
ExtendClusterConfigCertSANs(ctx context.Context, alternativeNames []string) error
GetClusterAttestationConfig(ctx context.Context, variant variant.Variant) (config.AttestationCfg, error)
ApplyJoinConfig(ctx context.Context, newAttestConfig config.AttestationCfg, measurementSalt []byte) error
BackupCRs(ctx context.Context, crds []apiextensionsv1.CustomResourceDefinition, upgradeDir string) error
BackupCRDs(ctx context.Context, upgradeDir string) ([]apiextensionsv1.CustomResourceDefinition, error)
// TODO(v2.11): Remove this function after v2.11 is released.
RemoveAttestationConfigHelmManagement(ctx context.Context) error
// TODO(v2.12): Remove this function after v2.12 is released.
RemoveHelmKeepAnnotation(ctx context.Context) error
}
type helmUpgrader interface {
Upgrade(
ctx context.Context, config *config.Config, idFile clusterid.File, timeout time.Duration,
allowDestructive, force bool, upgradeDir string, conformance bool, helmWaitMode helm.WaitMode,
masterSecret uri.MasterSecret, serviceAccURI string, validK8sVersion versions.ValidK8sVersion, tfOutput terraform.ApplyOutput,
) error
}
type clusterUpgrader interface {
PlanClusterUpgrade(ctx context.Context, outWriter io.Writer, vars terraform.Variables, csp cloudprovider.Provider) (bool, error)
ApplyClusterUpgrade(ctx context.Context, csp cloudprovider.Provider) (terraform.ApplyOutput, error)

View File

@ -11,10 +11,8 @@ import (
"context"
"io"
"testing"
"time"
"github.com/edgelesssys/constellation/v2/cli/internal/clusterid"
"github.com/edgelesssys/constellation/v2/cli/internal/helm"
"github.com/edgelesssys/constellation/v2/cli/internal/kubecmd"
"github.com/edgelesssys/constellation/v2/cli/internal/terraform"
"github.com/edgelesssys/constellation/v2/internal/attestation/variant"
@ -24,24 +22,24 @@ import (
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/kms/uri"
"github.com/edgelesssys/constellation/v2/internal/logger"
"github.com/edgelesssys/constellation/v2/internal/versions"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
)
func TestUpgradeApply(t *testing.T) {
testCases := map[string]struct {
helmUpgrader *stubHelmUpgrader
helmUpgrader stubApplier
kubeUpgrader *stubKubernetesUpgrader
terraformUpgrader *stubTerraformUpgrader
flags upgradeApplyFlags
wantErr bool
flags upgradeApplyFlags
stdin string
}{
"success": {
kubeUpgrader: &stubKubernetesUpgrader{currentConfig: config.DefaultForAzureSEVSNP()},
helmUpgrader: &stubHelmUpgrader{},
helmUpgrader: stubApplier{},
terraformUpgrader: &stubTerraformUpgrader{},
flags: upgradeApplyFlags{yes: true},
},
@ -50,7 +48,7 @@ func TestUpgradeApply(t *testing.T) {
currentConfig: config.DefaultForAzureSEVSNP(),
nodeVersionErr: assert.AnError,
},
helmUpgrader: &stubHelmUpgrader{},
helmUpgrader: stubApplier{},
terraformUpgrader: &stubTerraformUpgrader{},
wantErr: true,
flags: upgradeApplyFlags{yes: true},
@ -60,7 +58,7 @@ func TestUpgradeApply(t *testing.T) {
currentConfig: config.DefaultForAzureSEVSNP(),
nodeVersionErr: kubecmd.ErrInProgress,
},
helmUpgrader: &stubHelmUpgrader{},
helmUpgrader: stubApplier{},
terraformUpgrader: &stubTerraformUpgrader{},
flags: upgradeApplyFlags{yes: true},
},
@ -68,7 +66,7 @@ func TestUpgradeApply(t *testing.T) {
kubeUpgrader: &stubKubernetesUpgrader{
currentConfig: config.DefaultForAzureSEVSNP(),
},
helmUpgrader: &stubHelmUpgrader{err: assert.AnError},
helmUpgrader: stubApplier{err: assert.AnError},
terraformUpgrader: &stubTerraformUpgrader{},
wantErr: true,
flags: upgradeApplyFlags{yes: true},
@ -77,7 +75,7 @@ func TestUpgradeApply(t *testing.T) {
kubeUpgrader: &stubKubernetesUpgrader{
currentConfig: config.DefaultForAzureSEVSNP(),
},
helmUpgrader: &stubHelmUpgrader{},
helmUpgrader: stubApplier{},
terraformUpgrader: &stubTerraformUpgrader{terraformDiff: true},
wantErr: true,
stdin: "no\n",
@ -86,7 +84,7 @@ func TestUpgradeApply(t *testing.T) {
kubeUpgrader: &stubKubernetesUpgrader{
currentConfig: config.DefaultForAzureSEVSNP(),
},
helmUpgrader: &stubHelmUpgrader{},
helmUpgrader: stubApplier{},
terraformUpgrader: &stubTerraformUpgrader{planTerraformErr: assert.AnError},
wantErr: true,
flags: upgradeApplyFlags{yes: true},
@ -95,7 +93,7 @@ func TestUpgradeApply(t *testing.T) {
kubeUpgrader: &stubKubernetesUpgrader{
currentConfig: config.DefaultForAzureSEVSNP(),
},
helmUpgrader: &stubHelmUpgrader{},
helmUpgrader: stubApplier{},
terraformUpgrader: &stubTerraformUpgrader{
applyTerraformErr: assert.AnError,
terraformDiff: true,
@ -117,12 +115,12 @@ func TestUpgradeApply(t *testing.T) {
cfg := defaultConfigWithExpectedMeasurements(t, config.Default(), cloudprovider.Azure)
require.NoError(handler.WriteYAML(constants.ConfigFilename, cfg))
require.NoError(handler.WriteJSON(constants.ClusterIDsFilename, clusterid.File{}))
require.NoError(handler.WriteJSON(constants.ClusterIDsFilename, clusterid.File{MeasurementSalt: []byte("measurementSalt")}))
require.NoError(handler.WriteJSON(constants.MasterSecretFilename, uri.MasterSecret{}))
upgrader := upgradeApplyCmd{
kubeUpgrader: tc.kubeUpgrader,
helmUpgrader: tc.helmUpgrader,
helmApplier: tc.helmUpgrader,
clusterUpgrader: tc.terraformUpgrader,
log: logger.NewTest(t),
configFetcher: stubAttestationFetcher{},
@ -140,22 +138,19 @@ func TestUpgradeApply(t *testing.T) {
}
}
type stubHelmUpgrader struct {
err error
}
func (u stubHelmUpgrader) Upgrade(
_ context.Context, _ *config.Config, _ clusterid.File, _ time.Duration, _, _ bool, _ string, _ bool,
_ helm.WaitMode, _ uri.MasterSecret, _ string, _ versions.ValidK8sVersion, _ terraform.ApplyOutput,
) error {
return u.err
}
type stubKubernetesUpgrader struct {
nodeVersionErr error
currentConfig config.AttestationCfg
}
func (u stubKubernetesUpgrader) BackupCRDs(_ context.Context, _ string) ([]apiextensionsv1.CustomResourceDefinition, error) {
return []apiextensionsv1.CustomResourceDefinition{}, nil
}
func (u stubKubernetesUpgrader) BackupCRs(_ context.Context, _ []apiextensionsv1.CustomResourceDefinition, _ string) error {
return nil
}
func (u stubKubernetesUpgrader) UpgradeNodeVersion(_ context.Context, _ *config.Config, _ bool) error {
return u.nodeVersionErr
}

View File

@ -31,7 +31,6 @@ import (
"github.com/edgelesssys/constellation/v2/internal/config"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/kubernetes/kubectl"
consemver "github.com/edgelesssys/constellation/v2/internal/semver"
"github.com/edgelesssys/constellation/v2/internal/sigstore"
"github.com/edgelesssys/constellation/v2/internal/sigstore/keyselect"
@ -85,7 +84,7 @@ func runUpgradeCheck(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("setting up Terraform upgrader: %w", err)
}
kubeChecker, err := kubecmd.New(cmd.OutOrStdout(), constants.AdminConfFilename, log)
kubeChecker, err := kubecmd.New(cmd.OutOrStdout(), constants.AdminConfFilename, fileHandler, log)
if err != nil {
return fmt.Errorf("setting up Kubernetes upgrader: %w", err)
}
@ -371,7 +370,7 @@ type currentVersionInfo struct {
}
func (v *versionCollector) currentVersions(ctx context.Context) (currentVersionInfo, error) {
helmClient, err := helm.NewUpgradeClient(kubectl.NewUninitialized(), constants.AdminConfFilename, constants.HelmNamespace, v.log)
helmClient, err := helm.NewReleaseVersionClient(constants.AdminConfFilename, v.log)
if err != nil {
return currentVersionInfo{}, fmt.Errorf("setting up helm client: %w", err)
}

View File

@ -4,17 +4,17 @@ load("//bazel/go:go_test.bzl", "go_test")
go_library(
name = "helm",
srcs = [
"backup.go",
"action.go",
"actionfactory.go",
"ciliumhelper.go",
"helm.go",
"init.go",
"install.go",
"loader.go",
"overrides.go",
"release.go",
"retryaction.go",
"serviceversion.go",
"upgrade.go",
"values.go",
"versionlister.go",
],
embedsrcs = [
"charts/cert-manager/Chart.yaml",
@ -429,22 +429,17 @@ go_library(
"//internal/compatibility",
"//internal/config",
"//internal/constants",
"//internal/file",
"//internal/kms/uri",
"//internal/kubernetes/kubectl",
"//internal/retry",
"//internal/semver",
"//internal/versions",
"@com_github_pkg_errors//:errors",
"@com_github_spf13_afero//:afero",
"@io_k8s_apiextensions_apiserver//pkg/apis/apiextensions/v1:apiextensions",
"@io_k8s_apimachinery//pkg/api/errors",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_apimachinery//pkg/apis/meta/v1/unstructured",
"@io_k8s_apimachinery//pkg/runtime/schema",
"@io_k8s_apimachinery//pkg/util/wait",
"@io_k8s_client_go//kubernetes",
"@io_k8s_client_go//tools/clientcmd",
"@io_k8s_sigs_yaml//:yaml",
"@io_k8s_client_go//util/retry",
"@sh_helm_helm//pkg/ignore",
"@sh_helm_helm_v3//pkg/action",
"@sh_helm_helm_v3//pkg/chart",
@ -457,10 +452,8 @@ go_library(
go_test(
name = "helm_test",
srcs = [
"backup_test.go",
"helm_test.go",
"loader_test.go",
"upgrade_test.go",
],
data = glob(["testdata/**"]),
embed = [":helm"],
@ -473,22 +466,16 @@ go_test(
"//internal/cloud/gcpshared",
"//internal/compatibility",
"//internal/config",
"//internal/file",
"//internal/kms/uri",
"//internal/logger",
"//internal/semver",
"//internal/versions",
"@com_github_pkg_errors//:errors",
"@com_github_spf13_afero//:afero",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
"@io_k8s_apiextensions_apiserver//pkg/apis/apiextensions/v1:apiextensions",
"@io_k8s_apimachinery//pkg/api/errors",
"@io_k8s_apimachinery//pkg/apis/meta/v1/unstructured",
"@io_k8s_apimachinery//pkg/runtime/schema",
"@io_k8s_sigs_yaml//:yaml",
"@sh_helm_helm_v3//pkg/chart",
"@sh_helm_helm_v3//pkg/action",
"@sh_helm_helm_v3//pkg/chartutil",
"@sh_helm_helm_v3//pkg/engine",
"@sh_helm_helm_v3//pkg/release",
],
)

161
cli/internal/helm/action.go Normal file
View File

@ -0,0 +1,161 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package helm
import (
"context"
"fmt"
"time"
"github.com/edgelesssys/constellation/v2/internal/constants"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/cli"
)
const (
// timeout is the maximum time given per helm action.
timeout = 10 * time.Minute
)
type applyAction interface {
Apply(context.Context) error
ReleaseName() string
IsAtomic() bool
}
// newActionConfig creates a new action configuration for helm actions.
func newActionConfig(kubeconfig string, logger debugLog) (*action.Configuration, error) {
settings := cli.New()
settings.KubeConfig = kubeconfig
actionConfig := &action.Configuration{}
if err := actionConfig.Init(settings.RESTClientGetter(), constants.HelmNamespace,
"secret", logger.Debugf); err != nil {
return nil, err
}
return actionConfig, nil
}
func newHelmInstallAction(config *action.Configuration, release Release) *action.Install {
action := action.NewInstall(config)
action.Namespace = constants.HelmNamespace
action.Timeout = timeout
action.ReleaseName = release.ReleaseName
setWaitMode(action, release.WaitMode)
return action
}
func setWaitMode(a *action.Install, waitMode WaitMode) {
switch waitMode {
case WaitModeNone:
a.Wait = false
a.Atomic = false
case WaitModeWait:
a.Wait = true
a.Atomic = false
case WaitModeAtomic:
a.Wait = true
a.Atomic = true
default:
panic(fmt.Errorf("unknown wait mode %q", waitMode))
}
}
// installAction is an action that installs a helm chart.
type installAction struct {
preInstall func(context.Context) error
release Release
helmAction *action.Install
postInstall func(context.Context) error
log debugLog
}
// Apply installs the chart.
func (a *installAction) Apply(ctx context.Context) error {
if a.preInstall != nil {
if err := a.preInstall(ctx); err != nil {
return err
}
}
if err := retryApply(ctx, a, a.log); err != nil {
return err
}
if a.postInstall != nil {
if err := a.postInstall(ctx); err != nil {
return err
}
}
return nil
}
func (a *installAction) apply(ctx context.Context) error {
_, err := a.helmAction.RunWithContext(ctx, a.release.Chart, a.release.Values)
return err
}
// ReleaseName returns the release name.
func (a *installAction) ReleaseName() string {
return a.release.ReleaseName
}
// IsAtomic returns true if the action is atomic.
func (a *installAction) IsAtomic() bool {
return a.helmAction.Atomic
}
func newHelmUpgradeAction(config *action.Configuration) *action.Upgrade {
action := action.NewUpgrade(config)
action.Namespace = constants.HelmNamespace
action.Timeout = timeout
action.ReuseValues = false
action.Atomic = true
return action
}
// upgradeAction is an action that upgrades a helm chart.
type upgradeAction struct {
preUpgrade func(context.Context) error
postUpgrade func(context.Context) error
release Release
helmAction *action.Upgrade
log debugLog
}
// Apply installs the chart.
func (a *upgradeAction) Apply(ctx context.Context) error {
if a.preUpgrade != nil {
if err := a.preUpgrade(ctx); err != nil {
return err
}
}
if err := retryApply(ctx, a, a.log); err != nil {
return err
}
if a.postUpgrade != nil {
if err := a.postUpgrade(ctx); err != nil {
return err
}
}
return nil
}
func (a *upgradeAction) apply(ctx context.Context) error {
_, err := a.helmAction.RunWithContext(ctx, a.release.ReleaseName, a.release.Chart, a.release.Values)
return err
}
// ReleaseName returns the release name.
func (a *upgradeAction) ReleaseName() string {
return a.release.ReleaseName
}
// IsAtomic returns true if the action is atomic.
func (a *upgradeAction) IsAtomic() bool {
return a.helmAction.Atomic
}

View File

@ -0,0 +1,186 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package helm
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/edgelesssys/constellation/v2/internal/compatibility"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/semver"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
)
// ErrConfirmationMissing signals that an action requires user confirmation.
var ErrConfirmationMissing = errors.New("action requires user confirmation")
var errReleaseNotFound = errors.New("release not found")
type actionFactory struct {
versionLister releaseVersionLister
cfg *action.Configuration
kubeClient crdClient
cliVersion semver.Semver
log debugLog
}
type crdClient interface {
ApplyCRD(ctx context.Context, rawCRD []byte) error
}
// newActionFactory creates a new action factory for managing helm releases.
func newActionFactory(kubeClient crdClient, lister releaseVersionLister, actionConfig *action.Configuration, cliVersion semver.Semver, log debugLog) *actionFactory {
return &actionFactory{
cliVersion: cliVersion,
versionLister: lister,
cfg: actionConfig,
kubeClient: kubeClient,
log: log,
}
}
// GetActions returns a list of actions to apply the given releases.
func (a actionFactory) GetActions(releases []Release, force, allowDestructive bool) (actions []applyAction, includesUpgrade bool, err error) {
upgradeErrs := []error{}
for _, release := range releases {
err := a.appendNewAction(release, force, allowDestructive, &actions)
var invalidUpgrade *compatibility.InvalidUpgradeError
if errors.As(err, &invalidUpgrade) {
upgradeErrs = append(upgradeErrs, err)
continue
}
if err != nil {
return actions, includesUpgrade, fmt.Errorf("creating action for %s: %w", release.ReleaseName, err)
}
}
for _, action := range actions {
if _, ok := action.(*upgradeAction); ok {
includesUpgrade = true
break
}
}
return actions, includesUpgrade, errors.Join(upgradeErrs...)
}
func (a actionFactory) appendNewAction(release Release, force, allowDestructive bool, actions *[]applyAction) error {
newVersion, err := semver.New(release.Chart.Metadata.Version)
if err != nil {
return fmt.Errorf("parsing chart version: %w", err)
}
currentVersion, err := a.versionLister.currentVersion(release.ReleaseName)
if errors.Is(err, errReleaseNotFound) {
a.log.Debugf("Release %s not found, adding to new releases...", release.ReleaseName)
*actions = append(*actions, a.newInstall(release))
return nil
}
if err != nil {
return fmt.Errorf("getting version for %s: %w", release.ReleaseName, err)
}
a.log.Debugf("Current %s version: %s", release.ReleaseName, currentVersion)
a.log.Debugf("New %s version: %s", release.ReleaseName, newVersion)
// This may break for cert-manager or cilium if we decide to upgrade more than one minor version at a time.
// Leaving it as is since it is not clear to me what kind of sanity check we could do.
if !force {
if err := newVersion.IsUpgradeTo(currentVersion); err != nil {
return fmt.Errorf("invalid upgrade for %s: %w", release.ReleaseName, err)
}
}
// at this point we conclude that the release should be upgraded. check that this CLI supports the upgrade.
if isCLIVersionedRelease(release.ReleaseName) && a.cliVersion.Compare(newVersion) != 0 {
return fmt.Errorf("this CLI only supports microservice version %s for upgrading", a.cliVersion.String())
}
if !allowDestructive &&
release.ReleaseName == certManagerInfo.releaseName {
return ErrConfirmationMissing
}
a.log.Debugf("Upgrading %s from %s to %s", release.ReleaseName, currentVersion, newVersion)
*actions = append(*actions, a.newUpgrade(release))
return nil
}
func (a actionFactory) newInstall(release Release) *installAction {
action := &installAction{helmAction: newHelmInstallAction(a.cfg, release), release: release, log: a.log}
if action.ReleaseName() == ciliumInfo.releaseName {
action.postInstall = func(ctx context.Context) error {
return ciliumPostInstall(ctx, a.log)
}
}
return action
}
func ciliumPostInstall(ctx context.Context, log debugLog) error {
log.Debugf("Waiting for Cilium to become ready")
helper, err := newK8sCiliumHelper(constants.AdminConfFilename)
if err != nil {
return fmt.Errorf("creating Kubernetes client: %w", err)
}
timeToStartWaiting := time.Now()
// TODO(3u13r): Reduce the timeout when we switched the package repository - this is only this high because we once
// saw polling times of ~16 minutes when hitting a slow PoP from Fastly (GitHub's / ghcr.io CDN).
if err := helper.WaitForDS(ctx, "kube-system", "cilium", log); err != nil {
return fmt.Errorf("waiting for Cilium to become healthy: %w", err)
}
timeUntilFinishedWaiting := time.Since(timeToStartWaiting)
log.Debugf("Cilium became healthy after %s", timeUntilFinishedWaiting.String())
log.Debugf("Fix Cilium through restart")
if err := helper.RestartDS("kube-system", "cilium"); err != nil {
return fmt.Errorf("restarting Cilium: %w", err)
}
return nil
}
func (a actionFactory) newUpgrade(release Release) *upgradeAction {
action := &upgradeAction{helmAction: newHelmUpgradeAction(a.cfg), release: release, log: a.log}
if release.ReleaseName == constellationOperatorsInfo.releaseName {
action.preUpgrade = func(ctx context.Context) error {
if err := a.updateCRDs(ctx, release.Chart); err != nil {
return fmt.Errorf("updating operator CRDs: %w", err)
}
return nil
}
}
return action
}
// updateCRDs walks through the dependencies of the given chart and applies
// the files in the dependencie's 'crds' folder.
// This function is NOT recursive!
func (a actionFactory) updateCRDs(ctx context.Context, chart *chart.Chart) error {
for _, dep := range chart.Dependencies() {
for _, crdFile := range dep.Files {
if strings.HasPrefix(crdFile.Name, "crds/") {
a.log.Debugf("Updating crd: %s", crdFile.Name)
err := a.kubeClient.ApplyCRD(ctx, crdFile.Data)
if err != nil {
return err
}
}
}
}
return nil
}
// isCLIVersionedRelease checks if the given release is versioned by the CLI,
// meaning that the version of the Helm release is equal to the version of the CLI that installed it.
func isCLIVersionedRelease(releaseName string) bool {
return releaseName == constellationOperatorsInfo.releaseName ||
releaseName == constellationServicesInfo.releaseName ||
releaseName == csiInfo.releaseName
}
// releaseVersionLister can list the versions of a helm release.
type releaseVersionLister interface {
currentVersion(release string) (semver.Semver, error)
}

View File

@ -28,6 +28,104 @@ As such, the number of exported functions should be kept minimal.
*/
package helm
import (
"context"
"fmt"
"github.com/edgelesssys/constellation/v2/cli/internal/clusterid"
"github.com/edgelesssys/constellation/v2/cli/internal/terraform"
"github.com/edgelesssys/constellation/v2/internal/config"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/kms/uri"
"github.com/edgelesssys/constellation/v2/internal/kubernetes/kubectl"
"github.com/edgelesssys/constellation/v2/internal/semver"
"github.com/edgelesssys/constellation/v2/internal/versions"
)
const (
// AllowDestructive is a named bool to signal that destructive actions have been confirmed by the user.
AllowDestructive = true
// DenyDestructive is a named bool to signal that destructive actions have not been confirmed by the user yet.
DenyDestructive = false
)
type debugLog interface {
Debugf(format string, args ...any)
Sync()
}
// Client is a Helm client to apply charts.
type Client struct {
factory *actionFactory
cliVersion semver.Semver
log debugLog
}
// NewClient returns a new Helm client.
func NewClient(kubeConfigPath string, log debugLog) (*Client, error) {
kubeClient, err := kubectl.NewFromConfig(kubeConfigPath)
if err != nil {
return nil, fmt.Errorf("initializing kubectl: %w", err)
}
actionConfig, err := newActionConfig(kubeConfigPath, log)
if err != nil {
return nil, fmt.Errorf("creating action config: %w", err)
}
lister := ReleaseVersionClient{actionConfig}
cliVersion := constants.BinaryVersion()
factory := newActionFactory(kubeClient, lister, actionConfig, cliVersion, log)
return &Client{factory, cliVersion, log}, nil
}
// Options are options for loading charts.
type Options struct {
Conformance bool
HelmWaitMode WaitMode
AllowDestructive bool
Force bool
}
// PrepareApply loads the charts and returns the executor to apply them.
// TODO(elchead): remove validK8sVersion by putting ValidK8sVersion into config.Config, see AB#3374.
func (h Client) PrepareApply(conf *config.Config, validK8sversion versions.ValidK8sVersion, idFile clusterid.File, flags Options, tfOutput terraform.ApplyOutput, serviceAccURI string, masterSecret uri.MasterSecret) (Applier, bool, error) {
releases, err := h.loadReleases(conf, masterSecret, validK8sversion, idFile, flags, tfOutput, serviceAccURI)
if err != nil {
return nil, false, fmt.Errorf("loading Helm releases: %w", err)
}
h.log.Debugf("Loaded Helm releases")
actions, includesUpgrades, err := h.factory.GetActions(releases, flags.Force, flags.AllowDestructive)
return &ChartApplyExecutor{actions: actions, log: h.log}, includesUpgrades, err
}
func (h Client) loadReleases(conf *config.Config, secret uri.MasterSecret, validK8sVersion versions.ValidK8sVersion, idFile clusterid.File, flags Options, tfOutput terraform.ApplyOutput, serviceAccURI string) ([]Release, error) {
helmLoader := newLoader(conf, idFile, validK8sVersion, h.cliVersion)
h.log.Debugf("Created new Helm loader")
return helmLoader.loadReleases(flags.Conformance, flags.HelmWaitMode, secret,
serviceAccURI, tfOutput)
}
// Applier runs the Helm actions.
type Applier interface {
Apply(ctx context.Context) error
}
// ChartApplyExecutor is a Helm action executor that applies all actions.
type ChartApplyExecutor struct {
actions []applyAction
log debugLog
}
// Apply applies the charts in order.
func (c ChartApplyExecutor) Apply(ctx context.Context) error {
for _, action := range c.actions {
c.log.Debugf("Applying %q", action.ReleaseName())
if err := action.Apply(ctx); err != nil {
return fmt.Errorf("applying %s: %w", action.ReleaseName(), err)
}
}
return nil
}
// mergeMaps returns a new map that is the merger of it's inputs.
// Key collisions are resolved by taking the value of the second argument (map b).
// Taken from: https://github.com/helm/helm/blob/dbc6d8e20fe1d58d50e6ed30f09a04a77e4c68db/pkg/cli/values/options.go#L91-L108.

View File

@ -7,9 +7,21 @@ SPDX-License-Identifier: AGPL-3.0-only
package helm
import (
"errors"
"testing"
"github.com/edgelesssys/constellation/v2/cli/internal/clusterid"
"github.com/edgelesssys/constellation/v2/cli/internal/terraform"
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
"github.com/edgelesssys/constellation/v2/internal/compatibility"
"github.com/edgelesssys/constellation/v2/internal/config"
"github.com/edgelesssys/constellation/v2/internal/kms/uri"
"github.com/edgelesssys/constellation/v2/internal/logger"
"github.com/edgelesssys/constellation/v2/internal/semver"
"github.com/edgelesssys/constellation/v2/internal/versions"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"helm.sh/helm/v3/pkg/action"
)
func TestMergeMaps(t *testing.T) {
@ -107,3 +119,145 @@ func TestMergeMaps(t *testing.T) {
})
}
}
func TestHelmApply(t *testing.T) {
cliVersion := semver.NewFromInt(1, 99, 0, "")
csp := cloudprovider.AWS // using AWS since it has an additional chart: aws-load-balancer-controller
microserviceCharts := []string{
"constellation-services",
"constellation-operators",
"constellation-csi",
}
testCases := map[string]struct {
clusterMicroServiceVersion string
expectedActions []string
expectUpgrade bool
clusterCertManagerVersion *string
clusterAWSLBVersion *string
allowDestructive bool
expectError bool
}{
"CLI microservices are 1 minor version newer than cluster ones": {
clusterMicroServiceVersion: "v1.98.1",
expectedActions: microserviceCharts,
expectUpgrade: true,
},
"CLI microservices are 2 minor versions newer than cluster ones": {
clusterMicroServiceVersion: "v1.97.0",
expectedActions: []string{},
},
"cluster microservices are newer than CLI": {
clusterMicroServiceVersion: "v1.100.0",
},
"cluster and CLI microservices have the same version": {
clusterMicroServiceVersion: "v1.99.0",
expectedActions: []string{},
},
"cert-manager upgrade is ignored when denying destructive upgrades": {
clusterMicroServiceVersion: "v1.99.0",
clusterCertManagerVersion: toPtr("v1.9.0"),
allowDestructive: false,
expectError: true,
},
"both microservices and cert-manager are upgraded in destructive mode": {
clusterMicroServiceVersion: "v1.98.1",
clusterCertManagerVersion: toPtr("v1.9.0"),
expectedActions: append(microserviceCharts, "cert-manager"),
expectUpgrade: true,
allowDestructive: true,
},
"only missing aws-load-balancer-controller is installed": {
clusterMicroServiceVersion: "v1.99.0",
clusterAWSLBVersion: toPtr(""),
expectedActions: []string{"aws-load-balancer-controller"},
},
}
cfg := config.Default()
cfg.RemoveProviderAndAttestationExcept(csp)
log := logger.NewTest(t)
options := Options{
Conformance: false,
HelmWaitMode: WaitModeWait,
AllowDestructive: true,
Force: false,
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
lister := &ReleaseVersionStub{}
sut := Client{
factory: newActionFactory(nil, lister, &action.Configuration{}, cliVersion, log),
log: log,
cliVersion: cliVersion,
}
awsLbVersion := "v1.5.4" // current version
if tc.clusterAWSLBVersion != nil {
awsLbVersion = *tc.clusterAWSLBVersion
}
certManagerVersion := "v1.10.0" // current version
if tc.clusterCertManagerVersion != nil {
certManagerVersion = *tc.clusterCertManagerVersion
}
helmListVersion(lister, "cilium", "v1.12.1")
helmListVersion(lister, "cert-manager", certManagerVersion)
helmListVersion(lister, "constellation-services", tc.clusterMicroServiceVersion)
helmListVersion(lister, "constellation-operators", tc.clusterMicroServiceVersion)
helmListVersion(lister, "constellation-csi", tc.clusterMicroServiceVersion)
helmListVersion(lister, "aws-load-balancer-controller", awsLbVersion)
options.AllowDestructive = tc.allowDestructive
ex, includesUpgrade, err := sut.PrepareApply(cfg, versions.ValidK8sVersion("v1.27.4"),
clusterid.File{UID: "testuid", MeasurementSalt: []byte("measurementSalt")}, options,
fakeTerraformOutput(csp), fakeServiceAccURI(csp),
uri.MasterSecret{Key: []byte("secret"), Salt: []byte("masterSalt")})
var upgradeErr *compatibility.InvalidUpgradeError
if tc.expectError {
assert.Error(t, err)
} else {
assert.True(t, err == nil || errors.As(err, &upgradeErr))
}
assert.Equal(t, tc.expectUpgrade, includesUpgrade)
chartExecutor, ok := ex.(*ChartApplyExecutor)
assert.True(t, ok)
assert.ElementsMatch(t, tc.expectedActions, getActionReleaseNames(chartExecutor.actions))
})
}
}
func fakeTerraformOutput(csp cloudprovider.Provider) terraform.ApplyOutput {
switch csp {
case cloudprovider.AWS:
return terraform.ApplyOutput{}
case cloudprovider.GCP:
return terraform.ApplyOutput{GCP: &terraform.GCPApplyOutput{}}
default:
panic("invalid csp")
}
}
func getActionReleaseNames(actions []applyAction) []string {
releaseActionNames := []string{}
for _, action := range actions {
releaseActionNames = append(releaseActionNames, action.ReleaseName())
}
return releaseActionNames
}
func helmListVersion(l *ReleaseVersionStub, releaseName string, installedVersion string) {
if installedVersion == "" {
l.On("currentVersion", releaseName).Return(semver.Semver{}, errReleaseNotFound)
return
}
v, _ := semver.New(installedVersion)
l.On("currentVersion", releaseName).Return(v, nil)
}
type ReleaseVersionStub struct {
mock.Mock
}
func (s *ReleaseVersionStub) currentVersion(release string) (semver.Semver, error) {
args := s.Called(release)
return args.Get(0).(semver.Semver), args.Error(1)
}

View File

@ -1,110 +0,0 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package helm
import (
"context"
"fmt"
"time"
"github.com/edgelesssys/constellation/v2/internal/constants"
)
// InitializationClient installs all Helm charts required for a Constellation cluster.
type InitializationClient struct {
log debugLog
installer installer
}
// NewInitializer creates a new client to install all Helm charts required for a constellation cluster.
func NewInitializer(log debugLog, adminConfPath string) (*InitializationClient, error) {
installer, err := NewInstaller(adminConfPath, log)
if err != nil {
return nil, fmt.Errorf("creating Helm installer: %w", err)
}
return &InitializationClient{log: log, installer: installer}, nil
}
// Install installs all Helm charts required for a constellation cluster.
func (i InitializationClient) Install(ctx context.Context, releases *Releases) error {
if err := i.installer.InstallChart(ctx, releases.Cilium); err != nil {
return fmt.Errorf("installing Cilium: %w", err)
}
i.log.Debugf("Waiting for Cilium to become ready")
helper, err := newK8sCiliumHelper(constants.AdminConfFilename)
if err != nil {
return fmt.Errorf("creating Kubernetes client: %w", err)
}
timeToStartWaiting := time.Now()
// TODO(3u13r): Reduce the timeout when we switched the package repository - this is only this high because we once
// saw polling times of ~16 minutes when hitting a slow PoP from Fastly (GitHub's / ghcr.io CDN).
if err := helper.WaitForDS(ctx, "kube-system", "cilium", i.log); err != nil {
return fmt.Errorf("waiting for Cilium to become healthy: %w", err)
}
timeUntilFinishedWaiting := time.Since(timeToStartWaiting)
i.log.Debugf("Cilium became healthy after %s", timeUntilFinishedWaiting.String())
i.log.Debugf("Fix Cilium through restart")
if err := helper.RestartDS("kube-system", "cilium"); err != nil {
return fmt.Errorf("restarting Cilium: %w", err)
}
i.log.Debugf("Installing microservices")
if err := i.installer.InstallChart(ctx, releases.ConstellationServices); err != nil {
return fmt.Errorf("installing microservices: %w", err)
}
i.log.Debugf("Installing cert-manager")
if err := i.installer.InstallChart(ctx, releases.CertManager); err != nil {
return fmt.Errorf("installing cert-manager: %w", err)
}
if releases.CSI != nil {
i.log.Debugf("Installing CSI deployments")
if err := i.installer.InstallChart(ctx, *releases.CSI); err != nil {
return fmt.Errorf("installing CSI snapshot CRDs: %w", err)
}
}
if releases.AWSLoadBalancerController != nil {
i.log.Debugf("Installing AWS Load Balancer Controller")
if err := i.installer.InstallChart(ctx, *releases.AWSLoadBalancerController); err != nil {
return fmt.Errorf("installing AWS Load Balancer Controller: %w", err)
}
}
i.log.Debugf("Installing constellation operators")
if err := i.installer.InstallChart(ctx, releases.ConstellationOperators); err != nil {
return fmt.Errorf("installing constellation operators: %w", err)
}
return nil
}
// installer is the interface for installing a single Helm chart.
type installer interface {
InstallChart(context.Context, Release) error
InstallChartWithValues(ctx context.Context, release Release, extraValues map[string]any) error
}
type cloudConfig struct {
Cloud string `json:"cloud,omitempty"`
TenantID string `json:"tenantId,omitempty"`
SubscriptionID string `json:"subscriptionId,omitempty"`
ResourceGroup string `json:"resourceGroup,omitempty"`
Location string `json:"location,omitempty"`
SubnetName string `json:"subnetName,omitempty"`
SecurityGroupName string `json:"securityGroupName,omitempty"`
SecurityGroupResourceGroup string `json:"securityGroupResourceGroup,omitempty"`
LoadBalancerName string `json:"loadBalancerName,omitempty"`
LoadBalancerSku string `json:"loadBalancerSku,omitempty"`
VNetName string `json:"vnetName,omitempty"`
VNetResourceGroup string `json:"vnetResourceGroup,omitempty"`
CloudProviderBackoff bool `json:"cloudProviderBackoff,omitempty"`
UseInstanceMetadata bool `json:"useInstanceMetadata,omitempty"`
VMType string `json:"vmType,omitempty"`
UseManagedIdentityExtension bool `json:"useManagedIdentityExtension,omitempty"`
UserAssignedIdentityID string `json:"userAssignedIdentityID,omitempty"`
}

View File

@ -1,151 +0,0 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package helm
import (
"context"
"fmt"
"strings"
"time"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/retry"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/cli"
"k8s.io/apimachinery/pkg/util/wait"
)
const (
// timeout is the maximum time given to the helm Installer.
timeout = 10 * time.Minute
// maximumRetryAttempts is the maximum number of attempts to retry a helm install.
maximumRetryAttempts = 3
)
type debugLog interface {
Debugf(format string, args ...any)
Sync()
}
// Installer is a wrapper for a helm install action.
type Installer struct {
*action.Install
log debugLog
}
// NewInstaller creates a new Installer with the given logger.
func NewInstaller(kubeconfig string, logger debugLog) (*Installer, error) {
settings := cli.New()
settings.KubeConfig = kubeconfig
actionConfig := &action.Configuration{}
if err := actionConfig.Init(settings.RESTClientGetter(), constants.HelmNamespace,
"secret", logger.Debugf); err != nil {
return nil, err
}
action := action.NewInstall(actionConfig)
action.Namespace = constants.HelmNamespace
action.Timeout = timeout
return &Installer{
Install: action,
log: logger,
}, nil
}
// InstallChart is the generic install function for helm charts.
func (h *Installer) InstallChart(ctx context.Context, release Release) error {
return h.InstallChartWithValues(ctx, release, nil)
}
// InstallChartWithValues is the generic install function for helm charts with custom values.
func (h *Installer) InstallChartWithValues(ctx context.Context, release Release, extraValues map[string]any) error {
mergedVals := mergeMaps(release.Values, extraValues)
h.ReleaseName = release.ReleaseName
if err := h.SetWaitMode(release.WaitMode); err != nil {
return err
}
return h.install(ctx, release.Chart, mergedVals)
}
// install tries to install the given chart and aborts after ~5 tries.
// The function will wait 30 seconds before retrying a failed installation attempt.
// After 3 tries, the retrier will be canceled and the function returns with an error.
func (h *Installer) install(ctx context.Context, chart *chart.Chart, values map[string]any) error {
var retries int
retriable := func(err error) bool {
// abort after maximumRetryAttempts tries.
if retries >= maximumRetryAttempts {
return false
}
retries++
// only retry if atomic is set
// otherwise helm doesn't uninstall
// the release on failure
if !h.Atomic {
return false
}
// check if error is retriable
return wait.Interrupted(err) ||
strings.Contains(err.Error(), "connection refused")
}
doer := installDoer{
h,
chart,
values,
h.log,
}
retrier := retry.NewIntervalRetrier(doer, 30*time.Second, retriable)
retryLoopStartTime := time.Now()
if err := retrier.Do(ctx); err != nil {
return fmt.Errorf("helm install: %w", err)
}
retryLoopFinishDuration := time.Since(retryLoopStartTime)
h.log.Debugf("Helm chart %q installation finished after %s", chart.Name(), retryLoopFinishDuration)
return nil
}
// SetWaitMode sets the wait mode of the installer.
func (h *Installer) SetWaitMode(waitMode WaitMode) error {
switch waitMode {
case WaitModeNone:
h.Wait = false
h.Atomic = false
case WaitModeWait:
h.Wait = true
h.Atomic = false
case WaitModeAtomic:
h.Wait = true
h.Atomic = true
default:
return fmt.Errorf("unknown wait mode %q", waitMode)
}
return nil
}
// installDoer is a help struct to enable retrying helm's install action.
type installDoer struct {
Installer *Installer
chart *chart.Chart
values map[string]any
log debugLog
}
// Do logs which chart is installed and tries to install it.
func (i installDoer) Do(ctx context.Context) error {
i.log.Debugf("Trying to install Helm chart %s", i.chart.Name())
if _, err := i.Installer.RunWithContext(ctx, i.chart, i.values); err != nil {
i.log.Debugf("Helm chart installation %s failed: %v", i.chart.Name(), err)
return err
}
return nil
}

View File

@ -57,9 +57,10 @@ var (
csiInfo = chartInfo{releaseName: "constellation-csi", chartName: "constellation-csi", path: "charts/edgeless/csi"}
)
// ChartLoader loads embedded helm charts.
type ChartLoader struct {
// chartLoader loads embedded helm charts.
type chartLoader struct {
csp cloudprovider.Provider
config *config.Config
joinServiceImage string
keyServiceImage string
ccmImage string // cloud controller manager image
@ -71,11 +72,16 @@ type ChartLoader struct {
constellationOperatorImage string
nodeMaintenanceOperatorImage string
clusterName string
idFile clusterid.File
cliVersion semver.Semver
}
// NewLoader creates a new ChartLoader.
func NewLoader(csp cloudprovider.Provider, k8sVersion versions.ValidK8sVersion, clusterName string) *ChartLoader {
// newLoader creates a new ChartLoader.
func newLoader(config *config.Config, idFile clusterid.File, k8sVersion versions.ValidK8sVersion, cliVersion semver.Semver) *chartLoader {
// TODO(malt3): Allow overriding container image registry + prefix for all images
// (e.g. for air-gapped environments).
var ccmImage, cnmImage string
csp := config.GetProvider()
switch csp {
case cloudprovider.AWS:
ccmImage = versions.VersionConfigs[k8sVersion].CloudControllerManagerImageAWS
@ -87,35 +93,39 @@ func NewLoader(csp cloudprovider.Provider, k8sVersion versions.ValidK8sVersion,
case cloudprovider.OpenStack:
ccmImage = versions.VersionConfigs[k8sVersion].CloudControllerManagerImageOpenStack
}
// TODO(malt3): Allow overriding container image registry + prefix for all images
// (e.g. for air-gapped environments).
return &ChartLoader{
return &chartLoader{
cliVersion: cliVersion,
csp: csp,
joinServiceImage: imageversion.JoinService("", ""),
keyServiceImage: imageversion.KeyService("", ""),
idFile: idFile,
ccmImage: ccmImage,
azureCNMImage: cnmImage,
config: config,
joinServiceImage: imageversion.JoinService("", ""),
keyServiceImage: imageversion.KeyService("", ""),
autoscalerImage: versions.VersionConfigs[k8sVersion].ClusterAutoscalerImage,
verificationServiceImage: imageversion.VerificationService("", ""),
gcpGuestAgentImage: versions.GcpGuestImage,
konnectivityImage: versions.KonnectivityAgentImage,
constellationOperatorImage: imageversion.ConstellationNodeOperator("", ""),
nodeMaintenanceOperatorImage: versions.NodeMaintenanceOperatorImage,
clusterName: clusterName,
}
}
// LoadReleases loads the embedded helm charts and returns them as a HelmReleases object.
func (i *ChartLoader) LoadReleases(
config *config.Config, conformanceMode bool, helmWaitMode WaitMode, masterSecret uri.MasterSecret,
serviceAccURI string, idFile clusterid.File, output terraform.ApplyOutput,
) (*Releases, error) {
// releaseApplyOrder is a list of releases in the order they should be applied.
// makes sure if a release was removed as a dependency from one chart,
// and then added as a new standalone chart (or as a dependency of another chart),
// that the new release is installed after the existing one to avoid name conflicts.
type releaseApplyOrder []Release
// loadReleases loads the embedded helm charts and returns them as a HelmReleases object.
func (i *chartLoader) loadReleases(conformanceMode bool, helmWaitMode WaitMode, masterSecret uri.MasterSecret,
serviceAccURI string, output terraform.ApplyOutput,
) (releaseApplyOrder, error) {
ciliumRelease, err := i.loadRelease(ciliumInfo, helmWaitMode)
if err != nil {
return nil, fmt.Errorf("loading cilium: %w", err)
}
ciliumVals := extraCiliumValues(config.GetProvider(), conformanceMode, output)
ciliumVals := extraCiliumValues(i.config.GetProvider(), conformanceMode, output)
ciliumRelease.Values = mergeMaps(ciliumRelease.Values, ciliumVals)
certManagerRelease, err := i.loadRelease(certManagerInfo, helmWaitMode)
@ -127,46 +137,47 @@ func (i *ChartLoader) LoadReleases(
if err != nil {
return nil, fmt.Errorf("loading operators: %w", err)
}
operatorRelease.Values = mergeMaps(operatorRelease.Values, extraOperatorValues(idFile.UID))
operatorRelease.Values = mergeMaps(operatorRelease.Values, extraOperatorValues(i.idFile.UID))
conServicesRelease, err := i.loadRelease(constellationServicesInfo, helmWaitMode)
if err != nil {
return nil, fmt.Errorf("loading constellation-services: %w", err)
}
svcVals, err := extraConstellationServicesValues(config, masterSecret, idFile.UID, serviceAccURI, output)
svcVals, err := extraConstellationServicesValues(i.config, masterSecret, i.idFile.UID, serviceAccURI, output)
if err != nil {
return nil, fmt.Errorf("extending constellation-services values: %w", err)
}
conServicesRelease.Values = mergeMaps(conServicesRelease.Values, svcVals)
releases := Releases{Cilium: ciliumRelease, CertManager: certManagerRelease, ConstellationOperators: operatorRelease, ConstellationServices: conServicesRelease}
if config.HasProvider(cloudprovider.AWS) {
awsRelease, err := i.loadRelease(awsLBControllerInfo, helmWaitMode)
if err != nil {
return nil, fmt.Errorf("loading aws-services: %w", err)
}
releases.AWSLoadBalancerController = &awsRelease
}
if config.DeployCSIDriver() {
releases := releaseApplyOrder{ciliumRelease, conServicesRelease, certManagerRelease}
if i.config.DeployCSIDriver() {
csiRelease, err := i.loadRelease(csiInfo, helmWaitMode)
if err != nil {
return nil, fmt.Errorf("loading snapshot CRDs: %w", err)
}
extraCSIvals, err := extraCSIValues(config.GetProvider(), serviceAccURI)
extraCSIvals, err := extraCSIValues(i.config.GetProvider(), serviceAccURI)
if err != nil {
return nil, fmt.Errorf("extending CSI values: %w", err)
}
csiRelease.Values = mergeMaps(csiRelease.Values, extraCSIvals)
releases.CSI = &csiRelease
releases = append(releases, csiRelease)
}
return &releases, nil
if i.config.HasProvider(cloudprovider.AWS) {
awsRelease, err := i.loadRelease(awsLBControllerInfo, helmWaitMode)
if err != nil {
return nil, fmt.Errorf("loading aws-services: %w", err)
}
releases = append(releases, awsRelease)
}
releases = append(releases, operatorRelease)
return releases, nil
}
// loadRelease loads the embedded chart and values depending on the given info argument.
// IMPORTANT: .helmignore rules specifying files in subdirectories are not applied (e.g. crds/kustomization.yaml).
func (i *ChartLoader) loadRelease(info chartInfo, helmWaitMode WaitMode) (Release, error) {
func (i *chartLoader) loadRelease(info chartInfo, helmWaitMode WaitMode) (Release, error) {
chart, err := loadChartsDir(helmFS, info.path)
if err != nil {
return Release{}, fmt.Errorf("loading %s chart: %w", info.releaseName, err)
@ -184,23 +195,23 @@ func (i *ChartLoader) loadRelease(info chartInfo, helmWaitMode WaitMode) (Releas
case certManagerInfo.releaseName:
values = i.loadCertManagerValues()
case constellationOperatorsInfo.releaseName:
updateVersions(chart, constants.BinaryVersion())
updateVersions(chart, i.cliVersion)
values = i.loadOperatorsValues()
case constellationServicesInfo.releaseName:
updateVersions(chart, constants.BinaryVersion())
updateVersions(chart, i.cliVersion)
values = i.loadConstellationServicesValues()
case awsLBControllerInfo.releaseName:
values = i.loadAWSLBControllerValues()
case csiInfo.releaseName:
updateVersions(chart, constants.BinaryVersion())
updateVersions(chart, i.cliVersion)
values = i.loadCSIValues()
}
return Release{Chart: chart, Values: values, ReleaseName: info.releaseName, WaitMode: helmWaitMode}, nil
}
func (i *ChartLoader) loadAWSLBControllerValues() map[string]any {
func (i *chartLoader) loadAWSLBControllerValues() map[string]any {
return map[string]any{
"clusterName": i.clusterName,
"clusterName": clusterid.GetClusterName(i.config, i.idFile),
"tolerations": controlPlaneTolerations,
"nodeSelector": controlPlaneNodeSelector,
}
@ -208,7 +219,7 @@ func (i *ChartLoader) loadAWSLBControllerValues() map[string]any {
// loadCertManagerHelper is used to separate the marshalling step from the loading step.
// This reduces the time unit tests take to execute.
func (i *ChartLoader) loadCertManagerValues() map[string]any {
func (i *chartLoader) loadCertManagerValues() map[string]any {
return map[string]any{
"installCRDs": true,
"prometheus": map[string]any{
@ -233,7 +244,7 @@ func (i *ChartLoader) loadCertManagerValues() map[string]any {
// loadOperatorsHelper is used to separate the marshalling step from the loading step.
// This reduces the time unit tests take to execute.
func (i *ChartLoader) loadOperatorsValues() map[string]any {
func (i *chartLoader) loadOperatorsValues() map[string]any {
return map[string]any{
"constellation-operator": map[string]any{
"controllerManager": map[string]any{
@ -256,7 +267,7 @@ func (i *ChartLoader) loadOperatorsValues() map[string]any {
// loadConstellationServicesHelper is used to separate the marshalling step from the loading step.
// This reduces the time unit tests take to execute.
func (i *ChartLoader) loadConstellationServicesValues() map[string]any {
func (i *chartLoader) loadConstellationServicesValues() map[string]any {
return map[string]any{
"global": map[string]any{
"keyServicePort": constants.KeyServicePort,
@ -299,13 +310,13 @@ func (i *ChartLoader) loadConstellationServicesValues() map[string]any {
}
}
func (i *ChartLoader) loadCSIValues() map[string]any {
func (i *chartLoader) loadCSIValues() map[string]any {
return map[string]any{
"tags": i.cspTags(),
}
}
func (i *ChartLoader) cspTags() map[string]any {
func (i *chartLoader) cspTags() map[string]any {
return map[string]any{
i.csp.String(): true,
}

View File

@ -30,6 +30,8 @@ import (
"github.com/edgelesssys/constellation/v2/internal/cloud/gcpshared"
"github.com/edgelesssys/constellation/v2/internal/config"
"github.com/edgelesssys/constellation/v2/internal/kms/uri"
"github.com/edgelesssys/constellation/v2/internal/semver"
"github.com/edgelesssys/constellation/v2/internal/versions"
)
func fakeServiceAccURI(provider cloudprovider.Provider) string {
@ -65,26 +67,34 @@ func TestLoadReleases(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
config := &config.Config{Provider: config.ProviderConfig{GCP: &config.GCPConfig{}}}
chartLoader := ChartLoader{csp: config.GetProvider()}
helmReleases, err := chartLoader.LoadReleases(
config, true, WaitModeAtomic,
k8sVersion := versions.ValidK8sVersion("v1.27.4")
chartLoader := newLoader(config, clusterid.File{UID: "testuid", MeasurementSalt: []byte("measurementSalt")},
k8sVersion, semver.NewFromInt(2, 10, 0, ""))
helmReleases, err := chartLoader.loadReleases(
true, WaitModeAtomic,
uri.MasterSecret{Key: []byte("secret"), Salt: []byte("masterSalt")},
fakeServiceAccURI(cloudprovider.GCP), clusterid.File{UID: "testuid"}, terraform.ApplyOutput{GCP: &terraform.GCPApplyOutput{}},
fakeServiceAccURI(cloudprovider.GCP), terraform.ApplyOutput{GCP: &terraform.GCPApplyOutput{}},
)
require.NoError(err)
chart := helmReleases.ConstellationServices.Chart
assert.NotNil(chart.Dependencies())
for _, release := range helmReleases {
if release.ReleaseName == constellationServicesInfo.releaseName {
assert.NotNil(release.Chart.Dependencies())
}
}
}
func TestLoadAWSLoadBalancerValues(t *testing.T) {
sut := ChartLoader{
sut := chartLoader{
config: &config.Config{Name: "testCluster"},
clusterName: "testCluster",
idFile: clusterid.File{UID: "testuid"},
}
val := sut.loadAWSLBControllerValues()
assert.Equal(t, "testCluster", val["clusterName"])
assert.Equal(t, "testCluster-testuid", val["clusterName"])
// needs to run on control-plane
assert.Contains(t, val["nodeSelector"].(map[string]any), "node-role.kubernetes.io/control-plane")
assert.Contains(t, val["tolerations"].([]map[string]any), map[string]any{"key": "node-role.kubernetes.io/control-plane", "operator": "Exists", "effect": "NoSchedule"})
assert.Contains(t, val["tolerations"].([]map[string]any),
map[string]any{"key": "node-role.kubernetes.io/control-plane", "operator": "Exists", "effect": "NoSchedule"})
}
// TestConstellationServices checks if the rendered constellation-services chart produces the expected yaml files.
@ -146,7 +156,7 @@ func TestConstellationServices(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
chartLoader := ChartLoader{
chartLoader := chartLoader{
csp: tc.config.GetProvider(),
joinServiceImage: "joinServiceImage",
keyServiceImage: "keyServiceImage",
@ -239,7 +249,7 @@ func TestOperators(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
chartLoader := ChartLoader{
chartLoader := chartLoader{
csp: tc.csp,
joinServiceImage: "joinServiceImage",
keyServiceImage: "keyServiceImage",

View File

@ -125,6 +125,27 @@ func extraConstellationServicesValues(
return extraVals, nil
}
// cloudConfig is used to marshal the cloud config for the Kubernetes Cloud Controller Manager on Azure.
type cloudConfig struct {
Cloud string `json:"cloud,omitempty"`
TenantID string `json:"tenantId,omitempty"`
SubscriptionID string `json:"subscriptionId,omitempty"`
ResourceGroup string `json:"resourceGroup,omitempty"`
Location string `json:"location,omitempty"`
SubnetName string `json:"subnetName,omitempty"`
SecurityGroupName string `json:"securityGroupName,omitempty"`
SecurityGroupResourceGroup string `json:"securityGroupResourceGroup,omitempty"`
LoadBalancerName string `json:"loadBalancerName,omitempty"`
LoadBalancerSku string `json:"loadBalancerSku,omitempty"`
VNetName string `json:"vnetName,omitempty"`
VNetResourceGroup string `json:"vnetResourceGroup,omitempty"`
CloudProviderBackoff bool `json:"cloudProviderBackoff,omitempty"`
UseInstanceMetadata bool `json:"useInstanceMetadata,omitempty"`
VMType string `json:"vmType,omitempty"`
UseManagedIdentityExtension bool `json:"useManagedIdentityExtension,omitempty"`
UserAssignedIdentityID string `json:"userAssignedIdentityID,omitempty"`
}
// getCCMConfig returns the configuration needed for the Kubernetes Cloud Controller Manager on Azure.
func getCCMConfig(tfOutput terraform.AzureApplyOutput, serviceAccURI string) ([]byte, error) {
creds, err := azureshared.ApplicationCredentialsFromURI(serviceAccURI)

View File

@ -17,16 +17,6 @@ type Release struct {
WaitMode WaitMode
}
// Releases bundles all helm releases to be deployed to Constellation.
type Releases struct {
AWSLoadBalancerController *Release
CSI *Release
Cilium Release
CertManager Release
ConstellationOperators Release
ConstellationServices Release
}
// WaitMode specifies the wait mode for a helm release.
type WaitMode string

View File

@ -0,0 +1,79 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package helm
import (
"context"
"fmt"
"strings"
"time"
"github.com/edgelesssys/constellation/v2/internal/retry"
"k8s.io/apimachinery/pkg/util/wait"
)
const (
// maximumRetryAttempts is the maximum number of attempts to retry a helm install.
maximumRetryAttempts = 3
)
type retrieableApplier interface {
apply(context.Context) error
ReleaseName() string
IsAtomic() bool
}
// retryApply retries the given retriable action.
func retryApply(ctx context.Context, action retrieableApplier, log debugLog) error {
var retries int
retriable := func(err error) bool {
// abort after maximumRetryAttempts tries.
if retries >= maximumRetryAttempts {
return false
}
retries++
// only retry if atomic is set
// otherwise helm doesn't uninstall
// the release on failure
if !action.IsAtomic() {
return false
}
// check if error is retriable
return wait.Interrupted(err) ||
strings.Contains(err.Error(), "connection refused")
}
doer := applyDoer{
action,
log,
}
retrier := retry.NewIntervalRetrier(doer, 30*time.Second, retriable)
retryLoopStartTime := time.Now()
if err := retrier.Do(ctx); err != nil {
return fmt.Errorf("helm install: %w", err)
}
retryLoopFinishDuration := time.Since(retryLoopStartTime)
log.Debugf("Helm chart %q installation finished after %s", action.ReleaseName(), retryLoopFinishDuration)
return nil
}
// applyDoer is a helper struct to enable retrying helm actions.
type applyDoer struct {
Applier retrieableApplier
log debugLog
}
// Do tries to apply the action.
func (i applyDoer) Do(ctx context.Context) error {
i.log.Debugf("Trying to apply Helm chart %s", i.Applier.ReleaseName())
if err := i.Applier.apply(ctx); err != nil {
i.log.Debugf("Helm chart installation %s failed: %v", i.Applier.ReleaseName(), err)
return err
}
return nil
}

View File

@ -1,424 +0,0 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package helm
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/edgelesssys/constellation/v2/cli/internal/clusterid"
"github.com/edgelesssys/constellation/v2/cli/internal/terraform"
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
"github.com/edgelesssys/constellation/v2/internal/compatibility"
"github.com/edgelesssys/constellation/v2/internal/config"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/kms/uri"
"github.com/edgelesssys/constellation/v2/internal/semver"
"github.com/edgelesssys/constellation/v2/internal/versions"
"github.com/spf13/afero"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/cli"
"helm.sh/helm/v3/pkg/release"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)
const (
// AllowDestructive is a named bool to signal that destructive actions have been confirmed by the user.
AllowDestructive = true
// DenyDestructive is a named bool to signal that destructive actions have not been confirmed by the user yet.
DenyDestructive = false
)
// ErrConfirmationMissing signals that an action requires user confirmation.
var ErrConfirmationMissing = errors.New("action requires user confirmation")
var errReleaseNotFound = errors.New("release not found")
// UpgradeClient handles interaction with helm and the cluster.
type UpgradeClient struct {
config *action.Configuration
kubectl crdClient
fs file.Handler
actions actionWrapper
log debugLog
}
// NewUpgradeClient returns a newly initialized UpgradeClient for the given namespace.
func NewUpgradeClient(client crdClient, kubeConfigPath, helmNamespace string, log debugLog) (*UpgradeClient, error) {
settings := cli.New()
settings.KubeConfig = kubeConfigPath
actionConfig := &action.Configuration{}
if err := actionConfig.Init(settings.RESTClientGetter(), helmNamespace, "secret", log.Debugf); err != nil {
return nil, fmt.Errorf("initializing config: %w", err)
}
fileHandler := file.NewHandler(afero.NewOsFs())
kubeconfig, err := fileHandler.Read(kubeConfigPath)
if err != nil {
return nil, fmt.Errorf("reading gce config: %w", err)
}
if err := client.Initialize(kubeconfig); err != nil {
return nil, fmt.Errorf("initializing kubectl: %w", err)
}
return &UpgradeClient{
kubectl: client,
fs: fileHandler,
actions: actions{config: actionConfig},
log: log,
}, nil
}
func (c *UpgradeClient) shouldUpgrade(releaseName string, newVersion semver.Semver, force bool) error {
currentVersion, err := c.currentVersion(releaseName)
if err != nil {
return fmt.Errorf("getting version for %s: %w", releaseName, err)
}
c.log.Debugf("Current %s version: %s", releaseName, currentVersion)
c.log.Debugf("New %s version: %s", releaseName, newVersion)
// This may break for cert-manager or cilium if we decide to upgrade more than one minor version at a time.
// Leaving it as is since it is not clear to me what kind of sanity check we could do.
if !force {
if err := newVersion.IsUpgradeTo(currentVersion); err != nil {
return err
}
}
// at this point we conclude that the release should be upgraded. check that this CLI supports the upgrade.
cliVersion := constants.BinaryVersion()
if isCLIVersionedRelease(releaseName) && cliVersion.Compare(newVersion) != 0 {
return fmt.Errorf("this CLI only supports microservice version %s for upgrading", cliVersion.String())
}
c.log.Debugf("Upgrading %s from %s to %s", releaseName, currentVersion, newVersion)
return nil
}
// Upgrade runs a helm-upgrade on all deployments that are managed via Helm.
// If the CLI receives an interrupt signal it will cancel the context.
// Canceling the context will prompt helm to abort and roll back the ongoing upgrade.
func (c *UpgradeClient) Upgrade(ctx context.Context, config *config.Config, idFile clusterid.File, timeout time.Duration,
allowDestructive, force bool, upgradeDir string, conformance bool, helmWaitMode WaitMode, masterSecret uri.MasterSecret,
serviceAccURI string, validK8sVersion versions.ValidK8sVersion, output terraform.ApplyOutput,
) error {
upgradeErrs := []error{}
upgradeReleases := []Release{}
newReleases := []Release{}
clusterName := clusterid.GetClusterName(config, idFile)
helmLoader := NewLoader(config.GetProvider(), validK8sVersion, clusterName)
c.log.Debugf("Created new Helm loader")
releases, err := helmLoader.LoadReleases(config, conformance, helmWaitMode, masterSecret, serviceAccURI, idFile, output)
if err != nil {
return fmt.Errorf("loading releases: %w", err)
}
for _, release := range getManagedReleases(config, releases) {
var invalidUpgrade *compatibility.InvalidUpgradeError
// Get version of the chart embedded in the CLI
// This is the version we are upgrading to
// Since our bundled charts are embedded with version 0.0.0,
// we need to update them to the same version as the CLI
var upgradeVersion semver.Semver
if isCLIVersionedRelease(release.ReleaseName) {
updateVersions(release.Chart, constants.BinaryVersion())
upgradeVersion = config.MicroserviceVersion
} else {
chartVersion, err := semver.New(release.Chart.Metadata.Version)
if err != nil {
return fmt.Errorf("parsing chart version: %w", err)
}
upgradeVersion = chartVersion
}
err = c.shouldUpgrade(release.ReleaseName, upgradeVersion, force)
switch {
case errors.Is(err, errReleaseNotFound):
// if the release is not found, we need to install it
c.log.Debugf("Release %s not found, adding to new releases...", release.ReleaseName)
newReleases = append(newReleases, release)
case errors.As(err, &invalidUpgrade):
c.log.Debugf("Appending to %s upgrade: %s", release.ReleaseName, err)
upgradeErrs = append(upgradeErrs, fmt.Errorf("skipping %s upgrade: %w", release.ReleaseName, err))
case err != nil:
return fmt.Errorf("should upgrade %s: %w", release.ReleaseName, err)
case err == nil:
c.log.Debugf("Adding %s to upgrade releases...", release.ReleaseName)
upgradeReleases = append(upgradeReleases, release)
// Check if installing/upgrading the chart could be destructive
// If so, we don't want to perform any actions,
// unless the user confirms it to be OK.
if !allowDestructive &&
release.ReleaseName == certManagerInfo.releaseName {
return ErrConfirmationMissing
}
}
}
// Backup CRDs and CRs if we are upgrading anything.
if len(upgradeReleases) != 0 {
c.log.Debugf("Creating backup of CRDs and CRs")
crds, err := c.backupCRDs(ctx, upgradeDir)
if err != nil {
return fmt.Errorf("creating CRD backup: %w", err)
}
if err := c.backupCRs(ctx, crds, upgradeDir); err != nil {
return fmt.Errorf("creating CR backup: %w", err)
}
}
for _, release := range upgradeReleases {
c.log.Debugf("Upgrading release %s", release.Chart.Metadata.Name)
if release.ReleaseName == constellationOperatorsInfo.releaseName {
if err := c.updateCRDs(ctx, release.Chart); err != nil {
return fmt.Errorf("updating operator CRDs: %w", err)
}
}
if err := c.upgradeRelease(ctx, timeout, release); err != nil {
return fmt.Errorf("upgrading %s: %w", release.Chart.Metadata.Name, err)
}
}
// Install new releases after upgrading existing ones.
// This makes sure if a release was removed as a dependency from one chart,
// and then added as a new standalone chart (or as a dependency of another chart),
// that the new release is installed without creating naming conflicts.
// If in the future, we require to install a new release before upgrading existing ones,
// it should be done in a separate loop, instead of moving this one up.
for _, release := range newReleases {
c.log.Debugf("Installing new release %s", release.Chart.Metadata.Name)
if err := c.installNewRelease(ctx, timeout, release); err != nil {
return fmt.Errorf("upgrading %s: %w", release.Chart.Metadata.Name, err)
}
}
return errors.Join(upgradeErrs...)
}
func getManagedReleases(config *config.Config, releases *Releases) []Release {
res := []Release{releases.Cilium, releases.CertManager, releases.ConstellationOperators, releases.ConstellationServices}
if config.GetProvider() == cloudprovider.AWS {
res = append(res, *releases.AWSLoadBalancerController)
}
if config.DeployCSIDriver() {
res = append(res, *releases.CSI)
}
return res
}
// Versions queries the cluster for running versions and returns a map of releaseName -> version.
func (c *UpgradeClient) Versions() (ServiceVersions, error) {
ciliumVersion, err := c.currentVersion(ciliumInfo.releaseName)
if err != nil {
return ServiceVersions{}, fmt.Errorf("getting %s version: %w", ciliumInfo.releaseName, err)
}
certManagerVersion, err := c.currentVersion(certManagerInfo.releaseName)
if err != nil {
return ServiceVersions{}, fmt.Errorf("getting %s version: %w", certManagerInfo.releaseName, err)
}
operatorsVersion, err := c.currentVersion(constellationOperatorsInfo.releaseName)
if err != nil {
return ServiceVersions{}, fmt.Errorf("getting %s version: %w", constellationOperatorsInfo.releaseName, err)
}
servicesVersion, err := c.currentVersion(constellationServicesInfo.releaseName)
if err != nil {
return ServiceVersions{}, fmt.Errorf("getting %s version: %w", constellationServicesInfo.releaseName, err)
}
csiVersions, err := c.csiVersions()
if err != nil {
return ServiceVersions{}, fmt.Errorf("getting CSI versions: %w", err)
}
serviceVersions := ServiceVersions{
cilium: ciliumVersion,
certManager: certManagerVersion,
constellationOperators: operatorsVersion,
constellationServices: servicesVersion,
csiVersions: csiVersions,
}
if awsLBVersion, err := c.currentVersion(awsLBControllerInfo.releaseName); err == nil {
serviceVersions.awsLBController = awsLBVersion
} else if !errors.Is(err, errReleaseNotFound) {
return ServiceVersions{}, fmt.Errorf("getting %s version: %w", awsLBControllerInfo.releaseName, err)
}
return serviceVersions, nil
}
// currentVersion returns the version of the currently installed helm release.
func (c *UpgradeClient) currentVersion(release string) (semver.Semver, error) {
rel, err := c.actions.listAction(release)
if err != nil {
return semver.Semver{}, err
}
if len(rel) == 0 {
return semver.Semver{}, errReleaseNotFound
}
if len(rel) > 1 {
return semver.Semver{}, fmt.Errorf("multiple releases found for %s", release)
}
if rel[0] == nil || rel[0].Chart == nil || rel[0].Chart.Metadata == nil {
return semver.Semver{}, fmt.Errorf("received invalid release %s", release)
}
return semver.New(rel[0].Chart.Metadata.Version)
}
func (c *UpgradeClient) csiVersions() (map[string]semver.Semver, error) {
packedChartRelease, err := c.actions.listAction(csiInfo.releaseName)
if err != nil {
return nil, fmt.Errorf("listing %s: %w", csiInfo.releaseName, err)
}
csiVersions := make(map[string]semver.Semver)
// No CSI driver installed
if len(packedChartRelease) == 0 {
return csiVersions, nil
}
if len(packedChartRelease) > 1 {
return nil, fmt.Errorf("multiple releases found for %s", csiInfo.releaseName)
}
if packedChartRelease[0] == nil || packedChartRelease[0].Chart == nil {
return nil, fmt.Errorf("received invalid release %s", csiInfo.releaseName)
}
dependencies := packedChartRelease[0].Chart.Metadata.Dependencies
for _, dep := range dependencies {
var err error
csiVersions[dep.Name], err = semver.New(dep.Version)
if err != nil {
return nil, fmt.Errorf("parsing CSI version %q: %w", dep.Name, err)
}
}
return csiVersions, nil
}
// installNewRelease installs a previously not installed release on the cluster.
func (c *UpgradeClient) installNewRelease(
ctx context.Context, timeout time.Duration, release Release,
) error {
return c.actions.installAction(ctx, release.ReleaseName, release.Chart, release.Values, timeout)
}
// upgradeRelease upgrades a release running on the cluster.
func (c *UpgradeClient) upgradeRelease(
ctx context.Context, timeout time.Duration, release Release,
) error {
return c.actions.upgradeAction(ctx, release.ReleaseName, release.Chart, release.Values, timeout)
}
// GetValues queries the cluster for the values of the given release.
func (c *UpgradeClient) GetValues(release string) (map[string]any, error) {
client := action.NewGetValues(c.config)
// Version corresponds to the releases revision. Specifying a Version <= 0 yields the latest release.
client.Version = 0
values, err := client.Run(release)
if err != nil {
return nil, fmt.Errorf("getting values for %s: %w", release, err)
}
return values, nil
}
// updateCRDs walks through the dependencies of the given chart and applies
// the files in the dependencie's 'crds' folder.
// This function is NOT recursive!
func (c *UpgradeClient) updateCRDs(ctx context.Context, chart *chart.Chart) error {
for _, dep := range chart.Dependencies() {
for _, crdFile := range dep.Files {
if strings.HasPrefix(crdFile.Name, "crds/") {
c.log.Debugf("Updating crd: %s", crdFile.Name)
err := c.kubectl.ApplyCRD(ctx, crdFile.Data)
if err != nil {
return err
}
}
}
}
return nil
}
type crdClient interface {
Initialize(kubeconfig []byte) error
ApplyCRD(ctx context.Context, rawCRD []byte) error
ListCRDs(ctx context.Context) ([]apiextensionsv1.CustomResourceDefinition, error)
ListCRs(ctx context.Context, gvr schema.GroupVersionResource) ([]unstructured.Unstructured, error)
}
type actionWrapper interface {
listAction(release string) ([]*release.Release, error)
getValues(release string) (map[string]any, error)
installAction(ctx context.Context, releaseName string, chart *chart.Chart, values map[string]any, timeout time.Duration) error
upgradeAction(ctx context.Context, releaseName string, chart *chart.Chart, values map[string]any, timeout time.Duration) error
}
type actions struct {
config *action.Configuration
}
// listAction execute a List action by wrapping helm's action package.
// It creates the action, runs it at returns results and errors.
func (a actions) listAction(release string) ([]*release.Release, error) {
action := action.NewList(a.config)
action.Filter = release
return action.Run()
}
func (a actions) getValues(release string) (map[string]any, error) {
client := action.NewGetValues(a.config)
// Version corresponds to the releases revision. Specifying a Version <= 0 yields the latest release.
client.Version = 0
return client.Run(release)
}
func (a actions) upgradeAction(ctx context.Context, releaseName string, chart *chart.Chart, values map[string]any, timeout time.Duration) error {
action := action.NewUpgrade(a.config)
action.Atomic = true
action.Namespace = constants.HelmNamespace
action.ReuseValues = false
action.Timeout = timeout
if _, err := action.RunWithContext(ctx, releaseName, chart, values); err != nil {
return fmt.Errorf("upgrading %s: %w", releaseName, err)
}
return nil
}
func (a actions) installAction(ctx context.Context, releaseName string, chart *chart.Chart, values map[string]any, timeout time.Duration) error {
action := action.NewInstall(a.config)
action.Atomic = true
action.Namespace = constants.HelmNamespace
action.ReleaseName = releaseName
action.Timeout = timeout
if _, err := action.RunWithContext(ctx, chart, values); err != nil {
return fmt.Errorf("installing previously not installed chart %s: %w", chart.Name(), err)
}
return nil
}
// isCLIVersionedRelease checks if the given release is versioned by the CLI,
// meaning that the version of the Helm release is equal to the version of the CLI that installed it.
func isCLIVersionedRelease(releaseName string) bool {
return releaseName == constellationOperatorsInfo.releaseName ||
releaseName == constellationServicesInfo.releaseName ||
releaseName == csiInfo.releaseName
}

View File

@ -1,112 +0,0 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package helm
import (
"context"
"testing"
"time"
"github.com/edgelesssys/constellation/v2/internal/compatibility"
"github.com/edgelesssys/constellation/v2/internal/logger"
"github.com/edgelesssys/constellation/v2/internal/semver"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/release"
)
func TestShouldUpgrade(t *testing.T) {
testCases := map[string]struct {
version string
assertCorrectError func(t *testing.T, err error) bool
wantError bool
}{
"valid upgrade": {
version: "1.9.0",
},
"not a valid upgrade": {
version: "1.0.0",
assertCorrectError: func(t *testing.T, err error) bool {
var target *compatibility.InvalidUpgradeError
return assert.ErrorAs(t, err, &target)
},
wantError: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
client := UpgradeClient{kubectl: nil, actions: &stubActionWrapper{version: tc.version}, log: logger.NewTest(t)}
chart, err := loadChartsDir(helmFS, certManagerInfo.path)
require.NoError(err)
chartVersion, err := semver.New(chart.Metadata.Version)
require.NoError(err)
err = client.shouldUpgrade(certManagerInfo.releaseName, chartVersion, false)
if tc.wantError {
tc.assertCorrectError(t, err)
return
}
assert.NoError(err)
})
}
}
func TestUpgradeRelease(t *testing.T) {
testCases := map[string]struct {
version string
wantError bool
}{
"allow": {
version: "1.9.0",
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
client := UpgradeClient{kubectl: nil, actions: &stubActionWrapper{version: tc.version}, log: logger.NewTest(t)}
chart, err := loadChartsDir(helmFS, certManagerInfo.path)
require.NoError(err)
err = client.upgradeRelease(context.Background(), 0, Release{Chart: chart})
if tc.wantError {
assert.Error(err)
return
}
assert.NoError(err)
})
}
}
type stubActionWrapper struct {
version string
}
// listAction returns a list of len 1 with a release that has only it's version set.
func (a *stubActionWrapper) listAction(_ string) ([]*release.Release, error) {
return []*release.Release{{Chart: &chart.Chart{Metadata: &chart.Metadata{Version: a.version}}}}, nil
}
func (a *stubActionWrapper) getValues(_ string) (map[string]any, error) {
return nil, nil
}
func (a *stubActionWrapper) installAction(_ context.Context, _ string, _ *chart.Chart, _ map[string]any, _ time.Duration) error {
return nil
}
func (a *stubActionWrapper) upgradeAction(_ context.Context, _ string, _ *chart.Chart, _ map[string]any, _ time.Duration) error {
return nil
}

View File

@ -0,0 +1,142 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package helm
import (
"errors"
"fmt"
"github.com/edgelesssys/constellation/v2/internal/semver"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/release"
"k8s.io/client-go/util/retry"
)
// ReleaseVersionClient is a client that can retrieve the version of a helm release.
type ReleaseVersionClient struct {
config *action.Configuration
}
// NewReleaseVersionClient creates a new ReleaseVersionClient.
func NewReleaseVersionClient(kubeConfigPath string, log debugLog) (*ReleaseVersionClient, error) {
config, err := newActionConfig(kubeConfigPath, log)
if err != nil {
return nil, err
}
return &ReleaseVersionClient{
config: config,
}, nil
}
// listAction execute a List action by wrapping helm's action package.
// It creates the action, runs it at returns results and errors.
func (c ReleaseVersionClient) listAction(release string) (res []*release.Release, err error) {
action := action.NewList(c.config)
action.Filter = release
// during init, the kube API might not yet be reachable, so we retry
err = retry.OnError(retry.DefaultBackoff, func(err error) bool {
return err != nil
}, func() error {
res, err = action.Run()
return err
})
return
}
// Versions queries the cluster for running versions and returns a map of releaseName -> version.
func (c ReleaseVersionClient) Versions() (ServiceVersions, error) {
ciliumVersion, err := c.currentVersion(ciliumInfo.releaseName)
if err != nil {
return ServiceVersions{}, fmt.Errorf("getting %s version: %w", ciliumInfo.releaseName, err)
}
certManagerVersion, err := c.currentVersion(certManagerInfo.releaseName)
if err != nil {
return ServiceVersions{}, fmt.Errorf("getting %s version: %w", certManagerInfo.releaseName, err)
}
operatorsVersion, err := c.currentVersion(constellationOperatorsInfo.releaseName)
if err != nil {
return ServiceVersions{}, fmt.Errorf("getting %s version: %w", constellationOperatorsInfo.releaseName, err)
}
servicesVersion, err := c.currentVersion(constellationServicesInfo.releaseName)
if err != nil {
return ServiceVersions{}, fmt.Errorf("getting %s version: %w", constellationServicesInfo.releaseName, err)
}
csiVersions, err := c.csiVersions()
if err != nil {
return ServiceVersions{}, fmt.Errorf("getting CSI versions: %w", err)
}
serviceVersions := ServiceVersions{
cilium: ciliumVersion,
certManager: certManagerVersion,
constellationOperators: operatorsVersion,
constellationServices: servicesVersion,
csiVersions: csiVersions,
}
if awsLBVersion, err := c.currentVersion(awsLBControllerInfo.releaseName); err == nil {
serviceVersions.awsLBController = awsLBVersion
} else if !errors.Is(err, errReleaseNotFound) {
return ServiceVersions{}, fmt.Errorf("getting %s version: %w", awsLBControllerInfo.releaseName, err)
}
return serviceVersions, nil
}
// currentVersion returns the version of the currently installed helm release.
// If the CSI chart is not installed, no error is returned because the user can configure if the chart should be installed.
func (c ReleaseVersionClient) currentVersion(release string) (semver.Semver, error) {
rel, err := c.listAction(release)
if err != nil {
return semver.Semver{}, err
}
if len(rel) == 0 {
return semver.Semver{}, errReleaseNotFound
}
if len(rel) > 1 {
return semver.Semver{}, fmt.Errorf("multiple releases found for %s", release)
}
if rel[0] == nil || rel[0].Chart == nil || rel[0].Chart.Metadata == nil {
return semver.Semver{}, fmt.Errorf("received invalid release %s", release)
}
return semver.New(rel[0].Chart.Metadata.Version)
}
// csi versions needs special handling because all versions of its subcharts should be gathered.
func (c ReleaseVersionClient) csiVersions() (map[string]semver.Semver, error) {
packedChartRelease, err := c.listAction(csiInfo.releaseName)
if err != nil {
return nil, fmt.Errorf("listing %s: %w", csiInfo.releaseName, err)
}
csiVersions := make(map[string]semver.Semver)
// No CSI driver installed
if len(packedChartRelease) == 0 {
return csiVersions, nil
}
if len(packedChartRelease) > 1 {
return nil, fmt.Errorf("multiple releases found for %s", csiInfo.releaseName)
}
if packedChartRelease[0] == nil || packedChartRelease[0].Chart == nil {
return nil, fmt.Errorf("received invalid release %s", csiInfo.releaseName)
}
dependencies := packedChartRelease[0].Chart.Metadata.Dependencies
for _, dep := range dependencies {
var err error
csiVersions[dep.Name], err = semver.New(dep.Version)
if err != nil {
return nil, fmt.Errorf("parsing CSI version %q: %w", dep.Name, err)
}
}
return csiVersions, nil
}

View File

@ -4,6 +4,7 @@ load("//bazel/go:go_test.bzl", "go_test")
go_library(
name = "kubecmd",
srcs = [
"backup.go",
"kubecmd.go",
"status.go",
],
@ -16,6 +17,7 @@ go_library(
"//internal/compatibility",
"//internal/config",
"//internal/constants",
"//internal/file",
"//internal/imagefetcher",
"//internal/kubernetes",
"//internal/kubernetes/kubectl",
@ -23,6 +25,7 @@ go_library(
"//internal/versions/components",
"//operators/constellation-node-operator/api/v1alpha1",
"@io_k8s_api//core/v1:core",
"@io_k8s_apiextensions_apiserver//pkg/apis/apiextensions/v1:apiextensions",
"@io_k8s_apimachinery//pkg/api/errors",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_apimachinery//pkg/apis/meta/v1/unstructured",
@ -37,7 +40,10 @@ go_library(
go_test(
name = "kubecmd_test",
srcs = ["kubecmd_test.go"],
srcs = [
"backup_test.go",
"kubecmd_test.go",
],
embed = [":kubecmd"],
deps = [
"//internal/attestation/measurements",
@ -46,18 +52,23 @@ go_test(
"//internal/compatibility",
"//internal/config",
"//internal/constants",
"//internal/file",
"//internal/logger",
"//internal/versions",
"//internal/versions/components",
"//operators/constellation-node-operator/api/v1alpha1",
"@com_github_pkg_errors//:errors",
"@com_github_spf13_afero//:afero",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
"@io_k8s_api//core/v1:core",
"@io_k8s_apiextensions_apiserver//pkg/apis/apiextensions/v1:apiextensions",
"@io_k8s_apimachinery//pkg/api/errors",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_apimachinery//pkg/apis/meta/v1/unstructured",
"@io_k8s_apimachinery//pkg/runtime",
"@io_k8s_apimachinery//pkg/runtime/schema",
"@io_k8s_sigs_yaml//:yaml",
],
)

View File

@ -4,7 +4,7 @@ Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package helm
package kubecmd
import (
"context"
@ -13,25 +13,32 @@ import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/yaml"
)
func (c *UpgradeClient) backupCRDs(ctx context.Context, upgradeDir string) ([]apiextensionsv1.CustomResourceDefinition, error) {
c.log.Debugf("Starting CRD backup")
crds, err := c.kubectl.ListCRDs(ctx)
type crdLister interface {
ListCRDs(ctx context.Context) ([]apiextensionsv1.CustomResourceDefinition, error)
ListCRs(ctx context.Context, gvr schema.GroupVersionResource) ([]unstructured.Unstructured, error)
}
// BackupCRDs backs up all CRDs to the upgrade workspace.
func (k *KubeCmd) BackupCRDs(ctx context.Context, upgradeDir string) ([]apiextensionsv1.CustomResourceDefinition, error) {
k.log.Debugf("Starting CRD backup")
crds, err := k.kubectl.ListCRDs(ctx)
if err != nil {
return nil, fmt.Errorf("getting CRDs: %w", err)
}
crdBackupFolder := c.crdBackupFolder(upgradeDir)
if err := c.fs.MkdirAll(crdBackupFolder); err != nil {
crdBackupFolder := k.crdBackupFolder(upgradeDir)
if err := k.fileHandler.MkdirAll(crdBackupFolder); err != nil {
return nil, fmt.Errorf("creating backup dir: %w", err)
}
for i := range crds {
path := filepath.Join(crdBackupFolder, crds[i].Name+".yaml")
c.log.Debugf("Creating CRD backup: %s", path)
k.log.Debugf("Creating CRD backup: %s", path)
// We have to manually set kind/apiversion because of a long-standing limitation of the API:
// https://github.com/kubernetes/kubernetes/issues/3030#issuecomment-67543738
@ -44,18 +51,19 @@ func (c *UpgradeClient) backupCRDs(ctx context.Context, upgradeDir string) ([]ap
if err != nil {
return nil, err
}
if err := c.fs.Write(path, yamlBytes); err != nil {
if err := k.fileHandler.Write(path, yamlBytes); err != nil {
return nil, err
}
}
c.log.Debugf("CRD backup complete")
k.log.Debugf("CRD backup complete")
return crds, nil
}
func (c *UpgradeClient) backupCRs(ctx context.Context, crds []apiextensionsv1.CustomResourceDefinition, upgradeID string) error {
c.log.Debugf("Starting CR backup")
// BackupCRs backs up all CRs to the upgrade workspace.
func (k *KubeCmd) BackupCRs(ctx context.Context, crds []apiextensionsv1.CustomResourceDefinition, upgradeDir string) error {
k.log.Debugf("Starting CR backup")
for _, crd := range crds {
c.log.Debugf("Creating backup for resource type: %s", crd.Name)
k.log.Debugf("Creating backup for resource type: %s", crd.Name)
// Iterate over all versions of the CRD
// TODO: Consider iterating over crd.Status.StoredVersions instead
@ -63,22 +71,22 @@ func (c *UpgradeClient) backupCRs(ctx context.Context, crds []apiextensionsv1.Cu
// a version that is not installed in the cluster.
// With the StoredVersions field, we could only iterate over the installed versions.
for _, version := range crd.Spec.Versions {
c.log.Debugf("Creating backup of CRs for %q at version %q", crd.Name, version.Name)
k.log.Debugf("Creating backup of CRs for %q at version %q", crd.Name, version.Name)
gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: version.Name, Resource: crd.Spec.Names.Plural}
crs, err := c.kubectl.ListCRs(ctx, gvr)
crs, err := k.kubectl.ListCRs(ctx, gvr)
if err != nil {
if !k8serrors.IsNotFound(err) {
return fmt.Errorf("retrieving CR %s: %w", crd.Name, err)
}
c.log.Debugf("No CRs found for %q at version %q, skipping...", crd.Name, version.Name)
k.log.Debugf("No CRs found for %q at version %q, skipping...", crd.Name, version.Name)
continue
}
backupFolder := c.backupFolder(upgradeID)
backupFolder := k.backupFolder(upgradeDir)
for _, cr := range crs {
targetFolder := filepath.Join(backupFolder, gvr.Group, gvr.Version, cr.GetNamespace(), cr.GetKind())
if err := c.fs.MkdirAll(targetFolder); err != nil {
if err := k.fileHandler.MkdirAll(targetFolder); err != nil {
return fmt.Errorf("creating resource dir: %w", err)
}
path := filepath.Join(targetFolder, cr.GetName()+".yaml")
@ -86,22 +94,22 @@ func (c *UpgradeClient) backupCRs(ctx context.Context, crds []apiextensionsv1.Cu
if err != nil {
return err
}
if err := c.fs.Write(path, yamlBytes); err != nil {
if err := k.fileHandler.Write(path, yamlBytes); err != nil {
return err
}
}
}
c.log.Debugf("Backup for resource type %q complete", crd.Name)
k.log.Debugf("Backup for resource type %q complete", crd.Name)
}
c.log.Debugf("CR backup complete")
k.log.Debugf("CR backup complete")
return nil
}
func (c *UpgradeClient) backupFolder(upgradeDir string) string {
func (k *KubeCmd) backupFolder(upgradeDir string) string {
return filepath.Join(upgradeDir, "backups")
}
func (c *UpgradeClient) crdBackupFolder(upgradeDir string) string {
return filepath.Join(c.backupFolder(upgradeDir), "crds")
func (k *KubeCmd) crdBackupFolder(upgradeDir string) string {
return filepath.Join(k.backupFolder(upgradeDir), "crds")
}

View File

@ -4,7 +4,7 @@ Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package helm
package kubecmd
import (
"context"
@ -52,14 +52,13 @@ func TestBackupCRDs(t *testing.T) {
crd := apiextensionsv1.CustomResourceDefinition{}
err := yaml.Unmarshal([]byte(tc.crd), &crd)
require.NoError(err)
client := UpgradeClient{
config: nil,
kubectl: stubCrdClient{crds: []apiextensionsv1.CustomResourceDefinition{crd}, getCRDsError: tc.getCRDsError},
fs: file.NewHandler(memFs),
log: stubLog{},
client := KubeCmd{
kubectl: &stubKubectl{crds: []apiextensionsv1.CustomResourceDefinition{crd}, getCRDsError: tc.getCRDsError},
fileHandler: file.NewHandler(memFs),
log: stubLog{},
}
_, err = client.backupCRDs(context.Background(), tc.upgradeID)
_, err = client.BackupCRDs(context.Background(), tc.upgradeID)
if tc.wantError {
assert.Error(err)
return
@ -143,14 +142,13 @@ func TestBackupCRs(t *testing.T) {
require := require.New(t)
memFs := afero.NewMemMapFs()
client := UpgradeClient{
config: nil,
kubectl: stubCrdClient{crs: []unstructured.Unstructured{tc.resource}, getCRsError: tc.getCRsError},
fs: file.NewHandler(memFs),
log: stubLog{},
client := KubeCmd{
kubectl: &stubKubectl{crs: []unstructured.Unstructured{tc.resource}, getCRsError: tc.getCRsError},
fileHandler: file.NewHandler(memFs),
log: stubLog{},
}
err := client.backupCRs(context.Background(), []apiextensionsv1.CustomResourceDefinition{tc.crd}, tc.upgradeID)
err := client.BackupCRs(context.Background(), []apiextensionsv1.CustomResourceDefinition{tc.crd}, tc.upgradeID)
if tc.wantError {
assert.Error(err)
return
@ -173,22 +171,14 @@ type stubLog struct{}
func (s stubLog) Debugf(_ string, _ ...any) {}
func (s stubLog) Sync() {}
type stubCrdClient struct {
crds []apiextensionsv1.CustomResourceDefinition
getCRDsError error
crs []unstructured.Unstructured
getCRsError error
crdClient
}
func (c stubCrdClient) ListCRDs(_ context.Context) ([]apiextensionsv1.CustomResourceDefinition, error) {
func (c stubKubectl) ListCRDs(_ context.Context) ([]apiextensionsv1.CustomResourceDefinition, error) {
if c.getCRDsError != nil {
return nil, c.getCRDsError
}
return c.crds, nil
}
func (c stubCrdClient) ListCRs(_ context.Context, _ schema.GroupVersionResource) ([]unstructured.Unstructured, error) {
func (c stubKubectl) ListCRs(_ context.Context, _ schema.GroupVersionResource) ([]unstructured.Unstructured, error) {
if c.getCRsError != nil {
return nil, c.getCRsError
}

View File

@ -31,6 +31,7 @@ import (
"github.com/edgelesssys/constellation/v2/internal/compatibility"
"github.com/edgelesssys/constellation/v2/internal/config"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/imagefetcher"
internalk8s "github.com/edgelesssys/constellation/v2/internal/kubernetes"
"github.com/edgelesssys/constellation/v2/internal/kubernetes/kubectl"
@ -68,11 +69,12 @@ type KubeCmd struct {
kubectl kubectlInterface
imageFetcher imageFetcher
outWriter io.Writer
fileHandler file.Handler
log debugLog
}
// New returns a new KubeCmd.
func New(outWriter io.Writer, kubeConfigPath string, log debugLog) (*KubeCmd, error) {
func New(outWriter io.Writer, kubeConfigPath string, fileHandler file.Handler, log debugLog) (*KubeCmd, error) {
client, err := kubectl.NewFromConfig(kubeConfigPath)
if err != nil {
return nil, fmt.Errorf("creating kubectl client: %w", err)
@ -80,6 +82,7 @@ func New(outWriter io.Writer, kubeConfigPath string, log debugLog) (*KubeCmd, er
return &KubeCmd{
kubectl: client,
fileHandler: fileHandler,
imageFetcher: imagefetcher.New(),
outWriter: outWriter,
log: log,
@ -506,6 +509,7 @@ type kubectlInterface interface {
KubernetesVersion() (string, error)
GetCR(ctx context.Context, gvr schema.GroupVersionResource, name string) (*unstructured.Unstructured, error)
UpdateCR(ctx context.Context, gvr schema.GroupVersionResource, obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
crdLister
}
type debugLog interface {

View File

@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -616,6 +617,10 @@ type stubKubectl struct {
k8sErr error
nodes []corev1.Node
nodesErr error
crds []apiextensionsv1.CustomResourceDefinition
getCRDsError error
crs []unstructured.Unstructured
getCRsError error
}
func (s *stubKubectl) GetConfigMap(_ context.Context, _, name string) (*corev1.ConfigMap, error) {

View File

@ -350,6 +350,7 @@ func runUpgradeApply(require *require.Assertions, cli string) {
require.NoError(err, "Stdout: %s\nStderr: %s", string(stdout), string(stderr))
require.NoError(containsUnexepectedMsg(string(stdout)))
log.Println(string(stdout))
log.Println(string(stderr)) // also print debug logs.
}
// containsUnexepectedMsg checks if the given input contains any unexpected messages.