Remove client pkg from kubectl pkg (#638)

The nested client pkg was necessary to implement a generator pattern.
The generator was necessary as the Kubewrapper type
expects a k8sapi.Client object during instantiation.
However, the required kubeconfig is not ready during Kubewrapper creation.
This patch relies on an Initialize function to set the Kubeconfig
and hands over an empty struct during Kubewrapper creation.
This allows us to remove the extra Client abstraction.
This commit is contained in:
Otto Bittner 2022-11-25 11:19:22 +01:00 committed by GitHub
parent 1968dfe70c
commit 6af54142f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 93 additions and 1197 deletions

View File

@ -70,27 +70,6 @@ func (h *Client) InstallConstellationServices(ctx context.Context, release helm.
return nil
}
// mergeMaps returns a new map that is the merger of it's inputs.
// Taken from: https://github.com/helm/helm/blob/dbc6d8e20fe1d58d50e6ed30f09a04a77e4c68db/pkg/cli/values/options.go#L91-L108.
func mergeMaps(a, b map[string]any) map[string]any {
out := make(map[string]any, len(a))
for k, v := range a {
out[k] = v
}
for k, v := range b {
if v, ok := v.(map[string]any); ok {
if bv, ok := out[k]; ok {
if bv, ok := bv.(map[string]any); ok {
out[k] = mergeMaps(bv, v)
continue
}
}
}
out[k] = v
}
return out
}
// InstallCertManager installs the cert-manager chart.
func (h *Client) InstallCertManager(ctx context.Context, release helm.Release) error {
h.ReleaseName = release.ReleaseName
@ -211,3 +190,24 @@ func (h *Client) install(ctx context.Context, chartRaw []byte, values map[string
}
return nil
}
// mergeMaps returns a new map that is the merger of it's inputs.
// Taken from: https://github.com/helm/helm/blob/dbc6d8e20fe1d58d50e6ed30f09a04a77e4c68db/pkg/cli/values/options.go#L91-L108.
func mergeMaps(a, b map[string]any) map[string]any {
out := make(map[string]any, len(a))
for k, v := range a {
out[k] = v
}
for k, v := range b {
if v, ok := v.(map[string]any); ok {
if bv, ok := out[k]; ok {
if bv, ok := bv.(map[string]any); ok {
out[k] = mergeMaps(bv, v)
continue
}
}
}
out[k] = v
}
return out
}

View File

@ -25,8 +25,8 @@ import (
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubelet"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/k8sapi/resources"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/kubernetes"
"github.com/edgelesssys/constellation/v2/internal/role"
corev1 "k8s.io/api/core/v1"
kubeconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"github.com/edgelesssys/constellation/v2/internal/crypto"
@ -35,7 +35,6 @@ import (
"github.com/edgelesssys/constellation/v2/internal/versions"
"github.com/spf13/afero"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
)
const (
@ -49,12 +48,10 @@ const (
// Client provides the functions to talk to the k8s API.
type Client interface {
Apply(resources kubernetes.Marshaler, forceConflicts bool) error
SetKubeconfig(kubeconfig []byte)
Initialize(kubeconfig []byte) error
CreateConfigMap(ctx context.Context, configMap corev1.ConfigMap) error
AddTolerationsToDeployment(ctx context.Context, tolerations []corev1.Toleration, name string, namespace string) error
AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error
WaitForCRDs(ctx context.Context, crds []string) error
ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error)
}
@ -234,11 +231,6 @@ func (k *KubernetesUtil) prepareControlPlaneForKonnectivity(ctx context.Context,
return nil
}
// SetupKonnectivity uses kubectl client to apply the provided konnectivity daemon set.
func (k *KubernetesUtil) SetupKonnectivity(kubectl Client, konnectivityAgentsDaemonSet kubernetes.Marshaler) error {
return kubectl.Apply(konnectivityAgentsDaemonSet, true)
}
// SetupPodNetworkInput holds all configuration options to setup the pod network.
type SetupPodNetworkInput struct {
CloudProvider string
@ -309,26 +301,6 @@ func (k *KubernetesUtil) FixCilium(log *logger.Logger) {
}
}
// SetupGCPGuestAgent deploys the GCP guest agent daemon set.
func (k *KubernetesUtil) SetupGCPGuestAgent(kubectl Client, guestAgentDaemonset kubernetes.Marshaler) error {
return kubectl.Apply(guestAgentDaemonset, true)
}
// SetupVerificationService deploys the verification service.
func (k *KubernetesUtil) SetupVerificationService(kubectl Client, verificationServiceConfiguration kubernetes.Marshaler) error {
return kubectl.Apply(verificationServiceConfiguration, true)
}
// SetupNodeMaintenanceOperator deploys node maintenance operator.
func (k *KubernetesUtil) SetupNodeMaintenanceOperator(kubectl Client, nodeMaintenanceOperatorConfiguration kubernetes.Marshaler) error {
return kubectl.Apply(nodeMaintenanceOperatorConfiguration, true)
}
// SetupNodeOperator deploys node operator.
func (k *KubernetesUtil) SetupNodeOperator(ctx context.Context, kubectl Client, nodeOperatorConfiguration kubernetes.Marshaler) error {
return kubectl.Apply(nodeOperatorConfiguration, true)
}
// JoinCluster joins existing Kubernetes cluster using kubeadm join.
func (k *KubernetesUtil) JoinCluster(ctx context.Context, joinConfig []byte, peerRole role.Role, controlPlaneEndpoint string, log *logger.Logger) error {
// TODO: audit policy should be user input

View File

@ -1,211 +0,0 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package client
import (
"bytes"
"context"
"fmt"
kubernetesshared "github.com/edgelesssys/constellation/v2/internal/kubernetes"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
)
const fieldManager = "constellation-bootstrapper"
// Client implements k8sapi.Client interface and talks to the Kubernetes API.
type Client struct {
clientset kubernetes.Interface
apiextensionClient apiextensionsclientv1.ApiextensionsV1Interface
builder *resource.Builder
}
// New creates a new Client, talking to the real k8s API.
func New(config []byte) (*Client, error) {
clientConfig, err := clientcmd.RESTConfigFromKubeConfig(config)
if err != nil {
return nil, fmt.Errorf("creating k8s client config from kubeconfig: %w", err)
}
clientset, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("creating k8s client from kubeconfig: %w", err)
}
apiextensionClient, err := apiextensionsclientv1.NewForConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("creating api extension client from kubeconfig: %w", err)
}
restClientGetter, err := newRESTClientGetter(config)
if err != nil {
return nil, fmt.Errorf("creating k8s RESTClientGetter from kubeconfig: %w", err)
}
builder := resource.NewBuilder(restClientGetter).Unstructured()
return &Client{clientset: clientset, apiextensionClient: apiextensionClient, builder: builder}, nil
}
// ApplyOneObject uses server-side apply to send unstructured JSON blobs to the server and let it handle the core logic.
func (c *Client) ApplyOneObject(info *resource.Info, forceConflicts bool) error {
// helper can be used to patch k8s resources using server-side-apply.
helper := resource.NewHelper(info.Client, info.Mapping).
WithFieldManager(fieldManager)
// server-side-apply uses unstructured JSON instead of strict typing on the client side.
data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, info.Object)
if err != nil {
return fmt.Errorf("preparing resource for server-side apply: encoding of resource: %w", err)
}
options := metav1.PatchOptions{
Force: &forceConflicts,
}
obj, err := helper.Patch(
info.Namespace,
info.Name,
types.ApplyPatchType,
data,
&options,
)
if err != nil {
return fmt.Errorf("applying object %v using server-side apply: %w", info, err)
}
return info.Refresh(obj, true)
}
// GetObjects tries to marshal the resources into []*resource.Info using a resource.Builder.
func (c *Client) GetObjects(resources kubernetesshared.Marshaler) ([]*resource.Info, error) {
// convert our resource struct into YAML
data, err := resources.Marshal()
if err != nil {
return nil, fmt.Errorf("converting resources to YAML: %w", err)
}
// read into resource.Info using builder
reader := bytes.NewReader(data)
result := c.builder.
ContinueOnError().
NamespaceParam("default").
DefaultNamespace().
Stream(reader, "yaml").
Flatten().
Do()
return result.Infos()
}
// CreateConfigMap creates the given ConfigMap.
func (c *Client) CreateConfigMap(ctx context.Context, configMap corev1.ConfigMap) error {
_, err := c.clientset.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Create(ctx, &configMap, metav1.CreateOptions{})
if err != nil {
return err
}
return nil
}
// ListAllNamespaces returns a list of all namespaces.
func (c *Client) ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error) {
return c.clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
}
// AddTolerationsToDeployment adds [K8s tolerations] to the deployment, identified
// by name and namespace.
//
// [K8s tolerations]: https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
func (c *Client) AddTolerationsToDeployment(ctx context.Context, tolerations []corev1.Toleration, name string, namespace string) error {
deployments := c.clientset.AppsV1().Deployments(namespace)
// retry resource update if an error occurs
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := deployments.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get Deployment to add toleration: %v", err)
}
result.Spec.Template.Spec.Tolerations = append(result.Spec.Template.Spec.Tolerations, tolerations...)
if _, err = deployments.Update(ctx, result, metav1.UpdateOptions{}); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
// AddNodeSelectorsToDeployment adds [K8s selectors] to the deployment, identified
// by name and namespace.
//
// [K8s selectors]: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
func (c *Client) AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error {
deployments := c.clientset.AppsV1().Deployments(namespace)
// retry resource update if an error occurs
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := deployments.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get Deployment to add node selector: %v", err)
}
for k, v := range selectors {
result.Spec.Template.Spec.NodeSelector[k] = v
}
if _, err = deployments.Update(ctx, result, metav1.UpdateOptions{}); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
// WaitForCRD waits for the given CRD to be established.
func (c *Client) WaitForCRD(ctx context.Context, crd string) error {
watcher, err := c.apiextensionClient.CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", crd),
})
if err != nil {
return err
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Added, watch.Modified:
crd := event.Object.(*apiextensionsv1.CustomResourceDefinition)
if crdHasCondition(crd.Status.Conditions, apiextensionsv1.Established) {
return nil
}
case watch.Deleted:
return fmt.Errorf("crd %q deleted", crd)
case watch.Error:
return fmt.Errorf("crd %q error: %v", crd, event.Object)
}
}
return fmt.Errorf("crd %q not established", crd)
}
func crdHasCondition(conditions []apiextensionsv1.CustomResourceDefinitionCondition, conditionType apiextensionsv1.CustomResourceDefinitionConditionType) bool {
for _, condition := range conditions {
if condition.Type == conditionType && condition.Status == apiextensionsv1.ConditionTrue {
return true
}
}
return false
}

View File

@ -1,478 +0,0 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package client
import (
"bytes"
"context"
"errors"
"io"
"net/http"
"testing"
"github.com/edgelesssys/constellation/v2/internal/kubernetes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/protobuf/proto"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8s "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
restfake "k8s.io/client-go/rest/fake"
"k8s.io/client-go/restmapper"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
var (
corev1GV = schema.GroupVersion{Version: "v1"}
nginxDeployment = &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "nginx",
},
Name: "my-nginx",
},
Spec: appsv1.DeploymentSpec{
Replicas: proto.Int32(3),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "nginx",
},
},
Template: k8s.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "nginx",
},
},
Spec: k8s.PodSpec{
Containers: []k8s.Container{
{
Name: "nginx",
Image: "nginx:1.14.2",
Ports: []k8s.ContainerPort{
{
ContainerPort: 80,
},
},
},
},
},
},
},
}
tolerationsDeployment = appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-deployment",
},
}
selectorsDeployment = appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-deployment",
},
Spec: appsv1.DeploymentSpec{
Template: k8s.PodTemplateSpec{
Spec: k8s.PodSpec{
NodeSelector: map[string]string{},
},
},
},
}
nginxDeplJSON, _ = marshalJSON(nginxDeployment)
nginxDeplYAML, _ = marshalYAML(nginxDeployment)
)
type unmarshableResource struct{}
func (*unmarshableResource) Marshal() ([]byte, error) {
return nil, errors.New("someErr")
}
func stringBody(body string) io.ReadCloser {
return io.NopCloser(bytes.NewReader([]byte(body)))
}
func fakeClientWith(t *testing.T, testName string, data map[string]string) resource.FakeClientFunc {
return func(version schema.GroupVersion) (resource.RESTClient, error) {
return &restfake.RESTClient{
GroupVersion: corev1GV,
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
Client: restfake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
p := req.URL.Path
q := req.URL.RawQuery
if len(q) != 0 {
p = p + "?" + q
}
body, ok := data[p]
if !ok {
t.Fatalf("%s: unexpected request: %s (%s)\n%#v", testName, p, req.URL, req)
}
header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON)
return &http.Response{
StatusCode: http.StatusOK,
Header: header,
Body: stringBody(body),
}, nil
}),
}, nil
}
}
func newClientWithFakes(t *testing.T, data map[string]string, objects ...runtime.Object) Client {
clientset := fake.NewSimpleClientset(objects...)
builder := resource.NewFakeBuilder(
fakeClientWith(t, "", data),
func() (meta.RESTMapper, error) {
return testrestmapper.TestOnlyStaticRESTMapper(scheme.Scheme), nil
},
func() (restmapper.CategoryExpander, error) {
return resource.FakeCategoryExpander, nil
}).
Unstructured()
client := Client{
clientset: clientset,
builder: builder,
}
return client
}
func failingClient() resource.FakeClientFunc {
return func(version schema.GroupVersion) (resource.RESTClient, error) {
return &restfake.RESTClient{
GroupVersion: corev1GV,
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
Resp: &http.Response{StatusCode: 501},
}, nil
}
}
func newFailingClient(objects ...runtime.Object) Client {
clientset := fake.NewSimpleClientset(objects...)
builder := resource.NewFakeBuilder(
failingClient(),
func() (meta.RESTMapper, error) {
return testrestmapper.TestOnlyStaticRESTMapper(scheme.Scheme), nil
},
func() (restmapper.CategoryExpander, error) {
return resource.FakeCategoryExpander, nil
}).
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...)
client := Client{
clientset: clientset,
builder: builder,
}
return client
}
func marshalJSON(obj runtime.Object) ([]byte, error) {
serializer := json.NewSerializer(json.DefaultMetaFactory, nil, nil, false)
var buf bytes.Buffer
if err := serializer.Encode(obj, &buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func marshalYAML(obj runtime.Object) ([]byte, error) {
serializer := json.NewYAMLSerializer(json.DefaultMetaFactory, nil, nil)
var buf bytes.Buffer
if err := serializer.Encode(obj, &buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func TestApplyOneObject(t *testing.T) {
testCases := map[string]struct {
httpResponseData map[string]string
wantObj runtime.Object
resourcesYAML string
failingClient bool
wantErr bool
}{
"apply works": {
httpResponseData: map[string]string{
"/deployments/my-nginx?fieldManager=constellation-bootstrapper&force=true": string(nginxDeplJSON),
},
wantObj: nginxDeployment,
resourcesYAML: string(nginxDeplYAML),
wantErr: false,
},
"apply fails": {
httpResponseData: map[string]string{},
wantObj: nginxDeployment,
resourcesYAML: string(nginxDeplYAML),
failingClient: true,
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
var client Client
if tc.failingClient {
client = newFailingClient(tc.wantObj)
} else {
client = newClientWithFakes(t, tc.httpResponseData, tc.wantObj)
}
reader := bytes.NewReader([]byte(tc.resourcesYAML))
res := client.builder.
ContinueOnError().
Stream(reader, "yaml").
Flatten().
Do()
assert.NoError(res.Err())
infos, err := res.Infos()
assert.NoError(err)
require.Len(infos, 1)
err = client.ApplyOneObject(infos[0], true)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
})
}
}
func TestGetObjects(t *testing.T) {
testCases := map[string]struct {
wantResources kubernetes.Marshaler
httpResponseData map[string]string
resourcesYAML string
wantErr bool
}{
"GetObjects Marshal failure detected": {
wantResources: &unmarshableResource{},
resourcesYAML: string(nginxDeplYAML),
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
client := newClientWithFakes(t, tc.httpResponseData)
infos, err := client.GetObjects(tc.wantResources)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
assert.NotNil(infos)
})
}
}
func TestAddTolerationsToDeployment(t *testing.T) {
testCases := map[string]struct {
namespace string
name string
tolerations []corev1.Toleration
wantErr bool
}{
"Success": {
namespace: "test-ns",
name: "test-deployment",
},
"Specifying non-existent deployment fails": {
namespace: "test-ns",
name: "wrong-name",
wantErr: true,
},
"Wrong namespace": {
name: "test-deployment",
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
client := newClientWithFakes(t, map[string]string{}, &tolerationsDeployment)
err := client.AddTolerationsToDeployment(context.Background(), tc.tolerations, tc.name, tc.namespace)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
})
}
}
func TestAddNodeSelectorsToDeployment(t *testing.T) {
testCases := map[string]struct {
namespace string
name string
selectors map[string]string
wantErr bool
}{
"Success": {
namespace: "test-ns",
name: "test-deployment",
selectors: map[string]string{"some-key": "some-value"},
},
"Specifying non-existent deployment fails": {
namespace: "test-ns",
name: "wrong-name",
wantErr: true,
},
"Wrong namespace": {
name: "test-deployment",
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
client := newClientWithFakes(t, map[string]string{}, &selectorsDeployment)
err := client.AddNodeSelectorsToDeployment(context.Background(), tc.selectors, tc.name, tc.namespace)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
})
}
}
func TestWaitForCRD(t *testing.T) {
testCases := map[string]struct {
crd string
events []watch.Event
watchErr error
wantErr bool
}{
"Success": {
crd: "test-crd",
events: []watch.Event{
{
Type: watch.Added,
Object: &apiextensionsv1.CustomResourceDefinition{
Status: apiextensionsv1.CustomResourceDefinitionStatus{
Conditions: []apiextensionsv1.CustomResourceDefinitionCondition{
{
Type: apiextensionsv1.Established,
Status: apiextensionsv1.ConditionTrue,
},
},
},
},
},
},
},
"watch error": {
crd: "test-crd",
watchErr: errors.New("watch error"),
wantErr: true,
},
"crd deleted": {
crd: "test-crd",
events: []watch.Event{{Type: watch.Deleted}},
wantErr: true,
},
"other error": {
crd: "test-crd",
events: []watch.Event{{Type: watch.Error}},
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
client := Client{
apiextensionClient: &stubCRDWatcher{events: tc.events, watchErr: tc.watchErr},
}
err := client.WaitForCRD(context.Background(), tc.crd)
if tc.wantErr {
assert.Error(err)
return
}
require.NoError(err)
})
}
}
type stubCRDWatcher struct {
events []watch.Event
watchErr error
apiextensionsclientv1.ApiextensionsV1Interface
}
func (w *stubCRDWatcher) CustomResourceDefinitions() apiextensionsclientv1.CustomResourceDefinitionInterface {
return &stubCustomResourceDefinitions{
events: w.events,
watchErr: w.watchErr,
}
}
type stubCustomResourceDefinitions struct {
events []watch.Event
watchErr error
apiextensionsclientv1.CustomResourceDefinitionInterface
}
func (c *stubCustomResourceDefinitions) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
eventChan := make(chan watch.Event, len(c.events))
for _, event := range c.events {
eventChan <- event
}
return &stubCRDWatch{events: eventChan}, c.watchErr
}
type stubCRDWatch struct {
events chan watch.Event
}
func (w *stubCRDWatch) Stop() {
close(w.events)
}
func (w *stubCRDWatch) ResultChan() <-chan watch.Event {
return w.events
}

View File

@ -1,17 +0,0 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package kubectl
import "github.com/edgelesssys/constellation/v2/bootstrapper/internal/kubernetes/k8sapi/kubectl/client"
// generator implements clientGenerator interface.
type generator struct{}
// NewClients generates a new client implementing the Client interface.
func (generator) NewClient(kubeconfig []byte) (Client, error) {
return client.New(kubeconfig)
}

View File

@ -8,97 +8,68 @@ package kubectl
import (
"context"
"errors"
"fmt"
"github.com/edgelesssys/constellation/v2/internal/kubernetes"
corev1 "k8s.io/api/core/v1"
apiextensionsclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
)
// ErrKubeconfigNotSet is the error value returned by Kubectl.Apply when SetKubeconfig was not called first.
var ErrKubeconfigNotSet = errors.New("kubeconfig not set")
// Client wraps marshable k8s resources into resource.Info fields and applies them in a cluster.
type Client interface {
// ApplyOneObject applies a k8s resource similar to kubectl apply.
ApplyOneObject(info *resource.Info, forceConflicts bool) error
// GetObjects converts resources into prepared info fields for use in ApplyOneObject.
GetObjects(resources kubernetes.Marshaler) ([]*resource.Info, error)
CreateConfigMap(ctx context.Context, configMap corev1.ConfigMap) error
AddTolerationsToDeployment(ctx context.Context, tolerations []corev1.Toleration, name string, namespace string) error
AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error
// WaitForCRD waits for the given CRD to be established.
WaitForCRD(ctx context.Context, crd string) error
ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error)
}
// clientGenerator can generate new clients from a kubeconfig.
type clientGenerator interface {
NewClient(kubeconfig []byte) (Client, error)
}
// Kubectl implements kubernetes.Apply interface and acts like the Kubernetes "kubectl" tool.
// Kubectl implements functionality of the Kubernetes "kubectl" tool.
type Kubectl struct {
clientGenerator
kubeconfig []byte
kubernetes.Interface
apiextensionClient apiextensionsclientv1.ApiextensionsV1Interface
builder *resource.Builder
}
// New creates a new kubectl using the real clientGenerator.
// New returns an empty Kubectl client. Need to call Initialize before usable.
func New() *Kubectl {
return &Kubectl{
clientGenerator: &generator{},
}
return &Kubectl{}
}
// Apply will apply the given resources using server-side-apply.
func (k *Kubectl) Apply(resources kubernetes.Marshaler, forceConflicts bool) error {
if k.kubeconfig == nil {
return ErrKubeconfigNotSet
}
client, err := k.clientGenerator.NewClient(k.kubeconfig)
// Initialize sets sets all required fields so the Kubectl client can be used.
func (k *Kubectl) Initialize(kubeconfig []byte) error {
clientConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeconfig)
if err != nil {
return err
return fmt.Errorf("creating k8s client config from kubeconfig: %w", err)
}
// convert marshaler object into []*resource.info
infos, err := client.GetObjects(resources)
clientset, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return err
return fmt.Errorf("creating k8s client from kubeconfig: %w", err)
}
k.Interface = clientset
// apply each object, one by one
for i, resource := range infos {
if err := client.ApplyOneObject(resource, forceConflicts); err != nil {
return fmt.Errorf("kubectl apply of object %v/%v: %w", i+1, len(infos), err)
}
apiextensionClient, err := apiextensionsclientv1.NewForConfig(clientConfig)
if err != nil {
return fmt.Errorf("creating api extension client from kubeconfig: %w", err)
}
k.apiextensionClient = apiextensionClient
restClientGetter, err := newRESTClientGetter(kubeconfig)
if err != nil {
return fmt.Errorf("creating k8s RESTClientGetter from kubeconfig: %w", err)
}
k.builder = resource.NewBuilder(restClientGetter).Unstructured()
return nil
}
// SetKubeconfig will store the kubeconfig to generate Clients using the clientGenerator later.
func (k *Kubectl) SetKubeconfig(kubeconfig []byte) {
k.kubeconfig = kubeconfig
}
// CreateConfigMap creates the provided configmap.
func (k *Kubectl) CreateConfigMap(ctx context.Context, configMap corev1.ConfigMap) error {
client, err := k.clientGenerator.NewClient(k.kubeconfig)
_, err := k.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Create(ctx, &configMap, metav1.CreateOptions{})
if err != nil {
return err
}
return client.CreateConfigMap(ctx, configMap)
return nil
}
// ListAllNamespaces returns all namespaces in the cluster.
func (k *Kubectl) ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error) {
client, err := k.clientGenerator.NewClient(k.kubeconfig)
if err != nil {
return nil, err
}
return client.ListAllNamespaces(ctx)
return k.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
}
// AddTolerationsToDeployment adds [K8s tolerations] to the deployment, identified
@ -106,15 +77,24 @@ func (k *Kubectl) ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList,
//
// [K8s tolerations]: https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
func (k *Kubectl) AddTolerationsToDeployment(ctx context.Context, tolerations []corev1.Toleration, name string, namespace string) error {
client, err := k.clientGenerator.NewClient(k.kubeconfig)
deployments := k.AppsV1().Deployments(namespace)
// retry resource update if an error occurs
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := deployments.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get Deployment to add toleration: %w", err)
}
result.Spec.Template.Spec.Tolerations = append(result.Spec.Template.Spec.Tolerations, tolerations...)
if _, err = deployments.Update(ctx, result, metav1.UpdateOptions{}); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
if err = client.AddTolerationsToDeployment(ctx, tolerations, name, namespace); err != nil {
return err
}
return nil
}
@ -123,30 +103,26 @@ func (k *Kubectl) AddTolerationsToDeployment(ctx context.Context, tolerations []
//
// [K8s selectors]: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
func (k *Kubectl) AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error {
client, err := k.clientGenerator.NewClient(k.kubeconfig)
if err != nil {
return err
}
deployments := k.AppsV1().Deployments(namespace)
if err = client.AddNodeSelectorsToDeployment(ctx, selectors, name, namespace); err != nil {
return err
}
return nil
}
// WaitForCRDs waits for a list of CRDs to be established.
func (k *Kubectl) WaitForCRDs(ctx context.Context, crds []string) error {
client, err := k.clientGenerator.NewClient(k.kubeconfig)
if err != nil {
return err
}
for _, crd := range crds {
err = client.WaitForCRD(ctx, crd)
// retry resource update if an error occurs
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := deployments.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get Deployment to add node selector: %w", err)
}
for k, v := range selectors {
result.Spec.Template.Spec.NodeSelector[k] = v
}
if _, err = deployments.Update(ctx, result, metav1.UpdateOptions{}); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}

View File

@ -1,155 +0,0 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package kubectl
import (
"context"
"errors"
"testing"
"github.com/edgelesssys/constellation/v2/internal/kubernetes"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
corev1 "k8s.io/api/core/v1"
"k8s.io/cli-runtime/pkg/resource"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
type stubClient struct {
applyOneObjectErr error
getObjectsInfos []*resource.Info
getObjectsErr error
createConfigMapErr error
addTolerationsToDeploymentErr error
addNodeSelectorToDeploymentErr error
waitForCRDErr error
listAllNamespacesResp *corev1.NamespaceList
listAllNamespacesErr error
}
func (s *stubClient) ApplyOneObject(info *resource.Info, forceConflicts bool) error {
return s.applyOneObjectErr
}
func (s *stubClient) GetObjects(resources kubernetes.Marshaler) ([]*resource.Info, error) {
return s.getObjectsInfos, s.getObjectsErr
}
func (s *stubClient) CreateConfigMap(ctx context.Context, configMap corev1.ConfigMap) error {
return s.createConfigMapErr
}
func (s *stubClient) AddTolerationsToDeployment(ctx context.Context, tolerations []corev1.Toleration, name string, namespace string) error {
return s.addTolerationsToDeploymentErr
}
func (s *stubClient) AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error {
return s.addNodeSelectorToDeploymentErr
}
func (s *stubClient) ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error) {
return s.listAllNamespacesResp, s.listAllNamespacesErr
}
type stubClientGenerator struct {
applyOneObjectErr error
getObjectsInfos []*resource.Info
getObjectsErr error
newClientErr error
createConfigMapErr error
addTolerationsToDeploymentErr error
addNodeSelectorToDeploymentErr error
waitForCRDErr error
}
func (s *stubClient) WaitForCRD(ctx context.Context, crd string) error {
return s.waitForCRDErr
}
func (s *stubClientGenerator) NewClient(kubeconfig []byte) (Client, error) {
return &stubClient{
applyOneObjectErr: s.applyOneObjectErr,
getObjectsInfos: s.getObjectsInfos,
getObjectsErr: s.getObjectsErr,
createConfigMapErr: s.createConfigMapErr,
addTolerationsToDeploymentErr: s.addTolerationsToDeploymentErr,
addNodeSelectorToDeploymentErr: s.addNodeSelectorToDeploymentErr,
waitForCRDErr: s.waitForCRDErr,
}, s.newClientErr
}
type dummyResource struct{}
func (*dummyResource) Marshal() ([]byte, error) {
panic("dummy")
}
func TestApplyWorks(t *testing.T) {
assert := assert.New(t)
kube := Kubectl{
clientGenerator: &stubClientGenerator{
getObjectsInfos: []*resource.Info{
{},
},
},
}
kube.SetKubeconfig([]byte("someConfig"))
assert.NoError(kube.Apply(&dummyResource{}, true))
}
func TestKubeconfigUnset(t *testing.T) {
assert := assert.New(t)
kube := Kubectl{}
assert.ErrorIs(kube.Apply(&dummyResource{}, true), ErrKubeconfigNotSet)
}
func TestClientGeneratorFails(t *testing.T) {
assert := assert.New(t)
err := errors.New("generator failed")
kube := Kubectl{
clientGenerator: &stubClientGenerator{
newClientErr: err,
},
}
kube.SetKubeconfig([]byte("someConfig"))
assert.ErrorIs(kube.Apply(&dummyResource{}, true), err)
}
func TestGetObjectsFails(t *testing.T) {
assert := assert.New(t)
err := errors.New("getObjects failed")
kube := Kubectl{
clientGenerator: &stubClientGenerator{
getObjectsErr: err,
},
}
kube.SetKubeconfig([]byte("someConfig"))
assert.ErrorIs(kube.Apply(&dummyResource{}, true), err)
}
func TestApplyOneObjectFails(t *testing.T) {
assert := assert.New(t)
err := errors.New("applyOneObject failed")
kube := Kubectl{
clientGenerator: &stubClientGenerator{
getObjectsInfos: []*resource.Info{
{},
},
applyOneObjectErr: err,
},
}
kube.SetKubeconfig([]byte("someConfig"))
assert.ErrorIs(kube.Apply(&dummyResource{}, true), err)
}

View File

@ -4,7 +4,7 @@ Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package client
package kubectl
import (
"k8s.io/apimachinery/pkg/api/meta"

View File

@ -4,7 +4,7 @@ Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package client
package kubectl
import (
"errors"

View File

@ -1,184 +0,0 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package resources
import (
"github.com/edgelesssys/constellation/v2/internal/kubernetes"
"github.com/edgelesssys/constellation/v2/internal/versions"
apps "k8s.io/api/apps/v1"
k8s "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// GCPGuestAgentDaemonset is a GCP Guest Agent Daemonset.
type GCPGuestAgentDaemonset struct {
DaemonSet apps.DaemonSet
}
// NewGCPGuestAgentDaemonset creates a new GCP Guest Agent Daemonset.
// It is used automatically to add loadbalancer IPs to the local routing table of GCP instances.
func NewGCPGuestAgentDaemonset() *GCPGuestAgentDaemonset {
return &GCPGuestAgentDaemonset{
DaemonSet: apps.DaemonSet{
TypeMeta: meta.TypeMeta{
APIVersion: "apps/v1",
Kind: "DaemonSet",
},
ObjectMeta: meta.ObjectMeta{
Name: "gcp-guest-agent",
Namespace: "kube-system",
Labels: map[string]string{
"k8s-app": "gcp-guest-agent",
"component": "gcp-guest-agent",
"kubernetes.io/cluster-service": "true",
},
},
Spec: apps.DaemonSetSpec{
Selector: &meta.LabelSelector{
MatchLabels: map[string]string{
"k8s-app": "gcp-guest-agent",
},
},
Template: k8s.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: map[string]string{
"k8s-app": "gcp-guest-agent",
},
},
Spec: k8s.PodSpec{
PriorityClassName: "system-cluster-critical",
Tolerations: []k8s.Toleration{
{
Key: "node-role.kubernetes.io/master",
Operator: k8s.TolerationOpExists,
Effect: k8s.TaintEffectNoSchedule,
},
{
Key: "node-role.kubernetes.io/control-plane",
Operator: k8s.TolerationOpExists,
Effect: k8s.TaintEffectNoSchedule,
},
},
Containers: []k8s.Container{
{
Name: "gcp-guest-agent",
Image: versions.GcpGuestImage,
SecurityContext: &k8s.SecurityContext{
Privileged: func(b bool) *bool { return &b }(true),
Capabilities: &k8s.Capabilities{
Add: []k8s.Capability{"NET_ADMIN"},
},
},
VolumeMounts: []k8s.VolumeMount{
{
Name: "etcssl",
ReadOnly: true,
MountPath: "/etc/ssl",
},
{
Name: "etcpki",
ReadOnly: true,
MountPath: "/etc/pki",
},
{
Name: "bin",
ReadOnly: true,
MountPath: "/bin",
},
{
Name: "usrbin",
ReadOnly: true,
MountPath: "/usr/bin",
},
{
Name: "usr",
ReadOnly: true,
MountPath: "/usr",
},
{
Name: "lib",
ReadOnly: true,
MountPath: "/lib",
},
{
Name: "lib64",
ReadOnly: true,
MountPath: "/lib64",
},
},
},
},
Volumes: []k8s.Volume{
{
Name: "etcssl",
VolumeSource: k8s.VolumeSource{
HostPath: &k8s.HostPathVolumeSource{
Path: "/etc/ssl",
},
},
},
{
Name: "etcpki",
VolumeSource: k8s.VolumeSource{
HostPath: &k8s.HostPathVolumeSource{
Path: "/etc/pki",
},
},
},
{
Name: "bin",
VolumeSource: k8s.VolumeSource{
HostPath: &k8s.HostPathVolumeSource{
Path: "/bin",
},
},
},
{
Name: "usrbin",
VolumeSource: k8s.VolumeSource{
HostPath: &k8s.HostPathVolumeSource{
Path: "/usr/bin",
},
},
},
{
Name: "usr",
VolumeSource: k8s.VolumeSource{
HostPath: &k8s.HostPathVolumeSource{
Path: "/usr",
},
},
},
{
Name: "lib",
VolumeSource: k8s.VolumeSource{
HostPath: &k8s.HostPathVolumeSource{
Path: "/lib",
},
},
},
{
Name: "lib64",
VolumeSource: k8s.VolumeSource{
HostPath: &k8s.HostPathVolumeSource{
Path: "/lib64",
},
},
},
},
HostNetwork: true,
},
},
},
},
}
}
// Marshal marshals the gcp guest agent deployment as YAML documents.
func (c *GCPGuestAgentDaemonset) Marshal() ([]byte, error) {
return kubernetes.MarshalK8SResources(c)
}

View File

@ -152,7 +152,10 @@ func (k *KubeWrapper) InitCluster(
if err != nil {
return nil, fmt.Errorf("reading kubeconfig after cluster initialization: %w", err)
}
k.client.SetKubeconfig(kubeConfig)
err = k.client.Initialize(kubeConfig)
if err != nil {
return nil, fmt.Errorf("initializing kubectl client: %w", err)
}
waitCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
@ -324,8 +327,6 @@ func (k *KubeWrapper) setupK8sVersionConfigMap(ctx context.Context, k8sVersion v
},
}
// We do not use the client's Apply method here since we are handling a kubernetes-native type.
// These types don't implement our custom Marshaler interface.
if err := k.client.CreateConfigMap(ctx, config); err != nil {
return fmt.Errorf("apply in KubeWrapper.setupK8sVersionConfigMap(..) failed with: %w", err)
}

View File

@ -502,7 +502,6 @@ func (s *stubConfigProvider) JoinConfiguration(_ bool) k8sapi.KubeadmJoinYAML {
}
type stubKubectl struct {
applyErr error
createConfigMapErr error
addTolerationsToDeploymentErr error
addTNodeSelectorsToDeploymentErr error
@ -510,17 +509,10 @@ type stubKubectl struct {
listAllNamespacesErr error
listAllNamespacesResp *corev1.NamespaceList
resources []kubernetes.Marshaler
kubeconfigs [][]byte
}
func (s *stubKubectl) Apply(resources kubernetes.Marshaler, forceConflicts bool) error {
s.resources = append(s.resources, resources)
return s.applyErr
}
func (s *stubKubectl) SetKubeconfig(kubeconfig []byte) {
s.kubeconfigs = append(s.kubeconfigs, kubeconfig)
func (s *stubKubectl) Initialize(kubeconfig []byte) error {
return nil
}
func (s *stubKubectl) CreateConfigMap(ctx context.Context, configMap corev1.ConfigMap) error {