From f13a1957a232a9830e29f02baf8f1b44ef57e908 Mon Sep 17 00:00:00 2001 From: Danyal Prout Date: Thu, 14 Mar 2024 16:37:55 -0500 Subject: [PATCH 1/3] gzip files stored in s3 --- common/flags/config.go | 2 ++ common/flags/flags.go | 7 ++++++ common/storage/s3.go | 50 +++++++++++++++++++++++++++++++-------- common/storage/storage.go | 2 ++ 4 files changed, 51 insertions(+), 10 deletions(-) diff --git a/common/flags/config.go b/common/flags/config.go index 1e6dfb3..1b0ede1 100644 --- a/common/flags/config.go +++ b/common/flags/config.go @@ -28,6 +28,7 @@ type S3Config struct { S3CredentialType S3CredentialType AccessKey string SecretAccessKey string + Compress bool } func (c S3Config) check() error { @@ -104,6 +105,7 @@ func readS3Config(ctx *cli.Context) S3Config { UseHttps: ctx.Bool(S3EndpointHttpsFlagName), Bucket: ctx.String(S3BucketFlagName), S3CredentialType: toS3CredentialType(ctx.String(S3CredentialTypeFlagName)), + Compress: ctx.Bool(S3CompressFlagName), } } diff --git a/common/flags/flags.go b/common/flags/flags.go index 1bd30b7..1945953 100644 --- a/common/flags/flags.go +++ b/common/flags/flags.go @@ -12,6 +12,7 @@ const ( S3CredentialTypeFlagName = "s3-credential-type" S3EndpointFlagName = "s3-endpoint" S3EndpointHttpsFlagName = "s3-endpoint-https" + S3CompressFlagName = "s3-compress" S3AccessKeyFlagName = "s3-access-key" S3SecretAccessKeyFlagName = "s3-secret-access-key" S3BucketFlagName = "s3-bucket" @@ -51,6 +52,12 @@ func CLIFlags(envPrefix string) []cli.Flag { Value: true, EnvVars: opservice.PrefixEnvVar(envPrefix, "S3_ENDPOINT_HTTPS"), }, + &cli.BoolFlag{ + Name: S3CompressFlagName, + Usage: "Whether to compress data before storing in S3", + Value: false, + EnvVars: opservice.PrefixEnvVar(envPrefix, "S3_COMPRESS"), + }, &cli.StringFlag{ Name: S3AccessKeyFlagName, Usage: "The S3 access key for the bucket", diff --git a/common/storage/s3.go b/common/storage/s3.go index 91981a5..dd6cdd9 100644 --- a/common/storage/s3.go +++ b/common/storage/s3.go @@ -2,9 +2,9 @@ package storage import ( "bytes" + "compress/gzip" "context" "encoding/json" - "github.com/base-org/blob-archiver/common/flags" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -13,9 +13,10 @@ import ( ) type S3Storage struct { - s3 *minio.Client - bucket string - log log.Logger + s3 *minio.Client + bucket string + log log.Logger + compress bool } func NewS3Storage(cfg flags.S3Config, l log.Logger) (*S3Storage, error) { @@ -36,9 +37,10 @@ func NewS3Storage(cfg flags.S3Config, l log.Logger) (*S3Storage, error) { } return &S3Storage{ - s3: client, - bucket: cfg.Bucket, - log: l, + s3: client, + bucket: cfg.Bucket, + log: l, + compress: cfg.Compress, }, nil } @@ -75,6 +77,8 @@ func (s *S3Storage) Read(ctx context.Context, hash common.Hash) (BlobData, error } } + // TODO: We may need to decode if it's gzipped + var data BlobData err = json.NewDecoder(res).Decode(&data) if err != nil { @@ -92,10 +96,22 @@ func (s *S3Storage) Write(ctx context.Context, data BlobData) error { return ErrMarshaling } - reader := bytes.NewReader(b) - _, err = s.s3.PutObject(ctx, s.bucket, data.Header.BeaconBlockHash.String(), reader, int64(len(b)), minio.PutObjectOptions{ + options := minio.PutObjectOptions{ ContentType: "application/json", - }) + } + + if s.compress { + b, err = compress(b) + if err != nil { + s.log.Warn("error compressing blob", "err", err) + return ErrCompress + } + options.ContentEncoding = "gzip" + } + + reader := bytes.NewReader(b) + + _, err = s.s3.PutObject(ctx, s.bucket, data.Header.BeaconBlockHash.String(), reader, int64(len(b)), options) if err != nil { s.log.Warn("error writing blob", "err", err) @@ -105,3 +121,17 @@ func (s *S3Storage) Write(ctx context.Context, data BlobData) error { s.log.Info("wrote blob", "hash", data.Header.BeaconBlockHash.String()) return nil } + +func compress(in []byte) ([]byte, error) { + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + _, err := gz.Write(in) + if err != nil { + return nil, err + } + err = gz.Close() + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} diff --git a/common/storage/storage.go b/common/storage/storage.go index cdad499..57d2468 100644 --- a/common/storage/storage.go +++ b/common/storage/storage.go @@ -21,6 +21,8 @@ var ( ErrStorage = errors.New("error accessing storage") // ErrMarshaling is returned when there is an error in (un)marshaling the blob ErrMarshaling = errors.New("error encoding/decoding blob") + // ErrCompress is returned when there is an error gzipping the data + ErrCompress = errors.New("error compressing blob") ) type Header struct { From 16d5431d62286f1c594bd5b47d18f9130c889942 Mon Sep 17 00:00:00 2001 From: Danyal Prout Date: Thu, 14 Mar 2024 20:58:17 -0600 Subject: [PATCH 2/3] Reading gzipped files --- common/storage/s3.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/common/storage/s3.go b/common/storage/s3.go index dd6cdd9..b718447 100644 --- a/common/storage/s3.go +++ b/common/storage/s3.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" + "io" ) type S3Storage struct { @@ -65,7 +66,7 @@ func (s *S3Storage) Read(ctx context.Context, hash common.Hash) (BlobData, error return BlobData{}, ErrStorage } defer res.Close() - _, err = res.Stat() + stat, err := res.Stat() if err != nil { errResponse := minio.ToErrorResponse(err) if errResponse.Code == "NoSuchKey" { @@ -77,10 +78,19 @@ func (s *S3Storage) Read(ctx context.Context, hash common.Hash) (BlobData, error } } - // TODO: We may need to decode if it's gzipped + var reader io.ReadCloser = res + defer reader.Close() + + if stat.Metadata.Get("Content-Encoding") == "gzip" { + reader, err = gzip.NewReader(reader) + if err != nil { + s.log.Warn("error creating gzip reader", "hash", hash.String(), "err", err) + return BlobData{}, ErrMarshaling + } + } var data BlobData - err = json.NewDecoder(res).Decode(&data) + err = json.NewDecoder(reader).Decode(&data) if err != nil { s.log.Warn("error decoding blob", "hash", hash.String(), "err", err) return BlobData{}, ErrMarshaling From 9c9974de2f92e249e2a56c6554cd641a48d18be6 Mon Sep 17 00:00:00 2001 From: Danyal Prout Date: Fri, 15 Mar 2024 11:05:29 -0600 Subject: [PATCH 3/3] Fix: Provide blob url to validator --- common/storage/s3.go | 3 ++- validator/cmd/main.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/common/storage/s3.go b/common/storage/s3.go index b718447..6be01fc 100644 --- a/common/storage/s3.go +++ b/common/storage/s3.go @@ -5,12 +5,13 @@ import ( "compress/gzip" "context" "encoding/json" + "io" + "github.com/base-org/blob-archiver/common/flags" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" - "io" ) type S3Storage struct { diff --git a/validator/cmd/main.go b/validator/cmd/main.go index c2efe0d..cf47bcd 100644 --- a/validator/cmd/main.go +++ b/validator/cmd/main.go @@ -57,7 +57,7 @@ func Main() cliapp.LifecycleAction { } beaconClient := service.NewBlobSidecarClient(cfg.BeaconConfig.BeaconURL) - blobClient := service.NewBlobSidecarClient(cfg.BeaconConfig.BeaconURL) + blobClient := service.NewBlobSidecarClient(cfg.BlobConfig.BeaconURL) return service.NewValidator(l, headerClient, beaconClient, blobClient, closeApp), nil }