constellation/upgrade-agent/internal/server/server.go

169 lines
5.4 KiB
Go
Raw Normal View History

/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
/*
Package server implements the gRPC server for the upgrade agent.
The server is responsible for using kubeadm to upgrade the Kubernetes
release of a Constellation node.
*/
package server
import (
"context"
"errors"
"fmt"
"io/fs"
2024-02-08 09:20:01 -05:00
"log/slog"
"net"
"os"
"os/exec"
"github.com/edgelesssys/constellation/v2/internal/constants"
"github.com/edgelesssys/constellation/v2/internal/file"
"github.com/edgelesssys/constellation/v2/internal/installer"
"github.com/edgelesssys/constellation/v2/internal/logger"
"github.com/edgelesssys/constellation/v2/internal/versions/components"
"github.com/edgelesssys/constellation/v2/upgrade-agent/upgradeproto"
"golang.org/x/mod/semver"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var errInvalidKubernetesVersion = errors.New("invalid kubernetes version")
// Server is the upgrade-agent server.
type Server struct {
file file.Handler
grpcServer serveStopper
2024-02-08 09:20:01 -05:00
log *slog.Logger
upgradeproto.UnimplementedUpdateServer
}
// New creates a new upgrade-agent server.
2024-02-08 09:20:01 -05:00
func New(log *slog.Logger, fileHandler file.Handler) (*Server, error) {
log = log.WithGroup("upgradeServer")
server := &Server{
log: log,
file: fileHandler,
}
grpcServer := grpc.NewServer(
logger.GetServerUnaryInterceptor(logger.GRPCLogger(log)),
)
upgradeproto.RegisterUpdateServer(grpcServer, server)
server.grpcServer = grpcServer
return server, nil
}
// Run starts the upgrade-agent server on the given port, using the provided protocol and socket address.
func (s *Server) Run(protocol string, sockAddr string) error {
grpcServer := grpc.NewServer()
upgradeproto.RegisterUpdateServer(grpcServer, s)
cleanup := func() error {
err := os.RemoveAll(sockAddr)
if errors.Is(err, fs.ErrNotExist) {
return nil
} else if err != nil {
return err
}
return nil
}
if err := cleanup(); err != nil {
return fmt.Errorf("failed to clean socket file: %s", err)
}
lis, err := net.Listen(protocol, sockAddr)
if err != nil {
return fmt.Errorf("failed to listen: %s", err)
}
2024-02-08 09:20:01 -05:00
s.log.Info("Starting")
return grpcServer.Serve(lis)
}
// Stop stops the upgrade-agent server gracefully.
func (s *Server) Stop() {
2024-02-08 09:20:01 -05:00
s.log.Info("Stopping")
s.grpcServer.GracefulStop()
2024-02-08 09:20:01 -05:00
s.log.Info("Stopped")
}
// ExecuteUpdate installs & verifies the provided kubeadm, then executes `kubeadm upgrade plan` & `kubeadm upgrade apply {wanted_Kubernetes_Version}` to upgrade to the specified version.
func (s *Server) ExecuteUpdate(ctx context.Context, updateRequest *upgradeproto.ExecuteUpdateRequest) (*upgradeproto.ExecuteUpdateResponse, error) {
2024-02-08 09:20:01 -05:00
s.log.Info(fmt.Sprintf("Upgrade to Kubernetes version started: %s", updateRequest.WantedKubernetesVersion))
installer := installer.NewOSInstaller()
err := prepareUpdate(ctx, installer, updateRequest)
if errors.Is(err, errInvalidKubernetesVersion) {
2023-01-06 06:05:47 -05:00
return nil, status.Errorf(codes.Internal, "unable to verify the Kubernetes version %s: %s", updateRequest.WantedKubernetesVersion, err)
} else if err != nil {
2023-01-06 06:05:47 -05:00
return nil, status.Errorf(codes.Internal, "unable to install the kubeadm binary: %s", err)
}
// CoreDNS addon status is checked even though we did not install it.
// TODO(burgerdev): Use kubeadm phases once supported: https://github.com/kubernetes/kubeadm/issues/1318.
commonArgs := []string{"--ignore-preflight-errors", "CoreDNSMigration,CoreDNSUnsupportedPlugins", updateRequest.WantedKubernetesVersion}
planArgs := append([]string{"upgrade", "plan"}, commonArgs...)
applyArgs := append([]string{"upgrade", "apply", "--yes", "--patches", constants.KubeadmPatchDir}, commonArgs...)
upgradeCmd := exec.CommandContext(ctx, "kubeadm", planArgs...)
2023-01-06 06:05:47 -05:00
if out, err := upgradeCmd.CombinedOutput(); err != nil {
return nil, status.Errorf(codes.Internal, "unable to execute kubeadm upgrade plan %s: %s: %s", updateRequest.WantedKubernetesVersion, err, string(out))
}
applyCmd := exec.CommandContext(ctx, "kubeadm", applyArgs...)
2023-01-06 06:05:47 -05:00
if out, err := applyCmd.CombinedOutput(); err != nil {
return nil, status.Errorf(codes.Internal, "unable to execute kubeadm upgrade apply: %s: %s", err, string(out))
}
2024-02-08 09:20:01 -05:00
s.log.Info(fmt.Sprintf("Upgrade to Kubernetes version succeeded: %s", updateRequest.WantedKubernetesVersion))
return &upgradeproto.ExecuteUpdateResponse{}, nil
}
// prepareUpdate downloads & installs the specified kubeadm version and verifies the desired Kubernetes version.
func prepareUpdate(ctx context.Context, installer osInstaller, updateRequest *upgradeproto.ExecuteUpdateRequest) error {
// verify Kubernetes version
err := verifyVersion(updateRequest.WantedKubernetesVersion)
if err != nil {
return err
}
// Download & install the Kubernetes components.
2024-01-29 15:32:37 -05:00
for _, c := range updateRequest.KubernetesComponents {
if err := installer.Install(ctx, c); err != nil {
return fmt.Errorf("installing Kubernetes component %q: %w", c.Url, err)
}
}
return nil
}
// verifyVersion verifies the provided Kubernetes version.
func verifyVersion(version string) error {
if !semver.IsValid(version) {
return errInvalidKubernetesVersion
}
return nil
}
type osInstaller interface {
// Install downloads, installs and verifies the kubernetes component.
Install(ctx context.Context, kubernetesComponent *components.Component) error
}
type serveStopper interface {
// Serve starts the server.
Serve(lis net.Listener) error
// GracefulStop stops the server and blocks until all requests are done.
GracefulStop()
}