Deploy operator-lifecycle-manager (OLM), node-maintenance-operator (NMO) and constellation-node-operator

Signed-off-by: Malte Poll <mp@edgeless.systems>
This commit is contained in:
Malte Poll 2022-08-04 16:15:52 +02:00 committed by Malte Poll
parent 18a89d2881
commit 2c7129987a
23 changed files with 8756 additions and 32 deletions

View file

@ -7,10 +7,13 @@ import (
"github.com/edgelesssys/constellation/bootstrapper/internal/kubernetes/k8sapi/resources"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
@ -21,8 +24,9 @@ const fieldManager = "constellation-bootstrapper"
// Client implements k8sapi.Client interface and talks to the Kubernetes API.
type Client struct {
clientset kubernetes.Interface
builder *resource.Builder
clientset kubernetes.Interface
apiextensionClient apiextensionsclientv1.ApiextensionsV1Interface
builder *resource.Builder
}
// New creates a new Client, talking to the real k8s API.
@ -36,13 +40,18 @@ func New(config []byte) (*Client, error) {
return nil, fmt.Errorf("creating k8s client from kubeconfig: %w", err)
}
apiextensionClient, err := apiextensionsclientv1.NewForConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("creating api extension client from kubeconfig: %w", err)
}
restClientGetter, err := newRESTClientGetter(config)
if err != nil {
return nil, fmt.Errorf("creating k8s RESTClientGetter from kubeconfig: %w", err)
}
builder := resource.NewBuilder(restClientGetter).Unstructured()
return &Client{clientset: clientset, builder: builder}, nil
return &Client{clientset: clientset, apiextensionClient: apiextensionClient, builder: builder}, nil
}
// ApplyOneObject uses server-side apply to send unstructured JSON blobs to the server and let it handle the core logic.
@ -147,3 +156,37 @@ func (c *Client) AddNodeSelectorsToDeployment(ctx context.Context, selectors map
}
return nil
}
// WaitForCRD waits for the given CRD to be established.
func (c *Client) WaitForCRD(ctx context.Context, crd string) error {
watcher, err := c.apiextensionClient.CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", crd),
})
if err != nil {
return err
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Added, watch.Modified:
crd := event.Object.(*apiextensionsv1.CustomResourceDefinition)
if crdHasCondition(crd.Status.Conditions, apiextensionsv1.Established) {
return nil
}
case watch.Deleted:
return fmt.Errorf("crd %q deleted", crd)
case watch.Error:
return fmt.Errorf("crd %q error: %v", crd, event.Object)
}
}
return fmt.Errorf("crd %q not established", crd)
}
func crdHasCondition(conditions []apiextensionsv1.CustomResourceDefinitionCondition, conditionType apiextensionsv1.CustomResourceDefinitionConditionType) bool {
for _, condition := range conditions {
if condition.Type == conditionType && condition.Status == apiextensionsv1.ConditionTrue {
return true
}
}
return false
}

View file

@ -16,12 +16,15 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8s "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
@ -376,3 +379,104 @@ func TestAddNodeSelectorsToDeployment(t *testing.T) {
})
}
}
func TestWaitForCRD(t *testing.T) {
testCases := map[string]struct {
crd string
events []watch.Event
watchErr error
wantErr bool
}{
"Success": {
crd: "test-crd",
events: []watch.Event{
{
Type: watch.Added,
Object: &apiextensionsv1.CustomResourceDefinition{
Status: apiextensionsv1.CustomResourceDefinitionStatus{
Conditions: []apiextensionsv1.CustomResourceDefinitionCondition{
{
Type: apiextensionsv1.Established,
Status: apiextensionsv1.ConditionTrue,
},
},
},
},
},
},
},
"watch error": {
crd: "test-crd",
watchErr: errors.New("watch error"),
wantErr: true,
},
"crd deleted": {
crd: "test-crd",
events: []watch.Event{{Type: watch.Deleted}},
wantErr: true,
},
"other error": {
crd: "test-crd",
events: []watch.Event{{Type: watch.Error}},
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
client := Client{
apiextensionClient: &stubCRDWatcher{events: tc.events, watchErr: tc.watchErr},
}
err := client.WaitForCRD(context.Background(), tc.crd)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
})
}
}
type stubCRDWatcher struct {
events []watch.Event
watchErr error
apiextensionsclientv1.ApiextensionsV1Interface
}
func (w *stubCRDWatcher) CustomResourceDefinitions() apiextensionsclientv1.CustomResourceDefinitionInterface {
return &stubCustomResourceDefinitions{
events: w.events,
watchErr: w.watchErr,
}
}
type stubCustomResourceDefinitions struct {
events []watch.Event
watchErr error
apiextensionsclientv1.CustomResourceDefinitionInterface
}
func (c *stubCustomResourceDefinitions) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
eventChan := make(chan watch.Event, len(c.events))
for _, event := range c.events {
eventChan <- event
}
return &stubCRDWatch{events: eventChan}, c.watchErr
}
type stubCRDWatch struct {
events chan watch.Event
}
func (w *stubCRDWatch) Stop() {
close(w.events)
}
func (w *stubCRDWatch) ResultChan() <-chan watch.Event {
return w.events
}

View file

@ -22,6 +22,8 @@ type Client interface {
CreateConfigMap(ctx context.Context, configMap corev1.ConfigMap) error
AddTolerationsToDeployment(ctx context.Context, tolerations []corev1.Toleration, name string, namespace string) error
AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error
// WaitForCRD waits for the given CRD to be established.
WaitForCRD(ctx context.Context, crd string) error
}
// clientGenerator can generate new clients from a kubeconfig.
@ -111,3 +113,19 @@ func (k *Kubectl) AddNodeSelectorsToDeployment(ctx context.Context, selectors ma
return nil
}
// WaitForCRD waits for a list of CRDs to be established.
func (k *Kubectl) WaitForCRDs(ctx context.Context, crds []string) error {
client, err := k.clientGenerator.NewClient(k.kubeconfig)
if err != nil {
return err
}
for _, crd := range crds {
err = client.WaitForCRD(ctx, crd)
if err != nil {
return err
}
}
return nil
}

View file

@ -23,6 +23,7 @@ type stubClient struct {
createConfigMapErr error
addTolerationsToDeploymentErr error
addNodeSelectorToDeploymentErr error
waitForCRDErr error
}
func (s *stubClient) ApplyOneObject(info *resource.Info, forceConflicts bool) error {
@ -53,16 +54,22 @@ type stubClientGenerator struct {
createConfigMapErr error
addTolerationsToDeploymentErr error
addNodeSelectorToDeploymentErr error
waitForCRDErr error
}
func (s *stubClient) WaitForCRD(ctx context.Context, crd string) error {
return s.waitForCRDErr
}
func (s *stubClientGenerator) NewClient(kubeconfig []byte) (Client, error) {
return &stubClient{
s.applyOneObjectErr,
s.getObjectsInfos,
s.getObjectsErr,
s.createConfigMapErr,
s.addTolerationsToDeploymentErr,
s.addNodeSelectorToDeploymentErr,
applyOneObjectErr: s.applyOneObjectErr,
getObjectsInfos: s.getObjectsInfos,
getObjectsErr: s.getObjectsErr,
createConfigMapErr: s.createConfigMapErr,
addTolerationsToDeploymentErr: s.addTolerationsToDeploymentErr,
addNodeSelectorToDeploymentErr: s.addNodeSelectorToDeploymentErr,
waitForCRDErr: s.waitForCRDErr,
}, s.newClientErr
}