AB#2479 Implement AWS cloud logging (#232)

Signed-off-by: Daniel Weiße <dw@edgeless.systems>
This commit is contained in:
Daniel Weiße 2022-10-17 09:05:45 +02:00 committed by GitHub
parent c16f5a976d
commit 623cb6cdb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 555 additions and 12 deletions

1
go.mod
View File

@ -54,6 +54,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.17.8
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.17
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.34
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.20
github.com/aws/aws-sdk-go-v2/service/ec2 v1.32.0
github.com/aws/aws-sdk-go-v2/service/kms v1.18.12
github.com/aws/aws-sdk-go-v2/service/s3 v1.27.11

2
go.sum
View File

@ -281,6 +281,8 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24 h1:wj5Rwc05hvUSvKuOF29IYb9QrCL
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24/go.mod h1:jULHjqqjDlbyTa7pfM7WICATnOv+iOhjletM3N0Xbu8=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14 h1:ZSIPAkAsCCjYrhqfw2+lNzWDzxzHXEckFkTePL5RSWQ=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14/go.mod h1:AyGgqiKv9ECM6IZeNQtdT8NnMvUb3/2wokeq2Fgryto=
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.20 h1:yPyXdrZaB4SW+pn2CmqyAbhuqGM4Pv4fsMhLOt8cOj8=
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.20/go.mod h1:p2i2jyYZzFBJeOOQ5ji2k/Yc6IvlQsG/CuHRwEi8whs=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.32.0 h1:0Vbs1G2zV7uvBhMj7o/igTzAg1/roh4ksgIr5oRKFIo=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.32.0/go.mod h1:Z8942YP2VgLQpgPCx06iXCrOt7mxxCe0dESCm9FFhgs=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.9 h1:Lh1AShsuIJTwMkoxVCAYPJgNG5H+eN6SmoUn8nOZ5wE=

View File

@ -6,20 +6,195 @@ SPDX-License-Identifier: AGPL-3.0-only
package aws
// TODO: Implement for AWS.
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
logs "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"k8s.io/utils/clock"
)
// Logger is a Cloud Logger for AWS.
type Logger struct{}
// Log messages are collected and periodically flushed to AWS Cloudwatch Logs.
type Logger struct {
api logAPI
groupName string
streamName string
logs []types.InputLogEvent
sequenceToken *string
mux sync.Mutex
interval time.Duration
clock clock.WithTicker
wg sync.WaitGroup
stopCh chan struct{}
}
// NewLogger creates a new Cloud Logger for AWS.
func NewLogger() *Logger {
return &Logger{}
func NewLogger(ctx context.Context) (*Logger, error) {
cfg, err := config.LoadDefaultConfig(ctx, config.WithEC2IMDSRegion())
if err != nil {
return nil, err
}
client := logs.NewFromConfig(cfg)
l := &Logger{
api: client,
interval: time.Second,
clock: clock.RealClock{},
wg: sync.WaitGroup{},
stopCh: make(chan struct{}, 1),
}
if err := l.createStream(ctx, imds.New(imds.Options{})); err != nil {
return nil, err
}
l.flushLoop()
return l, nil
}
// Disclose is not implemented for AWS.
func (l *Logger) Disclose(msg string) {}
// Disclose adds a message to the log queue.
// The messages are flushed periodically to AWS Cloudwatch Logs.
func (l *Logger) Disclose(msg string) {
l.mux.Lock()
defer l.mux.Unlock()
l.logs = append(l.logs, types.InputLogEvent{
Message: aws.String(msg),
Timestamp: aws.Int64(l.clock.Now().UnixMilli()),
})
}
// Close is a no-op.
// Close flushes the logs a final time and stops the flush loop.
func (l *Logger) Close() error {
l.stopCh <- struct{}{}
l.wg.Wait()
return l.flushLogs()
}
// flushLogs flushes the aggregated log messages to AWS Cloudwatch Logs.
func (l *Logger) flushLogs() error {
// make sure only one flush operation is running at a time
l.mux.Lock()
defer l.mux.Unlock()
if len(l.logs) == 0 {
return nil // no logs to flush
}
ctx := context.Background()
logRequest := &logs.PutLogEventsInput{
LogEvents: l.logs,
LogGroupName: &l.groupName,
LogStreamName: &l.streamName,
SequenceToken: l.sequenceToken,
}
for res, err := l.api.PutLogEvents(ctx, logRequest); ; res, err = l.api.PutLogEvents(ctx, logRequest) {
if err == nil {
l.sequenceToken = res.NextSequenceToken
l.logs = nil
return nil
}
// If the flush operation was called on a pre-existing stream,
// or another operation sent logs to the same stream,
// the sequence token may not be set correctly.
// We can retrieve the correct sequence token from the error message.
var sequenceErr *types.InvalidSequenceTokenException
if !errors.As(err, &sequenceErr) {
return err
}
logRequest.SequenceToken = sequenceErr.ExpectedSequenceToken
}
}
// flushLoop periodically flushes the logs to AWS Cloudwatch Logs.
func (l *Logger) flushLoop() {
l.wg.Add(1)
ticker := l.clock.NewTicker(l.interval)
go func() {
defer l.wg.Done()
defer ticker.Stop()
for {
_ = l.flushLogs()
select {
case <-ticker.C():
case <-l.stopCh:
return
}
}
}()
}
// createStream creates a new log stream in AWS Cloudwatch Logs.
func (l *Logger) createStream(ctx context.Context, imds imdsAPI) error {
name, err := readInstanceTag(ctx, imds, tagName)
if err != nil {
return err
}
l.streamName = name
// find log group with matching Constellation UID
uid, err := readInstanceTag(ctx, imds, tagUID)
if err != nil {
return err
}
describeInput := &logs.DescribeLogGroupsInput{}
for res, err := l.api.DescribeLogGroups(ctx, describeInput); ; res, err = l.api.DescribeLogGroups(ctx, describeInput) {
if err != nil {
return err
}
for _, group := range res.LogGroups {
tags, err := l.api.ListTagsLogGroup(ctx, &logs.ListTagsLogGroupInput{LogGroupName: group.LogGroupName})
if err != nil {
continue // we may not have permission to read the tags of a log group outside the Constellation scope
}
if tags.Tags[tagUID] == uid {
l.groupName = *group.LogGroupName
res.NextToken = nil // stop pagination
break
}
}
if res.NextToken == nil {
break
}
describeInput.NextToken = res.NextToken
}
if l.groupName == "" {
return fmt.Errorf("failed to find log group for UID %s", uid)
}
// create or use existing log stream
if _, err := l.api.CreateLogStream(ctx, &logs.CreateLogStreamInput{
LogGroupName: &l.groupName,
LogStreamName: &l.streamName,
}); err != nil {
// Ignore error if the stream already exists
var createErr *types.ResourceAlreadyExistsException
if !errors.As(err, &createErr) {
return err
}
}
return nil
}
type logAPI interface {
CreateLogStream(context.Context, *logs.CreateLogStreamInput, ...func(*logs.Options)) (*logs.CreateLogStreamOutput, error)
DescribeLogGroups(context.Context, *logs.DescribeLogGroupsInput, ...func(*logs.Options)) (*logs.DescribeLogGroupsOutput, error)
ListTagsLogGroup(context.Context, *logs.ListTagsLogGroupInput, ...func(*logs.Options)) (*logs.ListTagsLogGroupOutput, error)
PutLogEvents(context.Context, *logs.PutLogEventsInput, ...func(*logs.Options)) (*logs.PutLogEventsOutput, error)
}

View File

@ -0,0 +1,365 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package aws
import (
"context"
"errors"
"strconv"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
logs "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
testclock "k8s.io/utils/clock/testing"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestCreateStream(t *testing.T) {
someErr := errors.New("failed")
testCases := map[string]struct {
imds *stubIMDS
logs *stubLogs
wantGroup string
wantStream string
wantErr bool
}{
"success new stream minimal": {
imds: &stubIMDS{
tags: map[string]string{
tagName: "test-instance",
tagUID: "uid",
},
},
logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{
{LogGroupName: aws.String("test-group")},
},
},
listTags: map[string]map[string]string{"test-group": {tagUID: "uid"}},
},
wantStream: "test-instance",
wantGroup: "test-group",
},
"success one group of many": {
imds: &stubIMDS{
tags: map[string]string{
tagName: "test-instance",
tagUID: "uid",
},
},
logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{
{
LogGroupName: aws.String("random-group"),
},
{
LogGroupName: aws.String("other-group"),
},
},
NextToken: aws.String("next"),
},
describeRes2: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{
{
LogGroupName: aws.String("another-group"),
},
{
LogGroupName: aws.String("test-group"),
},
},
},
listTags: map[string]map[string]string{
"random-group": {
"some-tag": "random-tag",
},
"other-group": {
tagUID: "other-uid",
},
"another-group": {
"some-tag": "uid",
},
"test-group": {
tagUID: "uid",
},
},
},
wantStream: "test-instance",
wantGroup: "test-group",
},
"success stream exists": {
imds: &stubIMDS{
tags: map[string]string{
tagName: "test-instance",
tagUID: "uid",
},
},
logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{
{LogGroupName: aws.String("test-group")},
},
},
listTags: map[string]map[string]string{"test-group": {tagUID: "uid"}},
createErr: &types.ResourceAlreadyExistsException{},
},
wantStream: "test-instance",
wantGroup: "test-group",
},
"create stream error": {
imds: &stubIMDS{
tags: map[string]string{
tagName: "test-instance",
tagUID: "uid",
},
},
logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{
{LogGroupName: aws.String("test-group")},
},
},
listTags: map[string]map[string]string{"test-group": {tagUID: "uid"}},
createErr: someErr,
},
wantErr: true,
},
"missing uid tag": {
imds: &stubIMDS{
tags: map[string]string{
tagName: "test-instance",
},
},
logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{
{LogGroupName: aws.String("test-group")},
},
},
listTags: map[string]map[string]string{"test-group": {tagUID: "uid"}},
},
wantErr: true,
},
"missing name tag": {
imds: &stubIMDS{
tags: map[string]string{
tagUID: "uid",
},
},
logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{
LogGroups: []types.LogGroup{
{LogGroupName: aws.String("test-group")},
},
},
listTags: map[string]map[string]string{"test-group": {tagUID: "uid"}},
},
wantErr: true,
},
"describe groups error": {
imds: &stubIMDS{
tags: map[string]string{
tagName: "test-instance",
tagUID: "uid",
},
},
logs: &stubLogs{
describeErr: someErr,
listTags: map[string]map[string]string{"test-group": {tagUID: "uid"}},
},
wantErr: true,
},
"no matching groups": {
imds: &stubIMDS{
tags: map[string]string{
tagName: "test-instance",
tagUID: "uid",
},
},
logs: &stubLogs{
describeRes1: &logs.DescribeLogGroupsOutput{},
listTags: map[string]map[string]string{"test-group": {tagUID: "uid"}},
},
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
l := &Logger{
api: tc.logs,
}
err := l.createStream(context.Background(), tc.imds)
if tc.wantErr {
assert.Error(err)
return
}
assert.NoError(err)
assert.Equal(tc.wantGroup, l.groupName)
assert.Equal(tc.wantStream, l.streamName)
})
}
}
func TestLogging(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
logAPI := &stubLogs{}
l := &Logger{
api: logAPI,
interval: 1 * time.Millisecond,
clock: testclock.NewFakeClock(time.Time{}),
}
l.Disclose("msg")
l.Disclose("msg")
// no logs until we flush to the API
assert.Len(logAPI.logs, 0)
// flush
require.NoError(l.flushLogs())
assert.Len(logAPI.logs, 2)
// flushing doesn't do anything if there are no logs
require.NoError(l.flushLogs())
assert.Len(logAPI.logs, 2)
// if we flush with an incorrect sequence token,
// we should get a new sequence token and retry
logAPI.logSequenceToken = 15
l.Disclose("msg")
require.NoError(l.flushLogs())
assert.Len(logAPI.logs, 3)
logAPI.putErr = errors.New("failed")
l.Disclose("msg")
assert.Error(l.flushLogs())
}
func TestFlushLoop(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
logAPI := &stubLogs{}
clock := testclock.NewFakeClock(time.Time{})
l := &Logger{
api: logAPI,
interval: 1 * time.Second,
clock: clock,
stopCh: make(chan struct{}, 1),
}
l.Disclose("msg")
l.Disclose("msg")
l.flushLoop()
clock.Step(1 * time.Second)
require.NoError(l.Close())
assert.Len(logAPI.logs, 2)
}
func TestConcurrency(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
l := &Logger{
api: &stubLogs{},
interval: 1 * time.Second,
clock: testclock.NewFakeClock(time.Time{}),
stopCh: make(chan struct{}, 1),
}
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
l.Disclose("msg")
}()
}
wg.Wait()
assert.Len(l.logs, 100)
require.NoError(l.flushLogs())
assert.Len(l.logs, 0)
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
l.Disclose("msg")
require.NoError(l.flushLogs())
}()
}
wg.Wait()
assert.Len(l.logs, 0)
}
type stubLogs struct {
createErr error
describeErr error
describeRes1 *logs.DescribeLogGroupsOutput
describeRes2 *logs.DescribeLogGroupsOutput
listTagsErr error
listTags map[string]map[string]string
putErr error
logSequenceToken int
logs []types.InputLogEvent
}
func (s *stubLogs) CreateLogStream(context.Context, *logs.CreateLogStreamInput, ...func(*logs.Options)) (*logs.CreateLogStreamOutput, error) {
return nil, s.createErr
}
func (s *stubLogs) DescribeLogGroups(_ context.Context, in *logs.DescribeLogGroupsInput, _ ...func(*logs.Options)) (*logs.DescribeLogGroupsOutput, error) {
if in.NextToken == nil {
return s.describeRes1, s.describeErr
}
return s.describeRes2, s.describeErr
}
func (s *stubLogs) ListTagsLogGroup(_ context.Context, in *logs.ListTagsLogGroupInput, _ ...func(*logs.Options)) (*logs.ListTagsLogGroupOutput, error) {
return &logs.ListTagsLogGroupOutput{Tags: s.listTags[*in.LogGroupName]}, s.listTagsErr
}
func (s *stubLogs) PutLogEvents(_ context.Context, in *logs.PutLogEventsInput, _ ...func(*logs.Options)) (*logs.PutLogEventsOutput, error) {
if s.putErr != nil {
return nil, s.putErr
}
if in.SequenceToken == nil || *in.SequenceToken == "" {
in.SequenceToken = aws.String("0")
}
gotSeq, err := strconv.Atoi(*in.SequenceToken)
if err != nil {
return nil, err
}
if gotSeq != s.logSequenceToken {
return nil, &types.InvalidSequenceTokenException{ExpectedSequenceToken: aws.String(strconv.Itoa(s.logSequenceToken))}
}
s.logs = append(s.logs, in.LogEvents...)
s.logSequenceToken++
return &logs.PutLogEventsOutput{NextSequenceToken: aws.String(strconv.Itoa(s.logSequenceToken))}, nil
}

View File

@ -62,7 +62,7 @@ func (m *Metadata) Supported() bool {
// List retrieves all instances belonging to the current Constellation.
func (m *Metadata) List(ctx context.Context) ([]metadata.InstanceMetadata, error) {
uid, err := m.readInstanceTag(ctx, tagUID)
uid, err := readInstanceTag(ctx, m.imds, tagUID)
if err != nil {
return nil, fmt.Errorf("retrieving uid tag: %w", err)
}
@ -81,11 +81,11 @@ func (m *Metadata) Self(ctx context.Context) (metadata.InstanceMetadata, error)
return metadata.InstanceMetadata{}, fmt.Errorf("retrieving instance identity: %w", err)
}
name, err := m.readInstanceTag(ctx, tagName)
name, err := readInstanceTag(ctx, m.imds, tagName)
if err != nil {
return metadata.InstanceMetadata{}, fmt.Errorf("retrieving name tag: %w", err)
}
instanceRole, err := m.readInstanceTag(ctx, tagRole)
instanceRole, err := readInstanceTag(ctx, m.imds, tagRole)
if err != nil {
return metadata.InstanceMetadata{}, fmt.Errorf("retrieving role tag: %w", err)
}
@ -172,8 +172,8 @@ func (m *Metadata) convertToMetadataInstance(ec2Instances []types.Instance) ([]m
return instances, nil
}
func (m *Metadata) readInstanceTag(ctx context.Context, tag string) (string, error) {
reader, err := m.imds.GetMetadata(ctx, &imds.GetMetadataInput{
func readInstanceTag(ctx context.Context, api imdsAPI, tag string) (string, error) {
reader, err := api.GetMetadata(ctx, &imds.GetMetadataInput{
Path: "/tags/instance/" + tag,
})
if err != nil {