mirror of
https://github.com/edgelesssys/constellation.git
synced 2025-01-25 14:56:18 -05:00
deubgd: add instance metadata to collected logs
Signed-off-by: Paul Meyer <49727155+katexochen@users.noreply.github.com>
This commit is contained in:
parent
568f288f0d
commit
e5e5d8eaae
@ -59,13 +59,7 @@ func main() {
|
|||||||
|
|
||||||
csp := os.Getenv("CONSTEL_CSP")
|
csp := os.Getenv("CONSTEL_CSP")
|
||||||
|
|
||||||
infoMap := info.NewMap()
|
var fetcher *cloudprovider.Fetcher
|
||||||
infoMap.RegisterOnReceiveTrigger(
|
|
||||||
logcollector.NewStartTrigger(ctx, wg, platform.FromString(csp), log.Named("logcollector")),
|
|
||||||
)
|
|
||||||
|
|
||||||
download := deploy.New(log.Named("download"), &net.Dialer{}, serviceManager, streamer, infoMap)
|
|
||||||
var fetcher metadata.Fetcher
|
|
||||||
switch platform.FromString(csp) {
|
switch platform.FromString(csp) {
|
||||||
case platform.AWS:
|
case platform.AWS:
|
||||||
meta, err := awscloud.New(ctx)
|
meta, err := awscloud.New(ctx)
|
||||||
@ -94,8 +88,16 @@ func main() {
|
|||||||
|
|
||||||
default:
|
default:
|
||||||
log.Errorf("Unknown / unimplemented cloud provider CONSTEL_CSP=%v. Using fallback", csp)
|
log.Errorf("Unknown / unimplemented cloud provider CONSTEL_CSP=%v. Using fallback", csp)
|
||||||
fetcher = fallback.Fetcher{}
|
fetcher = fallback.NewFallbackFetcher()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
infoMap := info.NewMap()
|
||||||
|
infoMap.RegisterOnReceiveTrigger(
|
||||||
|
logcollector.NewStartTrigger(ctx, wg, platform.FromString(csp), fetcher, log.Named("logcollector")),
|
||||||
|
)
|
||||||
|
|
||||||
|
download := deploy.New(log.Named("download"), &net.Dialer{}, serviceManager, streamer, infoMap)
|
||||||
|
|
||||||
sched := metadata.NewScheduler(log.Named("scheduler"), fetcher, download)
|
sched := metadata.NewScheduler(log.Named("scheduler"), fetcher, download)
|
||||||
serv := server.New(log.Named("server"), serviceManager, streamer, infoMap)
|
serv := server.New(log.Named("server"), serviceManager, streamer, infoMap)
|
||||||
if err := deploy.DefaultServiceUnit(ctx, serviceManager); err != nil {
|
if err := deploy.DefaultServiceUnit(ctx, serviceManager); err != nil {
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
|
|
||||||
"github.com/edgelesssys/constellation/v2/debugd/internal/debugd/info"
|
"github.com/edgelesssys/constellation/v2/debugd/internal/debugd/info"
|
||||||
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
|
"github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider"
|
||||||
|
"github.com/edgelesssys/constellation/v2/internal/cloud/metadata"
|
||||||
"github.com/edgelesssys/constellation/v2/internal/logger"
|
"github.com/edgelesssys/constellation/v2/internal/logger"
|
||||||
"github.com/edgelesssys/constellation/v2/internal/versions"
|
"github.com/edgelesssys/constellation/v2/internal/versions"
|
||||||
)
|
)
|
||||||
@ -32,7 +33,7 @@ const (
|
|||||||
//
|
//
|
||||||
// This requires podman to be installed.
|
// This requires podman to be installed.
|
||||||
func NewStartTrigger(ctx context.Context, wg *sync.WaitGroup, provider cloudprovider.Provider,
|
func NewStartTrigger(ctx context.Context, wg *sync.WaitGroup, provider cloudprovider.Provider,
|
||||||
logger *logger.Logger,
|
metadata providerMetadata, logger *logger.Logger,
|
||||||
) func(*info.Map) {
|
) func(*info.Map) {
|
||||||
return func(infoMap *info.Map) {
|
return func(infoMap *info.Map) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -83,7 +84,7 @@ func NewStartTrigger(ctx context.Context, wg *sync.WaitGroup, provider cloudprov
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
infoMapM = filterInfoMap(infoMapM)
|
infoMapM = filterInfoMap(infoMapM)
|
||||||
infoMapM["provider"] = provider.String()
|
setCloudMetadata(ctx, infoMapM, provider, metadata)
|
||||||
|
|
||||||
logger.Infof("Writing logstash pipeline")
|
logger.Infof("Writing logstash pipeline")
|
||||||
pipelineConf := logstashConfInput{
|
pipelineConf := logstashConfInput{
|
||||||
@ -246,6 +247,28 @@ func filterInfoMap(in map[string]string) map[string]string {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setCloudMetadata(ctx context.Context, m map[string]string, provider cloudprovider.Provider, metadata providerMetadata) {
|
||||||
|
m["provider"] = provider.String()
|
||||||
|
|
||||||
|
self, err := metadata.Self(ctx)
|
||||||
|
if err != nil {
|
||||||
|
m["name"] = "unknown"
|
||||||
|
m["role"] = "unknown"
|
||||||
|
m["vpcip"] = "unknown"
|
||||||
|
} else {
|
||||||
|
m["name"] = self.Name
|
||||||
|
m["role"] = self.Role.String()
|
||||||
|
m["vpcip"] = self.VPCIP
|
||||||
|
}
|
||||||
|
|
||||||
|
uid, err := metadata.UID(ctx)
|
||||||
|
if err != nil {
|
||||||
|
m["uid"] = "unknown"
|
||||||
|
} else {
|
||||||
|
m["uid"] = uid
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newCmdLogger(logger *logger.Logger) io.Writer {
|
func newCmdLogger(logger *logger.Logger) io.Writer {
|
||||||
return &cmdLogger{logger: logger}
|
return &cmdLogger{logger: logger}
|
||||||
}
|
}
|
||||||
@ -258,3 +281,10 @@ func (c *cmdLogger) Write(p []byte) (n int, err error) {
|
|||||||
c.logger.Infof("%s", p)
|
c.logger.Infof("%s", p)
|
||||||
return len(p), nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type providerMetadata interface {
|
||||||
|
// Self retrieves the current instance.
|
||||||
|
Self(ctx context.Context) (metadata.InstanceMetadata, error)
|
||||||
|
// UID returns the UID of the current instance.
|
||||||
|
UID(ctx context.Context) (string, error)
|
||||||
|
}
|
||||||
|
@ -22,6 +22,8 @@ type providerMetadata interface {
|
|||||||
Self(ctx context.Context) (metadata.InstanceMetadata, error)
|
Self(ctx context.Context) (metadata.InstanceMetadata, error)
|
||||||
// GetLoadBalancerEndpoint returns the endpoint of the load balancer.
|
// GetLoadBalancerEndpoint returns the endpoint of the load balancer.
|
||||||
GetLoadBalancerEndpoint(ctx context.Context) (string, error)
|
GetLoadBalancerEndpoint(ctx context.Context) (string, error)
|
||||||
|
// UID returns the UID of the current instance.
|
||||||
|
UID(ctx context.Context) (string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetcher checks the metadata service to search for instances that were set up for debugging.
|
// Fetcher checks the metadata service to search for instances that were set up for debugging.
|
||||||
@ -46,6 +48,20 @@ func (f *Fetcher) Role(ctx context.Context) (role.Role, error) {
|
|||||||
return self.Role, nil
|
return self.Role, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UID returns node UID via meta data API.
|
||||||
|
func (f *Fetcher) UID(ctx context.Context) (string, error) {
|
||||||
|
uid, err := f.metaAPI.UID(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("retrieving UID from cloud provider metadata: %w", err)
|
||||||
|
}
|
||||||
|
return uid, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Self returns the current instance via meta data API.
|
||||||
|
func (f *Fetcher) Self(ctx context.Context) (metadata.InstanceMetadata, error) {
|
||||||
|
return f.metaAPI.Self(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
// DiscoverDebugdIPs will query the metadata of all instances and return any ips of instances already set up for debugging.
|
// DiscoverDebugdIPs will query the metadata of all instances and return any ips of instances already set up for debugging.
|
||||||
func (f *Fetcher) DiscoverDebugdIPs(ctx context.Context) ([]string, error) {
|
func (f *Fetcher) DiscoverDebugdIPs(ctx context.Context) ([]string, error) {
|
||||||
self, err := f.metaAPI.Self(ctx)
|
self, err := f.metaAPI.Self(ctx)
|
||||||
|
@ -172,6 +172,8 @@ type stubMetadata struct {
|
|||||||
selfErr error
|
selfErr error
|
||||||
getLBEndpointRes string
|
getLBEndpointRes string
|
||||||
getLBEndpointErr error
|
getLBEndpointErr error
|
||||||
|
uid string
|
||||||
|
uidErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *stubMetadata) List(ctx context.Context) ([]metadata.InstanceMetadata, error) {
|
func (m *stubMetadata) List(ctx context.Context) ([]metadata.InstanceMetadata, error) {
|
||||||
@ -185,3 +187,7 @@ func (m *stubMetadata) Self(ctx context.Context) (metadata.InstanceMetadata, err
|
|||||||
func (m *stubMetadata) GetLoadBalancerEndpoint(ctx context.Context) (string, error) {
|
func (m *stubMetadata) GetLoadBalancerEndpoint(ctx context.Context) (string, error) {
|
||||||
return m.getLBEndpointRes, m.getLBEndpointErr
|
return m.getLBEndpointRes, m.getLBEndpointErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *stubMetadata) UID(context.Context) (string, error) {
|
||||||
|
return m.uid, m.uidErr
|
||||||
|
}
|
||||||
|
@ -9,23 +9,33 @@ package fallback
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/edgelesssys/constellation/v2/internal/role"
|
"github.com/edgelesssys/constellation/v2/debugd/internal/debugd/metadata/cloudprovider"
|
||||||
|
"github.com/edgelesssys/constellation/v2/internal/cloud/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Fetcher implements metadata.Fetcher interface but does not actually fetch cloud provider metadata.
|
// NewFallbackFetcher returns a cloudprovider.Fetcher with a fake metadata backend.
|
||||||
type Fetcher struct{}
|
func NewFallbackFetcher() *cloudprovider.Fetcher {
|
||||||
|
return cloudprovider.New(&fallbackMetadata{})
|
||||||
// Role for fallback fetcher does not try to fetch role.
|
|
||||||
func (f Fetcher) Role(_ context.Context) (role.Role, error) {
|
|
||||||
return role.Unknown, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DiscoverDebugdIPs for fallback fetcher does not try to discover debugd IPs.
|
type fallbackMetadata struct{}
|
||||||
func (f Fetcher) DiscoverDebugdIPs(ctx context.Context) ([]string, error) {
|
|
||||||
|
// List retrieves all instances belonging to the current constellation.
|
||||||
|
func (fallbackMetadata) List(context.Context) ([]metadata.InstanceMetadata, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DiscoverLoadbalancerIP for fallback fetcher does not try to discover loadbalancer IP.
|
// Self retrieves the current instance.
|
||||||
func (f Fetcher) DiscoverLoadbalancerIP(ctx context.Context) (string, error) {
|
func (fallbackMetadata) Self(context.Context) (metadata.InstanceMetadata, error) {
|
||||||
|
return metadata.InstanceMetadata{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLoadBalancerEndpoint returns the endpoint of the load balancer.
|
||||||
|
func (fallbackMetadata) GetLoadBalancerEndpoint(context.Context) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UID returns the UID of the current instance.
|
||||||
|
func (fallbackMetadata) UID(context.Context) (string, error) {
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/edgelesssys/constellation/v2/internal/role"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"go.uber.org/goleak"
|
"go.uber.org/goleak"
|
||||||
)
|
)
|
||||||
@ -21,9 +22,20 @@ func TestMain(m *testing.M) {
|
|||||||
func TestDiscoverDebugdIPs(t *testing.T) {
|
func TestDiscoverDebugdIPs(t *testing.T) {
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
|
||||||
fetcher := Fetcher{}
|
fetcher := NewFallbackFetcher()
|
||||||
ips, err := fetcher.DiscoverDebugdIPs(context.Background())
|
ips, err := fetcher.DiscoverDebugdIPs(context.Background())
|
||||||
|
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
assert.Empty(ips)
|
assert.Empty(ips)
|
||||||
|
|
||||||
|
rol, err := fetcher.Role(context.Background())
|
||||||
|
assert.NoError(err)
|
||||||
|
assert.Equal(rol, role.Unknown)
|
||||||
|
|
||||||
|
uid, err := fetcher.UID(context.Background())
|
||||||
|
assert.NoError(err)
|
||||||
|
assert.Empty(uid)
|
||||||
|
|
||||||
|
self, err := fetcher.Self(context.Background())
|
||||||
|
assert.NoError(err)
|
||||||
|
assert.Empty(self)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user