mirror of
https://github.com/edgelesssys/constellation.git
synced 2025-08-11 16:30:12 -04:00
Use multiple loadbalancers on GCP
This commit is contained in:
parent
c954ec089f
commit
a02a46e454
59 changed files with 1629 additions and 557 deletions
|
@ -7,6 +7,7 @@ import (
|
|||
"io/fs"
|
||||
"log"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/edgelesssys/constellation/debugd/bootstrapper"
|
||||
"github.com/edgelesssys/constellation/debugd/cdbg/config"
|
||||
|
@ -94,7 +95,7 @@ func deploy(cmd *cobra.Command, fileHandler file.Handler, constellationConfig *c
|
|||
|
||||
for _, ip := range ips {
|
||||
input := deployOnEndpointInput{
|
||||
debugdEndpoint: net.JoinHostPort(ip, debugd.DebugdPort),
|
||||
debugdEndpoint: net.JoinHostPort(ip, strconv.Itoa(constants.DebugdPort)),
|
||||
bootstrapperPath: debugConfig.ConstellationDebugConfig.BootstrapperPath,
|
||||
reader: reader,
|
||||
authorizedKeys: debugConfig.ConstellationDebugConfig.AuthorizedKeys,
|
||||
|
@ -194,13 +195,13 @@ func getIPsFromConfig(stat statec.ConstellationState, config configc.Config) ([]
|
|||
// add bootstrapper IP if it is not already in the list
|
||||
var foundBootstrapperIP bool
|
||||
for _, ip := range ips {
|
||||
if ip == stat.BootstrapperHost {
|
||||
if ip == stat.LoadBalancerIP {
|
||||
foundBootstrapperIP = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundBootstrapperIP && stat.BootstrapperHost != "" {
|
||||
ips = append(ips, stat.BootstrapperHost)
|
||||
if !foundBootstrapperIP && stat.LoadBalancerIP != "" {
|
||||
ips = append(ips, stat.LoadBalancerIP)
|
||||
}
|
||||
if len(ips) == 0 {
|
||||
return nil, fmt.Errorf("no public IPs found in statefile")
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net"
|
||||
|
@ -16,9 +17,9 @@ import (
|
|||
platform "github.com/edgelesssys/constellation/internal/cloud/cloudprovider"
|
||||
"github.com/edgelesssys/constellation/internal/deploy/ssh"
|
||||
"github.com/edgelesssys/constellation/internal/deploy/user"
|
||||
"github.com/edgelesssys/constellation/internal/iproute"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"github.com/spf13/afero"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const debugBanner = `
|
||||
|
@ -29,10 +30,9 @@ const debugBanner = `
|
|||
`
|
||||
|
||||
func main() {
|
||||
wg := &sync.WaitGroup{}
|
||||
verbosity := flag.Int("v", 0, logger.CmdLineVerbosityDescription)
|
||||
|
||||
flag.Parse()
|
||||
|
||||
log := logger.New(logger.JSONLog, logger.VerbosityFromInt(*verbosity))
|
||||
fs := afero.NewOsFs()
|
||||
streamer := bootstrapper.NewFileStreamer(fs)
|
||||
|
@ -49,15 +49,19 @@ func main() {
|
|||
case platform.Azure:
|
||||
azureFetcher, err := cloudprovider.NewAzure(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Fatalf("%s", err)
|
||||
}
|
||||
fetcher = azureFetcher
|
||||
case platform.GCP:
|
||||
gcpFetcher, err := cloudprovider.NewGCP(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Fatalf("%s", err)
|
||||
}
|
||||
fetcher = gcpFetcher
|
||||
if err := setLoadbalancerRoute(ctx, fetcher); err != nil {
|
||||
log.Errorf("adding load balancer IP to local routing table: %s", err)
|
||||
}
|
||||
log.Infof("Added load balancer IP to local routing table")
|
||||
case platform.QEMU:
|
||||
fetcher = cloudprovider.NewQEMU()
|
||||
default:
|
||||
|
@ -67,11 +71,13 @@ func main() {
|
|||
sched := metadata.NewScheduler(log.Named("scheduler"), fetcher, ssh, download)
|
||||
serv := server.New(log.Named("server"), ssh, serviceManager, streamer)
|
||||
if err := deploy.DeployDefaultServiceUnit(ctx, serviceManager); err != nil {
|
||||
panic(err)
|
||||
log.Fatalf("%s", err)
|
||||
}
|
||||
|
||||
writeDebugBanner(log)
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
wg.Add(1)
|
||||
go sched.Start(ctx, wg)
|
||||
wg.Add(1)
|
||||
|
@ -91,3 +97,11 @@ func writeDebugBanner(log *logger.Logger) {
|
|||
log.Infof("Unable to print to /dev/ttyS0: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func setLoadbalancerRoute(ctx context.Context, fetcher metadata.Fetcher) error {
|
||||
ip, err := fetcher.DiscoverLoadbalancerIP(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return iproute.AddToLocalRoutingTable(ctx, ip)
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import "time"
|
|||
|
||||
const (
|
||||
DebugdMetadataFlag = "constellation-debugd"
|
||||
DebugdPort = "4000"
|
||||
GRPCTimeout = 5 * time.Minute
|
||||
SSHCheckInterval = 30 * time.Second
|
||||
DiscoverDebugdInterval = 30 * time.Second
|
||||
|
|
|
@ -4,11 +4,13 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/edgelesssys/constellation/debugd/bootstrapper"
|
||||
"github.com/edgelesssys/constellation/debugd/debugd"
|
||||
pb "github.com/edgelesssys/constellation/debugd/service"
|
||||
"github.com/edgelesssys/constellation/internal/constants"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
@ -38,7 +40,8 @@ func New(log *logger.Logger, dialer NetDialer, serviceManager serviceManager, wr
|
|||
// DownloadBootstrapper will open a new grpc connection to another instance, attempting to download a bootstrapper from that instance.
|
||||
func (d *Download) DownloadBootstrapper(ctx context.Context, ip string) error {
|
||||
log := d.log.With(zap.String("ip", ip))
|
||||
serverAddr := net.JoinHostPort(ip, debugd.DebugdPort)
|
||||
serverAddr := net.JoinHostPort(ip, strconv.Itoa(constants.DebugdPort))
|
||||
|
||||
// only retry download from same endpoint after backoff
|
||||
if lastAttempt, ok := d.attemptedDownloads[serverAddr]; ok && time.Since(lastAttempt) < debugd.BootstrapperDownloadRetryBackoff {
|
||||
return fmt.Errorf("download failed too recently: %v / %v", time.Since(lastAttempt), debugd.BootstrapperDownloadRetryBackoff)
|
||||
|
|
|
@ -6,12 +6,14 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/edgelesssys/constellation/debugd/bootstrapper"
|
||||
"github.com/edgelesssys/constellation/debugd/debugd"
|
||||
pb "github.com/edgelesssys/constellation/debugd/service"
|
||||
"github.com/edgelesssys/constellation/internal/constants"
|
||||
"github.com/edgelesssys/constellation/internal/grpc/testdialer"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -59,7 +61,7 @@ func TestDownloadBootstrapper(t *testing.T) {
|
|||
chunks: [][]byte{[]byte("test")},
|
||||
},
|
||||
attemptedDownloads: map[string]time.Time{
|
||||
"192.0.2.0:4000": time.Now(),
|
||||
"192.0.2.0:" + strconv.Itoa(constants.DebugdPort): time.Now(),
|
||||
},
|
||||
wantDownloadErr: true,
|
||||
},
|
||||
|
@ -98,7 +100,7 @@ func TestDownloadBootstrapper(t *testing.T) {
|
|||
|
||||
grpcServ := grpc.NewServer()
|
||||
pb.RegisterDebugdServer(grpcServ, &tc.server)
|
||||
lis := dialer.GetListener(net.JoinHostPort(ip, debugd.DebugdPort))
|
||||
lis := dialer.GetListener(net.JoinHostPort(ip, strconv.Itoa(constants.DebugdPort)))
|
||||
go grpcServ.Serve(lis)
|
||||
|
||||
download := &Download{
|
||||
|
|
|
@ -3,6 +3,7 @@ package cloudprovider
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
azurecloud "github.com/edgelesssys/constellation/bootstrapper/cloudprovider/azure"
|
||||
gcpcloud "github.com/edgelesssys/constellation/bootstrapper/cloudprovider/gcp"
|
||||
|
@ -16,6 +17,8 @@ type providerMetadata interface {
|
|||
List(ctx context.Context) ([]metadata.InstanceMetadata, error)
|
||||
// Self retrieves the current instance.
|
||||
Self(ctx context.Context) (metadata.InstanceMetadata, error)
|
||||
// GetLoadBalancerEndpoint returns the endpoint of the load balancer.
|
||||
GetLoadBalancerEndpoint(ctx context.Context) (string, error)
|
||||
}
|
||||
|
||||
// Fetcher checks the metadata service to search for instances that were set up for debugging and cloud provider specific SSH keys.
|
||||
|
@ -80,6 +83,25 @@ func (f *Fetcher) DiscoverDebugdIPs(ctx context.Context) ([]string, error) {
|
|||
return ips, nil
|
||||
}
|
||||
|
||||
func (f *Fetcher) DiscoverLoadbalancerIP(ctx context.Context) (string, error) {
|
||||
lbEndpoint, err := f.metaAPI.GetLoadBalancerEndpoint(ctx)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("retrieving load balancer endpoint: %w", err)
|
||||
}
|
||||
|
||||
// The port of the endpoint is not the port we need. We need to strip it off.
|
||||
//
|
||||
// TODO: Tag the specific load balancer we are looking for with a distinct tag.
|
||||
// Change the GetLoadBalancerEndpoint method to return the endpoint of a load
|
||||
// balancer with a given tag.
|
||||
lbIP, _, err := net.SplitHostPort(lbEndpoint)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("parsing load balancer endpoint: %w", err)
|
||||
}
|
||||
|
||||
return lbIP, nil
|
||||
}
|
||||
|
||||
// FetchSSHKeys will query the metadata of the current instance and deploys any SSH keys found.
|
||||
func (f *Fetcher) FetchSSHKeys(ctx context.Context) ([]ssh.UserKey, error) {
|
||||
self, err := f.metaAPI.Self(ctx)
|
||||
|
|
|
@ -73,6 +73,50 @@ func TestDiscoverDebugIPs(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestDiscoverLoadbalancerIP(t *testing.T) {
|
||||
ip := "192.0.2.1"
|
||||
endpoint := ip + ":1234"
|
||||
someErr := errors.New("failed")
|
||||
|
||||
testCases := map[string]struct {
|
||||
metaAPI providerMetadata
|
||||
wantIP string
|
||||
wantErr bool
|
||||
}{
|
||||
"discovery works": {
|
||||
metaAPI: &stubMetadata{getLBEndpointRes: endpoint},
|
||||
wantIP: ip,
|
||||
},
|
||||
"get endpoint fails": {
|
||||
metaAPI: &stubMetadata{getLBEndpointErr: someErr},
|
||||
wantErr: true,
|
||||
},
|
||||
"invalid endpoint": {
|
||||
metaAPI: &stubMetadata{getLBEndpointRes: "invalid"},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
fetcher := &Fetcher{
|
||||
metaAPI: tc.metaAPI,
|
||||
}
|
||||
|
||||
ip, err := fetcher.DiscoverLoadbalancerIP(context.Background())
|
||||
|
||||
if tc.wantErr {
|
||||
assert.Error(err)
|
||||
} else {
|
||||
assert.NoError(err)
|
||||
assert.Equal(tc.wantIP, ip)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchSSHKeys(t *testing.T) {
|
||||
err := errors.New("some err")
|
||||
|
||||
|
@ -125,13 +169,15 @@ func TestFetchSSHKeys(t *testing.T) {
|
|||
}
|
||||
|
||||
type stubMetadata struct {
|
||||
listRes []metadata.InstanceMetadata
|
||||
listErr error
|
||||
selfRes metadata.InstanceMetadata
|
||||
selfErr error
|
||||
getInstanceRes metadata.InstanceMetadata
|
||||
getInstanceErr error
|
||||
supportedRes bool
|
||||
listRes []metadata.InstanceMetadata
|
||||
listErr error
|
||||
selfRes metadata.InstanceMetadata
|
||||
selfErr error
|
||||
getInstanceRes metadata.InstanceMetadata
|
||||
getInstanceErr error
|
||||
getLBEndpointRes string
|
||||
getLBEndpointErr error
|
||||
supportedRes bool
|
||||
}
|
||||
|
||||
func (m *stubMetadata) List(ctx context.Context) ([]metadata.InstanceMetadata, error) {
|
||||
|
@ -146,6 +192,10 @@ func (m *stubMetadata) GetInstance(ctx context.Context, providerID string) (meta
|
|||
return m.getInstanceRes, m.getInstanceErr
|
||||
}
|
||||
|
||||
func (m *stubMetadata) GetLoadBalancerEndpoint(ctx context.Context) (string, error) {
|
||||
return m.getLBEndpointRes, m.getLBEndpointErr
|
||||
}
|
||||
|
||||
func (m *stubMetadata) Supported() bool {
|
||||
return m.supportedRes
|
||||
}
|
||||
|
|
|
@ -14,6 +14,11 @@ func (f Fetcher) DiscoverDebugdIPs(ctx context.Context) ([]string, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (f Fetcher) DiscoverLoadbalancerIP(ctx context.Context) (string, error) {
|
||||
// Fallback fetcher does not try to discover loadbalancer IP
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (f Fetcher) FetchSSHKeys(ctx context.Context) ([]ssh.UserKey, error) {
|
||||
// Fallback fetcher does not try to fetch ssh keys
|
||||
return nil, nil
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
type Fetcher interface {
|
||||
DiscoverDebugdIPs(ctx context.Context) ([]string, error)
|
||||
FetchSSHKeys(ctx context.Context) ([]ssh.UserKey, error)
|
||||
DiscoverLoadbalancerIP(ctx context.Context) (string, error)
|
||||
}
|
||||
|
||||
// Scheduler schedules fetching of metadata using timers.
|
||||
|
|
|
@ -117,6 +117,10 @@ func (s *stubFetcher) FetchSSHKeys(ctx context.Context) ([]ssh.UserKey, error) {
|
|||
return s.keys, s.fetchSSHKeysErr
|
||||
}
|
||||
|
||||
func (s *stubFetcher) DiscoverLoadbalancerIP(ctx context.Context) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
type stubSSHDeployer struct {
|
||||
sshKeys []ssh.UserKey
|
||||
|
||||
|
|
|
@ -6,16 +6,20 @@ import (
|
|||
"fmt"
|
||||
"io/fs"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/edgelesssys/constellation/debugd/bootstrapper"
|
||||
"github.com/edgelesssys/constellation/debugd/debugd"
|
||||
"github.com/edgelesssys/constellation/debugd/debugd/deploy"
|
||||
pb "github.com/edgelesssys/constellation/debugd/service"
|
||||
"github.com/edgelesssys/constellation/internal/constants"
|
||||
"github.com/edgelesssys/constellation/internal/deploy/ssh"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
type debugdServer struct {
|
||||
|
@ -113,9 +117,13 @@ func Start(log *logger.Logger, wg *sync.WaitGroup, serv pb.DebugdServer) {
|
|||
grpcLog := log.Named("gRPC")
|
||||
grpcLog.WithIncreasedLevel(zap.WarnLevel).ReplaceGRPCLogger()
|
||||
|
||||
grpcServer := grpc.NewServer(grpcLog.GetServerStreamInterceptor(), grpcLog.GetServerUnaryInterceptor())
|
||||
grpcServer := grpc.NewServer(
|
||||
grpcLog.GetServerStreamInterceptor(),
|
||||
grpcLog.GetServerUnaryInterceptor(),
|
||||
grpc.KeepaliveParams(keepalive.ServerParameters{Time: 15 * time.Second}),
|
||||
)
|
||||
pb.RegisterDebugdServer(grpcServer, serv)
|
||||
lis, err := net.Listen("tcp", net.JoinHostPort("0.0.0.0", debugd.DebugdPort))
|
||||
lis, err := net.Listen("tcp", net.JoinHostPort("0.0.0.0", strconv.Itoa(constants.DebugdPort)))
|
||||
if err != nil {
|
||||
log.With(zap.Error(err)).Fatalf("Listening failed")
|
||||
}
|
||||
|
|
|
@ -6,11 +6,13 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/edgelesssys/constellation/debugd/bootstrapper"
|
||||
"github.com/edgelesssys/constellation/debugd/debugd/deploy"
|
||||
pb "github.com/edgelesssys/constellation/debugd/service"
|
||||
"github.com/edgelesssys/constellation/internal/constants"
|
||||
"github.com/edgelesssys/constellation/internal/deploy/ssh"
|
||||
"github.com/edgelesssys/constellation/internal/grpc/testdialer"
|
||||
"github.com/edgelesssys/constellation/internal/logger"
|
||||
|
@ -26,7 +28,7 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
|
||||
func TestUploadAuthorizedKeys(t *testing.T) {
|
||||
endpoint := "192.0.2.1:4000"
|
||||
endpoint := "192.0.2.1:" + strconv.Itoa(constants.DebugdPort)
|
||||
|
||||
testCases := map[string]struct {
|
||||
ssh stubSSHDeployer
|
||||
|
@ -105,7 +107,7 @@ func TestUploadAuthorizedKeys(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUploadBootstrapper(t *testing.T) {
|
||||
endpoint := "192.0.2.1:4000"
|
||||
endpoint := "192.0.2.1:" + strconv.Itoa(constants.DebugdPort)
|
||||
|
||||
testCases := map[string]struct {
|
||||
ssh stubSSHDeployer
|
||||
|
@ -190,7 +192,8 @@ func TestUploadBootstrapper(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestDownloadBootstrapper(t *testing.T) {
|
||||
endpoint := "192.0.2.1:4000"
|
||||
endpoint := "192.0.2.1:" + strconv.Itoa(constants.DebugdPort)
|
||||
|
||||
testCases := map[string]struct {
|
||||
ssh stubSSHDeployer
|
||||
serviceManager stubServiceManager
|
||||
|
@ -253,7 +256,8 @@ func TestDownloadBootstrapper(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUploadSystemServiceUnits(t *testing.T) {
|
||||
endpoint := "192.0.2.1:4000"
|
||||
endpoint := "192.0.2.1:" + strconv.Itoa(constants.DebugdPort)
|
||||
|
||||
testCases := map[string]struct {
|
||||
ssh stubSSHDeployer
|
||||
serviceManager stubServiceManager
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue