Replace logging with default logging interface (#233)

* Add test logger

* Refactor access manager logging

* Refactor activation service logging

* Refactor debugd logging

* Refactor kms server logging

* Refactor disk-mapper logging

Signed-off-by: Daniel Weiße <dw@edgeless.systems>
This commit is contained in:
Daniel Weiße 2022-06-28 16:51:30 +02:00 committed by GitHub
parent e3f78a5bff
commit b10b13b173
42 changed files with 513 additions and 328 deletions

View file

@ -1,7 +1,6 @@
package main
import (
"log"
"net"
"os"
"strings"
@ -15,22 +14,25 @@ import (
"github.com/edgelesssys/constellation/debugd/debugd/server"
"github.com/edgelesssys/constellation/internal/deploy/ssh"
"github.com/edgelesssys/constellation/internal/deploy/user"
"github.com/edgelesssys/constellation/internal/logger"
"github.com/spf13/afero"
"go.uber.org/zap/zapcore"
"golang.org/x/net/context"
)
func main() {
wg := &sync.WaitGroup{}
log := logger.New(logger.JSONLog, zapcore.InfoLevel)
fs := afero.NewOsFs()
streamer := coordinator.NewFileStreamer(fs)
serviceManager := deploy.NewServiceManager()
ssh := ssh.NewAccess(user.NewLinuxUserManager(fs))
serviceManager := deploy.NewServiceManager(log.Named("serviceManager"))
ssh := ssh.NewAccess(log, user.NewLinuxUserManager(fs))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
download := deploy.New(&net.Dialer{}, serviceManager, streamer)
download := deploy.New(log.Named("download"), &net.Dialer{}, serviceManager, streamer)
var fetcher metadata.Fetcher
constellationCSP := strings.ToLower(os.Getenv("CONSTEL_CSP"))
switch constellationCSP {
@ -47,11 +49,11 @@ func main() {
}
fetcher = gcpFetcher
default:
log.Printf("Unknown / unimplemented cloud provider CONSTEL_CSP=%v\n", constellationCSP)
log.Errorf("Unknown / unimplemented cloud provider CONSTEL_CSP=%v. Using fallback", constellationCSP)
fetcher = fallback.Fetcher{}
}
sched := metadata.NewScheduler(fetcher, ssh, download)
serv := server.New(ssh, serviceManager, streamer)
sched := metadata.NewScheduler(log.Named("scheduler"), fetcher, ssh, download)
serv := server.New(log.Named("server"), ssh, serviceManager, streamer)
if err := deploy.DeployDefaultServiceUnit(ctx, serviceManager); err != nil {
panic(err)
}
@ -59,7 +61,7 @@ func main() {
wg.Add(1)
go sched.Start(ctx, wg)
wg.Add(1)
go server.Start(wg, serv)
go server.Start(log, wg, serv)
wg.Wait()
}

View file

@ -3,19 +3,21 @@ package deploy
import (
"context"
"fmt"
"log"
"net"
"time"
"github.com/edgelesssys/constellation/debugd/coordinator"
"github.com/edgelesssys/constellation/debugd/debugd"
pb "github.com/edgelesssys/constellation/debugd/service"
"github.com/edgelesssys/constellation/internal/logger"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// Download downloads a coordinator from a given debugd instance.
type Download struct {
log *logger.Logger
dialer NetDialer
writer streamToFileWriter
serviceManager serviceManager
@ -23,8 +25,9 @@ type Download struct {
}
// New creates a new Download.
func New(dialer NetDialer, serviceManager serviceManager, writer streamToFileWriter) *Download {
func New(log *logger.Logger, dialer NetDialer, serviceManager serviceManager, writer streamToFileWriter) *Download {
return &Download{
log: log,
dialer: dialer,
writer: writer,
serviceManager: serviceManager,
@ -34,12 +37,13 @@ func New(dialer NetDialer, serviceManager serviceManager, writer streamToFileWri
// DownloadCoordinator will open a new grpc connection to another instance, attempting to download a coordinator from that instance.
func (d *Download) DownloadCoordinator(ctx context.Context, ip string) error {
log := d.log.With(zap.String("ip", ip))
serverAddr := net.JoinHostPort(ip, debugd.DebugdPort)
// only retry download from same endpoint after backoff
if lastAttempt, ok := d.attemptedDownloads[serverAddr]; ok && time.Since(lastAttempt) < debugd.CoordinatorDownloadRetryBackoff {
return fmt.Errorf("download failed too recently: %v / %v", time.Since(lastAttempt), debugd.CoordinatorDownloadRetryBackoff)
}
log.Printf("Trying to download coordinator from %s\n", ip)
log.Infof("Trying to download coordinator")
d.attemptedDownloads[serverAddr] = time.Now()
conn, err := d.dial(ctx, serverAddr)
if err != nil {
@ -56,7 +60,7 @@ func (d *Download) DownloadCoordinator(ctx context.Context, ip string) error {
return fmt.Errorf("streaming coordinator from other instance: %w", err)
}
log.Printf("Successfully downloaded coordinator from %s\n", ip)
log.Infof("Successfully downloaded coordinator")
// after the upload succeeds, try to restart the coordinator
restartAction := ServiceManagerRequest{

View file

@ -13,6 +13,7 @@ import (
"github.com/edgelesssys/constellation/debugd/debugd"
pb "github.com/edgelesssys/constellation/debugd/service"
"github.com/edgelesssys/constellation/internal/grpc/testdialer"
"github.com/edgelesssys/constellation/internal/logger"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
@ -93,6 +94,7 @@ func TestDownloadCoordinator(t *testing.T) {
go grpcServ.Serve(lis)
download := &Download{
log: logger.NewTest(t),
dialer: dialer,
writer: writer,
serviceManager: &tc.serviceManager,

View file

@ -2,13 +2,13 @@ package deploy
import (
"context"
"errors"
"fmt"
"log"
"sync"
"github.com/edgelesssys/constellation/debugd/debugd"
"github.com/edgelesssys/constellation/internal/logger"
"github.com/spf13/afero"
"go.uber.org/zap"
)
const (
@ -40,15 +40,17 @@ type SystemdUnit struct {
// ServiceManager receives ServiceManagerRequests and units via channels and performs the requests / creates the unit files.
type ServiceManager struct {
log *logger.Logger
dbus dbusClient
fs afero.Fs
systemdUnitFilewriteLock sync.Mutex
}
// NewServiceManager creates a new ServiceManager.
func NewServiceManager() *ServiceManager {
func NewServiceManager(log *logger.Logger) *ServiceManager {
fs := afero.NewOsFs()
return &ServiceManager{
log: log,
dbus: &dbusWrapper{},
fs: fs,
systemdUnitFilewriteLock: sync.Mutex{},
@ -78,6 +80,7 @@ type dbusConn interface {
// SystemdAction will perform a systemd action on a service unit (start, stop, restart, reload).
func (s *ServiceManager) SystemdAction(ctx context.Context, request ServiceManagerRequest) error {
log := s.log.With(zap.String("unit", request.Unit), zap.String("action", request.Action.String()))
conn, err := s.dbus.NewSystemdConnectionContext(ctx)
if err != nil {
return fmt.Errorf("establishing systemd connection: %w", err)
@ -94,14 +97,14 @@ func (s *ServiceManager) SystemdAction(ctx context.Context, request ServiceManag
case Reload:
err = conn.ReloadContext(ctx)
default:
return errors.New("unknown systemd action: " + request.Action.String())
return fmt.Errorf("unknown systemd action: %s", request.Action.String())
}
if err != nil {
return fmt.Errorf("performing systemd action %v on unit %v: %w", request.Action, request.Unit, err)
}
if request.Action == Reload {
log.Println("daemon-reload succeeded")
log.Infof("daemon-reload succeeded")
return nil
}
// Wait for the action to finish and then check if it was
@ -110,17 +113,18 @@ func (s *ServiceManager) SystemdAction(ctx context.Context, request ServiceManag
switch result {
case "done":
log.Printf("%s on systemd unit %s succeeded\n", request.Action, request.Unit)
log.Infof("%s on systemd unit %s succeeded", request.Action, request.Unit)
return nil
default:
return fmt.Errorf("performing action %v on systemd unit \"%v\" failed: expected \"%v\" but received \"%v\"", request.Action, request.Unit, "done", result)
return fmt.Errorf("performing action %q on systemd unit %q failed: expected %q but received %q", request.Action.String(), request.Unit, "done", result)
}
}
// WriteSystemdUnitFile will write a systemd unit to disk.
func (s *ServiceManager) WriteSystemdUnitFile(ctx context.Context, unit SystemdUnit) error {
log.Printf("Writing systemd unit file: %s/%s\n", systemdUnitFolder, unit.Name)
log := s.log.With(zap.String("unitFile", fmt.Sprintf("%s/%s", systemdUnitFolder, unit.Name)))
log.Infof("Writing systemd unit file")
s.systemdUnitFilewriteLock.Lock()
defer s.systemdUnitFilewriteLock.Unlock()
if err := afero.WriteFile(s.fs, fmt.Sprintf("%s/%s", systemdUnitFolder, unit.Name), []byte(unit.Contents), 0o644); err != nil {
@ -131,7 +135,7 @@ func (s *ServiceManager) WriteSystemdUnitFile(ctx context.Context, unit SystemdU
return fmt.Errorf("performing systemd daemon-reload: %w", err)
}
log.Printf("Wrote systemd unit file: %s/%s and performed daemon-reload\n", systemdUnitFolder, unit.Name)
log.Infof("Wrote systemd unit file and performed daemon-reload")
return nil
}

View file

@ -7,6 +7,7 @@ import (
"sync"
"testing"
"github.com/edgelesssys/constellation/internal/logger"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -95,6 +96,7 @@ func TestSystemdAction(t *testing.T) {
fs := afero.NewMemMapFs()
manager := ServiceManager{
log: logger.NewTest(t),
dbus: &tc.dbus,
fs: fs,
systemdUnitFilewriteLock: sync.Mutex{},
@ -173,6 +175,7 @@ func TestWriteSystemdUnitFile(t *testing.T) {
fs = afero.NewReadOnlyFs(fs)
}
manager := ServiceManager{
log: logger.NewTest(t),
dbus: &tc.dbus,
fs: fs,
systemdUnitFilewriteLock: sync.Mutex{},

View file

@ -4,12 +4,13 @@ import (
"context"
"errors"
"io/fs"
"log"
"sync"
"time"
"github.com/edgelesssys/constellation/debugd/debugd"
"github.com/edgelesssys/constellation/internal/deploy/ssh"
"github.com/edgelesssys/constellation/internal/logger"
"go.uber.org/zap"
)
// Fetcher retrieves other debugd IPs and SSH keys from cloud provider metadata.
@ -20,14 +21,16 @@ type Fetcher interface {
// Scheduler schedules fetching of metadata using timers.
type Scheduler struct {
log *logger.Logger
fetcher Fetcher
ssh sshDeployer
downloader downloader
}
// NewScheduler returns a new scheduler.
func NewScheduler(fetcher Fetcher, ssh sshDeployer, downloader downloader) *Scheduler {
func NewScheduler(log *logger.Logger, fetcher Fetcher, ssh sshDeployer, downloader downloader) *Scheduler {
return &Scheduler{
log: log,
fetcher: fetcher,
ssh: ssh,
downloader: downloader,
@ -49,7 +52,7 @@ func (s *Scheduler) discoveryLoop(ctx context.Context, wg *sync.WaitGroup) {
// execute debugd discovery once at the start to skip wait for first tick
ips, err := s.fetcher.DiscoverDebugdIPs(ctx)
if err != nil {
log.Printf("error occurred while discovering debugd IPs: %v\n", err)
s.log.With(zap.Error(err)).Errorf("Discovering debugd IPs failed")
} else {
if s.downloadCoordinator(ctx, ips) {
return
@ -64,10 +67,10 @@ func (s *Scheduler) discoveryLoop(ctx context.Context, wg *sync.WaitGroup) {
case <-ticker.C:
ips, err = s.fetcher.DiscoverDebugdIPs(ctx)
if err != nil {
log.Printf("error occurred while discovering debugd IPs: %v\n", err)
s.log.With(zap.Error(err)).Errorf("Discovering debugd IPs failed")
continue
}
log.Printf("discovered instances: %v\n", ips)
s.log.With(zap.Strings("ips", ips)).Infof("Discovered instances")
if s.downloadCoordinator(ctx, ips) {
return
}
@ -80,24 +83,19 @@ func (s *Scheduler) discoveryLoop(ctx context.Context, wg *sync.WaitGroup) {
// sshLoop discovers new ssh keys from cloud provider metadata periodically.
func (s *Scheduler) sshLoop(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
// execute ssh key search once at the start to skip wait for first tick
keys, err := s.fetcher.FetchSSHKeys(ctx)
if err != nil {
log.Printf("error occurred while fetching SSH keys: %v\n", err)
} else {
s.deploySSHKeys(ctx, keys)
}
ticker := time.NewTicker(debugd.SSHCheckInterval)
defer ticker.Stop()
for {
keys, err := s.fetcher.FetchSSHKeys(ctx)
if err != nil {
s.log.With(zap.Error(err)).Errorf("Fetching SSH keys failed")
} else {
s.deploySSHKeys(ctx, keys)
}
select {
case <-ticker.C:
keys, err := s.fetcher.FetchSSHKeys(ctx)
if err != nil {
log.Printf("error occurred while fetching ssh keys: %v\n", err)
continue
}
s.deploySSHKeys(ctx, keys)
case <-ctx.Done():
return
}
@ -116,7 +114,7 @@ func (s *Scheduler) downloadCoordinator(ctx context.Context, ips []string) (succ
// coordinator was already uploaded
return true
}
log.Printf("error occurred while downloading coordinator from %v: %v\n", ip, err)
s.log.With(zap.Error(err), zap.String("peer", ip)).Errorf("Downloading coordinator from peer failed")
}
return false
}
@ -126,7 +124,7 @@ func (s *Scheduler) deploySSHKeys(ctx context.Context, keys []ssh.UserKey) {
for _, key := range keys {
err := s.ssh.DeployAuthorizedKey(ctx, key)
if err != nil {
log.Printf("error occurred while deploying ssh key %v: %v\n", key, err)
s.log.With(zap.Error(err), zap.Any("key", key)).Errorf("Deploying SSH key failed")
continue
}
}

View file

@ -8,6 +8,7 @@ import (
"time"
"github.com/edgelesssys/constellation/internal/deploy/ssh"
"github.com/edgelesssys/constellation/internal/logger"
"github.com/stretchr/testify/assert"
)
@ -74,6 +75,7 @@ func TestSchedulerStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), tc.timeout)
defer cancel()
scheduler := Scheduler{
log: logger.NewTest(t),
fetcher: &tc.fetcher,
ssh: &tc.ssh,
downloader: &tc.downloader,

View file

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io/fs"
"log"
"net"
"sync"
@ -14,10 +13,13 @@ import (
"github.com/edgelesssys/constellation/debugd/debugd/deploy"
pb "github.com/edgelesssys/constellation/debugd/service"
"github.com/edgelesssys/constellation/internal/deploy/ssh"
"github.com/edgelesssys/constellation/internal/logger"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type debugdServer struct {
log *logger.Logger
ssh sshDeployer
serviceManager serviceManager
streamer streamer
@ -25,8 +27,9 @@ type debugdServer struct {
}
// New creates a new debugdServer according to the gRPC spec.
func New(ssh sshDeployer, serviceManager serviceManager, streamer streamer) pb.DebugdServer {
func New(log *logger.Logger, ssh sshDeployer, serviceManager serviceManager, streamer streamer) pb.DebugdServer {
return &debugdServer{
log: log,
ssh: ssh,
serviceManager: serviceManager,
streamer: streamer,
@ -35,10 +38,10 @@ func New(ssh sshDeployer, serviceManager serviceManager, streamer streamer) pb.D
// UploadAuthorizedKeys receives a list of authorized keys and forwards them to a channel.
func (s *debugdServer) UploadAuthorizedKeys(ctx context.Context, in *pb.UploadAuthorizedKeysRequest) (*pb.UploadAuthorizedKeysResponse, error) {
log.Println("Uploading authorized keys")
s.log.Infof("Uploading authorized keys")
for _, key := range in.Keys {
if err := s.ssh.DeployAuthorizedKey(ctx, ssh.UserKey{Username: key.Username, PublicKey: key.KeyValue}); err != nil {
log.Printf("Uploading authorized keys failed: %v\n", err)
s.log.With(zap.Error(err)).Errorf("Uploading authorized keys failed")
return &pb.UploadAuthorizedKeysResponse{
Status: pb.UploadAuthorizedKeysStatus_UPLOAD_AUTHORIZED_KEYS_FAILURE,
}, nil
@ -58,7 +61,7 @@ func (s *debugdServer) UploadCoordinator(stream pb.Debugd_UploadCoordinatorServe
var responseStatus pb.UploadCoordinatorStatus
defer func() {
if err := s.serviceManager.SystemdAction(stream.Context(), startAction); err != nil {
log.Printf("Starting uploaded coordinator failed: %v\n", err)
s.log.With(zap.Error(err)).Errorf("Starting uploaded coordinator failed")
if responseStatus == pb.UploadCoordinatorStatus_UPLOAD_COORDINATOR_SUCCESS {
responseStatus = pb.UploadCoordinatorStatus_UPLOAD_COORDINATOR_START_FAILED
}
@ -67,33 +70,33 @@ func (s *debugdServer) UploadCoordinator(stream pb.Debugd_UploadCoordinatorServe
Status: responseStatus,
})
}()
log.Println("Starting coordinator upload")
s.log.Infof("Starting coordinator upload")
if err := s.streamer.WriteStream(debugd.CoordinatorDeployFilename, stream, true); err != nil {
if errors.Is(err, fs.ErrExist) {
// coordinator was already uploaded
log.Println("Coordinator already uploaded")
s.log.Warnf("Coordinator already uploaded")
responseStatus = pb.UploadCoordinatorStatus_UPLOAD_COORDINATOR_FILE_EXISTS
return nil
}
log.Printf("Uploading coordinator failed: %v\n", err)
s.log.With(zap.Error(err)).Errorf("Uploading coordinator failed")
responseStatus = pb.UploadCoordinatorStatus_UPLOAD_COORDINATOR_UPLOAD_FAILED
return fmt.Errorf("uploading coordinator: %w", err)
}
log.Println("Successfully uploaded coordinator")
s.log.Infof("Successfully uploaded coordinator")
responseStatus = pb.UploadCoordinatorStatus_UPLOAD_COORDINATOR_SUCCESS
return nil
}
// DownloadCoordinator streams the local coordinator binary to other instances.
func (s *debugdServer) DownloadCoordinator(request *pb.DownloadCoordinatorRequest, stream pb.Debugd_DownloadCoordinatorServer) error {
log.Println("Sending coordinator to other instance")
s.log.Infof("Sending coordinator to other instance")
return s.streamer.ReadStream(debugd.CoordinatorDeployFilename, stream, debugd.Chunksize, true)
}
// UploadSystemServiceUnits receives systemd service units, writes them to a service file and schedules a daemon-reload.
func (s *debugdServer) UploadSystemServiceUnits(ctx context.Context, in *pb.UploadSystemdServiceUnitsRequest) (*pb.UploadSystemdServiceUnitsResponse, error) {
log.Println("Uploading systemd service units")
s.log.Infof("Uploading systemd service units")
for _, unit := range in.Units {
if err := s.serviceManager.WriteSystemdUnitFile(ctx, deploy.SystemdUnit{Name: unit.Name, Contents: unit.Contents}); err != nil {
return &pb.UploadSystemdServiceUnitsResponse{Status: pb.UploadSystemdServiceUnitsStatus_UPLOAD_SYSTEMD_SERVICE_UNITS_FAILURE}, nil
@ -104,15 +107,19 @@ func (s *debugdServer) UploadSystemServiceUnits(ctx context.Context, in *pb.Uplo
}
// Start will start the gRPC server and block.
func Start(wg *sync.WaitGroup, serv pb.DebugdServer) {
func Start(log *logger.Logger, wg *sync.WaitGroup, serv pb.DebugdServer) {
defer wg.Done()
grpcServer := grpc.NewServer()
grpcLog := log.Named("gRPC")
grpcLog.WithIncreasedLevel(zap.WarnLevel).ReplaceGRPCLogger()
grpcServer := grpc.NewServer(grpcLog.GetServerStreamInterceptor(), grpcLog.GetServerUnaryInterceptor())
pb.RegisterDebugdServer(grpcServer, serv)
lis, err := net.Listen("tcp", net.JoinHostPort("0.0.0.0", debugd.DebugdPort))
if err != nil {
log.Fatalf("listening failed: %v", err)
log.With(zap.Error(err)).Fatalf("Listening failed")
}
log.Println("gRPC server is waiting for connections")
log.Infof("gRPC server is waiting for connections")
grpcServer.Serve(lis)
}

View file

@ -13,6 +13,7 @@ import (
pb "github.com/edgelesssys/constellation/debugd/service"
"github.com/edgelesssys/constellation/internal/deploy/ssh"
"github.com/edgelesssys/constellation/internal/grpc/testdialer"
"github.com/edgelesssys/constellation/internal/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
@ -73,6 +74,7 @@ func TestUploadAuthorizedKeys(t *testing.T) {
require := require.New(t)
serv := debugdServer{
log: logger.NewTest(t),
ssh: &tc.ssh,
serviceManager: &tc.serviceManager,
streamer: &fakeStreamer{},
@ -148,6 +150,7 @@ func TestUploadCoordinator(t *testing.T) {
require := require.New(t)
serv := debugdServer{
log: logger.NewTest(t),
ssh: &tc.ssh,
serviceManager: &tc.serviceManager,
streamer: &tc.streamer,
@ -218,6 +221,7 @@ func TestDownloadCoordinator(t *testing.T) {
require := require.New(t)
serv := debugdServer{
log: logger.NewTest(t),
ssh: &tc.ssh,
serviceManager: &tc.serviceManager,
streamer: &tc.streamer,
@ -298,6 +302,7 @@ func TestUploadSystemServiceUnits(t *testing.T) {
require := require.New(t)
serv := debugdServer{
log: logger.NewTest(t),
ssh: &tc.ssh,
serviceManager: &tc.serviceManager,
streamer: &fakeStreamer{},