diff --git a/access_manager/access_manager.go b/access_manager/access_manager.go index 814079bc9..b11072352 100644 --- a/access_manager/access_manager.go +++ b/access_manager/access_manager.go @@ -2,8 +2,8 @@ package main import ( "context" + "errors" "fmt" - "log" "os" "path" "path/filepath" @@ -12,7 +12,10 @@ import ( "github.com/edgelesssys/constellation/internal/deploy/ssh" "github.com/edgelesssys/constellation/internal/deploy/user" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" v1 "k8s.io/api/core/v1" v1Options "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,7 +33,7 @@ const ( // evictedHomePath holds the directory to which deleted user directories are moved to. evictedHomePath = "/var/evicted" - // relativePathToSSHKeys holds the path inside an user's directory to the SSH keys. + // relativePathToSSHKeys holds the path inside a user's directory to the SSH keys. // Needs to be in sync with internal/deploy/ssh.go. relativePathToSSHKeys = ".ssh/authorized_keys.d/constellation-ssh-keys" @@ -45,33 +48,35 @@ type uidGIDPair struct { } func main() { + log := logger.New(logger.JSONLog, zapcore.InfoLevel) + hostname, err := os.Hostname() if err != nil { - log.Println("Starting constellation-access-manager as unknown pod.") + log.Warnf("Starting constellation-access-manager as unknown pod") } else { - log.Println("Starting constellation-access-manager as", hostname) + log.Infof("Starting constellation-access-manager as %q", hostname) } // Retrieve configMap from Kubernetes API before we chroot into the host filesystem. - configMap, err := retrieveConfigMap() + configMap, err := retrieveConfigMap(log) if err != nil { - log.Panicf("Failed to retrieve ConfigMap from Kubernetes API: %v", err) + log.With(zap.Error(err)).Fatalf("Failed to retrieve ConfigMap from Kubernetes API") } // Chroot into main system if err := syscall.Chroot(hostPath); err != nil { - log.Panicf("Failed to chroot into host filesystem: %v", err) + log.With(zap.Error(err)).Fatalf("Failed to chroot into host filesystem") } if err := syscall.Chdir("/"); err != nil { - log.Panicf("Failed to chdir into host filesystem: %v", err) + log.With(zap.Error(err)).Fatalf("Failed to chdir into host filesystem") } fs := afero.NewOsFs() linuxUserManager := user.NewLinuxUserManager(fs) - if err := run(fs, linuxUserManager, configMap); err != nil { + if err := run(log, fs, linuxUserManager, configMap); err != nil { // So far there is only one error path in this code, and this is getting the user directories... So just make the error specific here for now. - log.Panicf("Failed to retrieve existing user directories: %v", err) + log.With(zap.Error(err)).Fatalf("Failed to retrieve existing user directories") } } @@ -92,21 +97,28 @@ func loadClientSet() (*kubernetes.Clientset, error) { } // deployKeys creates or evicts users based on the ConfigMap and deploy their SSH keys. -func deployKeys(ctx context.Context, configMap *v1.ConfigMap, fs afero.Fs, linuxUserManager user.LinuxUserManager, userMap map[string]uidGIDPair, sshAccess *ssh.Access) { +func deployKeys( + ctx context.Context, log *logger.Logger, configMap *v1.ConfigMap, fs afero.Fs, + linuxUserManager user.LinuxUserManager, userMap map[string]uidGIDPair, sshAccess *ssh.Access, +) { // If no ConfigMap exists or has been emptied, evict all users and exit. if configMap == nil || len(configMap.Data) == 0 { for username, ownership := range userMap { + log := log.With(zap.String("username", username)) if username != "root" { evictedUserPath := path.Join(evictedHomePath, username) - log.Printf("Evicting '%s' with previous UID '%d' and GID '%d' to %s.\n", username, ownership.UID, ownership.GID, evictedUserPath) + log.With(zap.Uint32("UID", ownership.UID), zap.Uint32("GID", ownership.GID)). + Infof("Evicting user to %q", evictedUserPath) + if err := evictUser(username, fs, linuxUserManager); err != nil { - log.Printf("Did not evict '%s': %v\n", username, err) + log.With(zap.Error(err)).Errorf("Did not evict user") continue } } else { + log.Infof("Removing any old keys for 'root', if existent") // Remove root's SSH key specifically instead of evicting the whole directory. if err := evictRootKey(fs, linuxUserManager); err != nil && !os.IsNotExist(err) { - log.Printf("Failed to remove previously existing root key: %v\n", err) + log.With(zap.Error(err)).Errorf("Failed to remove previously existing root key") continue } } @@ -118,25 +130,36 @@ func deployKeys(ctx context.Context, configMap *v1.ConfigMap, fs afero.Fs, linux // First, recreate users that already existed, if they are defined in the configMap. // For users which do not exist, we move their user directories to avoid accidental takeovers but also loss of data. for username, ownership := range userMap { + log := log.With(zap.String("username", username)) if username != "root" { if _, ok := configMap.Data[username]; ok { - log.Printf("Recreating '%s' with UID %d and GID %d, if not existent.\n", username, ownership.UID, ownership.GID) - if err := linuxUserManager.Creator.CreateUserWithSpecificUIDAndGID(ctx, username, int(ownership.UID), int(ownership.GID)); err != nil { - log.Printf("Did not recreate '%s': %v\n", username, err) + log.With(zap.Uint32("UID", ownership.UID), zap.Uint32("GID", ownership.GID)). + Infof("Recreating user, if not existent") + + if err := linuxUserManager.Creator.CreateUserWithSpecificUIDAndGID( + ctx, username, int(ownership.UID), int(ownership.GID), + ); err != nil { + if errors.Is(err, user.ErrUserOrGroupAlreadyExists) { + log.Infof("User already exists, skipping") + } else { + log.With(zap.Error(err)).Errorf("Failed to recreate user") + } continue } } else { evictedUserPath := path.Join(evictedHomePath, username) - log.Printf("Evicting '%s' with previous UID '%d' and GID '%d' to %s.\n", username, ownership.UID, ownership.GID, evictedUserPath) + log.With(zap.Uint32("UID", ownership.UID), zap.Uint32("GID", ownership.GID)). + Infof("Evicting user to %q", evictedUserPath) if err := evictUser(username, fs, linuxUserManager); err != nil { - log.Printf("Did not to evict '%s': %v\n", username, err) + log.With(zap.Error(err)).Errorf("Did not evict user") continue } } } else { + log.Infof("Removing any old keys for 'root', if existent") // Always remove the root key first, even if it is about to be redeployed. if err := evictRootKey(fs, linuxUserManager); err != nil && !os.IsNotExist(err) { - log.Printf("Failed to remove previously existing root key: %v\n", err) + log.With(zap.Error(err)).Errorf("Failed to remove previously existing root key") continue } } @@ -144,25 +167,30 @@ func deployKeys(ctx context.Context, configMap *v1.ConfigMap, fs afero.Fs, linux // Then, create the remaining users from the configMap (if remaining) and deploy SSH keys for all users. for username, publicKey := range configMap.Data { + log := log.With(zap.String("username", username)) if _, ok := userMap[username]; !ok { - log.Printf("Creating user '%s'\n", username) + log.Infof("Creating user") if err := linuxUserManager.Creator.CreateUser(ctx, username); err != nil { - log.Printf("Failed to create '%s': %v\n", username, err) + if errors.Is(err, user.ErrUserOrGroupAlreadyExists) { + log.Infof("User already exists, skipping") + } else { + log.With(zap.Error(err)).Errorf("Failed to create user") + } continue } } - // If we created an user, let's actually get the home directory instead of assuming it's the same as the normal home directory. + // If we created a user, let's actually get the home directory instead of assuming it's the same as the normal home directory. user, err := linuxUserManager.GetLinuxUser(username) if err != nil { - log.Printf("Failed to retrieve information about user '%s': %v\n", username, err) + log.With(zap.Error(err)).Errorf("Failed to retrieve information about user") continue } // Delete already deployed keys pathToSSHKeys := filepath.Join(user.Home, relativePathToSSHKeys) if err := fs.Remove(pathToSSHKeys); err != nil && !os.IsNotExist(err) { - log.Printf("Failed to delete remaining managed SSH keys for '%s': %v\n", username, err) + log.With(zap.Error(err)).Errorf("Failed to delete remaining managed SSH keys for user") continue } @@ -172,15 +200,15 @@ func deployKeys(ctx context.Context, configMap *v1.ConfigMap, fs afero.Fs, linux PublicKey: publicKey, } - log.Printf("Deploying new SSH key for '%s'.\n", username) + log.Infof("Deploying new SSH key for user") if err := sshAccess.DeployAuthorizedKey(context.Background(), newKey); err != nil { - log.Printf("Failed to deploy SSH keys for '%s': %v\n", username, err) + log.With(zap.Error(err)).Errorf("Failed to deploy SSH keys for user") continue } } } -// evictUser moves an user directory to evictedPath and changes their owner recursive to root. +// evictUser moves a user directory to evictedPath and changes their owner recursive to root. func evictUser(username string, fs afero.Fs, linuxUserManager user.LinuxUserManager) error { if _, err := linuxUserManager.GetLinuxUser(username); err == nil { return fmt.Errorf("user '%s' still seems to exist", username) @@ -219,7 +247,6 @@ func evictUser(username string, fs afero.Fs, linuxUserManager user.LinuxUserMana // evictRootKey removes the root key from the filesystem, instead of evicting the whole user directory. func evictRootKey(fs afero.Fs, linuxUserManager user.LinuxUserManager) error { - log.Println("Removing any old keys for 'root', if existent.") user, err := linuxUserManager.GetLinuxUser("root") if err != nil { return err @@ -235,9 +262,9 @@ func evictRootKey(fs afero.Fs, linuxUserManager user.LinuxUserManager) error { } // retrieveConfigMap contacts the Kubernetes API server and retrieves the ssh-users ConfigMap. -func retrieveConfigMap() (*v1.ConfigMap, error) { +func retrieveConfigMap(log *logger.Logger) (*v1.ConfigMap, error) { // Authenticate with the Kubernetes API and get the information from the ssh-users ConfigMap to recreate the users we need. - log.Println("Authenticating with Kubernetes...") + log.Infof("Authenticating with Kubernetes...") clientset, err := loadClientSet() if err != nil { return nil, err @@ -246,7 +273,7 @@ func retrieveConfigMap() (*v1.ConfigMap, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - log.Println("Requesting 'ssh-users' ConfigMap...") + log.Infof("Requesting 'ssh-users' ConfigMap...") configmap, err := clientset.CoreV1().ConfigMaps("kube-system").Get(ctx, "ssh-users", v1Options.GetOptions{}) if err != nil { return nil, err @@ -256,7 +283,7 @@ func retrieveConfigMap() (*v1.ConfigMap, error) { } // generateUserMap iterates the list of existing home directories to create a map of previously existing usernames to their previous respective UID and GID. -func generateUserMap(fs afero.Fs) (map[string]uidGIDPair, error) { +func generateUserMap(log *logger.Logger, fs afero.Fs) (map[string]uidGIDPair, error) { // Go through the normalHomePath directory, and create a mapping of existing user names in combination with their owner's UID & GID. // We use this information later to create missing users under the same UID and GID to avoid breakage. fileInfo, err := afero.ReadDir(fs, normalHomePath) @@ -268,12 +295,14 @@ func generateUserMap(fs afero.Fs) (map[string]uidGIDPair, error) { userMap["root"] = uidGIDPair{UID: 0, GID: 0} // This will fail under MemMapFS, since it's not UNIX-compatible. for _, singleInfo := range fileInfo { + log := log.With("username", singleInfo.Name()) // Fail gracefully instead of hard. if stat, ok := singleInfo.Sys().(*syscall.Stat_t); ok { userMap[singleInfo.Name()] = uidGIDPair{UID: stat.Uid, GID: stat.Gid} - log.Printf("Found home directory for '%s' (%d:%d).\n", singleInfo.Name(), stat.Uid, stat.Gid) + log.With(zap.Uint32("UID", stat.Uid), zap.Uint32("GID", stat.Gid)). + Infof("Found home directory for user") } else { - log.Printf("WARNING: Failed to retrieve UNIX stat for %s. User will not be evicted, or if this directory belongs to an user that is to be created later, it might be created under a different UID/GID than before.\n", singleInfo.Name()) + log.Warnf("Failed to retrieve UNIX stat for user. User will not be evicted, or if this directory belongs to a user that is to be created later, it might be created under a different UID/GID than before") continue } } @@ -281,17 +310,17 @@ func generateUserMap(fs afero.Fs) (map[string]uidGIDPair, error) { return userMap, nil } -func run(fs afero.Fs, linuxUserManager user.LinuxUserManager, configMap *v1.ConfigMap) error { - sshAccess := ssh.NewAccess(linuxUserManager) +func run(log *logger.Logger, fs afero.Fs, linuxUserManager user.LinuxUserManager, configMap *v1.ConfigMap) error { + sshAccess := ssh.NewAccess(log, linuxUserManager) // Generate userMap containing existing user directories and their ownership - userMap, err := generateUserMap(fs) + userMap, err := generateUserMap(log, fs) if err != nil { return err } // Try to deploy keys based on configmap. - deployKeys(context.Background(), configMap, fs, linuxUserManager, userMap, sshAccess) + deployKeys(context.Background(), log, configMap, fs, linuxUserManager, userMap, sshAccess) return nil } diff --git a/access_manager/access_manager_test.go b/access_manager/access_manager_test.go index e9f7e4612..35ee3ae22 100644 --- a/access_manager/access_manager_test.go +++ b/access_manager/access_manager_test.go @@ -11,6 +11,7 @@ import ( "github.com/edgelesssys/constellation/internal/deploy/ssh" "github.com/edgelesssys/constellation/internal/deploy/user" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -164,86 +165,89 @@ func TestDeployKeys(t *testing.T) { }, }, } - for _, tc := range testCases { - fs := afero.NewMemMapFs() - require.NoError(fs.MkdirAll(normalHomePath, 0o700)) - require.NoError(fs.Mkdir("/etc", 0o644)) - _, err := fs.Create("/etc/passwd") - require.NoError(err) - - // Create fake user directories - for user := range tc.existingUsers { - userHomePath := path.Join(normalHomePath, user) - err := fs.MkdirAll(userHomePath, 0o700) - require.NoError(err) - require.NoError(fs.Chown(userHomePath, int(tc.existingUsers[user].UID), int(tc.existingUsers[user].GID))) - } - - linuxUserManager := user.NewLinuxUserManagerFake(fs) - sshAccess := ssh.NewAccess(linuxUserManager) - deployKeys(context.Background(), tc.configMap, fs, linuxUserManager, tc.existingUsers, sshAccess) - - // Unfourtunaly, we cannot retrieve the UID/GID from afero's MemMapFs without weird hacks, - // as it does not have getters and it is not exported. - if tc.configMap != nil && tc.existingUsers != nil { - // Parse /etc/passwd and check for users - passwdEntries, err := linuxUserManager.Passwd.Parse(fs) + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + fs := afero.NewMemMapFs() + require.NoError(fs.MkdirAll(normalHomePath, 0o700)) + require.NoError(fs.Mkdir("/etc", 0o644)) + _, err := fs.Create("/etc/passwd") require.NoError(err) - // Check recreation or deletion + // Create fake user directories for user := range tc.existingUsers { - if _, ok := tc.configMap.Data[user]; ok { - checkHomeDirectory(user, fs, assert, true) + userHomePath := path.Join(normalHomePath, user) + err := fs.MkdirAll(userHomePath, 0o700) + require.NoError(err) + require.NoError(fs.Chown(userHomePath, int(tc.existingUsers[user].UID), int(tc.existingUsers[user].GID))) + } - // Check if user exists in /etc/passwd - userEntry, ok := passwdEntries[user] - assert.True(ok) + log := logger.NewTest(t) + linuxUserManager := user.NewLinuxUserManagerFake(fs) + sshAccess := ssh.NewAccess(log, linuxUserManager) + deployKeys(context.Background(), log, tc.configMap, fs, linuxUserManager, tc.existingUsers, sshAccess) - // Check if user has been recreated with correct UID/GID - actualUID, err := strconv.Atoi(userEntry.Uid) - assert.NoError(err) - assert.EqualValues(tc.existingUsers[user].UID, actualUID) - actualGID, err := strconv.Atoi(userEntry.Gid) - assert.NoError(err) - assert.EqualValues(tc.existingUsers[user].GID, actualGID) + // Unfortunately, we cannot retrieve the UID/GID from afero's MemMapFs without weird hacks, + // as it does not have getters and it is not exported. + if tc.configMap != nil && tc.existingUsers != nil { + // Parse /etc/passwd and check for users + passwdEntries, err := linuxUserManager.Passwd.Parse(fs) + require.NoError(err) - // Check if the user has the right keys - checkSSHKeys(user, fs, assert, tc.configMap.Data[user]+"\n") + // Check recreation or deletion + for user := range tc.existingUsers { + if _, ok := tc.configMap.Data[user]; ok { + checkHomeDirectory(user, fs, assert, true) - } else { - // Check if home directory is not available anymore under the regular path - checkHomeDirectory(user, fs, assert, false) + // Check if user exists in /etc/passwd + userEntry, ok := passwdEntries[user] + assert.True(ok) - // Check if home directory has been evicted - homeDirs, err := afero.ReadDir(fs, evictedHomePath) - require.NoError(err) + // Check if user has been recreated with correct UID/GID + actualUID, err := strconv.Atoi(userEntry.Uid) + assert.NoError(err) + assert.EqualValues(tc.existingUsers[user].UID, actualUID) + actualGID, err := strconv.Atoi(userEntry.Gid) + assert.NoError(err) + assert.EqualValues(tc.existingUsers[user].GID, actualGID) - var userDirectoryName string - for _, singleDir := range homeDirs { - if strings.Contains(singleDir.Name(), user+"_") { - userDirectoryName = singleDir.Name() - break + // Check if the user has the right keys + checkSSHKeys(user, fs, assert, tc.configMap.Data[user]+"\n") + + } else { + // Check if home directory is not available anymore under the regular path + checkHomeDirectory(user, fs, assert, false) + + // Check if home directory has been evicted + homeDirs, err := afero.ReadDir(fs, evictedHomePath) + require.NoError(err) + + var userDirectoryName string + for _, singleDir := range homeDirs { + if strings.Contains(singleDir.Name(), user+"_") { + userDirectoryName = singleDir.Name() + break + } } + assert.NotEmpty(userDirectoryName) + + // Check if user does not exist in /etc/passwd + _, ok := passwdEntries[user] + assert.False(ok) } - assert.NotEmpty(userDirectoryName) - - // Check if user does not exist in /etc/passwd - _, ok := passwdEntries[user] - assert.False(ok) - } - } - - // Check creation of new users - for user := range tc.configMap.Data { - // We already checked recreated or evicted users, so skip them. - if _, ok := tc.existingUsers[user]; ok { - continue } - checkHomeDirectory(user, fs, assert, true) - checkSSHKeys(user, fs, assert, tc.configMap.Data[user]+"\n") + // Check creation of new users + for user := range tc.configMap.Data { + // We already checked recreated or evicted users, so skip them. + if _, ok := tc.existingUsers[user]; ok { + continue + } + + checkHomeDirectory(user, fs, assert, true) + checkSSHKeys(user, fs, assert, tc.configMap.Data[user]+"\n") + } } - } + }) } } diff --git a/activation/cmd/main.go b/activation/cmd/main.go index 72736bd2c..0818d7cb7 100644 --- a/activation/cmd/main.go +++ b/activation/cmd/main.go @@ -15,52 +15,61 @@ import ( "github.com/edgelesssys/constellation/internal/constants" "github.com/edgelesssys/constellation/internal/file" "github.com/edgelesssys/constellation/internal/grpc/atlscredentials" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" - "k8s.io/klog/v2" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) func main() { provider := flag.String("cloud-provider", "", "cloud service provider this binary is running on") kmsEndpoint := flag.String("kms-endpoint", "", "endpoint of Constellations key management service") + verbosity := flag.Int("v", 0, "log verbosity in zap logging levels. Use -1 for debug information, 0 for info, 1 for warn, 2 for error") - klog.InitFlags(nil) flag.Parse() - defer klog.Flush() + log := logger.New(logger.JSONLog, zapcore.Level(*verbosity)) - klog.V(2).Infof("\nConstellation Node Activation Service\nVersion: %s\nRunning on: %s", constants.VersionInfo, *provider) + log.With(zap.String("version", constants.VersionInfo), zap.String("cloudProvider", *provider)). + Infof("Constellation Node Activation Service") handler := file.NewHandler(afero.NewOsFs()) - validator, err := validator.New(*provider, handler) + validator, err := validator.New(log.Named("validator"), *provider, handler) if err != nil { flag.Usage() - klog.Exitf("failed to create validator: %s", err) + log.With(zap.Error(err)).Fatalf("Failed to create validator") } creds := atlscredentials.New(nil, []atls.Validator{validator}) - kubeadm, err := kubeadm.New() + kubeadm, err := kubeadm.New(log.Named("kubeadm")) if err != nil { - klog.Exitf("failed to create kubeadm: %s", err) + log.With(zap.Error(err)).Fatalf("Failed to create kubeadm") } - kms := kms.New(*kmsEndpoint) + kms := kms.New(log.Named("kms"), *kmsEndpoint) - server := server.New(handler, kubernetesca.New(handler), kubeadm, kms) + server := server.New( + log.Named("server"), + handler, + kubernetesca.New(log.Named("certificateAuthority"), handler), + kubeadm, + kms, + ) - watcher, err := watcher.New(validator) + watcher, err := watcher.New(log.Named("fileWatcher"), validator) if err != nil { - klog.Exitf("failed to create watcher for measurements updates: %s", err) + log.With(zap.Error(err)).Fatalf("Failed to create watcher for measurements updates") } defer watcher.Close() go func() { - klog.V(4).Infof("starting file watcher for measurements file %s", filepath.Join(constants.ActivationBasePath, constants.ActivationMeasurementsFilename)) + log.Infof("starting file watcher for measurements file %s", filepath.Join(constants.ActivationBasePath, constants.ActivationMeasurementsFilename)) if err := watcher.Watch(filepath.Join(constants.ActivationBasePath, constants.ActivationMeasurementsFilename)); err != nil { - klog.Exitf("failed to watch measurements file: %s", err) + log.With(zap.Error(err)).Fatalf("Failed to watch measurements file") } }() if err := server.Run(creds, strconv.Itoa(constants.ActivationServicePort)); err != nil { - klog.Exitf("failed to run server: %s", err) + log.With(zap.Error(err)).Fatalf("Failed to run server") } } diff --git a/activation/kms/kms.go b/activation/kms/kms.go index 71ca2a426..c9e7cac5c 100644 --- a/activation/kms/kms.go +++ b/activation/kms/kms.go @@ -4,21 +4,24 @@ import ( "context" "fmt" + "github.com/edgelesssys/constellation/internal/logger" "github.com/edgelesssys/constellation/kms/server/kmsapi/kmsproto" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "k8s.io/klog/v2" ) // Client interacts with Constellation's key management service. type Client struct { + log *logger.Logger endpoint string grpc grpcClient } // New creates a new KMS. -func New(endpoint string) Client { +func New(log *logger.Logger, endpoint string) Client { return Client{ + log: log, endpoint: endpoint, grpc: client{}, } @@ -26,15 +29,17 @@ func New(endpoint string) Client { // GetDEK returns a data encryption key for the given UUID. func (c Client) GetDataKey(ctx context.Context, uuid string, length int) ([]byte, error) { + log := c.log.With(zap.String("diskUUID", uuid), zap.String("endpoint", c.endpoint)) // TODO: update credentials if we enable aTLS on the KMS // For now this is fine since traffic is only routed through the Constellation cluster + log.Infof("Connecting to KMS at %s", c.endpoint) conn, err := grpc.DialContext(ctx, c.endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } defer conn.Close() - klog.V(6).Infof("GetDataKey: connecting to KMS at %s", c.endpoint) + log.Infof("Requesting data key") res, err := c.grpc.GetDataKey( ctx, &kmsproto.GetDataKeyRequest{ @@ -47,6 +52,7 @@ func (c Client) GetDataKey(ctx context.Context, uuid string, length int) ([]byte return nil, fmt.Errorf("fetching data encryption key from Constellation KMS: %w", err) } + log.Infof("Data key request successful") return res.DataKey, nil } diff --git a/activation/kms/kms_test.go b/activation/kms/kms_test.go index c829672ee..f90e2e2d4 100644 --- a/activation/kms/kms_test.go +++ b/activation/kms/kms_test.go @@ -5,6 +5,7 @@ import ( "errors" "testing" + "github.com/edgelesssys/constellation/internal/logger" "github.com/edgelesssys/constellation/kms/server/kmsapi/kmsproto" "github.com/stretchr/testify/assert" "google.golang.org/grpc" @@ -41,7 +42,10 @@ func TestGetDataKey(t *testing.T) { listener := bufconn.Listen(1) defer listener.Close() - client := New(listener.Addr().String()) + client := New( + logger.NewTest(t), + listener.Addr().String(), + ) client.grpc = tc.client diff --git a/activation/kubeadm/kubeadm.go b/activation/kubeadm/kubeadm.go index b1edce3de..31c594007 100644 --- a/activation/kubeadm/kubeadm.go +++ b/activation/kubeadm/kubeadm.go @@ -7,6 +7,7 @@ import ( "github.com/edgelesssys/constellation/internal/constants" "github.com/edgelesssys/constellation/internal/file" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" @@ -14,7 +15,6 @@ import ( "k8s.io/client-go/tools/clientcmd" certutil "k8s.io/client-go/util/cert" bootstraputil "k8s.io/cluster-bootstrap/token/util" - "k8s.io/klog/v2" bootstraptoken "k8s.io/kubernetes/cmd/kubeadm/app/apis/bootstraptoken/v1" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3" @@ -26,12 +26,13 @@ import ( // Kubeadm manages joining of new nodes. type Kubeadm struct { + log *logger.Logger client clientset.Interface file file.Handler } // New creates a new Kubeadm instance. -func New() (*Kubeadm, error) { +func New(log *logger.Logger) (*Kubeadm, error) { config, err := rest.InClusterConfig() if err != nil { return nil, fmt.Errorf("failed to get in-cluster config: %w", err) @@ -43,6 +44,7 @@ func New() (*Kubeadm, error) { file := file.NewHandler(afero.NewOsFs()) return &Kubeadm{ + log: log, client: client, file: file, }, nil @@ -50,7 +52,7 @@ func New() (*Kubeadm, error) { // GetJoinToken creates a new bootstrap (join) token, which a node can use to join the cluster. func (k *Kubeadm) GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscovery, error) { - klog.V(6).Info("[kubeadm] Generating new random bootstrap token") + k.log.Infof("Generating new random bootstrap token") rawToken, err := bootstraputil.GenerateBootstrapToken() if err != nil { return nil, fmt.Errorf("couldn't generate random token: %w", err) @@ -66,13 +68,13 @@ func (k *Kubeadm) GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscov } // create the token in Kubernetes - klog.V(6).Info("[kubeadm] Creating bootstrap token in Kubernetes") + k.log.Infof("Creating bootstrap token in Kubernetes") if err := tokenphase.CreateNewTokens(k.client, []bootstraptoken.BootstrapToken{token}); err != nil { return nil, fmt.Errorf("creating bootstrap token: %w", err) } // parse Kubernetes CA certs - klog.V(6).Info("[kubeadm] Preparing join token for new node") + k.log.Infof("Preparing join token for new node") rawConfig, err := k.file.Read(constants.CoreOSAdminConfFilename) if err != nil { return nil, fmt.Errorf("loading kubeconfig file: %w", err) @@ -94,6 +96,7 @@ func (k *Kubeadm) GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscov publicKeyPins = append(publicKeyPins, pubkeypin.Hash(caCert)) } + k.log.Infof("Join token creation successful") return &kubeadm.BootstrapTokenDiscovery{ Token: tokenStr.String(), APIServerEndpoint: "10.118.0.1:6443", // This is not HA and should be replaced with the IP of the node issuing the token @@ -104,13 +107,13 @@ func (k *Kubeadm) GetJoinToken(ttl time.Duration) (*kubeadm.BootstrapTokenDiscov // GetControlPlaneCertificateKey uploads Kubernetes encrypted CA certificates to Kubernetes and returns the decryption key. // The key can be used by new nodes to join the cluster as a control plane node. func (k *Kubeadm) GetControlPlaneCertificateKey() (string, error) { - klog.V(6).Info("[kubeadm] Creating new random control plane certificate key") + k.log.Infof("Creating new random control plane certificate key") key, err := copycerts.CreateCertificateKey() if err != nil { return "", fmt.Errorf("couldn't create control plane certificate key: %w", err) } - klog.V(6).Info("[kubeadm] Uploading certs to Kubernetes") + k.log.Infof("Uploading certs to Kubernetes") cfg := &kubeadmapi.InitConfiguration{ ClusterConfiguration: kubeadmapi.ClusterConfiguration{ CertificatesDir: constants.KubeadmCertificateDir, diff --git a/activation/kubeadm/kubeadm_test.go b/activation/kubeadm/kubeadm_test.go index ed9e4d542..f79cb0e14 100644 --- a/activation/kubeadm/kubeadm_test.go +++ b/activation/kubeadm/kubeadm_test.go @@ -7,6 +7,7 @@ import ( "github.com/edgelesssys/constellation/internal/constants" "github.com/edgelesssys/constellation/internal/file" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -77,6 +78,7 @@ kind: Config`, require := require.New(t) client := &Kubeadm{ + log: logger.NewTest(t), file: file.NewHandler(afero.NewMemMapFs()), client: fake.NewSimpleClientset(), } @@ -117,6 +119,7 @@ func TestGetControlPlaneCertificateKey(t *testing.T) { assert := assert.New(t) client := &Kubeadm{ + log: logger.NewTest(t), client: tc.client, } diff --git a/activation/kubernetesca/kubernetesca.go b/activation/kubernetesca/kubernetesca.go index 161e2ee10..dcc8c1abc 100644 --- a/activation/kubernetesca/kubernetesca.go +++ b/activation/kubernetesca/kubernetesca.go @@ -12,7 +12,7 @@ import ( "github.com/edgelesssys/constellation/coordinator/util" "github.com/edgelesssys/constellation/internal/file" - "k8s.io/klog/v2" + "github.com/edgelesssys/constellation/internal/logger" ) const ( @@ -22,19 +22,21 @@ const ( // KubernetesCA handles signing of certificates using the Kubernetes root CA. type KubernetesCA struct { + log *logger.Logger file file.Handler } // New creates a new KubernetesCA. -func New(fileHandler file.Handler) *KubernetesCA { +func New(log *logger.Logger, fileHandler file.Handler) *KubernetesCA { return &KubernetesCA{ + log: log, file: fileHandler, } } // GetCertificate creates a certificate for a node and signs it using the Kubernetes root CA. func (c KubernetesCA) GetCertificate(nodeName string) (cert []byte, key []byte, err error) { - klog.V(6).Info("CA: loading Kubernetes CA certificate") + c.log.Debugf("Loading Kubernetes CA certificate") parentCertRaw, err := c.file.Read(caCertFilename) if err != nil { return nil, nil, err @@ -45,7 +47,7 @@ func (c KubernetesCA) GetCertificate(nodeName string) (cert []byte, key []byte, return nil, nil, err } - klog.V(6).Info("CA: loading Kubernetes CA private key") + c.log.Debugf("Loading Kubernetes CA private key") parentKeyRaw, err := c.file.Read(caKeyFilename) if err != nil { return nil, nil, err @@ -66,7 +68,7 @@ func (c KubernetesCA) GetCertificate(nodeName string) (cert []byte, key []byte, return nil, nil, err } - klog.V(6).Info("CA: creating kubelet private key") + c.log.Infof("Creating kubelet private key") privK, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) if err != nil { return nil, nil, err @@ -80,7 +82,7 @@ func (c KubernetesCA) GetCertificate(nodeName string) (cert []byte, key []byte, Bytes: keyBytes, }) - klog.V(6).Info("CA: creating kubelet certificate") + c.log.Infof("Creating kubelet certificate") serialNumber, err := util.GenerateCertificateSerialNumber() if err != nil { return nil, nil, err diff --git a/activation/kubernetesca/kubernetesca_test.go b/activation/kubernetesca/kubernetesca_test.go index a943f4856..eac8e3909 100644 --- a/activation/kubernetesca/kubernetesca_test.go +++ b/activation/kubernetesca/kubernetesca_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/edgelesssys/constellation/internal/file" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -89,7 +90,10 @@ Q29uc3RlbGxhdGlvbg== require.NoError(file.Write(caKeyFilename, tc.caKey, 0o644)) } - ca := New(file) + ca := New( + logger.NewTest(t), + file, + ) nodeName := "test" kubeCert, kubeKey, err := ca.GetCertificate(nodeName) diff --git a/activation/server/server.go b/activation/server/server.go index 4babd3504..df1c02f92 100644 --- a/activation/server/server.go +++ b/activation/server/server.go @@ -11,18 +11,20 @@ import ( attestationtypes "github.com/edgelesssys/constellation/internal/attestation/types" "github.com/edgelesssys/constellation/internal/constants" "github.com/edgelesssys/constellation/internal/file" - "github.com/edgelesssys/constellation/internal/grpc/grpc_klog" + "github.com/edgelesssys/constellation/internal/grpc/grpclog" + "github.com/edgelesssys/constellation/internal/logger" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" - "k8s.io/klog/v2" kubeadmv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3" ) // Server implements the core logic of Constellation's node activation service. type Server struct { + log *logger.Logger file file.Handler joinTokenGetter joinTokenGetter dataKeyGetter dataKeyGetter @@ -31,8 +33,9 @@ type Server struct { } // New initializes a new Server. -func New(fileHandler file.Handler, ca certificateAuthority, joinTokenGetter joinTokenGetter, dataKeyGetter dataKeyGetter) *Server { +func New(log *logger.Logger, fileHandler file.Handler, ca certificateAuthority, joinTokenGetter joinTokenGetter, dataKeyGetter dataKeyGetter) *Server { return &Server{ + log: log, file: fileHandler, joinTokenGetter: joinTokenGetter, dataKeyGetter: dataKeyGetter, @@ -42,9 +45,10 @@ func New(fileHandler file.Handler, ca certificateAuthority, joinTokenGetter join // Run starts the gRPC server on the given port, using the provided tlsConfig. func (s *Server) Run(creds credentials.TransportCredentials, port string) error { + s.log.WithIncreasedLevel(zap.WarnLevel).Named("gRPC").ReplaceGRPCLogger() grpcServer := grpc.NewServer( grpc.Creds(creds), - grpc.UnaryInterceptor(grpc_klog.LogGRPC(2)), + s.log.Named("gRPC").GetServerUnaryInterceptor(), ) proto.RegisterAPIServer(grpcServer, s) @@ -53,7 +57,7 @@ func (s *Server) Run(creds credentials.TransportCredentials, port string) error if err != nil { return fmt.Errorf("failed to listen: %s", err) } - klog.V(2).Infof("starting activation service on %s", lis.Addr().String()) + s.log.Infof("Starting activation service on %s", lis.Addr().String()) return grpcServer.Serve(lis) } @@ -63,12 +67,13 @@ func (s *Server) Run(creds credentials.TransportCredentials, port string) error // - Kubernetes join token. // - cluster and owner ID to taint the node as initialized. func (s *Server) ActivateWorkerNode(ctx context.Context, req *proto.ActivateWorkerNodeRequest) (*proto.ActivateWorkerNodeResponse, error) { - nodeParameters, err := s.activateNode(ctx, "ActivateWorker", req.DiskUuid, req.NodeName) + s.log.Infof("ActivateWorkerNode called") + nodeParameters, err := s.activateNode(ctx, req.DiskUuid, req.NodeName) if err != nil { - return nil, fmt.Errorf("ActivateNode failed: %w", err) + return nil, fmt.Errorf("ActivateWorkerNode failed: %w", err) } - klog.V(4).Info("ActivateNode successful") + s.log.Infof("ActivateWorkerNode successful") return &proto.ActivateWorkerNodeResponse{ StateDiskKey: nodeParameters.stateDiskKey, @@ -89,9 +94,10 @@ func (s *Server) ActivateWorkerNode(ctx context.Context, req *proto.ActivateWork // - cluster and owner ID to taint the node as initialized. // - a decryption key for CA certificates uploaded to the Kubernetes cluster. func (s *Server) ActivateControlPlaneNode(ctx context.Context, req *proto.ActivateControlPlaneNodeRequest) (*proto.ActivateControlPlaneNodeResponse, error) { - nodeParameters, err := s.activateNode(ctx, "ActivateControlPlane", req.DiskUuid, req.NodeName) + s.log.Infof("ActivateControlPlaneNode called") + nodeParameters, err := s.activateNode(ctx, req.DiskUuid, req.NodeName) if err != nil { - return nil, fmt.Errorf("ActivateControlPlane failed: %w", err) + return nil, fmt.Errorf("ActivateControlPlaneNode failed: %w", err) } certKey, err := s.joinTokenGetter.GetControlPlaneCertificateKey() @@ -99,7 +105,7 @@ func (s *Server) ActivateControlPlaneNode(ctx context.Context, req *proto.Activa return nil, fmt.Errorf("ActivateControlPlane failed: %w", err) } - klog.V(4).Info("ActivateControlPlane successful") + s.log.Infof("ActivateControlPlaneNode successful") return &proto.ActivateControlPlaneNodeResponse{ StateDiskKey: nodeParameters.stateDiskKey, @@ -114,29 +120,30 @@ func (s *Server) ActivateControlPlaneNode(ctx context.Context, req *proto.Activa }, nil } -func (s *Server) activateNode(ctx context.Context, logPrefix, diskUUID, nodeName string) (nodeParameters, error) { - klog.V(4).Infof("%s: loading IDs", logPrefix) +func (s *Server) activateNode(ctx context.Context, diskUUID, nodeName string) (nodeParameters, error) { + log := s.log.With(zap.String("peerAddress", grpclog.PeerAddrFromContext(ctx))) + log.Infof("Loading IDs") var id attestationtypes.ID if err := s.file.ReadJSON(filepath.Join(constants.ActivationBasePath, constants.ActivationIDFilename), &id); err != nil { - klog.Errorf("unable to load IDs: %s", err) + log.With(zap.Error(err)).Errorf("Unable to load IDs") return nodeParameters{}, status.Errorf(codes.Internal, "unable to load IDs: %s", err) } - klog.V(4).Infof("%s: requesting disk encryption key", logPrefix) + log.Infof("Requesting disk encryption key") stateDiskKey, err := s.dataKeyGetter.GetDataKey(ctx, diskUUID, constants.StateDiskKeyLength) if err != nil { - klog.Errorf("unable to get key for stateful disk: %s", err) + log.With(zap.Error(err)).Errorf("Unable to get key for stateful disk") return nodeParameters{}, status.Errorf(codes.Internal, "unable to get key for stateful disk: %s", err) } - klog.V(4).Infof("%s: creating Kubernetes join token", logPrefix) + log.Infof("Creating Kubernetes join token") kubeArgs, err := s.joinTokenGetter.GetJoinToken(constants.KubernetesJoinTokenTTL) if err != nil { - klog.Errorf("unable to generate Kubernetes join arguments: %s", err) + log.With(zap.Error(err)).Errorf("Unable to generate Kubernetes join arguments") return nodeParameters{}, status.Errorf(codes.Internal, "unable to generate Kubernetes join arguments: %s", err) } - klog.V(4).Infof("%s: creating signed kubelet certificate", logPrefix) + log.Infof("Creating signed kubelet certificate") kubeletCert, kubeletKey, err := s.ca.GetCertificate(nodeName) if err != nil { return nodeParameters{}, status.Errorf(codes.Internal, "unable to generate kubelet certificate: %s", err) diff --git a/activation/server/server_test.go b/activation/server/server_test.go index c8bba76aa..9540a8bfc 100644 --- a/activation/server/server_test.go +++ b/activation/server/server_test.go @@ -12,6 +12,7 @@ import ( attestationtypes "github.com/edgelesssys/constellation/internal/attestation/types" "github.com/edgelesssys/constellation/internal/constants" "github.com/edgelesssys/constellation/internal/file" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -131,9 +132,15 @@ func TestActivateNode(t *testing.T) { if len(tc.id) > 0 { require.NoError(file.Write(filepath.Join(constants.ActivationBasePath, constants.ActivationIDFilename), tc.id, 0o644)) } - api := New(file, tc.ca, tc.kubeadm, tc.kms) + api := New( + logger.NewTest(t), + file, + tc.ca, + tc.kubeadm, + tc.kms, + ) - resp, err := api.activateNode(context.Background(), "test", "uuid", "test") + resp, err := api.activateNode(context.Background(), "uuid", "test") if tc.wantErr { assert.Error(err) return @@ -212,7 +219,13 @@ func TestActivateWorkerNode(t *testing.T) { file := file.NewHandler(afero.NewMemMapFs()) require.NoError(file.Write(filepath.Join(constants.ActivationBasePath, constants.ActivationIDFilename), tc.id, 0o644)) - api := New(file, tc.ca, tc.kubeadm, tc.kms) + api := New( + logger.NewTest(t), + file, + tc.ca, + tc.kubeadm, + tc.kms, + ) resp, err := api.ActivateWorkerNode(context.Background(), &activationproto.ActivateWorkerNodeRequest{DiskUuid: "uuid", NodeName: "test"}) if tc.wantErr { @@ -311,7 +324,13 @@ func TestActivateControlPlaneNode(t *testing.T) { file := file.NewHandler(afero.NewMemMapFs()) require.NoError(file.Write(filepath.Join(constants.ActivationBasePath, constants.ActivationIDFilename), tc.id, 0o644)) - api := New(file, tc.ca, tc.kubeadm, tc.kms) + api := New( + logger.NewTest(t), + file, + tc.ca, + tc.kubeadm, + tc.kms, + ) resp, err := api.ActivateControlPlaneNode(context.Background(), &activationproto.ActivateControlPlaneNodeRequest{DiskUuid: "uuid", NodeName: "test"}) if tc.wantErr { diff --git a/activation/validator/validator.go b/activation/validator/validator.go index a90d6e308..2fc1df7f6 100644 --- a/activation/validator/validator.go +++ b/activation/validator/validator.go @@ -13,11 +13,12 @@ import ( "github.com/edgelesssys/constellation/internal/cloud/cloudprovider" "github.com/edgelesssys/constellation/internal/constants" "github.com/edgelesssys/constellation/internal/file" - "k8s.io/klog/v2" + "github.com/edgelesssys/constellation/internal/logger" ) // Updatable implements an updatable atls.Validator. type Updatable struct { + log *logger.Logger mux sync.Mutex newValidator newValidatorFunc fileHandler file.Handler @@ -25,7 +26,7 @@ type Updatable struct { } // New initializes a new updatable validator. -func New(csp string, fileHandler file.Handler) (*Updatable, error) { +func New(log *logger.Logger, csp string, fileHandler file.Handler) (*Updatable, error) { var newValidator newValidatorFunc switch cloudprovider.FromString(csp) { case cloudprovider.Azure: @@ -39,6 +40,7 @@ func New(csp string, fileHandler file.Handler) (*Updatable, error) { } u := &Updatable{ + log: log, newValidator: newValidator, fileHandler: fileHandler, } @@ -66,13 +68,13 @@ func (u *Updatable) Update() error { u.mux.Lock() defer u.mux.Unlock() - klog.V(4).Info("Updating expected measurements") + u.log.Infof("Updating expected measurements") var measurements map[uint32][]byte if err := u.fileHandler.ReadJSON(filepath.Join(constants.ActivationBasePath, constants.ActivationMeasurementsFilename), &measurements); err != nil { return err } - klog.V(6).Infof("New measurements: %v", measurements) + u.log.Debugf("New measurements: %v", measurements) u.Validator = u.newValidator(measurements) diff --git a/activation/validator/validator_test.go b/activation/validator/validator_test.go index 2b2da05e1..383c20900 100644 --- a/activation/validator/validator_test.go +++ b/activation/validator/validator_test.go @@ -16,6 +16,7 @@ import ( "github.com/edgelesssys/constellation/internal/atls" "github.com/edgelesssys/constellation/internal/constants" "github.com/edgelesssys/constellation/internal/file" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -67,7 +68,11 @@ func TestNewUpdateableValidator(t *testing.T) { )) } - _, err := New(tc.provider, handler) + _, err := New( + logger.NewTest(t), + tc.provider, + handler, + ) if tc.wantErr { assert.Error(err) } else { @@ -88,7 +93,11 @@ func TestUpdate(t *testing.T) { handler := file.NewHandler(afero.NewMemMapFs()) // create server - validator := &Updatable{newValidator: newValidator, fileHandler: handler} + validator := &Updatable{ + log: logger.NewTest(t), + newValidator: newValidator, + fileHandler: handler, + } // Update should fail if the file does not exist assert.Error(validator.Update()) @@ -139,6 +148,7 @@ func TestUpdateConcurrency(t *testing.T) { handler := file.NewHandler(afero.NewMemMapFs()) validator := &Updatable{ + log: logger.NewTest(t), fileHandler: handler, newValidator: func(m map[uint32][]byte) atls.Validator { return fakeValidator{fakeOID: fakeOID{1, 3, 9900, 1}} diff --git a/activation/watcher/watcher.go b/activation/watcher/watcher.go index aba96598a..e3fe46742 100644 --- a/activation/watcher/watcher.go +++ b/activation/watcher/watcher.go @@ -3,25 +3,28 @@ package watcher import ( "fmt" + "github.com/edgelesssys/constellation/internal/logger" "github.com/fsnotify/fsnotify" - "k8s.io/klog/v2" + "go.uber.org/zap" ) // FileWatcher watches for changes to the file and calls the waiter's Update method. type FileWatcher struct { + log *logger.Logger updater updater watcher eventWatcher done chan struct{} } // New creates a new FileWatcher for the given validator. -func New(updater updater) (*FileWatcher, error) { +func New(log *logger.Logger, updater updater) (*FileWatcher, error) { watcher, err := fsnotify.NewWatcher() if err != nil { return nil, err } return &FileWatcher{ + log: log, watcher: &fsnotifyWatcher{watcher}, updater: updater, done: make(chan struct{}, 1), @@ -39,6 +42,7 @@ func (f *FileWatcher) Close() error { // Watch starts watching the file at the given path. // It will call the watcher's Update method when the file is modified. func (f *FileWatcher) Watch(file string) error { + log := f.log.With("file", file) defer func() { f.done <- struct{}{} }() if err := f.watcher.Add(file); err != nil { return err @@ -48,28 +52,28 @@ func (f *FileWatcher) Watch(file string) error { select { case event, ok := <-f.watcher.Events(): if !ok { - klog.V(4).Infof("watcher closed") + log.Infof("Watcher closed") return nil } // file changes may be indicated by either a WRITE, CHMOD, CREATE or RENAME event if event.Op&(fsnotify.Write|fsnotify.Chmod|fsnotify.Create|fsnotify.Rename) != 0 { if err := f.updater.Update(); err != nil { - klog.Errorf("failed to update activation validator: %s", err) + log.With(zap.Error(err)).Errorf("Failed to update activation validator") } } // if a file gets removed, e.g. by a rename event, we need to re-add the file to the watcher if event.Op&fsnotify.Remove == fsnotify.Remove { if err := f.watcher.Add(event.Name); err != nil { - klog.Errorf("failed to re-add file %q to watcher: %s", event.Name, err) + log.With(zap.Error(err)).Errorf("Failed to re-add file to watcher") return fmt.Errorf("failed to re-add file %q to watcher: %w", event.Name, err) } } case err := <-f.watcher.Errors(): if err != nil { - klog.Errorf("watching for measurements updates: %s", err) + log.With(zap.Error(err)).Errorf("Watching for measurements updates") return fmt.Errorf("watching for measurements updates: %w", err) } } diff --git a/activation/watcher/watcher_test.go b/activation/watcher/watcher_test.go index fd39e4d3b..70f6e3ea4 100644 --- a/activation/watcher/watcher_test.go +++ b/activation/watcher/watcher_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/edgelesssys/constellation/internal/logger" "github.com/fsnotify/fsnotify" "github.com/stretchr/testify/assert" ) @@ -99,6 +100,7 @@ func TestWatcher(t *testing.T) { assert := assert.New(t) watcher := &FileWatcher{ + log: logger.NewTest(t), updater: tc.updater, watcher: tc.watcher, done: make(chan struct{}, 1), diff --git a/coordinator/core/ssh.go b/coordinator/core/ssh.go index 4ebc9a789..4a8870479 100644 --- a/coordinator/core/ssh.go +++ b/coordinator/core/ssh.go @@ -4,11 +4,13 @@ import ( "context" "github.com/edgelesssys/constellation/internal/deploy/ssh" + "github.com/edgelesssys/constellation/internal/logger" + "go.uber.org/zap/zapcore" ) // CreateSSHUsers creates UNIX users with respective SSH access on the system the coordinator is running on when defined in the config. func (c *Core) CreateSSHUsers(sshUserKeys []ssh.UserKey) error { - sshAccess := ssh.NewAccess(c.linuxUserManager) + sshAccess := ssh.NewAccess(logger.New(logger.JSONLog, zapcore.InfoLevel), c.linuxUserManager) ctx := context.Background() for _, pair := range sshUserKeys { diff --git a/coordinator/pubapi/core_test.go b/coordinator/pubapi/core_test.go index 802cd6a44..b7c1441cc 100644 --- a/coordinator/pubapi/core_test.go +++ b/coordinator/pubapi/core_test.go @@ -12,7 +12,9 @@ import ( attestationtypes "github.com/edgelesssys/constellation/internal/attestation/types" "github.com/edgelesssys/constellation/internal/deploy/ssh" "github.com/edgelesssys/constellation/internal/deploy/user" + "github.com/edgelesssys/constellation/internal/logger" kms "github.com/edgelesssys/constellation/kms/server/setup" + "go.uber.org/zap/zapcore" kubeadm "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3" ) @@ -163,7 +165,7 @@ func (c *fakeCore) UpdateDiskPassphrase(passphrase string) error { } func (c *fakeCore) CreateSSHUsers(sshUserKeys []ssh.UserKey) error { - sshAccess := ssh.NewAccess(c.linuxUserManager) + sshAccess := ssh.NewAccess(logger.New(logger.PlainLog, zapcore.DebugLevel), c.linuxUserManager) ctx := context.Background() for _, pair := range sshUserKeys { diff --git a/debugd/debugd/cmd/debugd/debugd.go b/debugd/debugd/cmd/debugd/debugd.go index a0429ed3e..b43d78ffe 100644 --- a/debugd/debugd/cmd/debugd/debugd.go +++ b/debugd/debugd/cmd/debugd/debugd.go @@ -1,7 +1,6 @@ package main import ( - "log" "net" "os" "strings" @@ -15,22 +14,25 @@ import ( "github.com/edgelesssys/constellation/debugd/debugd/server" "github.com/edgelesssys/constellation/internal/deploy/ssh" "github.com/edgelesssys/constellation/internal/deploy/user" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" + "go.uber.org/zap/zapcore" "golang.org/x/net/context" ) func main() { wg := &sync.WaitGroup{} + log := logger.New(logger.JSONLog, zapcore.InfoLevel) fs := afero.NewOsFs() streamer := coordinator.NewFileStreamer(fs) - serviceManager := deploy.NewServiceManager() - ssh := ssh.NewAccess(user.NewLinuxUserManager(fs)) + serviceManager := deploy.NewServiceManager(log.Named("serviceManager")) + ssh := ssh.NewAccess(log, user.NewLinuxUserManager(fs)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - download := deploy.New(&net.Dialer{}, serviceManager, streamer) + download := deploy.New(log.Named("download"), &net.Dialer{}, serviceManager, streamer) var fetcher metadata.Fetcher constellationCSP := strings.ToLower(os.Getenv("CONSTEL_CSP")) switch constellationCSP { @@ -47,11 +49,11 @@ func main() { } fetcher = gcpFetcher default: - log.Printf("Unknown / unimplemented cloud provider CONSTEL_CSP=%v\n", constellationCSP) + log.Errorf("Unknown / unimplemented cloud provider CONSTEL_CSP=%v. Using fallback", constellationCSP) fetcher = fallback.Fetcher{} } - sched := metadata.NewScheduler(fetcher, ssh, download) - serv := server.New(ssh, serviceManager, streamer) + sched := metadata.NewScheduler(log.Named("scheduler"), fetcher, ssh, download) + serv := server.New(log.Named("server"), ssh, serviceManager, streamer) if err := deploy.DeployDefaultServiceUnit(ctx, serviceManager); err != nil { panic(err) } @@ -59,7 +61,7 @@ func main() { wg.Add(1) go sched.Start(ctx, wg) wg.Add(1) - go server.Start(wg, serv) + go server.Start(log, wg, serv) wg.Wait() } diff --git a/debugd/debugd/deploy/download.go b/debugd/debugd/deploy/download.go index 2eafd1e66..0b60655c6 100644 --- a/debugd/debugd/deploy/download.go +++ b/debugd/debugd/deploy/download.go @@ -3,19 +3,21 @@ package deploy import ( "context" "fmt" - "log" "net" "time" "github.com/edgelesssys/constellation/debugd/coordinator" "github.com/edgelesssys/constellation/debugd/debugd" pb "github.com/edgelesssys/constellation/debugd/service" + "github.com/edgelesssys/constellation/internal/logger" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) // Download downloads a coordinator from a given debugd instance. type Download struct { + log *logger.Logger dialer NetDialer writer streamToFileWriter serviceManager serviceManager @@ -23,8 +25,9 @@ type Download struct { } // New creates a new Download. -func New(dialer NetDialer, serviceManager serviceManager, writer streamToFileWriter) *Download { +func New(log *logger.Logger, dialer NetDialer, serviceManager serviceManager, writer streamToFileWriter) *Download { return &Download{ + log: log, dialer: dialer, writer: writer, serviceManager: serviceManager, @@ -34,12 +37,13 @@ func New(dialer NetDialer, serviceManager serviceManager, writer streamToFileWri // DownloadCoordinator will open a new grpc connection to another instance, attempting to download a coordinator from that instance. func (d *Download) DownloadCoordinator(ctx context.Context, ip string) error { + log := d.log.With(zap.String("ip", ip)) serverAddr := net.JoinHostPort(ip, debugd.DebugdPort) // only retry download from same endpoint after backoff if lastAttempt, ok := d.attemptedDownloads[serverAddr]; ok && time.Since(lastAttempt) < debugd.CoordinatorDownloadRetryBackoff { return fmt.Errorf("download failed too recently: %v / %v", time.Since(lastAttempt), debugd.CoordinatorDownloadRetryBackoff) } - log.Printf("Trying to download coordinator from %s\n", ip) + log.Infof("Trying to download coordinator") d.attemptedDownloads[serverAddr] = time.Now() conn, err := d.dial(ctx, serverAddr) if err != nil { @@ -56,7 +60,7 @@ func (d *Download) DownloadCoordinator(ctx context.Context, ip string) error { return fmt.Errorf("streaming coordinator from other instance: %w", err) } - log.Printf("Successfully downloaded coordinator from %s\n", ip) + log.Infof("Successfully downloaded coordinator") // after the upload succeeds, try to restart the coordinator restartAction := ServiceManagerRequest{ diff --git a/debugd/debugd/deploy/download_test.go b/debugd/debugd/deploy/download_test.go index d3fc597e1..382cca982 100644 --- a/debugd/debugd/deploy/download_test.go +++ b/debugd/debugd/deploy/download_test.go @@ -13,6 +13,7 @@ import ( "github.com/edgelesssys/constellation/debugd/debugd" pb "github.com/edgelesssys/constellation/debugd/service" "github.com/edgelesssys/constellation/internal/grpc/testdialer" + "github.com/edgelesssys/constellation/internal/logger" "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/protobuf/proto" @@ -93,6 +94,7 @@ func TestDownloadCoordinator(t *testing.T) { go grpcServ.Serve(lis) download := &Download{ + log: logger.NewTest(t), dialer: dialer, writer: writer, serviceManager: &tc.serviceManager, diff --git a/debugd/debugd/deploy/service.go b/debugd/debugd/deploy/service.go index 4c0f75c85..c1bb6b522 100644 --- a/debugd/debugd/deploy/service.go +++ b/debugd/debugd/deploy/service.go @@ -2,13 +2,13 @@ package deploy import ( "context" - "errors" "fmt" - "log" "sync" "github.com/edgelesssys/constellation/debugd/debugd" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" + "go.uber.org/zap" ) const ( @@ -40,15 +40,17 @@ type SystemdUnit struct { // ServiceManager receives ServiceManagerRequests and units via channels and performs the requests / creates the unit files. type ServiceManager struct { + log *logger.Logger dbus dbusClient fs afero.Fs systemdUnitFilewriteLock sync.Mutex } // NewServiceManager creates a new ServiceManager. -func NewServiceManager() *ServiceManager { +func NewServiceManager(log *logger.Logger) *ServiceManager { fs := afero.NewOsFs() return &ServiceManager{ + log: log, dbus: &dbusWrapper{}, fs: fs, systemdUnitFilewriteLock: sync.Mutex{}, @@ -78,6 +80,7 @@ type dbusConn interface { // SystemdAction will perform a systemd action on a service unit (start, stop, restart, reload). func (s *ServiceManager) SystemdAction(ctx context.Context, request ServiceManagerRequest) error { + log := s.log.With(zap.String("unit", request.Unit), zap.String("action", request.Action.String())) conn, err := s.dbus.NewSystemdConnectionContext(ctx) if err != nil { return fmt.Errorf("establishing systemd connection: %w", err) @@ -94,14 +97,14 @@ func (s *ServiceManager) SystemdAction(ctx context.Context, request ServiceManag case Reload: err = conn.ReloadContext(ctx) default: - return errors.New("unknown systemd action: " + request.Action.String()) + return fmt.Errorf("unknown systemd action: %s", request.Action.String()) } if err != nil { return fmt.Errorf("performing systemd action %v on unit %v: %w", request.Action, request.Unit, err) } if request.Action == Reload { - log.Println("daemon-reload succeeded") + log.Infof("daemon-reload succeeded") return nil } // Wait for the action to finish and then check if it was @@ -110,17 +113,18 @@ func (s *ServiceManager) SystemdAction(ctx context.Context, request ServiceManag switch result { case "done": - log.Printf("%s on systemd unit %s succeeded\n", request.Action, request.Unit) + log.Infof("%s on systemd unit %s succeeded", request.Action, request.Unit) return nil default: - return fmt.Errorf("performing action %v on systemd unit \"%v\" failed: expected \"%v\" but received \"%v\"", request.Action, request.Unit, "done", result) + return fmt.Errorf("performing action %q on systemd unit %q failed: expected %q but received %q", request.Action.String(), request.Unit, "done", result) } } // WriteSystemdUnitFile will write a systemd unit to disk. func (s *ServiceManager) WriteSystemdUnitFile(ctx context.Context, unit SystemdUnit) error { - log.Printf("Writing systemd unit file: %s/%s\n", systemdUnitFolder, unit.Name) + log := s.log.With(zap.String("unitFile", fmt.Sprintf("%s/%s", systemdUnitFolder, unit.Name))) + log.Infof("Writing systemd unit file") s.systemdUnitFilewriteLock.Lock() defer s.systemdUnitFilewriteLock.Unlock() if err := afero.WriteFile(s.fs, fmt.Sprintf("%s/%s", systemdUnitFolder, unit.Name), []byte(unit.Contents), 0o644); err != nil { @@ -131,7 +135,7 @@ func (s *ServiceManager) WriteSystemdUnitFile(ctx context.Context, unit SystemdU return fmt.Errorf("performing systemd daemon-reload: %w", err) } - log.Printf("Wrote systemd unit file: %s/%s and performed daemon-reload\n", systemdUnitFolder, unit.Name) + log.Infof("Wrote systemd unit file and performed daemon-reload") return nil } diff --git a/debugd/debugd/deploy/service_test.go b/debugd/debugd/deploy/service_test.go index 11cb651c0..0a0a70cd9 100644 --- a/debugd/debugd/deploy/service_test.go +++ b/debugd/debugd/deploy/service_test.go @@ -7,6 +7,7 @@ import ( "sync" "testing" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -95,6 +96,7 @@ func TestSystemdAction(t *testing.T) { fs := afero.NewMemMapFs() manager := ServiceManager{ + log: logger.NewTest(t), dbus: &tc.dbus, fs: fs, systemdUnitFilewriteLock: sync.Mutex{}, @@ -173,6 +175,7 @@ func TestWriteSystemdUnitFile(t *testing.T) { fs = afero.NewReadOnlyFs(fs) } manager := ServiceManager{ + log: logger.NewTest(t), dbus: &tc.dbus, fs: fs, systemdUnitFilewriteLock: sync.Mutex{}, diff --git a/debugd/debugd/metadata/scheduler.go b/debugd/debugd/metadata/scheduler.go index b6d00db17..53f230ea6 100644 --- a/debugd/debugd/metadata/scheduler.go +++ b/debugd/debugd/metadata/scheduler.go @@ -4,12 +4,13 @@ import ( "context" "errors" "io/fs" - "log" "sync" "time" "github.com/edgelesssys/constellation/debugd/debugd" "github.com/edgelesssys/constellation/internal/deploy/ssh" + "github.com/edgelesssys/constellation/internal/logger" + "go.uber.org/zap" ) // Fetcher retrieves other debugd IPs and SSH keys from cloud provider metadata. @@ -20,14 +21,16 @@ type Fetcher interface { // Scheduler schedules fetching of metadata using timers. type Scheduler struct { + log *logger.Logger fetcher Fetcher ssh sshDeployer downloader downloader } // NewScheduler returns a new scheduler. -func NewScheduler(fetcher Fetcher, ssh sshDeployer, downloader downloader) *Scheduler { +func NewScheduler(log *logger.Logger, fetcher Fetcher, ssh sshDeployer, downloader downloader) *Scheduler { return &Scheduler{ + log: log, fetcher: fetcher, ssh: ssh, downloader: downloader, @@ -49,7 +52,7 @@ func (s *Scheduler) discoveryLoop(ctx context.Context, wg *sync.WaitGroup) { // execute debugd discovery once at the start to skip wait for first tick ips, err := s.fetcher.DiscoverDebugdIPs(ctx) if err != nil { - log.Printf("error occurred while discovering debugd IPs: %v\n", err) + s.log.With(zap.Error(err)).Errorf("Discovering debugd IPs failed") } else { if s.downloadCoordinator(ctx, ips) { return @@ -64,10 +67,10 @@ func (s *Scheduler) discoveryLoop(ctx context.Context, wg *sync.WaitGroup) { case <-ticker.C: ips, err = s.fetcher.DiscoverDebugdIPs(ctx) if err != nil { - log.Printf("error occurred while discovering debugd IPs: %v\n", err) + s.log.With(zap.Error(err)).Errorf("Discovering debugd IPs failed") continue } - log.Printf("discovered instances: %v\n", ips) + s.log.With(zap.Strings("ips", ips)).Infof("Discovered instances") if s.downloadCoordinator(ctx, ips) { return } @@ -80,24 +83,19 @@ func (s *Scheduler) discoveryLoop(ctx context.Context, wg *sync.WaitGroup) { // sshLoop discovers new ssh keys from cloud provider metadata periodically. func (s *Scheduler) sshLoop(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - // execute ssh key search once at the start to skip wait for first tick - keys, err := s.fetcher.FetchSSHKeys(ctx) - if err != nil { - log.Printf("error occurred while fetching SSH keys: %v\n", err) - } else { - s.deploySSHKeys(ctx, keys) - } + ticker := time.NewTicker(debugd.SSHCheckInterval) defer ticker.Stop() for { + keys, err := s.fetcher.FetchSSHKeys(ctx) + if err != nil { + s.log.With(zap.Error(err)).Errorf("Fetching SSH keys failed") + } else { + s.deploySSHKeys(ctx, keys) + } + select { case <-ticker.C: - keys, err := s.fetcher.FetchSSHKeys(ctx) - if err != nil { - log.Printf("error occurred while fetching ssh keys: %v\n", err) - continue - } - s.deploySSHKeys(ctx, keys) case <-ctx.Done(): return } @@ -116,7 +114,7 @@ func (s *Scheduler) downloadCoordinator(ctx context.Context, ips []string) (succ // coordinator was already uploaded return true } - log.Printf("error occurred while downloading coordinator from %v: %v\n", ip, err) + s.log.With(zap.Error(err), zap.String("peer", ip)).Errorf("Downloading coordinator from peer failed") } return false } @@ -126,7 +124,7 @@ func (s *Scheduler) deploySSHKeys(ctx context.Context, keys []ssh.UserKey) { for _, key := range keys { err := s.ssh.DeployAuthorizedKey(ctx, key) if err != nil { - log.Printf("error occurred while deploying ssh key %v: %v\n", key, err) + s.log.With(zap.Error(err), zap.Any("key", key)).Errorf("Deploying SSH key failed") continue } } diff --git a/debugd/debugd/metadata/scheduler_test.go b/debugd/debugd/metadata/scheduler_test.go index 4fb3eb62a..f2fbd8386 100644 --- a/debugd/debugd/metadata/scheduler_test.go +++ b/debugd/debugd/metadata/scheduler_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/edgelesssys/constellation/internal/deploy/ssh" + "github.com/edgelesssys/constellation/internal/logger" "github.com/stretchr/testify/assert" ) @@ -74,6 +75,7 @@ func TestSchedulerStart(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), tc.timeout) defer cancel() scheduler := Scheduler{ + log: logger.NewTest(t), fetcher: &tc.fetcher, ssh: &tc.ssh, downloader: &tc.downloader, diff --git a/debugd/debugd/server/server.go b/debugd/debugd/server/server.go index 7343149b1..e9bd95890 100644 --- a/debugd/debugd/server/server.go +++ b/debugd/debugd/server/server.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io/fs" - "log" "net" "sync" @@ -14,10 +13,13 @@ import ( "github.com/edgelesssys/constellation/debugd/debugd/deploy" pb "github.com/edgelesssys/constellation/debugd/service" "github.com/edgelesssys/constellation/internal/deploy/ssh" + "github.com/edgelesssys/constellation/internal/logger" + "go.uber.org/zap" "google.golang.org/grpc" ) type debugdServer struct { + log *logger.Logger ssh sshDeployer serviceManager serviceManager streamer streamer @@ -25,8 +27,9 @@ type debugdServer struct { } // New creates a new debugdServer according to the gRPC spec. -func New(ssh sshDeployer, serviceManager serviceManager, streamer streamer) pb.DebugdServer { +func New(log *logger.Logger, ssh sshDeployer, serviceManager serviceManager, streamer streamer) pb.DebugdServer { return &debugdServer{ + log: log, ssh: ssh, serviceManager: serviceManager, streamer: streamer, @@ -35,10 +38,10 @@ func New(ssh sshDeployer, serviceManager serviceManager, streamer streamer) pb.D // UploadAuthorizedKeys receives a list of authorized keys and forwards them to a channel. func (s *debugdServer) UploadAuthorizedKeys(ctx context.Context, in *pb.UploadAuthorizedKeysRequest) (*pb.UploadAuthorizedKeysResponse, error) { - log.Println("Uploading authorized keys") + s.log.Infof("Uploading authorized keys") for _, key := range in.Keys { if err := s.ssh.DeployAuthorizedKey(ctx, ssh.UserKey{Username: key.Username, PublicKey: key.KeyValue}); err != nil { - log.Printf("Uploading authorized keys failed: %v\n", err) + s.log.With(zap.Error(err)).Errorf("Uploading authorized keys failed") return &pb.UploadAuthorizedKeysResponse{ Status: pb.UploadAuthorizedKeysStatus_UPLOAD_AUTHORIZED_KEYS_FAILURE, }, nil @@ -58,7 +61,7 @@ func (s *debugdServer) UploadCoordinator(stream pb.Debugd_UploadCoordinatorServe var responseStatus pb.UploadCoordinatorStatus defer func() { if err := s.serviceManager.SystemdAction(stream.Context(), startAction); err != nil { - log.Printf("Starting uploaded coordinator failed: %v\n", err) + s.log.With(zap.Error(err)).Errorf("Starting uploaded coordinator failed") if responseStatus == pb.UploadCoordinatorStatus_UPLOAD_COORDINATOR_SUCCESS { responseStatus = pb.UploadCoordinatorStatus_UPLOAD_COORDINATOR_START_FAILED } @@ -67,33 +70,33 @@ func (s *debugdServer) UploadCoordinator(stream pb.Debugd_UploadCoordinatorServe Status: responseStatus, }) }() - log.Println("Starting coordinator upload") + s.log.Infof("Starting coordinator upload") if err := s.streamer.WriteStream(debugd.CoordinatorDeployFilename, stream, true); err != nil { if errors.Is(err, fs.ErrExist) { // coordinator was already uploaded - log.Println("Coordinator already uploaded") + s.log.Warnf("Coordinator already uploaded") responseStatus = pb.UploadCoordinatorStatus_UPLOAD_COORDINATOR_FILE_EXISTS return nil } - log.Printf("Uploading coordinator failed: %v\n", err) + s.log.With(zap.Error(err)).Errorf("Uploading coordinator failed") responseStatus = pb.UploadCoordinatorStatus_UPLOAD_COORDINATOR_UPLOAD_FAILED return fmt.Errorf("uploading coordinator: %w", err) } - log.Println("Successfully uploaded coordinator") + s.log.Infof("Successfully uploaded coordinator") responseStatus = pb.UploadCoordinatorStatus_UPLOAD_COORDINATOR_SUCCESS return nil } // DownloadCoordinator streams the local coordinator binary to other instances. func (s *debugdServer) DownloadCoordinator(request *pb.DownloadCoordinatorRequest, stream pb.Debugd_DownloadCoordinatorServer) error { - log.Println("Sending coordinator to other instance") + s.log.Infof("Sending coordinator to other instance") return s.streamer.ReadStream(debugd.CoordinatorDeployFilename, stream, debugd.Chunksize, true) } // UploadSystemServiceUnits receives systemd service units, writes them to a service file and schedules a daemon-reload. func (s *debugdServer) UploadSystemServiceUnits(ctx context.Context, in *pb.UploadSystemdServiceUnitsRequest) (*pb.UploadSystemdServiceUnitsResponse, error) { - log.Println("Uploading systemd service units") + s.log.Infof("Uploading systemd service units") for _, unit := range in.Units { if err := s.serviceManager.WriteSystemdUnitFile(ctx, deploy.SystemdUnit{Name: unit.Name, Contents: unit.Contents}); err != nil { return &pb.UploadSystemdServiceUnitsResponse{Status: pb.UploadSystemdServiceUnitsStatus_UPLOAD_SYSTEMD_SERVICE_UNITS_FAILURE}, nil @@ -104,15 +107,19 @@ func (s *debugdServer) UploadSystemServiceUnits(ctx context.Context, in *pb.Uplo } // Start will start the gRPC server and block. -func Start(wg *sync.WaitGroup, serv pb.DebugdServer) { +func Start(log *logger.Logger, wg *sync.WaitGroup, serv pb.DebugdServer) { defer wg.Done() - grpcServer := grpc.NewServer() + + grpcLog := log.Named("gRPC") + grpcLog.WithIncreasedLevel(zap.WarnLevel).ReplaceGRPCLogger() + + grpcServer := grpc.NewServer(grpcLog.GetServerStreamInterceptor(), grpcLog.GetServerUnaryInterceptor()) pb.RegisterDebugdServer(grpcServer, serv) lis, err := net.Listen("tcp", net.JoinHostPort("0.0.0.0", debugd.DebugdPort)) if err != nil { - log.Fatalf("listening failed: %v", err) + log.With(zap.Error(err)).Fatalf("Listening failed") } - log.Println("gRPC server is waiting for connections") + log.Infof("gRPC server is waiting for connections") grpcServer.Serve(lis) } diff --git a/debugd/debugd/server/server_test.go b/debugd/debugd/server/server_test.go index 6921ba046..0fd7a60d8 100644 --- a/debugd/debugd/server/server_test.go +++ b/debugd/debugd/server/server_test.go @@ -13,6 +13,7 @@ import ( pb "github.com/edgelesssys/constellation/debugd/service" "github.com/edgelesssys/constellation/internal/deploy/ssh" "github.com/edgelesssys/constellation/internal/grpc/testdialer" + "github.com/edgelesssys/constellation/internal/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -73,6 +74,7 @@ func TestUploadAuthorizedKeys(t *testing.T) { require := require.New(t) serv := debugdServer{ + log: logger.NewTest(t), ssh: &tc.ssh, serviceManager: &tc.serviceManager, streamer: &fakeStreamer{}, @@ -148,6 +150,7 @@ func TestUploadCoordinator(t *testing.T) { require := require.New(t) serv := debugdServer{ + log: logger.NewTest(t), ssh: &tc.ssh, serviceManager: &tc.serviceManager, streamer: &tc.streamer, @@ -218,6 +221,7 @@ func TestDownloadCoordinator(t *testing.T) { require := require.New(t) serv := debugdServer{ + log: logger.NewTest(t), ssh: &tc.ssh, serviceManager: &tc.serviceManager, streamer: &tc.streamer, @@ -298,6 +302,7 @@ func TestUploadSystemServiceUnits(t *testing.T) { require := require.New(t) serv := debugdServer{ + log: logger.NewTest(t), ssh: &tc.ssh, serviceManager: &tc.serviceManager, streamer: &fakeStreamer{}, diff --git a/hack/go.mod b/hack/go.mod index cb6f8d1a6..0a2ba0086 100644 --- a/hack/go.mod +++ b/hack/go.mod @@ -96,6 +96,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.11.2 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.16.2 // indirect github.com/aws/smithy-go v1.11.2 // indirect + github.com/benbjohnson/clock v1.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dimchansky/utfbom v1.1.1 // indirect @@ -114,6 +115,7 @@ require ( github.com/google/tink/go v1.6.1 // indirect github.com/google/uuid v1.3.0 // indirect github.com/googleapis/gax-go/v2 v2.2.0 // indirect + github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/native v1.0.0 // indirect @@ -136,6 +138,7 @@ require ( go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect + go.uber.org/zap v1.21.0 // indirect golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a // indirect diff --git a/hack/go.sum b/hack/go.sum index bf91506c5..8d152ba5d 100644 --- a/hack/go.sum +++ b/hack/go.sum @@ -248,6 +248,9 @@ github.com/aws/smithy-go v1.11.2 h1:eG/N+CcUMAvsdffgMvjMKwfyDzIkjM6pfxMJ8Mzc6mE= github.com/aws/smithy-go v1.11.2/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnwub1bgM= github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -398,6 +401,7 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= @@ -552,6 +556,7 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.2.2/go.mod h1:EaizFBKfUKtMIF5iaDEhniwNedqGo9FuLFzppDr3uwI= +github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= @@ -786,6 +791,7 @@ github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzL github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= @@ -999,6 +1005,7 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= @@ -1012,6 +1019,8 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= +go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= +go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= gocloud.dev v0.19.0/go.mod h1:SmKwiR8YwIMMJvQBKLsC3fHNyMwXLw3PMDO+VVteJMI= golang.org/x/crypto v0.0.0-20180501155221-613d6eafa307/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/internal/deploy/ssh/ssh.go b/internal/deploy/ssh/ssh.go index 5be24da13..fbe800068 100644 --- a/internal/deploy/ssh/ssh.go +++ b/internal/deploy/ssh/ssh.go @@ -3,11 +3,12 @@ package ssh import ( "context" "fmt" - "log" "os" "sync" "github.com/edgelesssys/constellation/internal/deploy/user" + "github.com/edgelesssys/constellation/internal/logger" + "go.uber.org/zap" ) // UserKey describes an user that should be created with a corresponding public SSH key. @@ -18,14 +19,16 @@ type UserKey struct { // Access reads SSH public keys from a channel, creates the specified users if required and writes the public keys to the users authorized_keys file. type Access struct { + log *logger.Logger userManager user.LinuxUserManager authorized map[string]bool mux sync.Mutex } // NewAccess creates a new Access. -func NewAccess(userManager user.LinuxUserManager) *Access { +func NewAccess(log *logger.Logger, userManager user.LinuxUserManager) *Access { return &Access{ + log: log, userManager: userManager, mux: sync.Mutex{}, authorized: map[string]bool{}, @@ -51,7 +54,7 @@ func (s *Access) DeployAuthorizedKey(ctx context.Context, sshKey UserKey) error if s.alreadyAuthorized(sshKey) { return nil } - log.Printf("Trying to deploy ssh key for %s\n", sshKey.Username) + s.log.With(zap.String("username", sshKey.Username)).Infof("Trying to deploy ssh key for user") user, err := s.userManager.EnsureLinuxUserExists(ctx, sshKey.Username) if err != nil { return err @@ -87,6 +90,6 @@ func (s *Access) DeployAuthorizedKey(ctx context.Context, sshKey UserKey) error return err } s.rememberAuthorized(sshKey) - log.Printf("Successfully authorized %s\n", sshKey.Username) + s.log.With(zap.String("username", sshKey.Username)).Infof("Successfully authorized user") return nil } diff --git a/internal/deploy/ssh/ssh_test.go b/internal/deploy/ssh/ssh_test.go index def191045..f8220c898 100644 --- a/internal/deploy/ssh/ssh_test.go +++ b/internal/deploy/ssh/ssh_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/edgelesssys/constellation/internal/deploy/user" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -66,6 +67,7 @@ func TestDeploySSHAuthorizedKey(t *testing.T) { authorized["user:ssh-rsa testkey"] = true } sshAccess := Access{ + log: logger.NewTest(t), userManager: userManager, mux: sync.Mutex{}, authorized: authorized, diff --git a/internal/grpc/grpc_klog/grpc_klog.go b/internal/grpc/grpc_klog/grpc_klog.go deleted file mode 100644 index 5a3ffdc1f..000000000 --- a/internal/grpc/grpc_klog/grpc_klog.go +++ /dev/null @@ -1,31 +0,0 @@ -// grpc_klog provides a logging interceptor for the klog logger. -package grpc_klog - -import ( - "context" - - "google.golang.org/grpc" - "google.golang.org/grpc/peer" - "k8s.io/klog/v2" -) - -// LogGRPC writes a log with the name of every gRPC call or error it receives. -// Request parameters or responses are NOT logged. -func LogGRPC(level klog.Level) func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - // log the requests method name - var addr string - peer, ok := peer.FromContext(ctx) - if ok { - addr = peer.Addr.String() - } - klog.V(level).Infof("GRPC call from peer: %q: %s", addr, info.FullMethod) - - // log errors, if any - resp, err := handler(ctx, req) - if err != nil { - klog.Errorf("GRPC error: %v", err) - } - return resp, err - } -} diff --git a/internal/grpc/grpclog/grplog.go b/internal/grpc/grpclog/grplog.go new file mode 100644 index 000000000..2e1601ca4 --- /dev/null +++ b/internal/grpc/grpclog/grplog.go @@ -0,0 +1,17 @@ +// grpclog provides a logging utilities for gRPC. +package grpclog + +import ( + "context" + + "google.golang.org/grpc/peer" +) + +// PeerAddrFromContext returns a peer's address from context, or "unknown" if not found. +func PeerAddrFromContext(ctx context.Context) string { + p, ok := peer.FromContext(ctx) + if !ok { + return "unknown" + } + return p.Addr.String() +} diff --git a/internal/logger/log.go b/internal/logger/log.go index 5b7f67a7c..a8a9847d2 100644 --- a/internal/logger/log.go +++ b/internal/logger/log.go @@ -33,13 +33,16 @@ Use Fatalf() to log information about any errors that occurred and then exit the package logger import ( + "fmt" "os" + "testing" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" "google.golang.org/grpc" ) @@ -85,6 +88,13 @@ func New(logType LogType, logLevel zapcore.Level) *Logger { return &Logger{logger: logger.Sugar()} } +// NewTestLogger creates a logger for unit / integration tests. +func NewTest(t *testing.T) *Logger { + return &Logger{ + logger: zaptest.NewLogger(t).Sugar().Named(fmt.Sprintf("%q", t.Name())), + } +} + // Debugf logs a message at Debug level. // Debug logs are typically voluminous, and contain detailed information on the flow of execution. func (l *Logger) Debugf(format string, args ...any) { @@ -123,7 +133,7 @@ func (l *Logger) Sync() { // WithIncreasedLevel returns a logger with increased logging level. func (l *Logger) WithIncreasedLevel(level zapcore.Level) *Logger { - return &Logger{logger: l.GetZapLogger().WithOptions(zap.IncreaseLevel(level)).Sugar()} + return &Logger{logger: l.getZapLogger().WithOptions(zap.IncreaseLevel(level)).Sugar()} } // With returns a logger with structured context. @@ -136,16 +146,16 @@ func (l *Logger) Named(name string) *Logger { return &Logger{logger: l.logger.Named(name)} } -// GetZapLogger returns the underlying zap logger. -func (l *Logger) GetZapLogger() *zap.Logger { - return l.logger.Desugar() +// ReplaceGRPCLogger replaces grpc's internal logger with the given logger. +func (l *Logger) ReplaceGRPCLogger() { + grpc_zap.ReplaceGrpcLoggerV2(l.logger.Desugar()) } // GetServerUnaryInterceptor returns a gRPC server option for intercepting unary gRPC logs. func (l *Logger) GetServerUnaryInterceptor() grpc.ServerOption { return grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_ctxtags.UnaryServerInterceptor(), - grpc_zap.UnaryServerInterceptor(l.GetZapLogger()), + grpc_zap.UnaryServerInterceptor(l.getZapLogger()), )) } @@ -153,20 +163,25 @@ func (l *Logger) GetServerUnaryInterceptor() grpc.ServerOption { func (l *Logger) GetServerStreamInterceptor() grpc.ServerOption { return grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_ctxtags.StreamServerInterceptor(), - grpc_zap.StreamServerInterceptor(l.GetZapLogger()), + grpc_zap.StreamServerInterceptor(l.getZapLogger()), )) } // GetClientUnaryInterceptor returns a gRPC client option for intercepting unary gRPC logs. func (l *Logger) GetClientUnaryInterceptor() grpc.DialOption { return grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - grpc_zap.UnaryClientInterceptor(l.GetZapLogger()), + grpc_zap.UnaryClientInterceptor(l.getZapLogger()), )) } // GetClientStreamInterceptor returns a gRPC client option for intercepting stream gRPC logs. func (l *Logger) GetClientStreamInterceptor() grpc.DialOption { return grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - grpc_zap.StreamClientInterceptor(l.GetZapLogger()), + grpc_zap.StreamClientInterceptor(l.getZapLogger()), )) } + +// getZapLogger returns the underlying zap logger. +func (l *Logger) getZapLogger() *zap.Logger { + return l.logger.Desugar() +} diff --git a/kms/server/cmd/main.go b/kms/server/cmd/main.go index 3b7822e7b..12b3ca0b7 100644 --- a/kms/server/cmd/main.go +++ b/kms/server/cmd/main.go @@ -9,13 +9,13 @@ import ( "github.com/edgelesssys/constellation/internal/constants" "github.com/edgelesssys/constellation/internal/file" - "github.com/edgelesssys/constellation/internal/grpc/grpc_klog" + "github.com/edgelesssys/constellation/internal/logger" "github.com/edgelesssys/constellation/kms/server/kmsapi" "github.com/edgelesssys/constellation/kms/server/kmsapi/kmsproto" "github.com/edgelesssys/constellation/kms/server/setup" "github.com/spf13/afero" "go.uber.org/zap" - "k8s.io/klog/v2" + "go.uber.org/zap/zapcore" "google.golang.org/grpc" ) @@ -24,43 +24,42 @@ func main() { port := flag.String("port", "9000", "Port gRPC server listens on") masterSecretPath := flag.String("master-secret", "/constellation/constellation-mastersecret.base64", "Path to the Constellation master secret") - klog.InitFlags(nil) flag.Parse() - defer klog.Flush() - klog.V(2).Infof("\nConstellation Key Management Service\nVersion: %s", constants.VersionInfo) + log := logger.New(logger.JSONLog, zapcore.InfoLevel) + + log.With(zap.String("version", constants.VersionInfo)).Infof("Constellation Key Management Service") masterKey, err := readMainSecret(*masterSecretPath) if err != nil { - klog.Exitf("Failed to read master secret: %v", err) + log.With(zap.Error(err)).Fatalf("Failed to read master secret") } conKMS, err := setup.SetUpKMS(context.Background(), setup.NoStoreURI, setup.ClusterKMSURI) if err != nil { - klog.Exitf("Failed to setup KMS: %v", err) + log.With(zap.Error(err)).Fatalf("Failed to setup KMS") } if err := conKMS.CreateKEK(context.Background(), "Constellation", masterKey); err != nil { - klog.Exitf("Failed to create KMS KEK from MasterKey: %v", err) + log.With(zap.Error(err)).Fatalf("Failed to create KMS KEK from MasterKey") } lis, err := net.Listen("tcp", net.JoinHostPort("", *port)) if err != nil { - klog.Exitf("Failed to listen: %v", err) + log.With(zap.Error(err)).Fatalf("Failed to listen") } - srv := kmsapi.New(&zap.Logger{}, conKMS) + srv := kmsapi.New(log.Named("server"), conKMS) + log.Named("gRPC").WithIncreasedLevel(zapcore.WarnLevel).ReplaceGRPCLogger() // TODO: Launch server with aTLS to allow attestation for clients. - grpcServer := grpc.NewServer( - grpc.UnaryInterceptor(grpc_klog.LogGRPC(2)), - ) + grpcServer := grpc.NewServer(log.Named("gRPC").GetServerUnaryInterceptor()) kmsproto.RegisterAPIServer(grpcServer, srv) - klog.V(2).Infof("Starting key management service on %s", lis.Addr().String()) + log.Infof("Starting key management service on %s", lis.Addr().String()) if err := grpcServer.Serve(lis); err != nil { - klog.Exitf("Failed to serve: %s", err) + log.With(zap.Error(err)).Fatalf("Failed to serve") } } diff --git a/kms/server/kmsapi/kmsapi.go b/kms/server/kmsapi/kmsapi.go index 048f92c5e..21c53dbc1 100644 --- a/kms/server/kmsapi/kmsapi.go +++ b/kms/server/kmsapi/kmsapi.go @@ -4,46 +4,49 @@ package kmsapi import ( "context" + "github.com/edgelesssys/constellation/internal/grpc/grpclog" + "github.com/edgelesssys/constellation/internal/logger" "github.com/edgelesssys/constellation/kms/kms" "github.com/edgelesssys/constellation/kms/server/kmsapi/kmsproto" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "k8s.io/klog/v2" ) // API resembles an encryption key management api server through logger, CloudKMS and proto-unimplemented server. type API struct { - logger *zap.Logger + log *logger.Logger conKMS kms.CloudKMS kmsproto.UnimplementedAPIServer } // New creates a new API. -func New(logger *zap.Logger, conKMS kms.CloudKMS) *API { +func New(log *logger.Logger, conKMS kms.CloudKMS) *API { return &API{ - logger: logger, + log: log, conKMS: conKMS, } } // GetDataKey returns a data key. func (a *API) GetDataKey(ctx context.Context, in *kmsproto.GetDataKeyRequest) (*kmsproto.GetDataKeyResponse, error) { + log := a.log.With("peerAddress", grpclog.PeerAddrFromContext(ctx)) + // Error on 0 key length if in.Length == 0 { - klog.Error("GetDataKey: requested key length is zero") + log.Errorf("Requested key length is zero") return nil, status.Error(codes.InvalidArgument, "can't derive key with length zero") } // Error on empty DataKeyId if in.DataKeyId == "" { - klog.Error("GetDataKey: no data key ID specified") + log.Errorf("No data key ID specified") return nil, status.Error(codes.InvalidArgument, "no data key ID specified") } key, err := a.conKMS.GetDEK(ctx, "Constellation", "key-"+in.DataKeyId, int(in.Length)) if err != nil { - klog.Errorf("GetDataKey: failed to get data key: %v", err) + log.With(zap.Error(err)).Errorf("Failed to get data key") return nil, status.Errorf(codes.Internal, "%v", err) } return &kmsproto.GetDataKeyResponse{DataKey: key}, nil diff --git a/kms/server/kmsapi/kmsapi_test.go b/kms/server/kmsapi/kmsapi_test.go index 145b92f97..d5d9d6ea2 100644 --- a/kms/server/kmsapi/kmsapi_test.go +++ b/kms/server/kmsapi/kmsapi_test.go @@ -5,18 +5,20 @@ import ( "errors" "testing" + "github.com/edgelesssys/constellation/internal/logger" "github.com/edgelesssys/constellation/kms/server/kmsapi/kmsproto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" ) func TestGetDataKey(t *testing.T) { assert := assert.New(t) require := require.New(t) + log := logger.NewTest(t) + kms := &stubKMS{derivedKey: []byte{0x0, 0x1, 0x2, 0x3, 0x4, 0x5}} - api := New(zaptest.NewLogger(t), kms) + api := New(log, kms) res, err := api.GetDataKey(context.Background(), &kmsproto.GetDataKeyRequest{DataKeyId: "1", Length: 32}) require.NoError(err) @@ -33,7 +35,7 @@ func TestGetDataKey(t *testing.T) { assert.Nil(res) // Test derive key error - api = New(zaptest.NewLogger(t), &stubKMS{deriveKeyErr: errors.New("error")}) + api = New(log, &stubKMS{deriveKeyErr: errors.New("error")}) res, err = api.GetDataKey(context.Background(), &kmsproto.GetDataKeyRequest{DataKeyId: "1", Length: 32}) assert.Error(err) assert.Nil(res) diff --git a/state/cmd/main.go b/state/cmd/main.go index 326878758..887a8ee1e 100644 --- a/state/cmd/main.go +++ b/state/cmd/main.go @@ -4,8 +4,6 @@ import ( "context" "flag" "fmt" - "log" - "os" "path/filepath" "strings" "time" @@ -17,10 +15,14 @@ import ( "github.com/edgelesssys/constellation/internal/attestation/gcp" "github.com/edgelesssys/constellation/internal/attestation/qemu" "github.com/edgelesssys/constellation/internal/attestation/vtpm" + "github.com/edgelesssys/constellation/internal/constants" + "github.com/edgelesssys/constellation/internal/logger" "github.com/edgelesssys/constellation/state/keyservice" "github.com/edgelesssys/constellation/state/mapper" "github.com/edgelesssys/constellation/state/setup" "github.com/spf13/afero" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) const ( @@ -34,7 +36,9 @@ var csp = flag.String("csp", "", "Cloud Service Provider the image is running on func main() { flag.Parse() - log.Printf("Starting disk-mapper for csp %q\n", *csp) + log := logger.New(logger.JSONLog, zapcore.InfoLevel) + log.With(zap.String("version", constants.VersionInfo), zap.String("cloudProvider", *csp)). + Infof("Starting disk-mapper") // set up metadata API and quote issuer for aTLS connections var err error @@ -47,7 +51,7 @@ func main() { diskPath, diskPathErr = filepath.EvalSymlinks(azureStateDiskPath) metadata, err = azurecloud.NewMetadata(context.Background()) if err != nil { - exit(err) + log.With(zap.Error).Fatalf("Failed to create Azure metadata API") } issuer = azure.NewIssuer() @@ -56,34 +60,35 @@ func main() { issuer = gcp.NewIssuer() gcpClient, err := gcpcloud.NewClient(context.Background()) if err != nil { - exit(err) + log.With(zap.Error).Fatalf("Failed to create GCP client") } metadata = gcpcloud.New(gcpClient) case "qemu": diskPath = qemuStateDiskPath issuer = qemu.NewIssuer() - fmt.Fprintf(os.Stderr, "warning: cloud services are not supported for csp %q\n", *csp) + log.Warnf("cloud services are not supported on QEMU") metadata = &core.ProviderMetadataFake{} default: diskPathErr = fmt.Errorf("csp %q is not supported by Constellation", *csp) } if diskPathErr != nil { - exit(fmt.Errorf("unable to determine state disk path: %w", diskPathErr)) + log.With(zap.Error(diskPathErr)).Fatalf("Unable to determine state disk path") } // initialize device mapper mapper, err := mapper.New(diskPath) if err != nil { - exit(err) + log.With(zap.Error(err)).Fatalf("Failed to initialize device mapper") } defer mapper.Close() setupManger := setup.New( + log.Named("setupManager"), *csp, afero.Afero{Fs: afero.NewOsFs()}, - keyservice.New(issuer, metadata, 20*time.Second), // try to request a key every 20 seconds + keyservice.New(log.Named("keyService"), issuer, metadata, 20*time.Second), // try to request a key every 20 seconds mapper, setup.DiskMounter{}, vtpm.OpenVTPM, @@ -95,13 +100,7 @@ func main() { } else { err = setupManger.PrepareNewDisk() } - exit(err) -} - -func exit(err error) { if err != nil { - fmt.Fprintln(os.Stderr, err.Error()) - os.Exit(1) + log.With(zap.Error(err)).Fatalf("Failed to prepare state disk") } - os.Exit(0) } diff --git a/state/keyservice/keyservice.go b/state/keyservice/keyservice.go index 6dbcbb7a4..533f9d9c1 100644 --- a/state/keyservice/keyservice.go +++ b/state/keyservice/keyservice.go @@ -3,7 +3,6 @@ package keyservice import ( "context" "errors" - "log" "net" "sync" "time" @@ -12,7 +11,9 @@ import ( "github.com/edgelesssys/constellation/coordinator/core" "github.com/edgelesssys/constellation/coordinator/pubapi/pubproto" "github.com/edgelesssys/constellation/internal/grpc/atlscredentials" + "github.com/edgelesssys/constellation/internal/logger" "github.com/edgelesssys/constellation/state/keyservice/keyproto" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -21,6 +22,7 @@ import ( // KeyAPI is the interface called by the Coordinator or an admin during restart of a node. type KeyAPI struct { + log *logger.Logger mux sync.Mutex metadata core.ProviderMetadata issuer core.QuoteIssuer @@ -31,8 +33,9 @@ type KeyAPI struct { } // New initializes a KeyAPI with the given parameters. -func New(issuer core.QuoteIssuer, metadata core.ProviderMetadata, timeout time.Duration) *KeyAPI { +func New(log *logger.Logger, issuer core.QuoteIssuer, metadata core.ProviderMetadata, timeout time.Duration) *KeyAPI { return &KeyAPI{ + log: log, metadata: metadata, issuer: issuer, keyReceived: make(chan struct{}, 1), @@ -71,7 +74,7 @@ func (a *KeyAPI) WaitForDecryptionKey(uuid, listenAddr string) ([]byte, error) { } defer listener.Close() - log.Printf("Waiting for decryption key. Listening on: %s", listener.Addr().String()) + a.log.Infof("Waiting for decryption key. Listening on: %s", listener.Addr().String()) go server.Serve(listener) defer server.GracefulStop() @@ -118,7 +121,7 @@ func (a *KeyAPI) requestKey(uuid string, credentials credentials.TransportCreden // list available Coordinators endpoints, _ := core.CoordinatorEndpoints(context.Background(), a.metadata) - log.Printf("Sending a key request to available Coordinators: %v", endpoints) + a.log.With(zap.Strings("endpoints", endpoints)).Infof("Sending a key request to available Coordinators") // notify all available Coordinators to send a key to the node // any errors encountered here will be ignored, and the calls retried after a timeout for _, endpoint := range endpoints { diff --git a/state/keyservice/keyservice_test.go b/state/keyservice/keyservice_test.go index a9541e5ab..be785389d 100644 --- a/state/keyservice/keyservice_test.go +++ b/state/keyservice/keyservice_test.go @@ -12,6 +12,7 @@ import ( "github.com/edgelesssys/constellation/coordinator/role" "github.com/edgelesssys/constellation/internal/atls" "github.com/edgelesssys/constellation/internal/grpc/atlscredentials" + "github.com/edgelesssys/constellation/internal/logger" "github.com/edgelesssys/constellation/internal/oid" "github.com/edgelesssys/constellation/state/keyservice/keyproto" "github.com/stretchr/testify/assert" @@ -85,6 +86,7 @@ func TestRequestKeyLoop(t *testing.T) { } keyWaiter := &KeyAPI{ + log: logger.NewTest(t), metadata: stubMetadata{listResponse: tc.listResponse}, keyReceived: keyReceived, timeout: 500 * time.Millisecond, @@ -138,6 +140,7 @@ func TestPushStateDiskKey(t *testing.T) { t.Run(name, func(t *testing.T) { assert := assert.New(t) + tc.testAPI.log = logger.NewTest(t) _, err := tc.testAPI.PushStateDiskKey(context.Background(), tc.request) if tc.wantErr { assert.Error(err) @@ -150,7 +153,7 @@ func TestPushStateDiskKey(t *testing.T) { } func TestResetKey(t *testing.T) { - api := New(nil, nil, time.Second) + api := New(logger.NewTest(t), nil, nil, time.Second) api.key = []byte{0x1, 0x2, 0x3} api.ResetKey() diff --git a/state/setup/setup.go b/state/setup/setup.go index 373be1d00..36dafc225 100644 --- a/state/setup/setup.go +++ b/state/setup/setup.go @@ -3,7 +3,6 @@ package setup import ( "crypto/rand" "errors" - "log" "net" "os" "path/filepath" @@ -13,7 +12,9 @@ import ( "github.com/edgelesssys/constellation/coordinator/nodestate" "github.com/edgelesssys/constellation/internal/attestation/vtpm" "github.com/edgelesssys/constellation/internal/file" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" + "go.uber.org/zap" ) const ( @@ -27,6 +28,7 @@ const ( // SetupManager handles formating, mapping, mounting and unmounting of state disks. type SetupManager struct { + log *logger.Logger csp string fs afero.Afero keyWaiter KeyWaiter @@ -36,8 +38,9 @@ type SetupManager struct { } // New initializes a SetupManager with the given parameters. -func New(csp string, fs afero.Afero, keyWaiter KeyWaiter, mapper DeviceMapper, mounter Mounter, openTPM vtpm.TPMOpenFunc) *SetupManager { +func New(log *logger.Logger, csp string, fs afero.Afero, keyWaiter KeyWaiter, mapper DeviceMapper, mounter Mounter, openTPM vtpm.TPMOpenFunc) *SetupManager { return &SetupManager{ + log: log, csp: csp, fs: fs, keyWaiter: keyWaiter, @@ -50,7 +53,7 @@ func New(csp string, fs afero.Afero, keyWaiter KeyWaiter, mapper DeviceMapper, m // PrepareExistingDisk requests and waits for a decryption key to remap the encrypted state disk. // Once the disk is mapped, the function taints the node as initialized by updating it's PCRs. func (s *SetupManager) PrepareExistingDisk() error { - log.Println("Preparing existing state disk") + s.log.Infof("Preparing existing state disk") uuid := s.mapper.DiskUUID() getKey: @@ -61,6 +64,7 @@ getKey: if err := s.mapper.MapDisk(stateDiskMappedName, string(passphrase)); err != nil { // retry key fetching if disk mapping fails + s.log.With(zap.Error(err)).Errorf("Failed to map state disk, retrying...") s.keyWaiter.ResetKey() goto getKey } @@ -88,7 +92,7 @@ getKey: // PrepareNewDisk prepares an instances state disk by formatting the disk as a LUKS device using a random passphrase. func (s *SetupManager) PrepareNewDisk() error { - log.Println("Preparing new state disk") + s.log.Infof("Preparing new state disk") // generate and save temporary passphrase if err := s.fs.MkdirAll(keyPath, os.ModePerm); err != nil { diff --git a/state/setup/setup_test.go b/state/setup/setup_test.go index 5501c6eb7..d5c39af94 100644 --- a/state/setup/setup_test.go +++ b/state/setup/setup_test.go @@ -11,6 +11,7 @@ import ( "github.com/edgelesssys/constellation/coordinator/nodestate" "github.com/edgelesssys/constellation/internal/attestation/vtpm" "github.com/edgelesssys/constellation/internal/file" + "github.com/edgelesssys/constellation/internal/logger" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -107,7 +108,15 @@ func TestPrepareExistingDisk(t *testing.T) { require.NoError(t, handler.WriteJSON(stateInfoPath, nodestate.NodeState{OwnerID: []byte("ownerID"), ClusterID: []byte("clusterID")}, file.OptMkdirAll)) } - setupManager := New("test", tc.fs, tc.keyWaiter, tc.mapper, tc.mounter, tc.openTPM) + setupManager := New( + logger.NewTest(t), + "test", + tc.fs, + tc.keyWaiter, + tc.mapper, + tc.mounter, + tc.openTPM, + ) err := setupManager.PrepareExistingDisk() if tc.wantErr { @@ -167,7 +176,7 @@ func TestPrepareNewDisk(t *testing.T) { t.Run(name, func(t *testing.T) { assert := assert.New(t) - setupManager := New("test", tc.fs, nil, tc.mapper, nil, nil) + setupManager := New(logger.NewTest(t), "test", tc.fs, nil, tc.mapper, nil, nil) err := setupManager.PrepareNewDisk() if tc.wantErr { @@ -233,7 +242,7 @@ func TestReadInitSecrets(t *testing.T) { require.NoError(handler.WriteJSON("/tmp/test-state.json", state, file.OptMkdirAll)) } - setupManager := New("test", tc.fs, nil, nil, nil, nil) + setupManager := New(logger.NewTest(t), "test", tc.fs, nil, nil, nil, nil) ownerID, clusterID, err := setupManager.readInitSecrets("/tmp/test-state.json") if tc.wantErr { diff --git a/state/test/integration_test.go b/state/test/integration_test.go index 55b78550e..6f9ae88d5 100644 --- a/state/test/integration_test.go +++ b/state/test/integration_test.go @@ -14,6 +14,7 @@ import ( "github.com/edgelesssys/constellation/coordinator/core" "github.com/edgelesssys/constellation/internal/atls" "github.com/edgelesssys/constellation/internal/grpc/atlscredentials" + "github.com/edgelesssys/constellation/internal/logger" "github.com/edgelesssys/constellation/internal/oid" "github.com/edgelesssys/constellation/state/keyservice" "github.com/edgelesssys/constellation/state/keyservice/keyproto" @@ -85,7 +86,12 @@ func TestKeyAPI(t *testing.T) { apiAddr := listener.Addr().String() listener.Close() - api := keyservice.New(atls.NewFakeIssuer(oid.Dummy{}), &core.ProviderMetadataFake{}, 20*time.Second) + api := keyservice.New( + logger.NewTest(t), + atls.NewFakeIssuer(oid.Dummy{}), + &core.ProviderMetadataFake{}, + 20*time.Second, + ) // send a key to the server go func() {