Correctly deploy the AWS CCM (#1853)

* aws: stop using the imds api for tags

* aws: disable tags in imds api

* aws: only tag instances with non-lecagy tag

* bootstrapper: always let coredns run before cilium

* debugd: make debugd less noisy

* fixup fix aws imds test

* fixup unsued context

* move getting instance id to readInstanceTag
This commit is contained in:
3u13r 2023-06-13 09:58:39 +02:00 committed by GitHub
parent 4f63481b7d
commit a2c98eb1d5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 687 additions and 163 deletions

View file

@ -98,11 +98,33 @@ func (h *Client) InstallCilium(ctx context.Context, kubectl k8sapi.Client, relea
h.ReleaseName = release.ReleaseName h.ReleaseName = release.ReleaseName
h.Wait = release.Wait h.Wait = release.Wait
timeoutS := int64(10)
// allow coredns to run on uninitialized nodes (required by cloud-controller-manager)
tolerations := []corev1.Toleration{
{
Key: "node.cloudprovider.kubernetes.io/uninitialized",
Value: "true",
Effect: corev1.TaintEffectNoSchedule,
},
{
Key: "node.kubernetes.io/unreachable",
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoExecute,
TolerationSeconds: &timeoutS,
},
}
if err := kubectl.AddTolerationsToDeployment(ctx, tolerations, "coredns", "kube-system"); err != nil {
return fmt.Errorf("failed to add tolerations to coredns deployment: %w", err)
}
if err := kubectl.EnforceCoreDNSSpread(ctx); err != nil {
return fmt.Errorf("failed to enforce CoreDNS spread: %w", err)
}
switch in.CloudProvider { switch in.CloudProvider {
case "aws", "azure", "openstack", "qemu": case "aws", "azure", "openstack", "qemu":
return h.installCiliumGeneric(ctx, release, in.LoadBalancerEndpoint) return h.installCiliumGeneric(ctx, release, in.LoadBalancerEndpoint)
case "gcp": case "gcp":
return h.installCiliumGCP(ctx, kubectl, release, in.NodeName, in.FirstNodePodCIDR, in.SubnetworkPodCIDR, in.LoadBalancerEndpoint) return h.installCiliumGCP(ctx, release, in.NodeName, in.FirstNodePodCIDR, in.SubnetworkPodCIDR, in.LoadBalancerEndpoint)
default: default:
return fmt.Errorf("unsupported cloud provider %q", in.CloudProvider) return fmt.Errorf("unsupported cloud provider %q", in.CloudProvider)
} }
@ -119,38 +141,13 @@ func (h *Client) installCiliumGeneric(ctx context.Context, release helm.Release,
return h.install(ctx, release.Chart, release.Values) return h.install(ctx, release.Chart, release.Values)
} }
func (h *Client) installCiliumGCP(ctx context.Context, kubectl k8sapi.Client, release helm.Release, nodeName, nodePodCIDR, subnetworkPodCIDR, kubeAPIEndpoint string) error { func (h *Client) installCiliumGCP(ctx context.Context, release helm.Release, nodeName, nodePodCIDR, subnetworkPodCIDR, kubeAPIEndpoint string) error {
out, err := exec.CommandContext(ctx, constants.KubectlPath, "--kubeconfig", constants.ControlPlaneAdminConfFilename, "patch", "node", nodeName, "-p", "{\"spec\":{\"podCIDR\": \""+nodePodCIDR+"\"}}").CombinedOutput() out, err := exec.CommandContext(ctx, constants.KubectlPath, "--kubeconfig", constants.ControlPlaneAdminConfFilename, "patch", "node", nodeName, "-p", "{\"spec\":{\"podCIDR\": \""+nodePodCIDR+"\"}}").CombinedOutput()
if err != nil { if err != nil {
err = errors.New(string(out)) err = errors.New(string(out))
return err return err
} }
timeoutS := int64(10)
// allow coredns to run on uninitialized nodes (required by cloud-controller-manager)
tolerations := []corev1.Toleration{
{
Key: "node.cloudprovider.kubernetes.io/uninitialized",
Value: "true",
Effect: corev1.TaintEffectNoSchedule,
},
{
Key: "node.kubernetes.io/unreachable",
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoExecute,
TolerationSeconds: &timeoutS,
},
}
if err = kubectl.AddTolerationsToDeployment(ctx, tolerations, "coredns", "kube-system"); err != nil {
return err
}
selectors := map[string]string{
"node-role.kubernetes.io/control-plane": "",
}
if err = kubectl.AddNodeSelectorsToDeployment(ctx, selectors, "coredns", "kube-system"); err != nil {
return err
}
host, port, err := net.SplitHostPort(kubeAPIEndpoint) host, port, err := net.SplitHostPort(kubeAPIEndpoint)
if err != nil { if err != nil {
return err return err

View file

@ -54,6 +54,7 @@ type Client interface {
AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error AddNodeSelectorsToDeployment(ctx context.Context, selectors map[string]string, name string, namespace string) error
ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error) ListAllNamespaces(ctx context.Context) (*corev1.NamespaceList, error)
AnnotateNode(ctx context.Context, nodeName, annotationKey, annotationValue string) error AnnotateNode(ctx context.Context, nodeName, annotationKey, annotationValue string) error
EnforceCoreDNSSpread(ctx context.Context) error
} }
type componentsInstaller interface { type componentsInstaller interface {

View file

@ -125,7 +125,8 @@ func (k *KubeWrapper) InitCluster(
// Step 2: configure kubeadm init config // Step 2: configure kubeadm init config
ccmSupported := cloudprovider.FromString(k.cloudProvider) == cloudprovider.Azure || ccmSupported := cloudprovider.FromString(k.cloudProvider) == cloudprovider.Azure ||
cloudprovider.FromString(k.cloudProvider) == cloudprovider.GCP cloudprovider.FromString(k.cloudProvider) == cloudprovider.GCP ||
cloudprovider.FromString(k.cloudProvider) == cloudprovider.AWS
initConfig := k.configProvider.InitConfiguration(ccmSupported, versionString) initConfig := k.configProvider.InitConfiguration(ccmSupported, versionString)
initConfig.SetNodeIP(nodeIP) initConfig.SetNodeIP(nodeIP)
initConfig.SetClusterName(clusterName) initConfig.SetClusterName(clusterName)

View file

@ -544,6 +544,7 @@ type stubKubectl struct {
waitForCRDsErr error waitForCRDsErr error
listAllNamespacesErr error listAllNamespacesErr error
annotateNodeErr error annotateNodeErr error
enforceCoreDNSSpreadErr error
listAllNamespacesResp *corev1.NamespaceList listAllNamespacesResp *corev1.NamespaceList
} }
@ -576,6 +577,10 @@ func (s *stubKubectl) ListAllNamespaces(_ context.Context) (*corev1.NamespaceLis
return s.listAllNamespacesResp, s.listAllNamespacesErr return s.listAllNamespacesResp, s.listAllNamespacesErr
} }
func (s *stubKubectl) EnforceCoreDNSSpread(_ context.Context) error {
return s.enforceCoreDNSSpreadErr
}
type stubHelmClient struct { type stubHelmClient struct {
ciliumError error ciliumError error
certManagerError error certManagerError error

View file

@ -243,8 +243,8 @@ module "instance_group_control_plane" {
{ Name = local.name }, { Name = local.name },
{ constellation-role = "control-plane" }, { constellation-role = "control-plane" },
{ constellation-uid = local.uid }, { constellation-uid = local.uid },
{ KubernetesCluster = "Constellation-${local.uid}" }, { constellation-init-secret-hash = local.initSecretHash },
{ constellation-init-secret-hash = local.initSecretHash } { "kubernetes.io/cluster/${local.name}" = "owned" }
) )
} }
@ -268,7 +268,7 @@ module "instance_group_worker_nodes" {
{ Name = local.name }, { Name = local.name },
{ constellation-role = "worker" }, { constellation-role = "worker" },
{ constellation-uid = local.uid }, { constellation-uid = local.uid },
{ KubernetesCluster = "Constellation-${local.uid}" }, { constellation-init-secret-hash = local.initSecretHash },
{ constellation-init-secret-hash = local.initSecretHash } { "kubernetes.io/cluster/${local.name}" = "owned" }
) )
} }

View file

@ -23,7 +23,7 @@ resource "aws_launch_template" "launch_template" {
metadata_options { metadata_options {
http_endpoint = "enabled" http_endpoint = "enabled"
http_tokens = "required" http_tokens = "required"
instance_metadata_tags = "enabled" instance_metadata_tags = "disabled"
http_put_response_hop_limit = 2 http_put_response_hop_limit = 2
} }

View file

@ -12,7 +12,7 @@ import "time"
const ( const (
DebugdMetadataFlag = "constellation-debugd" DebugdMetadataFlag = "constellation-debugd"
GRPCTimeout = 5 * time.Minute GRPCTimeout = 5 * time.Minute
DiscoverDebugdInterval = 30 * time.Second DiscoverDebugdInterval = 10 * time.Second
DownloadRetryBackoff = 1 * time.Minute DownloadRetryBackoff = 1 * time.Minute
BinaryAccessMode = 0o755 // -rwxr-xr-x BinaryAccessMode = 0o755 // -rwxr-xr-x
BootstrapperDeployFilename = "/run/state/bin/bootstrapper" BootstrapperDeployFilename = "/run/state/bin/bootstrapper"

View file

@ -54,13 +54,14 @@ func (s *Scheduler) Start(ctx context.Context, wg *sync.WaitGroup) {
ips, err := s.fetcher.DiscoverDebugdIPs(ctx) ips, err := s.fetcher.DiscoverDebugdIPs(ctx)
if err != nil { if err != nil {
s.log.With(zap.Error(err)).Warnf("Discovering debugd IPs failed") s.log.With(zap.Error(err)).Warnf("Discovering debugd IPs failed")
continue
} }
if err == nil {
s.log.With(zap.Strings("ips", ips)).Infof("Discovered instances") s.log.With(zap.Strings("ips", ips)).Infof("Discovered instances")
s.download(ctx, ips) s.download(ctx, ips)
if s.deploymentDone && s.infoDone { if s.deploymentDone && s.infoDone {
return return
} }
}
select { select {
case <-ctx.Done(): case <-ctx.Done():

View file

@ -19,7 +19,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/config"
@ -53,7 +52,6 @@ type ec2API interface {
type imdsAPI interface { type imdsAPI interface {
GetInstanceIdentityDocument(context.Context, *imds.GetInstanceIdentityDocumentInput, ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) GetInstanceIdentityDocument(context.Context, *imds.GetInstanceIdentityDocumentInput, ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error)
GetMetadata(context.Context, *imds.GetMetadataInput, ...func(*imds.Options)) (*imds.GetMetadataOutput, error)
} }
// Cloud provides AWS metadata and API access. // Cloud provides AWS metadata and API access.
@ -81,7 +79,7 @@ func New(ctx context.Context) (*Cloud, error) {
// List retrieves all instances belonging to the current Constellation. // List retrieves all instances belonging to the current Constellation.
func (c *Cloud) List(ctx context.Context) ([]metadata.InstanceMetadata, error) { func (c *Cloud) List(ctx context.Context) ([]metadata.InstanceMetadata, error) {
uid, err := readInstanceTag(ctx, c.imds, cloud.TagUID) uid, err := c.readInstanceTag(ctx, cloud.TagUID)
if err != nil { if err != nil {
return nil, fmt.Errorf("retrieving uid tag: %w", err) return nil, fmt.Errorf("retrieving uid tag: %w", err)
} }
@ -100,7 +98,7 @@ func (c *Cloud) Self(ctx context.Context) (metadata.InstanceMetadata, error) {
return metadata.InstanceMetadata{}, fmt.Errorf("retrieving instance identity: %w", err) return metadata.InstanceMetadata{}, fmt.Errorf("retrieving instance identity: %w", err)
} }
instanceRole, err := readInstanceTag(ctx, c.imds, cloud.TagRole) instanceRole, err := c.readInstanceTag(ctx, cloud.TagRole)
if err != nil { if err != nil {
return metadata.InstanceMetadata{}, fmt.Errorf("retrieving role tag: %w", err) return metadata.InstanceMetadata{}, fmt.Errorf("retrieving role tag: %w", err)
} }
@ -115,12 +113,12 @@ func (c *Cloud) Self(ctx context.Context) (metadata.InstanceMetadata, error) {
// UID returns the UID of the Constellation. // UID returns the UID of the Constellation.
func (c *Cloud) UID(ctx context.Context) (string, error) { func (c *Cloud) UID(ctx context.Context) (string, error) {
return readInstanceTag(ctx, c.imds, cloud.TagUID) return c.readInstanceTag(ctx, cloud.TagUID)
} }
// InitSecretHash returns the InitSecretHash of the current instance. // InitSecretHash returns the InitSecretHash of the current instance.
func (c *Cloud) InitSecretHash(ctx context.Context) ([]byte, error) { func (c *Cloud) InitSecretHash(ctx context.Context) ([]byte, error) {
initSecretHash, err := readInstanceTag(ctx, c.imds, cloud.TagInitSecretHash) initSecretHash, err := c.readInstanceTag(ctx, cloud.TagInitSecretHash)
if err != nil { if err != nil {
return nil, fmt.Errorf("retrieving init secret hash tag: %w", err) return nil, fmt.Errorf("retrieving init secret hash tag: %w", err)
} }
@ -129,7 +127,7 @@ func (c *Cloud) InitSecretHash(ctx context.Context) ([]byte, error) {
// GetLoadBalancerEndpoint returns the endpoint of the load balancer. // GetLoadBalancerEndpoint returns the endpoint of the load balancer.
func (c *Cloud) GetLoadBalancerEndpoint(ctx context.Context) (string, error) { func (c *Cloud) GetLoadBalancerEndpoint(ctx context.Context) (string, error) {
uid, err := readInstanceTag(ctx, c.imds, cloud.TagUID) uid, err := c.readInstanceTag(ctx, cloud.TagUID)
if err != nil { if err != nil {
return "", fmt.Errorf("retrieving uid tag: %w", err) return "", fmt.Errorf("retrieving uid tag: %w", err)
} }
@ -269,16 +267,28 @@ func (c *Cloud) convertToMetadataInstance(ec2Instances []ec2Types.Instance) ([]m
return instances, nil return instances, nil
} }
func readInstanceTag(ctx context.Context, api imdsAPI, tag string) (string, error) { func (c *Cloud) readInstanceTag(ctx context.Context, tag string) (string, error) {
reader, err := api.GetMetadata(ctx, &imds.GetMetadataInput{ identity, err := c.imds.GetInstanceIdentityDocument(ctx, &imds.GetInstanceIdentityDocumentInput{})
Path: "/tags/instance/" + tag, if err != nil {
return "", fmt.Errorf("retrieving instance identity: %w", err)
}
if identity == nil {
return "", errors.New("instance identity is nil")
}
out, err := c.ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
InstanceIds: []string{identity.InstanceID},
}) })
if err != nil { if err != nil {
return "", err return "", fmt.Errorf("descibing instances: %w", err)
} }
defer reader.Content.Close()
instanceTag, err := io.ReadAll(reader.Content) if len(out.Reservations) != 1 || len(out.Reservations[0].Instances) != 1 {
return string(instanceTag), err return "", fmt.Errorf("expected 1 instance, got %d", len(out.Reservations[0].Instances))
}
return findTag(out.Reservations[0].Instances[0].Tags, tag)
} }
func findTag(tags []ec2Types.Tag, wantKey string) (string, error) { func findTag(tags []ec2Types.Tag, wantKey string) (string, error) {

View file

@ -9,8 +9,6 @@ package aws
import ( import (
"context" "context"
"errors" "errors"
"io"
"strings"
"testing" "testing"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
@ -29,11 +27,9 @@ import (
) )
func TestSelf(t *testing.T) { func TestSelf(t *testing.T) {
someErr := errors.New("failed")
testCases := map[string]struct { testCases := map[string]struct {
imds *stubIMDS imds *stubIMDS
ec2 *stubEC2 ec2API *stubEC2
wantSelf metadata.InstanceMetadata wantSelf metadata.InstanceMetadata
wantErr bool wantErr bool
}{ }{
@ -46,8 +42,24 @@ func TestSelf(t *testing.T) {
PrivateIP: "192.0.2.1", PrivateIP: "192.0.2.1",
}, },
}, },
tags: map[string]string{ },
cloud.TagRole: "controlplane", ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("test-instance-id"),
Tags: []ec2Types.Tag{
{
Key: aws.String(cloud.TagRole),
Value: aws.String("controlplane"),
},
},
},
},
},
},
}, },
}, },
wantSelf: metadata.InstanceMetadata{ wantSelf: metadata.InstanceMetadata{
@ -66,9 +78,28 @@ func TestSelf(t *testing.T) {
PrivateIP: "192.0.2.1", PrivateIP: "192.0.2.1",
}, },
}, },
tags: map[string]string{ },
cloud.TagRole: "worker", ec2API: &stubEC2{
cloud.TagInitSecretHash: "initSecretHash", selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("test-instance-id"),
Tags: []ec2Types.Tag{
{
Key: aws.String(cloud.TagRole),
Value: aws.String("worker"),
},
{
Key: aws.String(cloud.TagInitSecretHash),
Value: aws.String("initSecretHash"),
},
},
},
},
},
},
}, },
}, },
wantSelf: metadata.InstanceMetadata{ wantSelf: metadata.InstanceMetadata{
@ -80,15 +111,34 @@ func TestSelf(t *testing.T) {
}, },
"get instance document error": { "get instance document error": {
imds: &stubIMDS{ imds: &stubIMDS{
getInstanceIdentityDocumentErr: someErr, getInstanceIdentityDocumentErr: assert.AnError,
tags: map[string]string{ },
tagName: "test-instance", ec2API: &stubEC2{
cloud.TagRole: "controlplane", selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("test-instance-id"),
Tags: []ec2Types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("test-instance"),
},
{
Key: aws.String(cloud.TagRole),
Value: aws.String("controlplane"),
},
},
},
},
},
},
}, },
}, },
wantErr: true, wantErr: true,
}, },
"get metadata error": { "get instance error": {
imds: &stubIMDS{ imds: &stubIMDS{
instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
InstanceIdentityDocument: imds.InstanceIdentityDocument{ InstanceIdentityDocument: imds.InstanceIdentityDocument{
@ -97,7 +147,9 @@ func TestSelf(t *testing.T) {
PrivateIP: "192.0.2.1", PrivateIP: "192.0.2.1",
}, },
}, },
getMetadataErr: someErr, },
ec2API: &stubEC2{
describeInstancesErr: assert.AnError,
}, },
wantErr: true, wantErr: true,
}, },
@ -110,8 +162,24 @@ func TestSelf(t *testing.T) {
PrivateIP: "192.0.2.1", PrivateIP: "192.0.2.1",
}, },
}, },
tags: map[string]string{ },
tagName: "test-instance", ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("test-instance-id"),
Tags: []ec2Types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("test-instance"),
},
},
},
},
},
},
}, },
}, },
wantErr: true, wantErr: true,
@ -121,7 +189,10 @@ func TestSelf(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
m := &Cloud{imds: tc.imds, ec2: &stubEC2{}} m := &Cloud{
imds: tc.imds,
ec2: tc.ec2API,
}
self, err := m.Self(context.Background()) self, err := m.Self(context.Background())
if tc.wantErr { if tc.wantErr {
@ -192,18 +263,45 @@ func TestList(t *testing.T) {
} }
testCases := map[string]struct { testCases := map[string]struct {
imds *stubIMDS imdsAPI *stubIMDS
ec2 *stubEC2 ec2 *stubEC2
wantList []metadata.InstanceMetadata wantList []metadata.InstanceMetadata
wantErr bool wantErr bool
}{ }{
"success single page": { "success single page": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
cloud.TagUID: "uid", InstanceIdentityDocument: imds.InstanceIdentityDocument{
InstanceID: "id-1",
},
}, },
}, },
ec2: &stubEC2{ ec2: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("id-1"),
Tags: []ec2Types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("name-1"),
},
{
Key: aws.String(cloud.TagRole),
Value: aws.String("controlplane"),
},
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
},
describeInstancesResp1: successfulResp, describeInstancesResp1: successfulResp,
}, },
wantList: []metadata.InstanceMetadata{ wantList: []metadata.InstanceMetadata{
@ -222,12 +320,39 @@ func TestList(t *testing.T) {
}, },
}, },
"success multiple pages": { "success multiple pages": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
cloud.TagUID: "uid", InstanceIdentityDocument: imds.InstanceIdentityDocument{
InstanceID: "id-1",
},
}, },
}, },
ec2: &stubEC2{ ec2: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("id-1"),
Tags: []ec2Types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("name-1"),
},
{
Key: aws.String(cloud.TagRole),
Value: aws.String("controlplane"),
},
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
},
describeInstancesResp1: &ec2.DescribeInstancesOutput{ describeInstancesResp1: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{ Reservations: []ec2Types.Reservation{
{ {
@ -283,16 +408,45 @@ func TestList(t *testing.T) {
}, },
}, },
"fail to get UID": { "fail to get UID": {
imds: &stubIMDS{}, imdsAPI: &stubIMDS{
instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
InstanceIdentityDocument: imds.InstanceIdentityDocument{
InstanceID: "id-1",
},
},
},
ec2: &stubEC2{ ec2: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("id-1"),
Tags: []ec2Types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("name-1"),
},
{
Key: aws.String(cloud.TagRole),
Value: aws.String("controlplane"),
},
},
},
},
},
},
},
describeInstancesResp1: successfulResp, describeInstancesResp1: successfulResp,
}, },
wantErr: true, wantErr: true,
}, },
"describe instances fails": { "describe instances fails": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
cloud.TagUID: "uid", InstanceIdentityDocument: imds.InstanceIdentityDocument{
InstanceID: "id-1",
},
}, },
}, },
ec2: &stubEC2{ ec2: &stubEC2{
@ -305,7 +459,10 @@ func TestList(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
m := &Cloud{ec2: tc.ec2, imds: tc.imds} m := &Cloud{
imds: tc.imdsAPI,
ec2: tc.ec2,
}
list, err := m.List(context.Background()) list, err := m.List(context.Background())
if tc.wantErr { if tc.wantErr {
@ -325,6 +482,7 @@ func TestGetLoadBalancerEndpoint(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
imds *stubIMDS imds *stubIMDS
ec2API *stubEC2
loadbalancer *stubLoadbalancer loadbalancer *stubLoadbalancer
resourceapi *stubResourceGroupTagging resourceapi *stubResourceGroupTagging
wantAddr string wantAddr string
@ -332,8 +490,29 @@ func TestGetLoadBalancerEndpoint(t *testing.T) {
}{ }{
"success retrieving loadbalancer endpoint": { "success retrieving loadbalancer endpoint": {
imds: &stubIMDS{ imds: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
cloud.TagUID: "uid", InstanceIdentityDocument: imds.InstanceIdentityDocument{
InstanceID: "test-instance-id",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("test-instance-id"),
Tags: []ec2Types.Tag{
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
loadbalancer: &stubLoadbalancer{ loadbalancer: &stubLoadbalancer{
@ -366,8 +545,29 @@ func TestGetLoadBalancerEndpoint(t *testing.T) {
}, },
"too many ARNs": { "too many ARNs": {
imds: &stubIMDS{ imds: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
cloud.TagUID: "uid", InstanceIdentityDocument: imds.InstanceIdentityDocument{
InstanceID: "test-instance-id",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("test-instance-id"),
Tags: []ec2Types.Tag{
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
loadbalancer: &stubLoadbalancer{ loadbalancer: &stubLoadbalancer{
@ -403,8 +603,29 @@ func TestGetLoadBalancerEndpoint(t *testing.T) {
}, },
"too many ARNs (paged)": { "too many ARNs (paged)": {
imds: &stubIMDS{ imds: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
cloud.TagUID: "uid", InstanceIdentityDocument: imds.InstanceIdentityDocument{
InstanceID: "test-instance-id",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("test-instance-id"),
Tags: []ec2Types.Tag{
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
loadbalancer: &stubLoadbalancer{ loadbalancer: &stubLoadbalancer{
@ -445,8 +666,29 @@ func TestGetLoadBalancerEndpoint(t *testing.T) {
}, },
"loadbalancer has no availability zones": { "loadbalancer has no availability zones": {
imds: &stubIMDS{ imds: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
cloud.TagUID: "uid", InstanceIdentityDocument: imds.InstanceIdentityDocument{
InstanceID: "test-instance-id",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("test-instance-id"),
Tags: []ec2Types.Tag{
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
loadbalancer: &stubLoadbalancer{ loadbalancer: &stubLoadbalancer{
@ -471,8 +713,29 @@ func TestGetLoadBalancerEndpoint(t *testing.T) {
}, },
"failure to get resources by tag": { "failure to get resources by tag": {
imds: &stubIMDS{ imds: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
cloud.TagUID: "uid", InstanceIdentityDocument: imds.InstanceIdentityDocument{
InstanceID: "test-instance-id",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2Types.Reservation{
{
Instances: []ec2Types.Instance{
{
InstanceId: aws.String("test-instance-id"),
Tags: []ec2Types.Tag{
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
loadbalancer: &stubLoadbalancer{ loadbalancer: &stubLoadbalancer{
@ -504,6 +767,7 @@ func TestGetLoadBalancerEndpoint(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
m := &Cloud{ m := &Cloud{
imds: tc.imds, imds: tc.imds,
ec2: tc.ec2API,
loadbalancer: tc.loadbalancer, loadbalancer: tc.loadbalancer,
resourceapiClient: tc.resourceapi, resourceapiClient: tc.resourceapi,
} }
@ -696,37 +960,25 @@ func TestConvertToMetadataInstance(t *testing.T) {
} }
type stubIMDS struct { type stubIMDS struct {
getInstanceIdentityDocumentErr error
getMetadataErr error
instanceDocumentResp *imds.GetInstanceIdentityDocumentOutput instanceDocumentResp *imds.GetInstanceIdentityDocumentOutput
tags map[string]string getInstanceIdentityDocumentErr error
} }
func (s *stubIMDS) GetInstanceIdentityDocument(context.Context, *imds.GetInstanceIdentityDocumentInput, ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { func (s *stubIMDS) GetInstanceIdentityDocument(context.Context, *imds.GetInstanceIdentityDocumentInput, ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) {
return s.instanceDocumentResp, s.getInstanceIdentityDocumentErr return s.instanceDocumentResp, s.getInstanceIdentityDocumentErr
} }
func (s *stubIMDS) GetMetadata(_ context.Context, in *imds.GetMetadataInput, _ ...func(*imds.Options)) (*imds.GetMetadataOutput, error) {
tag, ok := s.tags[strings.TrimPrefix(in.Path, "/tags/instance/")]
if !ok {
return nil, errors.New("not found")
}
return &imds.GetMetadataOutput{
Content: io.NopCloser(
strings.NewReader(
tag,
),
),
}, s.getMetadataErr
}
type stubEC2 struct { type stubEC2 struct {
describeInstancesErr error describeInstancesErr error
selfInstance *ec2.DescribeInstancesOutput
describeInstancesResp1 *ec2.DescribeInstancesOutput describeInstancesResp1 *ec2.DescribeInstancesOutput
describeInstancesResp2 *ec2.DescribeInstancesOutput describeInstancesResp2 *ec2.DescribeInstancesOutput
} }
func (s *stubEC2) DescribeInstances(_ context.Context, in *ec2.DescribeInstancesInput, _ ...func(*ec2.Options)) (*ec2.DescribeInstancesOutput, error) { func (s *stubEC2) DescribeInstances(_ context.Context, in *ec2.DescribeInstancesInput, _ ...func(*ec2.Options)) (*ec2.DescribeInstancesOutput, error) {
if len(in.InstanceIds) == 1 {
return s.selfInstance, s.describeInstancesErr
}
if in.NextToken == nil { if in.NextToken == nil {
return s.describeInstancesResp1, s.describeInstancesErr return s.describeInstancesResp1, s.describeInstancesErr
} }

View file

@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds" "github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
logs "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" logs "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/edgelesssys/constellation/v2/internal/cloud" "github.com/edgelesssys/constellation/v2/internal/cloud"
"k8s.io/utils/clock" "k8s.io/utils/clock"
) )
@ -27,6 +28,9 @@ import (
type Logger struct { type Logger struct {
api logAPI api logAPI
ec2API ec2API
imdsAPI imdsAPI
groupName string groupName string
streamName string streamName string
@ -50,13 +54,15 @@ func NewLogger(ctx context.Context) (*Logger, error) {
l := &Logger{ l := &Logger{
api: client, api: client,
ec2API: ec2.NewFromConfig(cfg),
imdsAPI: imds.NewFromConfig(cfg),
interval: time.Second, interval: time.Second,
clock: clock.RealClock{}, clock: clock.RealClock{},
wg: sync.WaitGroup{}, wg: sync.WaitGroup{},
stopCh: make(chan struct{}, 1), stopCh: make(chan struct{}, 1),
} }
if err := l.createStream(ctx, imds.New(imds.Options{})); err != nil { if err := l.createStream(ctx); err != nil {
return nil, err return nil, err
} }
@ -140,18 +146,14 @@ func (l *Logger) flushLoop() {
} }
// createStream creates a new log stream in AWS Cloudwatch Logs. // createStream creates a new log stream in AWS Cloudwatch Logs.
func (l *Logger) createStream(ctx context.Context, imds imdsAPI) error { func (l *Logger) createStream(ctx context.Context) error {
name, err := readInstanceTag(ctx, imds, tagName) name, uid, err := l.getNameAndUID(ctx)
if err != nil { if err != nil {
return err return err
} }
l.streamName = name l.streamName = name
// find log group with matching Constellation UID // find log group with matching Constellation UID
uid, err := readInstanceTag(ctx, imds, cloud.TagUID)
if err != nil {
return err
}
describeInput := &logs.DescribeLogGroupsInput{} describeInput := &logs.DescribeLogGroupsInput{}
for res, err := l.api.DescribeLogGroups(ctx, describeInput); ; res, err = l.api.DescribeLogGroups(ctx, describeInput) { for res, err := l.api.DescribeLogGroups(ctx, describeInput); ; res, err = l.api.DescribeLogGroups(ctx, describeInput) {
if err != nil { if err != nil {
@ -193,6 +195,31 @@ func (l *Logger) createStream(ctx context.Context, imds imdsAPI) error {
return nil return nil
} }
func (l *Logger) getNameAndUID(ctx context.Context) (string, string, error) {
identity, err := l.imdsAPI.GetInstanceIdentityDocument(ctx, &imds.GetInstanceIdentityDocumentInput{})
if err != nil {
return "", "", fmt.Errorf("retrieving instance identity: %w", err)
}
out, err := l.ec2API.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
InstanceIds: []string{identity.InstanceID},
})
if err != nil {
return "", "", fmt.Errorf("descibing instances: %w", err)
}
if len(out.Reservations) != 1 || len(out.Reservations[0].Instances) != 1 {
return "", "", fmt.Errorf("expected 1 instance, got %d", len(out.Reservations[0].Instances))
}
uid, err := findTag(out.Reservations[0].Instances[0].Tags, cloud.TagUID)
if err != nil {
return "", "", fmt.Errorf("finding tag %s: %w", cloud.TagUID, err)
}
return identity.InstanceID, uid, err
}
type logAPI interface { type logAPI interface {
CreateLogStream(context.Context, *logs.CreateLogStreamInput, ...func(*logs.Options)) (*logs.CreateLogStreamOutput, error) CreateLogStream(context.Context, *logs.CreateLogStreamInput, ...func(*logs.Options)) (*logs.CreateLogStreamOutput, error)
DescribeLogGroups(context.Context, *logs.DescribeLogGroupsInput, ...func(*logs.Options)) (*logs.DescribeLogGroupsOutput, error) DescribeLogGroups(context.Context, *logs.DescribeLogGroupsInput, ...func(*logs.Options)) (*logs.DescribeLogGroupsOutput, error)

View file

@ -15,8 +15,11 @@ import (
"time" "time"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
logs "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" logs "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" cloudwatchtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/edgelesssys/constellation/v2/internal/cloud" "github.com/edgelesssys/constellation/v2/internal/cloud"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -32,22 +35,47 @@ func TestCreateStream(t *testing.T) {
someErr := errors.New("failed") someErr := errors.New("failed")
testCases := map[string]struct { testCases := map[string]struct {
imds *stubIMDS imdsAPI *stubIMDS
ec2API *stubEC2
logs *stubLogs logs *stubLogs
wantGroup string wantGroup string
wantStream string wantStream string
wantErr bool wantErr bool
}{ }{
"success new stream minimal": { "success new stream minimal": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
tagName: "test-instance", InstanceIdentityDocument: imds.InstanceIdentityDocument{
cloud.TagUID: "uid", InstanceID: "test-instance",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{
{
Instances: []ec2types.Instance{
{
InstanceId: aws.String("test-instance"),
Tags: []ec2types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("test-instance"),
},
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
logs: &stubLogs{ logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{ describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{ LogGroups: []cloudwatchtypes.LogGroup{
{LogGroupName: aws.String("test-group")}, {LogGroupName: aws.String("test-group")},
}, },
}, },
@ -57,15 +85,39 @@ func TestCreateStream(t *testing.T) {
wantGroup: "test-group", wantGroup: "test-group",
}, },
"success one group of many": { "success one group of many": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
tagName: "test-instance", InstanceIdentityDocument: imds.InstanceIdentityDocument{
cloud.TagUID: "uid", InstanceID: "test-instance",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{
{
Instances: []ec2types.Instance{
{
InstanceId: aws.String("test-instance"),
Tags: []ec2types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("test-instance"),
},
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
logs: &stubLogs{ logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{ describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{ LogGroups: []cloudwatchtypes.LogGroup{
{ {
LogGroupName: aws.String("random-group"), LogGroupName: aws.String("random-group"),
}, },
@ -76,7 +128,7 @@ func TestCreateStream(t *testing.T) {
NextToken: aws.String("next"), NextToken: aws.String("next"),
}, },
describeRes2: &logs.DescribeLogGroupsOutput{ describeRes2: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{ LogGroups: []cloudwatchtypes.LogGroup{
{ {
LogGroupName: aws.String("another-group"), LogGroupName: aws.String("another-group"),
}, },
@ -104,34 +156,82 @@ func TestCreateStream(t *testing.T) {
wantGroup: "test-group", wantGroup: "test-group",
}, },
"success stream exists": { "success stream exists": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
tagName: "test-instance", InstanceIdentityDocument: imds.InstanceIdentityDocument{
cloud.TagUID: "uid", InstanceID: "test-instance",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{
{
Instances: []ec2types.Instance{
{
InstanceId: aws.String("test-instance"),
Tags: []ec2types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("test-instance"),
},
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
logs: &stubLogs{ logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{ describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{ LogGroups: []cloudwatchtypes.LogGroup{
{LogGroupName: aws.String("test-group")}, {LogGroupName: aws.String("test-group")},
}, },
}, },
listTags: map[string]map[string]string{"test-group": {cloud.TagUID: "uid"}}, listTags: map[string]map[string]string{"test-group": {cloud.TagUID: "uid"}},
createErr: &types.ResourceAlreadyExistsException{}, createErr: &cloudwatchtypes.ResourceAlreadyExistsException{},
}, },
wantStream: "test-instance", wantStream: "test-instance",
wantGroup: "test-group", wantGroup: "test-group",
}, },
"create stream error": { "create stream error": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
tagName: "test-instance", InstanceIdentityDocument: imds.InstanceIdentityDocument{
cloud.TagUID: "uid", InstanceID: "test-instance",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{
{
Instances: []ec2types.Instance{
{
InstanceId: aws.String("test-instance"),
Tags: []ec2types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("test-instance"),
},
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
logs: &stubLogs{ logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{ describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{ LogGroups: []cloudwatchtypes.LogGroup{
{LogGroupName: aws.String("test-group")}, {LogGroupName: aws.String("test-group")},
}, },
}, },
@ -141,14 +241,35 @@ func TestCreateStream(t *testing.T) {
wantErr: true, wantErr: true,
}, },
"missing uid tag": { "missing uid tag": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
tagName: "test-instance", InstanceIdentityDocument: imds.InstanceIdentityDocument{
InstanceID: "test-instance",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{
{
Instances: []ec2types.Instance{
{
InstanceId: aws.String("test-instance"),
Tags: []ec2types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("test-instance"),
},
},
},
},
},
},
}, },
}, },
logs: &stubLogs{ logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{ describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{ LogGroups: []cloudwatchtypes.LogGroup{
{LogGroupName: aws.String("test-group")}, {LogGroupName: aws.String("test-group")},
}, },
}, },
@ -156,15 +277,32 @@ func TestCreateStream(t *testing.T) {
}, },
wantErr: true, wantErr: true,
}, },
"missing name tag": { "missing identity document": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ getInstanceIdentityDocumentErr: assert.AnError,
cloud.TagUID: "uid", },
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{
{
Instances: []ec2types.Instance{
{
InstanceId: aws.String("test-instance"),
Tags: []ec2types.Tag{
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
logs: &stubLogs{ logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{ describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{ LogGroups: []cloudwatchtypes.LogGroup{
{LogGroupName: aws.String("test-group")}, {LogGroupName: aws.String("test-group")},
}, },
}, },
@ -173,10 +311,34 @@ func TestCreateStream(t *testing.T) {
wantErr: true, wantErr: true,
}, },
"describe groups error": { "describe groups error": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
tagName: "test-instance", InstanceIdentityDocument: imds.InstanceIdentityDocument{
cloud.TagUID: "uid", InstanceID: "test-instance",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{
{
Instances: []ec2types.Instance{
{
InstanceId: aws.String("test-instance"),
Tags: []ec2types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("test-instance"),
},
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
logs: &stubLogs{ logs: &stubLogs{
@ -186,10 +348,34 @@ func TestCreateStream(t *testing.T) {
wantErr: true, wantErr: true,
}, },
"no matching groups": { "no matching groups": {
imds: &stubIMDS{ imdsAPI: &stubIMDS{
tags: map[string]string{ instanceDocumentResp: &imds.GetInstanceIdentityDocumentOutput{
tagName: "test-instance", InstanceIdentityDocument: imds.InstanceIdentityDocument{
cloud.TagUID: "uid", InstanceID: "test-instance",
},
},
},
ec2API: &stubEC2{
selfInstance: &ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{
{
Instances: []ec2types.Instance{
{
InstanceId: aws.String("test-instance"),
Tags: []ec2types.Tag{
{
Key: aws.String(tagName),
Value: aws.String("test-instance"),
},
{
Key: aws.String(cloud.TagUID),
Value: aws.String("uid"),
},
},
},
},
},
},
}, },
}, },
logs: &stubLogs{ logs: &stubLogs{
@ -205,9 +391,11 @@ func TestCreateStream(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
l := &Logger{ l := &Logger{
api: tc.logs, api: tc.logs,
imdsAPI: tc.imdsAPI,
ec2API: tc.ec2API,
} }
err := l.createStream(context.Background(), tc.imds) err := l.createStream(context.Background())
if tc.wantErr { if tc.wantErr {
assert.Error(err) assert.Error(err)
return return
@ -326,7 +514,7 @@ type stubLogs struct {
listTags map[string]map[string]string listTags map[string]map[string]string
putErr error putErr error
logSequenceToken int logSequenceToken int
logs []types.InputLogEvent logs []cloudwatchtypes.InputLogEvent
} }
func (s *stubLogs) CreateLogStream(context.Context, *logs.CreateLogStreamInput, ...func(*logs.Options)) (*logs.CreateLogStreamOutput, error) { func (s *stubLogs) CreateLogStream(context.Context, *logs.CreateLogStreamInput, ...func(*logs.Options)) (*logs.CreateLogStreamOutput, error) {
@ -356,7 +544,7 @@ func (s *stubLogs) PutLogEvents(_ context.Context, in *logs.PutLogEventsInput, _
return nil, err return nil, err
} }
if gotSeq != s.logSequenceToken { if gotSeq != s.logSequenceToken {
return nil, &types.InvalidSequenceTokenException{ExpectedSequenceToken: aws.String(strconv.Itoa(s.logSequenceToken))} return nil, &cloudwatchtypes.InvalidSequenceTokenException{ExpectedSequenceToken: aws.String(strconv.Itoa(s.logSequenceToken))}
} }
s.logs = append(s.logs, in.LogEvents...) s.logs = append(s.logs, in.LogEvents...)

View file

@ -164,6 +164,48 @@ func (k *Kubectl) AddTolerationsToDeployment(ctx context.Context, tolerations []
} }
return nil return nil
}) })
return err
}
// EnforceCoreDNSSpread adds a pod anti-affinity to the coredns deployment to ensure that
// coredns pods are spread across nodes.
func (k *Kubectl) EnforceCoreDNSSpread(ctx context.Context) error {
deployments := k.AppsV1().Deployments("kube-system")
// retry resource update if an error occurs
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := deployments.Get(ctx, "coredns", metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get Deployment to add toleration: %w", err)
}
if result.Spec.Template.Spec.Affinity == nil {
result.Spec.Template.Spec.Affinity = &corev1.Affinity{}
}
if result.Spec.Template.Spec.Affinity.PodAntiAffinity == nil {
result.Spec.Template.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{}
}
result.Spec.Template.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution = []corev1.WeightedPodAffinityTerm{}
if result.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
result.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = []corev1.PodAffinityTerm{}
}
result.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(result.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "k8s-app",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"kube-dns"},
},
},
},
TopologyKey: "kubernetes.io/hostname",
})
_, err = deployments.Update(ctx, result, metav1.UpdateOptions{})
return err
})
if err != nil { if err != nil {
return err return err
} }