debugd: add logcollector

Signed-off-by: Paul Meyer <49727155+katexochen@users.noreply.github.com>
This commit is contained in:
Paul Meyer 2022-11-17 15:25:25 +01:00
parent 983c2c4b57
commit b93b24e058
16 changed files with 826 additions and 6 deletions

View file

@ -0,0 +1,7 @@
.PHONY: containers
containers:
docker build ./filebeat -t ghcr.io/edgelesssys/constellation/filebeat-debug
docker build ./logstash -t ghcr.io/edgelesssys/constellation/logstash-debug
docker push ghcr.io/edgelesssys/constellation/filebeat-debug
docker push ghcr.io/edgelesssys/constellation/logstash-debug

View file

@ -0,0 +1,215 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package logcollector
import (
"context"
"errors"
"fmt"
"hash/crc32"
"io"
"strings"
gcpsecretmanager "cloud.google.com/go/secretmanager/apiv1"
gcpsecretmanagerpb "cloud.google.com/go/secretmanager/apiv1/secretmanagerpb"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
awssecretmanager "github.com/aws/aws-sdk-go-v2/service/secretsmanager"
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
gaxv2 "github.com/googleapis/gax-go/v2"
)
// Credentials contains the credentials for an OpenSearch instance.
type credentials struct {
Username string
Password string
}
// credentialGetter is a wrapper around the cloud provider specific credential getters.
type credentialGetter struct {
openseachCredsGetter
}
type openseachCredsGetter interface {
GetOpensearchCredentials(ctx context.Context) (credentials, error)
io.Closer
}
// NewCloudCredentialGetter returns a new CloudCredentialGetter for the given cloud provider.
func newCloudCredentialGetter(ctx context.Context, provider cloudprovider.Provider) (*credentialGetter, error) {
switch provider {
case cloudprovider.GCP:
getter, err := newGCPCloudCredentialGetter(ctx)
if err != nil {
return nil, fmt.Errorf("creating GCP cloud credential getter: %w", err)
}
return &credentialGetter{getter}, nil
case cloudprovider.Azure:
getter, err := newAzureCloudCredentialGetter()
if err != nil {
return nil, fmt.Errorf("creating Azure cloud credential getter: %w", err)
}
return &credentialGetter{getter}, nil
case cloudprovider.AWS:
getter, err := newAWSCloudCredentialGetter(ctx)
if err != nil {
return nil, fmt.Errorf("creating AWS cloud credential getter: %w", err)
}
return &credentialGetter{getter}, nil
default:
return nil, fmt.Errorf("cloud provider not supported")
}
}
type gcpCloudCredentialGetter struct {
secretsAPI gcpSecretManagerAPI
}
func newGCPCloudCredentialGetter(ctx context.Context) (*gcpCloudCredentialGetter, error) {
client, err := gcpsecretmanager.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("creating secretmanager client: %w", err)
}
return &gcpCloudCredentialGetter{secretsAPI: client}, nil
}
func (g *gcpCloudCredentialGetter) GetOpensearchCredentials(ctx context.Context) (credentials, error) {
const secretName = "projects/796962942582/secrets/e2e-logs-OpenSearch-password/versions/1"
const username = "cluster-instance-gcp"
req := &gcpsecretmanagerpb.AccessSecretVersionRequest{Name: secretName}
resp, err := g.secretsAPI.AccessSecretVersion(ctx, req)
if err != nil {
return credentials{}, fmt.Errorf("accessing secret version: %w", err)
}
if resp.Payload == nil || len(resp.Payload.Data) == 0 {
return credentials{}, errors.New("response payload is empty")
}
crc32c := crc32.MakeTable(crc32.Castagnoli)
checksum := int64(crc32.Checksum(resp.Payload.Data, crc32c))
if checksum != *resp.Payload.DataCrc32C {
return credentials{}, errors.New("data corruption of secret detected")
}
return credentials{
Username: username,
Password: string(resp.Payload.Data),
}, nil
}
func (g *gcpCloudCredentialGetter) Close() error {
return g.secretsAPI.Close()
}
type gcpSecretManagerAPI interface {
AccessSecretVersion(ctx context.Context, req *gcpsecretmanagerpb.AccessSecretVersionRequest,
opts ...gaxv2.CallOption,
) (*gcpsecretmanagerpb.AccessSecretVersionResponse, error)
io.Closer
}
type azureCloudCredentialGetter struct {
secretsAPI azureSecretsAPI
}
func newAzureCloudCredentialGetter() (*azureCloudCredentialGetter, error) {
const vaultURI = "https://opensearch-creds.vault.azure.net"
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, fmt.Errorf("creating default azure credential: %w", err)
}
client, err := azsecrets.NewClient(vaultURI, cred, nil)
if err != nil {
return nil, fmt.Errorf("creating Azure secrets client: %w", err)
}
return &azureCloudCredentialGetter{secretsAPI: client}, nil
}
func (a *azureCloudCredentialGetter) GetOpensearchCredentials(ctx context.Context) (credentials, error) {
const secretName = "opensearch-password"
const username = "cluster-instance-azure"
const version = "" // An empty string version gets the latest version of the secret.
resp, err := a.secretsAPI.GetSecret(ctx, secretName, version, nil)
if err != nil {
return credentials{}, fmt.Errorf("getting secret: %w", err)
}
if resp.Value == nil {
return credentials{}, errors.New("response value is empty")
}
return credentials{
Username: username,
Password: *resp.Value,
}, nil
}
func (a *azureCloudCredentialGetter) Close() error {
return nil
}
type azureSecretsAPI interface {
GetSecret(ctx context.Context, name string, version string, options *azsecrets.GetSecretOptions,
) (azsecrets.GetSecretResponse, error)
}
type awsCloudCredentialGetter struct {
secretmanager awsSecretManagerAPI
}
func newAWSCloudCredentialGetter(ctx context.Context) (*awsCloudCredentialGetter, error) {
const region = "eu-central-1"
config, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(region))
if err != nil {
return nil, fmt.Errorf("loading default AWS config: %w", err)
}
client := awssecretmanager.NewFromConfig(config)
return &awsCloudCredentialGetter{secretmanager: client}, nil
}
func (a *awsCloudCredentialGetter) GetOpensearchCredentials(ctx context.Context) (credentials, error) {
const username = "cluster-instance-aws"
secertName := "opensearch-password"
req := &awssecretmanager.GetSecretValueInput{SecretId: &secertName}
resp, err := a.secretmanager.GetSecretValue(ctx, req)
if err != nil {
return credentials{}, fmt.Errorf("getting secret value: %w", err)
}
if resp.SecretString == nil {
return credentials{}, errors.New("response secret string is empty")
}
password := strings.TrimPrefix(*resp.SecretString, "{\"password\":\"")
password = strings.TrimSuffix(password, "\"}")
return credentials{
Username: username,
Password: password,
}, nil
}
func (a *awsCloudCredentialGetter) Close() error {
return nil
}
type awsSecretManagerAPI interface {
GetSecretValue(ctx context.Context, params *awssecretmanager.GetSecretValueInput,
optFns ...func(*awssecretmanager.Options),
) (*awssecretmanager.GetSecretValueOutput, error)
}

View file

@ -0,0 +1,216 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package logcollector
import (
"context"
"errors"
"hash/crc32"
"testing"
"cloud.google.com/go/secretmanager/apiv1/secretmanagerpb"
"github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets"
awssecretmanager "github.com/aws/aws-sdk-go-v2/service/secretsmanager"
"github.com/googleapis/gax-go/v2"
"github.com/stretchr/testify/assert"
)
func TestGetOpensearchCredentialsGCP(t *testing.T) {
crc32c := crc32.MakeTable(crc32.Castagnoli)
someErr := errors.New("failed")
testCases := map[string]struct {
gcpAPI gcpSecretManagerAPI
wantCreds credentials
wantErr bool
}{
"gcp success": {
gcpAPI: stubGCPSecretManagerAPI{
assessSecretVersionResp: &secretmanagerpb.AccessSecretVersionResponse{
Name: "cluster-instance-gcp",
Payload: &secretmanagerpb.SecretPayload{
Data: []byte("e2e-logs-OpenSearch-password"),
DataCrc32C: ptr(int64(crc32.Checksum([]byte("e2e-logs-OpenSearch-password"), crc32c))),
},
},
},
wantCreds: credentials{
Username: "cluster-instance-gcp",
Password: "e2e-logs-OpenSearch-password",
},
},
"gcp access secret version error": {
gcpAPI: stubGCPSecretManagerAPI{accessSecretVersionErr: someErr},
wantErr: true,
},
"gcp invalid checksum": {
gcpAPI: stubGCPSecretManagerAPI{
assessSecretVersionResp: &secretmanagerpb.AccessSecretVersionResponse{
Name: "cluster-instance-gcp",
Payload: &secretmanagerpb.SecretPayload{
Data: []byte("e2e-logs-OpenSearch-password"),
DataCrc32C: ptr(int64(crc32.Checksum([]byte("e2e-logs-OpenSearch-password"), crc32c)) + 1),
},
},
},
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
g := &gcpCloudCredentialGetter{secretsAPI: tc.gcpAPI}
gotCreds, err := g.GetOpensearchCredentials(context.Background())
if tc.wantErr {
assert.Error(err)
} else {
assert.NoError(err)
assert.Equal(tc.wantCreds, gotCreds)
}
})
}
}
type stubGCPSecretManagerAPI struct {
assessSecretVersionResp *secretmanagerpb.AccessSecretVersionResponse
accessSecretVersionErr error
}
func (s stubGCPSecretManagerAPI) AccessSecretVersion(ctx context.Context, req *secretmanagerpb.AccessSecretVersionRequest,
opts ...gax.CallOption,
) (*secretmanagerpb.AccessSecretVersionResponse, error) {
return s.assessSecretVersionResp, s.accessSecretVersionErr
}
func (s stubGCPSecretManagerAPI) Close() error {
return nil
}
func TestGetOpensearchCredentialsAzure(t *testing.T) {
testCases := map[string]struct {
azureAPI azureSecretsAPI
wantCreds credentials
wantErr bool
}{
"azure success": {
azureAPI: stubAzureSecretsAPI{
getSecretResp: azsecrets.GetSecretResponse{
SecretBundle: azsecrets.SecretBundle{
Value: ptr("test-password"),
},
},
},
wantCreds: credentials{
Username: "cluster-instance-azure",
Password: "test-password",
},
},
"azure get secret error": {
azureAPI: stubAzureSecretsAPI{
getSecretErr: errors.New("failed"),
},
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
a := &azureCloudCredentialGetter{secretsAPI: tc.azureAPI}
gotCreds, err := a.GetOpensearchCredentials(context.Background())
if tc.wantErr {
assert.Error(err)
} else {
assert.NoError(err)
assert.Equal(tc.wantCreds, gotCreds)
}
})
}
}
type stubAzureSecretsAPI struct {
getSecretResp azsecrets.GetSecretResponse
getSecretErr error
}
func (s stubAzureSecretsAPI) GetSecret(ctx context.Context, name string, version string, options *azsecrets.GetSecretOptions,
) (azsecrets.GetSecretResponse, error) {
return s.getSecretResp, s.getSecretErr
}
func (s stubAzureSecretsAPI) Close() error {
return nil
}
func TestGetOpensearchCredentialsAWS(t *testing.T) {
testCases := map[string]struct {
awsAPI awsSecretManagerAPI
wantCreds credentials
wantErr bool
}{
"aws success": {
awsAPI: stubAWSSecretsAPI{
getSecretValueResp: &awssecretmanager.GetSecretValueOutput{
SecretString: ptr("test-password"),
},
},
wantCreds: credentials{
Username: "cluster-instance-aws",
Password: "test-password",
},
},
"aws get secret value error": {
awsAPI: stubAWSSecretsAPI{
getSecretValueErr: errors.New("failed"),
},
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
a := &awsCloudCredentialGetter{secretmanager: tc.awsAPI}
gotCreds, err := a.GetOpensearchCredentials(context.Background())
if tc.wantErr {
assert.Error(err)
} else {
assert.NoError(err)
assert.Equal(tc.wantCreds, gotCreds)
}
})
}
}
type stubAWSSecretsAPI struct {
getSecretValueResp *awssecretmanager.GetSecretValueOutput
getSecretValueErr error
}
func (s stubAWSSecretsAPI) GetSecretValue(ctx context.Context, params *awssecretmanager.GetSecretValueInput,
optFns ...func(*awssecretmanager.Options),
) (*awssecretmanager.GetSecretValueOutput, error) {
return s.getSecretValueResp, s.getSecretValueErr
}
func (s stubAWSSecretsAPI) Close() error {
return nil
}
func ptr[T any](v T) *T {
return &v
}

View file

@ -0,0 +1,9 @@
FROM fedora:37@sha256:f2c083c0b7d2367a375f15e002c2dc7baaca2b3181ace61f9d5113a8fe2f6b44
RUN dnf install -y https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.5.0-x86_64.rpm systemd-devel
COPY filebeat.yml /usr/share/filebeat/filebeat.yml
RUN chmod 600 /usr/share/filebeat/filebeat.yml
ENTRYPOINT ["/usr/share/filebeat/bin/filebeat", "-e", "--path.home", "/usr/share/filebeat", "--path.data", "/usr/share/filebeat/data"]

View file

@ -0,0 +1,15 @@
filebeat.inputs:
- type: journald
enabled: true
id: everything
output.logstash:
hosts: ["localhost:5044"]
output.console:
enabled: false
logging:
to_files: false
metrics.enabled: false
level: warning

View file

@ -0,0 +1,262 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package logcollector
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"strings"
"sync"
"text/template"
"github.com/edgelesssys/constellation/v2/debugd/internal/debugd/info"
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
"github.com/edgelesssys/constellation/v2/internal/logger"
"github.com/edgelesssys/constellation/v2/internal/versions"
)
const (
openSearchHost = "https://search-e2e-logs-y46renozy42lcojbvrt3qq7csm.eu-central-1.es.amazonaws.com:443"
)
// NewStartTrigger returns a trigger func can be registered with an infos instance.
// The trigger is called when infos changes to received state and starts a log collection pod
// with filebeat and logstash in case the flags are set.
//
// This requires podman to be installed.
func NewStartTrigger(ctx context.Context, wg *sync.WaitGroup, provider cloudprovider.Provider,
logger *logger.Logger,
) func(*info.Map) {
return func(infoMap *info.Map) {
wg.Add(1)
go func() {
defer wg.Done()
logger.Infof("Start trigger running")
if err := ctx.Err(); err != nil {
logger.With("err", err).Errorf("Start trigger canceled")
return
}
logger.Infof("Get flags from infos")
_, ok, err := infoMap.Get("logcollect")
if err != nil {
logger.Errorf("Getting infos: %v", err)
return
}
if !ok {
logger.Infof("Flag 'logcollect' not set")
return
}
cerdsGetter, err := newCloudCredentialGetter(ctx, provider)
if err != nil {
logger.Errorf("Creating cloud credential getter: %v", err)
return
}
logger.Infof("Getting credentials")
creds, err := cerdsGetter.GetOpensearchCredentials(ctx)
if err != nil {
logger.Errorf("Getting opensearch credentials: %v", err)
return
}
logger.Infof("Getting logstash pipeline template")
tmpl, err := getTemplate(ctx, logger)
if err != nil {
logger.Errorf("Getting logstash pipeline template: %v", err)
return
}
infoMapM, err := infoMap.GetCopy()
if err != nil {
logger.Errorf("Getting copy of map from info: %v", err)
return
}
infoMapM = filterInfoMap(infoMapM)
infoMapM["provider"] = provider.String()
logger.Infof("Writing logstash pipeline")
pipelineConf := logstashConfInput{
Host: openSearchHost,
InfoMap: infoMapM,
Credentials: creds,
}
if err := writeLogstashPipelineConf(tmpl, pipelineConf); err != nil {
logger.Errorf("Writing logstash pipeline: %v", err)
return
}
logger.Infof("Starting log collection pod")
if err := startPod(ctx, logger); err != nil {
logger.Errorf("Starting filebeat: %v", err)
}
}()
}
}
func getTemplate(ctx context.Context, logger *logger.Logger) (*template.Template, error) {
createContainerArgs := []string{
"create",
"--name=template",
versions.LogstashImage,
}
createContainerCmd := exec.CommandContext(ctx, "podman", createContainerArgs...)
logger.Infof("Creating logstash template container")
if out, err := createContainerCmd.CombinedOutput(); err != nil {
return nil, fmt.Errorf("creating logstash template container: %w\n%s", err, out)
}
if err := os.MkdirAll("/run/logstash", 0o511); err != nil {
return nil, fmt.Errorf("creating logstash template dir: %w", err)
}
copyFromArgs := []string{
"cp",
"template:/usr/share/constellogs/templates/",
"/run/logstash/",
}
copyFromCmd := exec.CommandContext(ctx, "podman", copyFromArgs...)
logger.Infof("Copying logstash templates")
if out, err := copyFromCmd.CombinedOutput(); err != nil {
return nil, fmt.Errorf("copying logstash templates: %w\n%s", err, out)
}
removeContainerArgs := []string{
"rm",
"template",
}
removeContainerCmd := exec.CommandContext(ctx, "podman", removeContainerArgs...)
logger.Infof("Removing logstash template container")
if out, err := removeContainerCmd.CombinedOutput(); err != nil {
return nil, fmt.Errorf("removing logstash template container: %w\n%s", err, out)
}
tmpl, err := template.ParseFiles("/run/logstash/templates/pipeline.conf")
if err != nil {
return nil, fmt.Errorf("parsing logstash template: %w", err)
}
return tmpl, nil
}
func startPod(ctx context.Context, logger *logger.Logger) error {
// create a shared pod for filebeat and logstash
createPodArgs := []string{
"pod",
"create",
"logcollection",
}
createPodCmd := exec.CommandContext(ctx, "podman", createPodArgs...)
logger.Infof("Create pod command: %v", createPodCmd.String())
if out, err := createPodCmd.CombinedOutput(); err != nil {
return fmt.Errorf("failed to create pod: %w; output: %s", err, out)
}
// start logstash container
logstashLog := newCmdLogger(logger.Named("logstash"))
runLogstashArgs := []string{
"run",
"--rm",
"--name=logstash",
"--pod=logcollection",
"--user=root",
"--privileged",
"--log-driver=none",
"--volume=/run/logstash/pipeline:/usr/share/logstash/pipeline:ro",
versions.LogstashImage,
}
runLogstashCmd := exec.CommandContext(ctx, "podman", runLogstashArgs...)
logger.Infof("Run logstash command: %v", runLogstashCmd.String())
runLogstashCmd.Stdout = logstashLog
runLogstashCmd.Stderr = logstashLog
if err := runLogstashCmd.Start(); err != nil {
return fmt.Errorf("failed to start logstash: %w", err)
}
// start filebeat container
filebeatLog := newCmdLogger(logger.Named("filebeat"))
runFilebeatArgs := []string{
"run",
"--rm",
"--name=filebeat",
"--pod=logcollection",
"--user=root",
"--privileged",
"--log-driver=none",
"--volume=/run/log/journal:/run/log/journal:ro",
"--volume=/etc/machine-id:/etc/machine-id:ro",
"--volume=/run/systemd:/run/systemd:ro",
"--volume=/run/systemd/journal/socket:/run/systemd/journal/socket:rw",
versions.FilebeatImage,
}
runFilebeatCmd := exec.CommandContext(ctx, "podman", runFilebeatArgs...)
logger.Infof("Run filebeat command: %v", runFilebeatCmd.String())
runFilebeatCmd.Stdout = filebeatLog
runFilebeatCmd.Stderr = filebeatLog
if err := runFilebeatCmd.Start(); err != nil {
return fmt.Errorf("failed to run filebeat: %w", err)
}
return nil
}
type logstashConfInput struct {
Host string
InfoMap map[string]string
Credentials credentials
}
func writeLogstashPipelineConf(templ *template.Template, in logstashConfInput) error {
if err := os.MkdirAll("/run/logstash/pipeline", 0o511); err != nil {
return fmt.Errorf("creating logstash config dir: %w", err)
}
file, err := os.OpenFile("/run/logstash/pipeline/pipeline.conf", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
if err != nil {
return fmt.Errorf("opening logstash config file: %w", err)
}
defer file.Close()
if err := templ.Execute(file, in); err != nil {
return fmt.Errorf("executing logstash pipeline template: %w", err)
}
return nil
}
func filterInfoMap(in map[string]string) map[string]string {
out := make(map[string]string)
for k, v := range in {
if strings.HasPrefix(k, "logcollect.") {
out[strings.TrimPrefix(k, "logcollect.")] = v
}
}
delete(out, "logcollect")
return out
}
func newCmdLogger(logger *logger.Logger) io.Writer {
return &cmdLogger{logger: logger}
}
type cmdLogger struct {
logger *logger.Logger
}
func (c *cmdLogger) Write(p []byte) (n int, err error) {
c.logger.Infof("%s", p)
return len(p), nil
}

View file

@ -0,0 +1,9 @@
FROM docker.io/opensearchproject/logstash-oss-with-opensearch-output-plugin:7.16.2
RUN rm -f /usr/share/logstash/pipeline/logstash.conf
COPY config/ /usr/share/logstash/config/
COPY templates/ /usr/share/constellogs/templates/
ENTRYPOINT ["/usr/share/logstash/bin/logstash"]

View file

@ -0,0 +1,2 @@
log.level: warn
config.reload.automatic: true

View file

@ -0,0 +1,61 @@
input {
beats {
host => "0.0.0.0"
port => 5044
}
}
filter {
mutate {
# Remove some fields that are not needed.
remove_field => [
"[agent]",
"[journald]",
"[log]",
"[syslog]",
"[systemd][invocation_id]"
]
# Tag with the provided metadata.
add_field => {
{{ range $key, $value := .InfoMap }}
"[metadata][{{ $key }}]" => "{{ $value }}"
{{ end }}
}
}
# Parse structured logs for following systemd units.
if [systemd][unit] in ["bootstrapper.service", "constellation-bootstrapper.service"] {
json {
source => "message"
target => "logs"
skip_on_invalid_json => true
}
date {
match => [ "[logs][ts]", "ISO8601" ]
}
mutate {
replace => {
"message" => "%{[logs][msg]}"
}
remove_field => [
"[logs][msg]",
"[logs][ts]"
]
}
de_dot {
fields => ["[logs][peer.address]"]
}
}
}
output {
opensearch {
hosts => "{{ .Host }}"
index => "systemd-logs-%{+YYYY.MM.dd}"
user => "{{ .Credentials.Username }}"
password => "{{ .Credentials.Password }}"
ssl => true
ssl_certificate_verification => true
}
}