bazel: deps mirror (#1522)

bazel-deps-mirror is an internal tools used to upload external dependencies
that are referenced in the Bazel WORKSPACE to the Edgeless Systems' mirror.

It also normalizes deps rules.

* hack: add tool to mirror Bazel dependencies
* hack: bazel-deps-mirror tests
* bazel: add deps mirror commands
* ci: upload Bazel dependencies on renovate PRs
* update go mod
* run deps_mirror_upload


Signed-off-by: Paul Meyer <49727155+katexochen@users.noreply.github.com>
Co-authored-by: Paul Meyer <49727155+katexochen@users.noreply.github.com>
This commit is contained in:
Malte Poll 2023-03-30 09:41:56 +02:00 committed by GitHub
parent d3e2f30f7b
commit 827c4f548d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 2698 additions and 529 deletions

View file

@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("//bazel/go:go_test.bzl", "go_test")
go_library(
name = "mirror",
srcs = ["mirror.go"],
importpath = "github.com/edgelesssys/constellation/v2/hack/bazel-deps-mirror/internal/mirror",
visibility = ["//hack/bazel-deps-mirror:__subpackages__"],
deps = [
"//internal/logger",
"@com_github_aws_aws_sdk_go_v2_config//:config",
"@com_github_aws_aws_sdk_go_v2_feature_s3_manager//:manager",
"@com_github_aws_aws_sdk_go_v2_service_s3//:s3",
"@com_github_aws_aws_sdk_go_v2_service_s3//types",
],
)
go_test(
name = "mirror_test",
srcs = ["mirror_test.go"],
embed = [":mirror"],
deps = [
"//internal/logger",
"@com_github_aws_aws_sdk_go_v2_feature_s3_manager//:manager",
"@com_github_aws_aws_sdk_go_v2_service_s3//:s3",
"@com_github_aws_aws_sdk_go_v2_service_s3//types",
"@com_github_stretchr_testify//assert",
"@org_uber_go_goleak//:goleak",
],
)

View file

@ -0,0 +1,270 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
// package mirror is used upload and download Bazel dependencies to and from a mirror.
package mirror
import (
"context"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/edgelesssys/constellation/v2/internal/logger"
)
// Maintainer can upload and download files to and from a CAS mirror.
type Maintainer struct {
objectStorageClient objectStorageClient
uploadClient uploadClient
httpClient httpClient
// bucket is the name of the S3 bucket to use.
bucket string
// mirrorBaseURL is the base URL of the public CAS http endpoint.
mirrorBaseURL string
unauthenticated bool
dryRun bool
log *logger.Logger
}
// NewUnauthenticated creates a new Maintainer that dose not require authentication can only download files from a CAS mirror.
func NewUnauthenticated(mirrorBaseURL string, dryRun bool, log *logger.Logger) *Maintainer {
return &Maintainer{
httpClient: http.DefaultClient,
mirrorBaseURL: mirrorBaseURL,
unauthenticated: true,
dryRun: dryRun,
log: log,
}
}
// New creates a new Maintainer that can upload and download files to and from a CAS mirror.
func New(ctx context.Context, region, bucket, mirrorBaseURL string, dryRun bool, log *logger.Logger) (*Maintainer, error) {
cfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(region))
if err != nil {
return nil, err
}
s3C := s3.NewFromConfig(cfg)
uploadC := s3manager.NewUploader(s3C)
return &Maintainer{
objectStorageClient: s3C,
uploadClient: uploadC,
bucket: bucket,
mirrorBaseURL: mirrorBaseURL,
httpClient: http.DefaultClient,
dryRun: dryRun,
log: log,
}, nil
}
// MirrorURL returns the public URL of a file in the CAS mirror.
func (m *Maintainer) MirrorURL(hash string) (string, error) {
if _, err := hex.DecodeString(hash); err != nil {
return "", fmt.Errorf("invalid hash %q: %w", hash, err)
}
key := path.Join(keyBase, hash)
pubURL, err := url.Parse(m.mirrorBaseURL)
if err != nil {
return "", err
}
pubURL.Path = path.Join(pubURL.Path, key)
return pubURL.String(), nil
}
// Mirror downloads a file from one of the existing (non-mirror) urls and uploads it to the CAS mirror.
// It also calculates the hash of the file during streaming and checks if it matches the expected hash.
func (m *Maintainer) Mirror(ctx context.Context, hash string, urls []string) error {
if m.unauthenticated {
return errors.New("cannot upload in unauthenticated mode")
}
for _, url := range urls {
m.log.Debugf("Mirroring file with hash %v from %q", hash, url)
body, err := m.downloadFromUpstream(ctx, url)
if err != nil {
m.log.Debugf("Failed to download file from %q: %v", url, err)
continue
}
defer body.Close()
streamedHash := sha256.New()
tee := io.TeeReader(body, streamedHash)
if err := m.put(ctx, hash, tee); err != nil {
m.log.Warnf("Failed to stream file from upstream %q to mirror: %v.. Trying next url.", url, err)
continue
}
actualHash := hex.EncodeToString(streamedHash.Sum(nil))
if actualHash != hash {
return fmt.Errorf("hash mismatch while streaming file to mirror: expected %v, got %v", hash, actualHash)
}
pubURL, err := m.MirrorURL(hash)
if err != nil {
return err
}
m.log.Debugf("File uploaded successfully to mirror from %q as %q", url, pubURL)
return nil
}
return fmt.Errorf("failed to download / reupload file with hash %v from any of the urls: %v", hash, urls)
}
// Check checks if a file is present and has the correct hash in the CAS mirror.
func (m *Maintainer) Check(ctx context.Context, expectedHash string) error {
m.log.Debugf("Checking consistency of object with hash %v", expectedHash)
if m.unauthenticated {
return m.checkUnauthenticated(ctx, expectedHash)
}
return m.checkAuthenticated(ctx, expectedHash)
}
// checkReadonly checks if a file is present and has the correct hash in the CAS mirror.
// It uses the authenticated CAS s3 endpoint to download the file metadata.
func (m *Maintainer) checkAuthenticated(ctx context.Context, expectedHash string) error {
key := path.Join(keyBase, expectedHash)
m.log.Debugf("Check: s3 getObjectAttributes {Bucket: %v, Key: %v}", m.bucket, key)
attributes, err := m.objectStorageClient.GetObjectAttributes(ctx, &s3.GetObjectAttributesInput{
Bucket: &m.bucket,
Key: &key,
ObjectAttributes: []s3types.ObjectAttributes{s3types.ObjectAttributesChecksum, s3types.ObjectAttributesObjectParts},
})
if err != nil {
return err
}
hasChecksum := attributes.Checksum != nil && attributes.Checksum.ChecksumSHA256 != nil && len(*attributes.Checksum.ChecksumSHA256) > 0
isSinglePart := attributes.ObjectParts == nil || attributes.ObjectParts.TotalPartsCount == 1
if !hasChecksum || !isSinglePart {
// checksums are not guaranteed to be present
// and if present, they are only meaningful for single part objects
// fallback if checksum cannot be verified from attributes
m.log.Debugf("S3 object attributes cannot be used to verify key %v. Falling back to download.", key)
return m.checkUnauthenticated(ctx, expectedHash)
}
actualHash, err := base64.StdEncoding.DecodeString(*attributes.Checksum.ChecksumSHA256)
if err != nil {
return err
}
return compareHashes(expectedHash, actualHash)
}
// checkReadonly checks if a file is present and has the correct hash in the CAS mirror.
// It uses the public CAS http endpoint to download the file.
func (m *Maintainer) checkUnauthenticated(ctx context.Context, expectedHash string) error {
pubURL, err := m.MirrorURL(expectedHash)
if err != nil {
return err
}
m.log.Debugf("Check: http get {Url: %v}", pubURL)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, pubURL, http.NoBody)
if err != nil {
return err
}
resp, err := m.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code %v", resp.StatusCode)
}
actualHash := sha256.New()
if _, err := io.Copy(actualHash, resp.Body); err != nil {
return err
}
return compareHashes(expectedHash, actualHash.Sum(nil))
}
// put uploads a file to the CAS mirror.
func (m *Maintainer) put(ctx context.Context, hash string, data io.Reader) error {
if m.unauthenticated {
return errors.New("cannot upload in unauthenticated mode")
}
key := path.Join(keyBase, hash)
if m.dryRun {
m.log.Debugf("DryRun: s3 put object {Bucket: %v, Key: %v}", m.bucket, key)
return nil
}
m.log.Debugf("Uploading object with hash %v to s3://%v/%v", hash, m.bucket, key)
_, err := m.uploadClient.Upload(ctx, &s3.PutObjectInput{
Bucket: &m.bucket,
Key: &key,
Body: data,
ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256,
})
return err
}
// downloadFromUpstream downloads a file from one of the existing (non-mirror) urls.
func (m *Maintainer) downloadFromUpstream(ctx context.Context, url string) (body io.ReadCloser, retErr error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
if err != nil {
return nil, err
}
resp, err := m.httpClient.Do(req)
if err != nil {
return nil, err
}
defer func() {
if retErr != nil {
resp.Body.Close()
}
}()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code %v", resp.StatusCode)
}
return resp.Body, nil
}
func compareHashes(expectedHash string, actualHash []byte) error {
if len(actualHash) != sha256.Size {
return fmt.Errorf("actual hash should to be %v bytes, got %v", sha256.Size, len(actualHash))
}
if len(expectedHash) != hex.EncodedLen(sha256.Size) {
return fmt.Errorf("expected hash should be %v bytes, got %v", hex.EncodedLen(sha256.Size), len(expectedHash))
}
actualHashStr := hex.EncodeToString(actualHash)
if expectedHash != actualHashStr {
return fmt.Errorf("expected hash %v, mirror returned %v", expectedHash, actualHashStr)
}
return nil
}
type objectStorageClient interface {
GetObjectAttributes(ctx context.Context, params *s3.GetObjectAttributesInput, optFns ...func(*s3.Options)) (*s3.GetObjectAttributesOutput, error)
}
type uploadClient interface {
Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
}
type httpClient interface {
Get(url string) (*http.Response, error)
Do(req *http.Request) (*http.Response, error)
}
const (
// DryRun is a flag to enable dry run mode.
DryRun = true
// Run is a flag to perform actual operations.
Run = false
keyBase = "constellation/cas/sha256"
)

View file

@ -0,0 +1,285 @@
/*
Copyright (c) Edgeless Systems GmbH
SPDX-License-Identifier: AGPL-3.0-only
*/
package mirror
import (
"bytes"
"context"
"io"
"log"
"net/http"
"testing"
s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/edgelesssys/constellation/v2/internal/logger"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestMirrorURL(t *testing.T) {
testCases := map[string]struct {
hash string
wantURL string
wantErr bool
}{
"empty hash": {
hash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
wantURL: "https://example.com/constellation/cas/sha256/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
},
"other hash": {
hash: "0000000000000000000000000000000000000000000000000000000000000000",
wantURL: "https://example.com/constellation/cas/sha256/0000000000000000000000000000000000000000000000000000000000000000",
},
"invalid hash": {
hash: "\x00",
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
m := Maintainer{
mirrorBaseURL: "https://example.com/",
}
url, err := m.MirrorURL(tc.hash)
if tc.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, tc.wantURL, url)
})
}
}
func TestMirror(t *testing.T) {
testCases := map[string]struct {
unauthenticated bool
hash string
data []byte
upstreamURL string
statusCode int
failUpload bool
wantErr bool
}{
"cannot upload in unauthenticated mode": {
unauthenticated: true,
hash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
data: []byte(""),
upstreamURL: "https://example.com/empty",
statusCode: http.StatusOK,
wantErr: true,
},
"http error": {
hash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
data: []byte(""),
upstreamURL: "https://example.com/empty",
statusCode: http.StatusNotFound,
wantErr: true,
},
"hash mismatch": {
hash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
data: []byte("evil"),
upstreamURL: "https://example.com/empty",
statusCode: http.StatusOK,
wantErr: true,
},
"upload error": {
hash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
data: []byte(""),
upstreamURL: "https://example.com/empty",
statusCode: http.StatusOK,
failUpload: true,
wantErr: true,
},
"success": {
hash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
data: []byte(""),
upstreamURL: "https://example.com/empty",
statusCode: http.StatusOK,
},
"success with different hash": {
hash: "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae",
data: []byte("foo"),
upstreamURL: "https://example.com/foo",
statusCode: http.StatusOK,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
m := Maintainer{
httpClient: &http.Client{
Transport: &stubUpstream{
statusCode: tc.statusCode,
body: tc.data,
},
},
uploadClient: &stubUploadClient{
uploadErr: func() error {
if tc.failUpload {
return assert.AnError
}
return nil
}(),
},
unauthenticated: tc.unauthenticated,
log: logger.NewTest(t),
}
err := m.Mirror(context.Background(), tc.hash, []string{tc.upstreamURL})
if tc.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
func TestCheck(t *testing.T) {
testCases := map[string]struct {
hash string
unauthenticatedResponse []byte
unauthenticatedStatusCode int
authenticatedResponse *s3.GetObjectAttributesOutput
authenticatedErr error
wantErr bool
}{
"unauthenticated mode, http error": {
hash: "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae",
unauthenticatedResponse: []byte("foo"), // ignored
unauthenticatedStatusCode: http.StatusNotFound,
wantErr: true,
},
"unauthenticated mode, hash mismatch": {
hash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
unauthenticatedResponse: []byte("foo"),
unauthenticatedStatusCode: http.StatusOK,
wantErr: true,
},
"unauthenticated mode, success": {
hash: "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae",
unauthenticatedResponse: []byte("foo"),
unauthenticatedStatusCode: http.StatusOK,
},
"authenticated mode, get attributes fails": {
hash: "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae",
authenticatedErr: assert.AnError,
wantErr: true,
},
"authenticated mode, hash mismatch": {
hash: "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae",
authenticatedResponse: &s3.GetObjectAttributesOutput{
Checksum: &types.Checksum{
ChecksumSHA256: toPtr("tcH7Lvxta0Z0wv3MSM4BtDo7fAN2PAwzVd4Ame4PjHM="),
},
ObjectParts: &types.GetObjectAttributesParts{
TotalPartsCount: 1,
},
},
wantErr: true,
},
"authenticated mode, success": {
hash: "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae",
authenticatedResponse: &s3.GetObjectAttributesOutput{
Checksum: &types.Checksum{
ChecksumSHA256: toPtr("LCa0a2j/xo/5m0U8HTBBNBNCLXBkg7+g+YpeiGJm564="),
},
ObjectParts: &types.GetObjectAttributesParts{
TotalPartsCount: 1,
},
},
},
"authenticated mode, fallback to unauthenticated": {
hash: "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae",
authenticatedResponse: &s3.GetObjectAttributesOutput{
ObjectParts: &types.GetObjectAttributesParts{
TotalPartsCount: 2,
},
},
unauthenticatedResponse: []byte("foo"),
unauthenticatedStatusCode: http.StatusOK,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
m := Maintainer{
unauthenticated: (tc.authenticatedResponse == nil),
httpClient: &http.Client{
Transport: &stubUpstream{
statusCode: tc.unauthenticatedStatusCode,
body: tc.unauthenticatedResponse,
},
},
objectStorageClient: &stubObjectStorageClient{
response: tc.authenticatedResponse,
err: tc.authenticatedErr,
},
log: logger.NewTest(t),
}
err := m.Check(context.Background(), tc.hash)
if tc.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
// stubUpstream implements http.RoundTripper and returns a canned response.
type stubUpstream struct {
statusCode int
body []byte
}
func (s *stubUpstream) RoundTrip(req *http.Request) (*http.Response, error) {
log.Printf("stubUpstream: %s %s -> %q\n", req.Method, req.URL, string(s.body))
return &http.Response{
StatusCode: s.statusCode,
Body: io.NopCloser(bytes.NewReader(s.body)),
}, nil
}
type stubUploadClient struct {
uploadErr error
uploadedData []byte
}
func (s *stubUploadClient) Upload(
_ context.Context, input *s3.PutObjectInput,
_ ...func(*s3manager.Uploader),
) (*s3manager.UploadOutput, error) {
var err error
s.uploadedData, err = io.ReadAll(input.Body)
if err != nil {
panic(err)
}
return nil, s.uploadErr
}
func toPtr[T any](v T) *T {
return &v
}
type stubObjectStorageClient struct {
response *s3.GetObjectAttributesOutput
err error
}
func (s *stubObjectStorageClient) GetObjectAttributes(
_ context.Context, _ *s3.GetObjectAttributesInput, _ ...func(*s3.Options),
) (*s3.GetObjectAttributesOutput, error) {
return s.response, s.err
}