constellation/coordinator/kubernetes/kubernetes.go

191 lines
7.2 KiB
Go
Raw Normal View History

package kubernetes
import (
"context"
"fmt"
"strings"
"time"
"github.com/edgelesssys/constellation/coordinator/kubernetes/k8sapi"
"github.com/edgelesssys/constellation/coordinator/kubernetes/k8sapi/resources"
"github.com/edgelesssys/constellation/coordinator/role"
"github.com/spf13/afero"
kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
)
// k8s pod network cidr. This value was chosen to match the default flannel pod network.
const (
podNetworkCidr = "10.244.0.0/16"
serviceCidr = "10.245.0.0/24"
)
// configReader provides kubeconfig as []byte.
type configReader interface {
ReadKubeconfig() ([]byte, error)
}
// configurationProvider provides kubeadm init and join configuration.
type configurationProvider interface {
InitConfiguration(externalCloudProvider bool) k8sapi.KubeadmInitYAML
JoinConfiguration(externalCloudProvider bool) k8sapi.KubeadmJoinYAML
}
// KubeWrapper implements ClusterWrapper interface.
type KubeWrapper struct {
clusterUtil k8sapi.ClusterUtil
configProvider configurationProvider
client k8sapi.Client
kubeconfigReader configReader
}
// New creates a new KubeWrapper with real values.
func New(clusterUtil k8sapi.ClusterUtil, configProvider configurationProvider, client k8sapi.Client) *KubeWrapper {
return &KubeWrapper{
clusterUtil: clusterUtil,
configProvider: configProvider,
client: client,
kubeconfigReader: &KubeconfigReader{fs: afero.Afero{Fs: afero.NewOsFs()}},
}
}
2022-04-26 05:22:21 -04:00
// InitCluster initializes a new Kubernetes cluster and applies pod network provider.
func (k *KubeWrapper) InitCluster(in InitClusterInput) error {
// TODO: k8s version should be user input
if err := k.clusterUtil.InstallComponents(context.TODO(), "1.23.6"); err != nil {
return err
}
initConfig := k.configProvider.InitConfiguration(in.SupportsCloudControllerManager)
initConfig.SetApiServerAdvertiseAddress(in.APIServerAdvertiseIP)
initConfig.SetNodeIP(in.NodeIP)
initConfig.SetNodeName(in.NodeName)
initConfig.SetPodNetworkCIDR(podNetworkCidr)
initConfig.SetServiceCIDR(serviceCidr)
initConfig.SetProviderID(in.ProviderID)
initConfigYAML, err := initConfig.Marshal()
if err != nil {
return fmt.Errorf("encoding kubeadm init configuration as YAML failed: %w", err)
}
if err := k.clusterUtil.InitCluster(initConfigYAML); err != nil {
return fmt.Errorf("kubeadm init failed: %w", err)
}
kubeConfig, err := k.GetKubeconfig()
if err != nil {
return fmt.Errorf("reading kubeconfig after cluster initialization failed: %w", err)
}
k.client.SetKubeconfig(kubeConfig)
flannel := resources.NewDefaultFlannelDeployment()
if err = k.clusterUtil.SetupPodNetwork(k.client, flannel); err != nil {
return fmt.Errorf("setup of pod network failed: %w", err)
}
kms := resources.NewKMSDeployment(in.MasterSecret)
if err = k.clusterUtil.SetupKMS(k.client, kms); err != nil {
return fmt.Errorf("setup of kms failed: %w", err)
}
if in.SupportsCloudControllerManager {
cloudControllerManagerConfiguration := resources.NewDefaultCloudControllerManagerDeployment(
in.CloudControllerManagerName, in.CloudControllerManagerImage, in.CloudControllerManagerPath, in.CloudControllerManagerExtraArgs,
in.CloudControllerManagerVolumes, in.CloudControllerManagerVolumeMounts, in.CloudControllerManagerEnv,
)
if err := k.clusterUtil.SetupCloudControllerManager(k.client, cloudControllerManagerConfiguration, in.CloudControllerManagerConfigMaps, in.CloudControllerManagerSecrets); err != nil {
return fmt.Errorf("failed to setup cloud-controller-manager: %w", err)
}
}
2022-03-25 05:49:18 -04:00
if in.SupportsCloudNodeManager {
cloudNodeManagerConfiguration := resources.NewDefaultCloudNodeManagerDeployment(
in.CloudNodeManagerImage, in.CloudNodeManagerPath, in.CloudNodeManagerExtraArgs,
)
if err := k.clusterUtil.SetupCloudNodeManager(k.client, cloudNodeManagerConfiguration); err != nil {
return fmt.Errorf("failed to setup cloud-node-manager: %w", err)
2022-03-25 05:49:18 -04:00
}
}
if in.SupportClusterAutoscaler {
clusterAutoscalerConfiguration := resources.NewDefaultAutoscalerDeployment(in.AutoscalingVolumes, in.AutoscalingVolumeMounts, in.AutoscalingEnv)
clusterAutoscalerConfiguration.SetAutoscalerCommand(in.AutoscalingCloudprovider, in.AutoscalingNodeGroups)
if err := k.clusterUtil.SetupAutoscaling(k.client, clusterAutoscalerConfiguration, in.AutoscalingSecrets); err != nil {
return fmt.Errorf("failed to setup cluster-autoscaler: %w", err)
}
}
return nil
}
2022-04-26 05:22:21 -04:00
// JoinCluster joins existing Kubernetes cluster.
func (k *KubeWrapper) JoinCluster(args *kubeadm.BootstrapTokenDiscovery, nodeName, nodeInternalIP, nodeVPNIP, providerID, certKey string, ccmSupported bool, peerRole role.Role) error {
// TODO: k8s version should be user input
if err := k.clusterUtil.InstallComponents(context.TODO(), "1.23.6"); err != nil {
return err
}
joinConfig := k.configProvider.JoinConfiguration(ccmSupported)
joinConfig.SetApiServerEndpoint(args.APIServerEndpoint)
joinConfig.SetToken(args.Token)
joinConfig.AppendDiscoveryTokenCaCertHash(args.CACertHashes[0])
joinConfig.SetNodeIP(nodeInternalIP)
joinConfig.SetNodeName(nodeName)
joinConfig.SetProviderID(providerID)
if peerRole == role.Coordinator {
joinConfig.SetControlPlane(nodeVPNIP, certKey)
}
joinConfigYAML, err := joinConfig.Marshal()
if err != nil {
return fmt.Errorf("encoding kubeadm join configuration as YAML failed: %w", err)
}
if err := k.clusterUtil.JoinCluster(joinConfigYAML); err != nil {
return fmt.Errorf("joining cluster failed: %v %w ", string(joinConfigYAML), err)
}
return nil
}
// GetKubeconfig returns the current nodes kubeconfig of stored on disk.
func (k *KubeWrapper) GetKubeconfig() ([]byte, error) {
kubeconf, err := k.kubeconfigReader.ReadKubeconfig()
if err != nil {
return nil, err
}
// replace the cluster.Server endpoint (127.0.0.1:16443) in admin.conf with the first coordinator endpoint (10.118.0.1:6443)
// kube-api server listens on 10.118.0.1:6443
// 127.0.0.1:16443 is the high availability balancer nginx endpoint, runnining localy on all nodes
// alternatively one could also start a local high availability balancer.
return []byte(strings.ReplaceAll(string(kubeconf), "127.0.0.1:16443", "10.118.0.1:6443")), nil
}
// GetKubeadmCertificateKey return the key needed to join the Cluster as Control-Plane (has to be executed on a control-plane; errors otherwise).
func (k *KubeWrapper) GetKubeadmCertificateKey() (string, error) {
return k.clusterUtil.GetControlPlaneJoinCertificateKey()
}
// GetJoinToken returns a bootstrap (join) token.
func (k *KubeWrapper) GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscovery, error) {
return k.clusterUtil.CreateJoinToken(ttl)
}
// StartKubelet starts the kubelet service.
func (k *KubeWrapper) StartKubelet() error {
return k.clusterUtil.StartKubelet()
}
type fakeK8SClient struct {
kubeconfig []byte
}
// NewFakeK8SClient creates a new, fake k8s client where apply always works.
func NewFakeK8SClient([]byte) (k8sapi.Client, error) {
return &fakeK8SClient{}, nil
}
2022-04-26 05:22:21 -04:00
// Apply fakes applying Kubernetes resources.
func (f *fakeK8SClient) Apply(resources resources.Marshaler, forceConflicts bool) error {
return nil
}
// SetKubeconfig stores the kubeconfig given to it.
func (f *fakeK8SClient) SetKubeconfig(kubeconfig []byte) {
f.kubeconfig = kubeconfig
}