Simplify node lock and various small changes

Co-authored-by: Fabian Kammel <fabian@kammel.dev>
Co-authored-by: Daniel Weiße <66256922+daniel-weisse@users.noreply.github.com>
This commit is contained in:
Malte Poll 2022-07-14 15:45:04 +02:00 committed by Paul Meyer
parent 2bcf001d52
commit cce2611e2a
31 changed files with 530 additions and 229 deletions

View file

@ -0,0 +1,64 @@
package clean
import (
"sync"
)
type cleaner struct {
stoppers []stopper
stopC chan struct{}
startOnce sync.Once
wg sync.WaitGroup
}
// New creates a new cleaner.
func New(stoppers ...stopper) *cleaner {
res := &cleaner{
stoppers: stoppers,
stopC: make(chan struct{}, 1),
}
res.wg.Add(1) // for the Start goroutine
return res
}
// With adds a new stopper to the cleaner.
func (c *cleaner) With(stopper stopper) *cleaner {
c.stoppers = append(c.stoppers, stopper)
return c
}
// Start blocks until it receives a stop message, stops all services gracefully and returns.
func (c *cleaner) Start() {
c.startOnce.Do(func() {
defer c.wg.Done()
// wait for the stop message
<-c.stopC
c.wg.Add(len(c.stoppers))
for _, stopItem := range c.stoppers {
go func(stopItem stopper) {
defer c.wg.Done()
stopItem.Stop()
}(stopItem)
}
})
}
// Clean initiates the cleanup but does not wait for it to complete.
func (c *cleaner) Clean() {
// try to enqueue the stop message once
// if the channel is full, the message is dropped
select {
case c.stopC <- struct{}{}:
default:
}
}
// Done waits for the cleanup to complete.
func (c *cleaner) Done() {
c.wg.Wait()
}
type stopper interface {
Stop()
}

View file

@ -0,0 +1,98 @@
package clean
import (
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestNew(t *testing.T) {
assert := assert.New(t)
cleaner := New(&spyStopper{})
assert.NotNil(cleaner)
assert.NotEmpty(cleaner.stoppers)
}
func TestWith(t *testing.T) {
assert := assert.New(t)
cleaner := New().With(&spyStopper{})
assert.NotEmpty(cleaner.stoppers)
}
func TestClean(t *testing.T) {
assert := assert.New(t)
stopper := &spyStopper{}
cleaner := New(stopper)
go cleaner.Start()
cleaner.Clean()
cleaner.Done()
assert.Equal(int64(1), atomic.LoadInt64(&stopper.stopped))
// call again to make sure it doesn't panic or block or clean up again
cleaner.Clean()
assert.Equal(int64(1), atomic.LoadInt64(&stopper.stopped))
}
func TestCleanBeforeStart(t *testing.T) {
assert := assert.New(t)
// calling Clean before Start should work
stopper := &spyStopper{}
cleaner := New(stopper)
cleaner.Clean()
cleaner.Start()
cleaner.Done()
assert.Equal(int64(1), atomic.LoadInt64(&stopper.stopped))
}
func TestConcurrent(t *testing.T) {
assert := assert.New(t)
// calling Clean concurrently should call Stop exactly once
stopper := &spyStopper{}
cleaner := New(stopper)
parallelism := 10
wg := sync.WaitGroup{}
start := func() {
defer wg.Done()
cleaner.Start()
}
clean := func() {
defer wg.Done()
cleaner.Clean()
}
done := func() {
defer wg.Done()
cleaner.Done()
}
wg.Add(3 * parallelism)
for i := 0; i < parallelism; i++ {
go start()
go clean()
go done()
}
wg.Wait()
cleaner.Done()
assert.Equal(int64(1), atomic.LoadInt64(&stopper.stopped))
}
type spyStopper struct {
stopped int64
}
func (s *spyStopper) Stop() {
atomic.AddInt64(&s.stopped, 1)
}

View file

@ -1,50 +0,0 @@
package exit
import (
"sync"
)
type cleaner struct {
stoppers []stopper
cleanupDone bool
wg sync.WaitGroup
mux sync.Mutex
}
// New creates a new cleaner.
func New(stoppers ...stopper) *cleaner {
return &cleaner{
stoppers: stoppers,
}
}
// With adds a new stopper to the cleaner.
func (c *cleaner) With(stopper stopper) *cleaner {
c.stoppers = append(c.stoppers, stopper)
return c
}
// Clean stops all services gracefully.
func (c *cleaner) Clean() {
// only cleanup once
c.mux.Lock()
defer c.mux.Unlock()
if c.cleanupDone {
return
}
c.wg.Add(len(c.stoppers))
for _, stopItem := range c.stoppers {
go func(stopItem stopper) {
stopItem.Stop()
c.wg.Done()
}(stopItem)
}
c.wg.Wait()
c.cleanupDone = true
}
type stopper interface {
Stop()
}

View file

@ -1,47 +0,0 @@
package exit
import (
"testing"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestNew(t *testing.T) {
assert := assert.New(t)
cleaner := New(&spyStopper{})
assert.NotNil(cleaner)
assert.NotEmpty(cleaner.stoppers)
}
func TestWith(t *testing.T) {
assert := assert.New(t)
cleaner := New().With(&spyStopper{})
assert.NotEmpty(cleaner.stoppers)
}
func TestClean(t *testing.T) {
assert := assert.New(t)
stopper := &spyStopper{}
cleaner := New(stopper)
cleaner.Clean()
assert.True(stopper.stopped)
// call again to make sure it doesn't panic or block
cleaner.Clean()
}
type spyStopper struct {
stopped bool
}
func (s *spyStopper) Stop() {
s.stopped = true
}

View file

@ -35,6 +35,7 @@ type Server struct {
disk encryptedDisk
fileHandler file.Handler
grpcServer serveStopper
cleaner cleaner
logger *zap.Logger
@ -70,18 +71,17 @@ func New(lock locker, kube ClusterInitializer, issuer atls.Issuer, fh file.Handl
// Serve starts the initialization server.
func (s *Server) Serve(ip, port string, cleaner cleaner) error {
s.cleaner = cleaner
lis, err := net.Listen("tcp", net.JoinHostPort(ip, port))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
err = s.grpcServer.Serve(lis)
cleaner.Clean()
return err
return s.grpcServer.Serve(lis)
}
// Init initializes the cluster.
func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initproto.InitResponse, error) {
defer s.cleaner.Clean()
s.logger.Info("Init called")
id, err := s.deriveAttestationID(req.MasterSecret)
@ -99,7 +99,6 @@ func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initpro
// init does not make sense, so we just stop.
//
// The server stops itself after the current call is done.
go s.grpcServer.GracefulStop()
s.logger.Info("node is already in a join process")
return nil, status.Error(codes.FailedPrecondition, "node is already being activated")
}
@ -137,7 +136,6 @@ func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initpro
}
s.logger.Info("Init succeeded")
go s.grpcServer.GracefulStop()
return &initproto.InitResponse{
Kubeconfig: kubeconfig,
OwnerId: id.Owner,

View file

@ -124,6 +124,7 @@ func TestInit(t *testing.T) {
fileHandler: tc.fileHandler,
logger: zaptest.NewLogger(t),
grpcServer: serveStopper,
cleaner: &fakeCleaner{serveStopper: serveStopper},
}
kubeconfig, err := server.Init(context.Background(), tc.req)
@ -253,3 +254,11 @@ func newFakeLock() *fakeLock {
func (l *fakeLock) TryLockOnce(_, _ []byte) (bool, error) {
return l.state.TryLock(), nil
}
type fakeCleaner struct {
serveStopper
}
func (f *fakeCleaner) Clean() {
go f.serveStopper.GracefulStop() // this is not the correct way to do this, but it's fine for testing
}

View file

@ -96,6 +96,7 @@ func (c *JoinClient) Start(cleaner cleaner) {
defer ticker.Stop()
defer func() { c.stopDone <- struct{}{} }()
defer c.log.Info("Client stopped")
defer cleaner.Clean()
diskUUID, err := c.getDiskUUID()
if err != nil {
@ -124,7 +125,6 @@ func (c *JoinClient) Start(cleaner cleaner) {
err := c.tryJoinWithAvailableServices()
if err == nil {
c.log.Info("Joined successfully. Client is shut down.")
go cleaner.Clean()
return
} else if isUnrecoverable(err) {
c.log.Error("Unrecoverable error occurred", zap.Error(err))

View file

@ -11,6 +11,9 @@ type gcpGuestAgentDaemonset struct {
DaemonSet apps.DaemonSet
}
// NewGCPGuestAgentDaemonset creates a new GCP Guest Agent Daemonset.
// The GCP guest agent is built in a separate repository: https://github.com/edgelesssys/gcp-guest-agent
// It is used automatically to add loadbalancer IPs to the local routing table of GCP instances.
func NewGCPGuestAgentDaemonset() *gcpGuestAgentDaemonset {
return &gcpGuestAgentDaemonset{
DaemonSet: apps.DaemonSet{
@ -61,7 +64,7 @@ func NewGCPGuestAgentDaemonset() *gcpGuestAgentDaemonset {
Containers: []k8s.Container{
{
Name: "gcp-guest-agent",
Image: gcpGuestImage,
Image: gcpGuestImage, // built from https://github.com/edgelesssys/gcp-guest-agent
SecurityContext: &k8s.SecurityContext{
Privileged: func(b bool) *bool { return &b }(true),
Capabilities: &k8s.Capabilities{

View file

@ -338,8 +338,8 @@ func manuallySetLoadbalancerIP(ctx context.Context, ip string) error {
if !strings.Contains(ip, "/") {
ip = ip + "/32"
}
args := fmt.Sprintf("route add to local %s scope host dev ens3 proto 66", ip)
_, err := exec.CommandContext(ctx, "ip", strings.Split(args, " ")...).Output()
args := []string{"route", "add", "to", "local", ip, "scope", "host", "dev", "ens3", "proto", "66"}
_, err := exec.CommandContext(ctx, "ip", args...).Output()
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {

View file

@ -14,39 +14,23 @@ import (
// There is no way to unlock, so the state changes only once from unlock to
// locked.
type Lock struct {
tpm vtpm.TPMOpenFunc
locked bool
state *sync.Mutex
mux *sync.RWMutex
tpm vtpm.TPMOpenFunc
mux *sync.Mutex
}
// New creates a new NodeLock, which is unlocked.
func New(tpm vtpm.TPMOpenFunc) *Lock {
return &Lock{
tpm: tpm,
state: &sync.Mutex{},
mux: &sync.RWMutex{},
tpm: tpm,
mux: &sync.Mutex{},
}
}
// TryLockOnce tries to lock the node. If the node is already locked, it
// returns false. If the node is unlocked, it locks it and returns true.
func (l *Lock) TryLockOnce(ownerID, clusterID []byte) (bool, error) {
success := l.state.TryLock()
if success {
l.mux.Lock()
defer l.mux.Unlock()
l.locked = true
if err := vtpm.MarkNodeAsBootstrapped(l.tpm, ownerID, clusterID); err != nil {
return success, err
}
if !l.mux.TryLock() {
return false, nil
}
return success, nil
}
// Locked returns true if the node is locked.
func (l *Lock) Locked() bool {
l.mux.RLock()
defer l.mux.RUnlock()
return l.locked
return true, vtpm.MarkNodeAsBootstrapped(l.tpm, ownerID, clusterID)
}