Wait for kube api during init (#440)

* kubernetes: wait for KubeAPI to be reachable
This commit is contained in:
3u13r 2022-11-04 12:36:26 +01:00 committed by GitHub
parent b89fae8062
commit 9ad377284d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 190 additions and 10 deletions

View File

@ -19,6 +19,7 @@ import (
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/k8sapi"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/k8sapi/kubectl"
kubewaiter "github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/kubeWaiter"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/logging"
"github.com/edgelesssys/constellation/v2/internal/atls"
"github.com/edgelesssys/constellation/v2/internal/attestation/aws"
@ -107,7 +108,7 @@ func main() {
cloudControllerManager := &awscloud.CloudControllerManager{}
clusterInitJoiner = kubernetes.New(
"aws", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.New(), cloudControllerManager,
metadata, pcrsJSON, helmClient,
metadata, pcrsJSON, helmClient, &kubewaiter.CloudKubeAPIWaiter{},
)
openTPM = vtpm.OpenVTPM
fs = afero.NewOsFs()
@ -144,7 +145,7 @@ func main() {
}
clusterInitJoiner = kubernetes.New(
"gcp", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.New(), cloudControllerManager,
metadata, pcrsJSON, helmClient,
metadata, pcrsJSON, helmClient, &kubewaiter.CloudKubeAPIWaiter{},
)
openTPM = vtpm.OpenVTPM
fs = afero.NewOsFs()
@ -178,7 +179,7 @@ func main() {
}
clusterInitJoiner = kubernetes.New(
"azure", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.New(), azurecloud.NewCloudControllerManager(metadata),
metadata, pcrsJSON, helmClient,
metadata, pcrsJSON, helmClient, &kubewaiter.CloudKubeAPIWaiter{},
)
openTPM = vtpm.OpenVTPM
@ -200,7 +201,7 @@ func main() {
}
clusterInitJoiner = kubernetes.New(
"qemu", k8sapi.NewKubernetesUtil(), &k8sapi.KubdeadmConfiguration{}, kubectl.New(), &qemucloud.CloudControllerManager{},
metadata, pcrsJSON, helmClient,
metadata, pcrsJSON, helmClient, &kubewaiter.CloudKubeAPIWaiter{},
)
metadataAPI = metadata

View File

@ -55,6 +55,7 @@ type Client interface {
AddTolerationsToDeployment(ctx context.Context, tolerations []corev1.Toleration, name string, namespace string) error
AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error
WaitForCRDs(ctx context.Context, crds []string) error
ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error)
}
type installer interface {

View File

@ -116,6 +116,11 @@ func (c *Client) CreateConfigMap(ctx context.Context, configMap corev1.ConfigMap
return nil
}
// ListAllNamespaces returns a list of all namespaces.
func (c *Client) ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error) {
return c.clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
}
func (c *Client) AddTolerationsToDeployment(ctx context.Context, tolerations []corev1.Toleration, name string, namespace string) error {
deployments := c.clientset.AppsV1().Deployments(namespace)

View File

@ -30,6 +30,7 @@ type Client interface {
AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error
// WaitForCRD waits for the given CRD to be established.
WaitForCRD(ctx context.Context, crd string) error
ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error)
}
// clientGenerator can generate new clients from a kubeconfig.
@ -86,12 +87,17 @@ func (k *Kubectl) CreateConfigMap(ctx context.Context, configMap corev1.ConfigMa
return err
}
err = client.CreateConfigMap(ctx, configMap)
return client.CreateConfigMap(ctx, configMap)
}
// ListAllNamespaces returns all namespaces in the cluster.
func (k *Kubectl) ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error) {
client, err := k.clientGenerator.NewClient(k.kubeconfig)
if err != nil {
return err
return nil, err
}
return nil
return client.ListAllNamespaces(ctx)
}
func (k *Kubectl) AddTolerationsToDeployment(ctx context.Context, tolerations []corev1.Toleration, name string, namespace string) error {

View File

@ -30,6 +30,8 @@ type stubClient struct {
addTolerationsToDeploymentErr error
addNodeSelectorToDeploymentErr error
waitForCRDErr error
listAllNamespacesResp *corev1.NamespaceList
listAllNamespacesErr error
}
func (s *stubClient) ApplyOneObject(info *resource.Info, forceConflicts bool) error {
@ -52,6 +54,10 @@ func (s *stubClient) AddNodeSelectorsToDeployment(ctx context.Context, selectors
return s.addNodeSelectorToDeploymentErr
}
func (s *stubClient) ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error) {
return s.listAllNamespacesResp, s.listAllNamespacesErr
}
type stubClientGenerator struct {
applyOneObjectErr error
getObjectsInfos []*resource.Info

View File

@ -0,0 +1,47 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package kubewaiter
import (
"context"
"time"
corev1 "k8s.io/api/core/v1"
"github.com/edgelesssys/constellation/v2/internal/retry"
)
// KubernetesClient is an interface for the Kubernetes client.
// It is used to check if the Kubernetes API is available.
type KubernetesClient interface {
ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error)
}
// CloudKubeAPIWaiter waits for the Kubernetes API to be available.
type CloudKubeAPIWaiter struct{}
// Wait waits for the Kubernetes API to be available.
// Note that the kubernetesClient must have the kubeconfig already set.
func (w *CloudKubeAPIWaiter) Wait(ctx context.Context, kubernetesClient KubernetesClient) error {
funcAlwaysRetriable := func(err error) bool { return true }
doer := &kubeDoer{kubeClient: kubernetesClient}
retrier := retry.NewIntervalRetrier(doer, 5*time.Second, funcAlwaysRetriable)
if err := retrier.Do(ctx); err != nil {
return doer.Do(context.Background())
}
return nil
}
type kubeDoer struct {
kubeClient KubernetesClient
}
func (d *kubeDoer) Do(ctx context.Context) error {
_, err := d.kubeClient.ListAllNamespaces(ctx)
return err
}

View File

@ -0,0 +1,60 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package kubewaiter
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
corev1 "k8s.io/api/core/v1"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestCloudKubeAPIWaiter(t *testing.T) {
testCases := map[string]struct {
kubeClient KubernetesClient
wantErr bool
}{
"success": {
kubeClient: &stubKubernetesClient{},
},
"error": {
kubeClient: &stubKubernetesClient{listAllNamespacesErr: errors.New("error")},
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
require := require.New(t)
waiter := &CloudKubeAPIWaiter{}
ctx, cancel := context.WithTimeout(context.Background(), 0)
defer cancel()
err := waiter.Wait(ctx, tc.kubeClient)
if tc.wantErr {
require.Error(err)
} else {
require.NoError(err)
}
})
}
}
type stubKubernetesClient struct {
listAllNamespacesErr error
}
func (c *stubKubernetesClient) ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error) {
return nil, c.listAllNamespacesErr
}

View File

@ -16,9 +16,11 @@ import (
"net"
"strconv"
"strings"
"time"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/k8sapi"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/k8sapi/resources"
kubewaiter "github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/kubeWaiter"
"github.com/edgelesssys/constellation/v2/internal/azureshared"
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
"github.com/edgelesssys/constellation/v2/internal/cloud/metadata"
@ -46,11 +48,16 @@ type configurationProvider interface {
JoinConfiguration(externalCloudProvider bool) k8sapi.KubeadmJoinYAML
}
type kubeAPIWaiter interface {
Wait(ctx context.Context, kubernetesClient kubewaiter.KubernetesClient) error
}
// KubeWrapper implements Cluster interface.
type KubeWrapper struct {
cloudProvider string
clusterUtil clusterUtil
helmClient helmClient
kubeAPIWaiter kubeAPIWaiter
configProvider configurationProvider
client k8sapi.Client
kubeconfigReader configReader
@ -62,12 +69,13 @@ type KubeWrapper struct {
// New creates a new KubeWrapper with real values.
func New(cloudProvider string, clusterUtil clusterUtil, configProvider configurationProvider, client k8sapi.Client, cloudControllerManager CloudControllerManager,
providerMetadata ProviderMetadata, initialMeasurementsJSON []byte, helmClient helmClient,
providerMetadata ProviderMetadata, initialMeasurementsJSON []byte, helmClient helmClient, kubeAPIWaiter kubeAPIWaiter,
) *KubeWrapper {
return &KubeWrapper{
cloudProvider: cloudProvider,
clusterUtil: clusterUtil,
helmClient: helmClient,
kubeAPIWaiter: kubeAPIWaiter,
configProvider: configProvider,
client: client,
kubeconfigReader: &KubeconfigReader{fs: afero.Afero{Fs: afero.NewOsFs()}},
@ -158,6 +166,12 @@ func (k *KubeWrapper) InitCluster(
}
k.client.SetKubeconfig(kubeConfig)
waitCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := k.kubeAPIWaiter.Wait(waitCtx, k.client); err != nil {
return nil, fmt.Errorf("waiting for Kubernetes API to be available: %w", err)
}
// Step 3: configure & start kubernetes controllers
log.Infof("Starting Kubernetes controllers and deployments")
setupPodNetworkInput := k8sapi.SetupPodNetworkInput{

View File

@ -15,6 +15,7 @@ import (
"testing"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/k8sapi"
kubewaiter "github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/kubeWaiter"
"github.com/edgelesssys/constellation/v2/internal/cloud/metadata"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/deploy/helm"
@ -47,6 +48,7 @@ func TestInitCluster(t *testing.T) {
clusterUtil stubClusterUtil
helmClient stubHelmClient
kubectl stubKubectl
kubeAPIWaiter stubKubeAPIWaiter
providerMetadata ProviderMetadata
CloudControllerManager CloudControllerManager
ClusterAutoscaler ClusterAutoscaler
@ -60,6 +62,7 @@ func TestInitCluster(t *testing.T) {
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{SupportedResp: false},
CloudControllerManager: &stubCloudControllerManager{},
ClusterAutoscaler: &stubClusterAutoscaler{},
@ -82,6 +85,7 @@ func TestInitCluster(t *testing.T) {
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{
SupportedResp: true,
SelfResp: metadata.InstanceMetadata{
@ -120,6 +124,7 @@ func TestInitCluster(t *testing.T) {
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{
SelfErr: someErr,
SupportedResp: true,
@ -149,6 +154,7 @@ func TestInitCluster(t *testing.T) {
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
CloudControllerManager: &stubCloudControllerManager{},
ClusterAutoscaler: &stubClusterAutoscaler{},
@ -173,6 +179,7 @@ func TestInitCluster(t *testing.T) {
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
CloudControllerManager: &stubCloudControllerManager{SupportedResp: true},
ClusterAutoscaler: &stubClusterAutoscaler{},
@ -185,6 +192,7 @@ func TestInitCluster(t *testing.T) {
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
CloudControllerManager: &stubCloudControllerManager{},
ClusterAutoscaler: &stubClusterAutoscaler{},
@ -197,6 +205,7 @@ func TestInitCluster(t *testing.T) {
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
CloudControllerManager: &stubCloudControllerManager{},
ClusterAutoscaler: &stubClusterAutoscaler{SupportedResp: true},
@ -208,6 +217,7 @@ func TestInitCluster(t *testing.T) {
kubeconfigReader: &stubKubeconfigReader{
ReadErr: someErr,
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
CloudControllerManager: &stubCloudControllerManager{},
ClusterAutoscaler: &stubClusterAutoscaler{},
@ -219,6 +229,7 @@ func TestInitCluster(t *testing.T) {
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{SupportedResp: false},
CloudControllerManager: &stubCloudControllerManager{},
ClusterAutoscaler: &stubClusterAutoscaler{},
@ -230,17 +241,31 @@ func TestInitCluster(t *testing.T) {
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{SupportedResp: false},
CloudControllerManager: &stubCloudControllerManager{},
ClusterAutoscaler: &stubClusterAutoscaler{},
wantErr: true,
k8sVersion: versions.Default,
},
"kubeadm init fails when waiting for kubeAPI server": {
clusterUtil: stubClusterUtil{},
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{waitErr: someErr},
providerMetadata: &stubProviderMetadata{SupportedResp: false},
CloudControllerManager: &stubCloudControllerManager{},
ClusterAutoscaler: &stubClusterAutoscaler{},
k8sVersion: versions.Default,
wantErr: true,
},
"unsupported k8sVersion fails cluster creation": {
clusterUtil: stubClusterUtil{},
kubeconfigReader: &stubKubeconfigReader{
Kubeconfig: []byte("someKubeconfig"),
},
kubeAPIWaiter: stubKubeAPIWaiter{},
providerMetadata: &stubProviderMetadata{},
CloudControllerManager: &stubCloudControllerManager{},
ClusterAutoscaler: &stubClusterAutoscaler{},
@ -258,6 +283,7 @@ func TestInitCluster(t *testing.T) {
clusterUtil: &tc.clusterUtil,
helmClient: &tc.helmClient,
providerMetadata: tc.providerMetadata,
kubeAPIWaiter: &tc.kubeAPIWaiter,
cloudControllerManager: tc.CloudControllerManager,
configProvider: &stubConfigProvider{InitConfig: k8sapi.KubeadmInitYAML{}},
client: &tc.kubectl,
@ -570,9 +596,11 @@ type stubKubectl struct {
AddTolerationsToDeploymentErr error
AddTNodeSelectorsToDeploymentErr error
waitForCRDsErr error
listAllNamespacesErr error
resources []kubernetes.Marshaler
kubeconfigs [][]byte
listAllNamespacesResp *corev1.NamespaceList
resources []kubernetes.Marshaler
kubeconfigs [][]byte
}
func (s *stubKubectl) Apply(resources kubernetes.Marshaler, forceConflicts bool) error {
@ -600,6 +628,10 @@ func (s *stubKubectl) WaitForCRDs(ctx context.Context, crds []string) error {
return s.waitForCRDsErr
}
func (s *stubKubectl) ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error) {
return s.listAllNamespacesResp, s.listAllNamespacesErr
}
type stubKubeconfigReader struct {
Kubeconfig []byte
ReadErr error
@ -621,3 +653,11 @@ func (s *stubHelmClient) InstallCilium(ctx context.Context, kubectl k8sapi.Clien
func (s *stubHelmClient) InstallConstellationServices(ctx context.Context, release helm.Release, extraVals map[string]any) error {
return s.servicesError
}
type stubKubeAPIWaiter struct {
waitErr error
}
func (s *stubKubeAPIWaiter) Wait(_ context.Context, _ kubewaiter.KubernetesClient) error {
return s.waitErr
}