mirror of
synced 2025-02-08 11:05:31 -05:00
193 lines
6.2 KiB
193 lines
6.2 KiB
package client
import (
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
const fieldManager = "constellation-bootstrapper"
// Client implements k8sapi.Client interface and talks to the Kubernetes API.
type Client struct {
clientset kubernetes.Interface
apiextensionClient apiextensionsclientv1.ApiextensionsV1Interface
builder *resource.Builder
// New creates a new Client, talking to the real k8s API.
func New(config []byte) (*Client, error) {
clientConfig, err := clientcmd.RESTConfigFromKubeConfig(config)
if err != nil {
return nil, fmt.Errorf("creating k8s client config from kubeconfig: %w", err)
clientset, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("creating k8s client from kubeconfig: %w", err)
apiextensionClient, err := apiextensionsclientv1.NewForConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("creating api extension client from kubeconfig: %w", err)
restClientGetter, err := newRESTClientGetter(config)
if err != nil {
return nil, fmt.Errorf("creating k8s RESTClientGetter from kubeconfig: %w", err)
builder := resource.NewBuilder(restClientGetter).Unstructured()
return &Client{clientset: clientset, apiextensionClient: apiextensionClient, builder: builder}, nil
// ApplyOneObject uses server-side apply to send unstructured JSON blobs to the server and let it handle the core logic.
func (c *Client) ApplyOneObject(info *resource.Info, forceConflicts bool) error {
// helper can be used to patch k8s resources using server-side-apply.
helper := resource.NewHelper(info.Client, info.Mapping).
// server-side-apply uses unstructured JSON instead of strict typing on the client side.
data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, info.Object)
if err != nil {
return fmt.Errorf("preparing resource for server-side apply: encoding of resource: %w", err)
options := metav1.PatchOptions{
Force: &forceConflicts,
obj, err := helper.Patch(
if err != nil {
return fmt.Errorf("applying object %v using server-side apply: %w", info, err)
return info.Refresh(obj, true)
// GetObjects tries to marshal the resources into []*resource.Info using a resource.Builder.
func (c *Client) GetObjects(resources resources.Marshaler) ([]*resource.Info, error) {
// convert our resource struct into YAML
data, err := resources.Marshal()
if err != nil {
return nil, fmt.Errorf("converting resources to YAML: %w", err)
// read into resource.Info using builder
reader := bytes.NewReader(data)
result := c.builder.
Stream(reader, "yaml").
return result.Infos()
// CreateConfigMap creates the given ConfigMap.
func (c *Client) CreateConfigMap(ctx context.Context, configMap corev1.ConfigMap) error {
_, err := c.clientset.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Create(ctx, &configMap, metav1.CreateOptions{})
if err != nil {
return err
return nil
func (c *Client) AddTolerationsToDeployment(ctx context.Context, tolerations []corev1.Toleration, name string, namespace string) error {
deployments := c.clientset.AppsV1().Deployments(namespace)
// retry resource update if an error occurs
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := deployments.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get Deployment to add toleration: %v", err)
result.Spec.Template.Spec.Tolerations = append(result.Spec.Template.Spec.Tolerations, tolerations...)
if _, err = deployments.Update(ctx, result, metav1.UpdateOptions{}); err != nil {
return err
return nil
if err != nil {
return err
return nil
func (c *Client) AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error {
deployments := c.clientset.AppsV1().Deployments(namespace)
// retry resource update if an error occurs
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := deployments.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get Deployment to add node selector: %v", err)
for k, v := range selectors {
result.Spec.Template.Spec.NodeSelector[k] = v
if _, err = deployments.Update(ctx, result, metav1.UpdateOptions{}); err != nil {
return err
return nil
if err != nil {
return err
return nil
// WaitForCRD waits for the given CRD to be established.
func (c *Client) WaitForCRD(ctx context.Context, crd string) error {
watcher, err := c.apiextensionClient.CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", crd),
if err != nil {
return err
defer watcher.Stop()
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Added, watch.Modified:
crd := event.Object.(*apiextensionsv1.CustomResourceDefinition)
if crdHasCondition(crd.Status.Conditions, apiextensionsv1.Established) {
return nil
case watch.Deleted:
return fmt.Errorf("crd %q deleted", crd)
case watch.Error:
return fmt.Errorf("crd %q error: %v", crd, event.Object)
return fmt.Errorf("crd %q not established", crd)
func crdHasCondition(conditions []apiextensionsv1.CustomResourceDefinitionCondition, conditionType apiextensionsv1.CustomResourceDefinitionConditionType) bool {
for _, condition := range conditions {
if condition.Type == conditionType && condition.Status == apiextensionsv1.ConditionTrue {
return true
return false