mirror of
https://github.com/edgelesssys/constellation.git
synced 2025-01-11 23:49:30 -05:00
8ef5ea2efe
* helm: rename CoreDNS configmap * upgrade-agent: ignore CoreDNS preflight errors * fixup! helm: rename CoreDNS configmap
169 lines
5.4 KiB
Go
169 lines
5.4 KiB
Go
/*
|
|
Copyright (c) Edgeless Systems GmbH
|
|
|
|
SPDX-License-Identifier: AGPL-3.0-only
|
|
*/
|
|
|
|
/*
|
|
Package server implements the gRPC server for the upgrade agent.
|
|
|
|
The server is responsible for using kubeadm to upgrade the Kubernetes
|
|
release of a Constellation node.
|
|
*/
|
|
package server
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io/fs"
|
|
"log/slog"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
|
|
"github.com/edgelesssys/constellation/v2/internal/constants"
|
|
"github.com/edgelesssys/constellation/v2/internal/file"
|
|
"github.com/edgelesssys/constellation/v2/internal/installer"
|
|
"github.com/edgelesssys/constellation/v2/internal/logger"
|
|
"github.com/edgelesssys/constellation/v2/internal/versions/components"
|
|
"github.com/edgelesssys/constellation/v2/upgrade-agent/upgradeproto"
|
|
"golang.org/x/mod/semver"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
var errInvalidKubernetesVersion = errors.New("invalid kubernetes version")
|
|
|
|
// Server is the upgrade-agent server.
|
|
type Server struct {
|
|
file file.Handler
|
|
grpcServer serveStopper
|
|
log *slog.Logger
|
|
upgradeproto.UnimplementedUpdateServer
|
|
}
|
|
|
|
// New creates a new upgrade-agent server.
|
|
func New(log *slog.Logger, fileHandler file.Handler) (*Server, error) {
|
|
log = log.WithGroup("upgradeServer")
|
|
|
|
server := &Server{
|
|
log: log,
|
|
file: fileHandler,
|
|
}
|
|
|
|
grpcServer := grpc.NewServer(
|
|
logger.GetServerUnaryInterceptor(logger.GRPCLogger(log)),
|
|
)
|
|
upgradeproto.RegisterUpdateServer(grpcServer, server)
|
|
|
|
server.grpcServer = grpcServer
|
|
return server, nil
|
|
}
|
|
|
|
// Run starts the upgrade-agent server on the given port, using the provided protocol and socket address.
|
|
func (s *Server) Run(protocol string, sockAddr string) error {
|
|
grpcServer := grpc.NewServer()
|
|
|
|
upgradeproto.RegisterUpdateServer(grpcServer, s)
|
|
|
|
cleanup := func() error {
|
|
err := os.RemoveAll(sockAddr)
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
if err := cleanup(); err != nil {
|
|
return fmt.Errorf("failed to clean socket file: %s", err)
|
|
}
|
|
|
|
lis, err := net.Listen(protocol, sockAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to listen: %s", err)
|
|
}
|
|
|
|
s.log.Info("Starting")
|
|
return grpcServer.Serve(lis)
|
|
}
|
|
|
|
// Stop stops the upgrade-agent server gracefully.
|
|
func (s *Server) Stop() {
|
|
s.log.Info("Stopping")
|
|
|
|
s.grpcServer.GracefulStop()
|
|
|
|
s.log.Info("Stopped")
|
|
}
|
|
|
|
// ExecuteUpdate installs & verifies the provided kubeadm, then executes `kubeadm upgrade plan` & `kubeadm upgrade apply {wanted_Kubernetes_Version}` to upgrade to the specified version.
|
|
func (s *Server) ExecuteUpdate(ctx context.Context, updateRequest *upgradeproto.ExecuteUpdateRequest) (*upgradeproto.ExecuteUpdateResponse, error) {
|
|
s.log.Info(fmt.Sprintf("Upgrade to Kubernetes version started: %s", updateRequest.WantedKubernetesVersion))
|
|
|
|
installer := installer.NewOSInstaller()
|
|
err := prepareUpdate(ctx, installer, updateRequest)
|
|
if errors.Is(err, errInvalidKubernetesVersion) {
|
|
return nil, status.Errorf(codes.Internal, "unable to verify the Kubernetes version %s: %s", updateRequest.WantedKubernetesVersion, err)
|
|
} else if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "unable to install the kubeadm binary: %s", err)
|
|
}
|
|
|
|
// CoreDNS addon status is checked even though we did not install it.
|
|
// TODO(burgerdev): Use kubeadm phases once supported: https://github.com/kubernetes/kubeadm/issues/1318.
|
|
commonArgs := []string{"--ignore-preflight-errors", "CoreDNSMigration,CoreDNSUnsupportedPlugins", updateRequest.WantedKubernetesVersion}
|
|
planArgs := append([]string{"upgrade", "plan"}, commonArgs...)
|
|
applyArgs := append([]string{"upgrade", "apply", "--yes", "--patches", constants.KubeadmPatchDir}, commonArgs...)
|
|
|
|
upgradeCmd := exec.CommandContext(ctx, "kubeadm", planArgs...)
|
|
if out, err := upgradeCmd.CombinedOutput(); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "unable to execute kubeadm upgrade plan %s: %s: %s", updateRequest.WantedKubernetesVersion, err, string(out))
|
|
}
|
|
|
|
applyCmd := exec.CommandContext(ctx, "kubeadm", applyArgs...)
|
|
if out, err := applyCmd.CombinedOutput(); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "unable to execute kubeadm upgrade apply: %s: %s", err, string(out))
|
|
}
|
|
|
|
s.log.Info(fmt.Sprintf("Upgrade to Kubernetes version succeeded: %s", updateRequest.WantedKubernetesVersion))
|
|
return &upgradeproto.ExecuteUpdateResponse{}, nil
|
|
}
|
|
|
|
// prepareUpdate downloads & installs the specified kubeadm version and verifies the desired Kubernetes version.
|
|
func prepareUpdate(ctx context.Context, installer osInstaller, updateRequest *upgradeproto.ExecuteUpdateRequest) error {
|
|
// verify Kubernetes version
|
|
err := verifyVersion(updateRequest.WantedKubernetesVersion)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Download & install the Kubernetes components.
|
|
for _, c := range updateRequest.KubernetesComponents {
|
|
if err := installer.Install(ctx, c); err != nil {
|
|
return fmt.Errorf("installing Kubernetes component %q: %w", c.Url, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// verifyVersion verifies the provided Kubernetes version.
|
|
func verifyVersion(version string) error {
|
|
if !semver.IsValid(version) {
|
|
return errInvalidKubernetesVersion
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type osInstaller interface {
|
|
// Install downloads, installs and verifies the kubernetes component.
|
|
Install(ctx context.Context, kubernetesComponent *components.Component) error
|
|
}
|
|
|
|
type serveStopper interface {
|
|
// Serve starts the server.
|
|
Serve(lis net.Listener) error
|
|
// GracefulStop stops the server and blocks until all requests are done.
|
|
GracefulStop()
|
|
}
|