From 4b1d4b71c6d7cee7f740d08844c39e49a36adc45 Mon Sep 17 00:00:00 2001 From: Danyal Prout Date: Sat, 17 Feb 2024 15:02:16 -0600 Subject: [PATCH 1/4] Add a rearchive API to the archiver --- archiver/cmd/main.go | 11 ++- archiver/metrics/metrics.go | 5 +- archiver/service/api.go | 90 +++++++++++++++++++-- archiver/service/api_test.go | 83 +++++++++++++++++++- archiver/service/archiver.go | 126 ++++++++++++------------------ archiver/service/archiver_test.go | 83 ++++++++++++++++++-- archiver/service/service.go | 89 +++++++++++++++++++++ common/beacon/beacontest/stub.go | 30 +++++-- common/blobtest/helpers.go | 2 + 9 files changed, 414 insertions(+), 105 deletions(-) create mode 100644 archiver/service/service.go diff --git a/archiver/cmd/main.go b/archiver/cmd/main.go index 00535f6..5b8b19a 100644 --- a/archiver/cmd/main.go +++ b/archiver/cmd/main.go @@ -66,9 +66,14 @@ func Main() cliapp.LifecycleAction { return nil, err } - api := service.NewAPI(m, l) - l.Info("Initializing Archiver Service") - return service.NewService(l, cfg, api, storageClient, beaconClient, m) + archiver, err := service.NewArchiver(l, cfg, storageClient, beaconClient, m) + if err != nil { + return nil, fmt.Errorf("failed to initialize archiver: %w", err) + } + + api := service.NewAPI(m, l, archiver) + + return service.NewService(l, cfg, api, archiver, m) } } diff --git a/archiver/metrics/metrics.go b/archiver/metrics/metrics.go index af14c75..b5e56f4 100644 --- a/archiver/metrics/metrics.go +++ b/archiver/metrics/metrics.go @@ -11,8 +11,9 @@ type BlockSource string var ( MetricsNamespace = "blob_archiver" - BlockSourceBackfill BlockSource = "backfill" - BlockSourceLive BlockSource = "live" + BlockSourceBackfill BlockSource = "backfill" + BlockSourceLive BlockSource = "live" + BlockSourceRearchive BlockSource = "rearchive" ) type Metricer interface { diff --git a/archiver/service/api.go b/archiver/service/api.go index 59e64d9..663489f 100644 --- a/archiver/service/api.go +++ b/archiver/service/api.go @@ -1,7 +1,10 @@ package service import ( + "encoding/json" + "fmt" "net/http" + "strconv" "time" m "github.com/base-org/blob-archiver/archiver/metrics" @@ -16,17 +19,19 @@ const ( ) type API struct { - router *chi.Mux - logger log.Logger - metrics m.Metricer + router *chi.Mux + logger log.Logger + metrics m.Metricer + archiver *Archiver } // NewAPI creates a new Archiver API instance. This API exposes an admin interface to control the archiver. -func NewAPI(metrics m.Metricer, logger log.Logger) *API { +func NewAPI(metrics m.Metricer, logger log.Logger, archiver *Archiver) *API { result := &API{ - router: chi.NewRouter(), - logger: logger, - metrics: metrics, + router: chi.NewRouter(), + archiver: archiver, + logger: logger, + metrics: metrics, } r := result.router @@ -41,6 +46,77 @@ func NewAPI(metrics m.Metricer, logger log.Logger) *API { }) r.Get("/", http.NotFound) + r.Post("/reindex", result.reindexBlocks) return result } + +type reindexResponse struct { + Error string `json:"error,omitempty"` + BlockStart uint64 `json:"blockStart"` + BlockEnd uint64 `json:"blockEnd"` +} + +func toSlot(input string) (uint64, error) { + if input == "" { + return 0, fmt.Errorf("must provide param") + } + res, err := strconv.ParseUint(input, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid slot: \"%s\"", input) + } + return res, nil +} + +func (a *API) reindexBlocks(w http.ResponseWriter, r *http.Request) { + from, err := toSlot(r.URL.Query().Get("from")) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(reindexResponse{ + Error: fmt.Sprintf("invalid from param: %v", err), + }) + return + } + + to, err := toSlot(r.URL.Query().Get("to")) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(reindexResponse{ + Error: fmt.Sprintf("invalid to param: %v", err), + }) + return + } + + if from > to { + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(reindexResponse{ + Error: fmt.Sprintf("invalid range: from %d to %d", from, to), + }) + return + } + + blockStart, blockEnd, err := a.archiver.rearchiveRange(from, to) + if err != nil { + a.logger.Error("Failed to reindex blocks", "err", err) + + w.WriteHeader(http.StatusInternalServerError) + err = json.NewEncoder(w).Encode(reindexResponse{ + Error: err.Error(), + BlockStart: blockStart, + BlockEnd: blockEnd, + }) + } else { + a.logger.Info("Reindexing blocks complete") + w.WriteHeader(http.StatusOK) + + err = json.NewEncoder(w).Encode(reindexResponse{ + BlockStart: blockStart, + BlockEnd: blockEnd, + }) + } + + if err != nil { + a.logger.Error("Failed to write response", "err", err) + w.WriteHeader(http.StatusInternalServerError) + } +} diff --git a/archiver/service/api_test.go b/archiver/service/api_test.go index 8f2af61..faed0bb 100644 --- a/archiver/service/api_test.go +++ b/archiver/service/api_test.go @@ -1,23 +1,32 @@ package service import ( + "encoding/json" "net/http/httptest" "testing" + "time" + "github.com/base-org/blob-archiver/archiver/flags" "github.com/base-org/blob-archiver/archiver/metrics" + "github.com/base-org/blob-archiver/common/storage/storagetest" "github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" ) -func setupAPI(t *testing.T) *API { +func setupAPI(t *testing.T) (*API, *storagetest.TestFileStorage) { logger := testlog.Logger(t, log.LvlInfo) m := metrics.NewMetrics() - return NewAPI(m, logger) + fs := storagetest.NewTestFileStorage(t, logger) + archiver, err := NewArchiver(logger, flags.ArchiverConfig{ + PollInterval: 10 * time.Second, + }, fs, nil, m) + require.NoError(t, err) + return NewAPI(m, logger, archiver), fs } func TestHealthHandler(t *testing.T) { - a := setupAPI(t) + a, _ := setupAPI(t) request := httptest.NewRequest("GET", "/healthz", nil) response := httptest.NewRecorder() @@ -26,3 +35,71 @@ func TestHealthHandler(t *testing.T) { require.Equal(t, 200, response.Code) } + +func TestReindexHandler(t *testing.T) { + a, _ := setupAPI(t) + + tests := []struct { + name string + path string + expectedStatus int + error string + }{ + { + name: "should fail with no params", + path: "/reindex", + expectedStatus: 400, + error: "invalid from param: must provide param", + }, + { + name: "should fail with missing to param", + path: "/reindex?from=1", + expectedStatus: 400, + error: "invalid to param: must provide param", + }, + { + name: "should fail with missing from param", + path: "/reindex?to=1", + expectedStatus: 400, + error: "invalid from param: must provide param", + }, + { + name: "should fail with invalid from param", + path: "/reindex?from=blah&to=1", + expectedStatus: 400, + error: "invalid from param: invalid slot: \"blah\"", + }, + { + name: "should fail with invalid to param", + path: "/reindex?from=1&to=blah", + expectedStatus: 400, + error: "invalid to param: invalid slot: \"blah\"", + }, + { + name: "should fail with to greater than equal to from", + path: "/reindex?from=2&to=1", + expectedStatus: 400, + error: "invalid range: from 2 to 1", + }, + } + + for _, tt := range tests { + test := tt + t.Run(test.name, func(t *testing.T) { + request := httptest.NewRequest("POST", test.path, nil) + response := httptest.NewRecorder() + + a.router.ServeHTTP(response, request) + + require.Equal(t, test.expectedStatus, response.Code) + + var errResponse reindexResponse + err := json.NewDecoder(response.Body).Decode(&errResponse) + require.NoError(t, err) + + if test.error != "" { + require.Equal(t, errResponse.Error, test.error) + } + }) + } +} diff --git a/archiver/service/archiver.go b/archiver/service/archiver.go index 1ad6c1b..0e5a49e 100644 --- a/archiver/service/archiver.go +++ b/archiver/service/archiver.go @@ -2,84 +2,53 @@ package service import ( "context" - "errors" - "fmt" - "sync/atomic" + "strconv" "time" - client "github.com/attestantio/go-eth2-client" "github.com/attestantio/go-eth2-client/api" v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/base-org/blob-archiver/archiver/flags" "github.com/base-org/blob-archiver/archiver/metrics" "github.com/base-org/blob-archiver/common/storage" - "github.com/ethereum-optimism/optimism/op-service/httputil" - opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" ) -const liveFetchBlobMaximumRetries = 10 -const startupFetchBlobMaximumRetries = 3 -const backfillErrorRetryInterval = 5 * time.Second - -var ErrAlreadyStopped = errors.New("already stopped") - -type BeaconClient interface { - client.BlobSidecarsProvider - client.BeaconBlockHeadersProvider -} +const ( + liveFetchBlobMaximumRetries = 10 + startupFetchBlobMaximumRetries = 3 + rearchiveMaximumRetries = 3 + backfillErrorRetryInterval = 5 * time.Second +) -func NewService(l log.Logger, cfg flags.ArchiverConfig, api *API, dataStoreClient storage.DataStore, client BeaconClient, m metrics.Metricer) (*ArchiverService, error) { - return &ArchiverService{ +func NewArchiver(l log.Logger, cfg flags.ArchiverConfig, dataStoreClient storage.DataStore, client BeaconClient, m metrics.Metricer) (*Archiver, error) { + return &Archiver{ log: l, cfg: cfg, dataStoreClient: dataStoreClient, metrics: m, - stopCh: make(chan struct{}), beaconClient: client, - api: api, + stopCh: make(chan struct{}), }, nil } -type ArchiverService struct { - stopped atomic.Bool - stopCh chan struct{} +type Archiver struct { log log.Logger + cfg flags.ArchiverConfig dataStoreClient storage.DataStore beaconClient BeaconClient - metricsServer *httputil.HTTPServer - cfg flags.ArchiverConfig metrics metrics.Metricer - api *API + stopCh chan struct{} } // Start starts the archiver service. It begins polling the beacon node for the latest blocks and persisting blobs for // them. Concurrently it'll also begin a backfill process (see backfillBlobs) to store all blobs from the current head // to the previously stored blocks. This ensures that during restarts or outages of an archiver, any gaps will be // filled in. -func (a *ArchiverService) Start(ctx context.Context) error { - if a.cfg.MetricsConfig.Enabled { - a.log.Info("starting metrics server", "addr", a.cfg.MetricsConfig.ListenAddr, "port", a.cfg.MetricsConfig.ListenPort) - srv, err := opmetrics.StartServer(a.metrics.Registry(), a.cfg.MetricsConfig.ListenAddr, a.cfg.MetricsConfig.ListenPort) - if err != nil { - return err - } - - a.log.Info("started metrics server", "addr", srv.Addr()) - a.metricsServer = srv - } - - srv, err := httputil.StartHTTPServer(a.cfg.ListenAddr, a.api.router) - if err != nil { - return fmt.Errorf("failed to start Archiver API server: %w", err) - } - - a.log.Info("Archiver API server started", "address", srv.Addr().String()) - +func (a *Archiver) Start(ctx context.Context) error { currentBlob, _, err := retry.Do2(ctx, startupFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { - return a.persistBlobsForBlockToS3(ctx, "head") + return a.persistBlobsForBlockToS3(ctx, "head", false) }) if err != nil { @@ -92,12 +61,18 @@ func (a *ArchiverService) Start(ctx context.Context) error { return a.trackLatestBlocks(ctx) } +// Stops the archiver service. +func (a *Archiver) Stop(ctx context.Context) error { + close(a.stopCh) + return nil +} + // persistBlobsForBlockToS3 fetches the blobs for a given block and persists them to S3. It returns the block header // and a boolean indicating whether the blobs already existed in S3 and any errors that occur. // If the blobs are already stored, it will not overwrite the data. Currently, the archiver does not // perform any validation of the blobs, it assumes a trusted beacon node. See: // https://github.com/base-org/blob-archiver/issues/4. -func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier string) (*v1.BeaconBlockHeader, bool, error) { +func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier string, overwrite bool) (*v1.BeaconBlockHeader, bool, error) { currentHeader, err := a.beaconClient.BeaconBlockHeader(ctx, &api.BeaconBlockHeaderOpts{ Block: blockIdentifier, }) @@ -113,7 +88,7 @@ func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIde return nil, false, err } - if exists { + if exists && !overwrite { a.log.Debug("blob already exists", "hash", currentHeader.Data.Root) return currentHeader.Data, true, nil } @@ -146,36 +121,13 @@ func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIde a.metrics.RecordStoredBlobs(len(blobSidecars.Data)) - return currentHeader.Data, false, nil -} - -// Stops the archiver service. -func (a *ArchiverService) Stop(ctx context.Context) error { - if a.stopped.Load() { - return ErrAlreadyStopped - } - a.log.Info("Stopping Archiver") - a.stopped.Store(true) - - close(a.stopCh) - - if a.metricsServer != nil { - if err := a.metricsServer.Stop(ctx); err != nil { - return err - } - } - - return nil -} - -func (a *ArchiverService) Stopped() bool { - return a.stopped.Load() + return currentHeader.Data, exists, nil } // backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted // to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled. // If an error is encountered persisting a block, it will retry after waiting for a period of time. -func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) { +func (a *Archiver) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) { current, alreadyExists, err := latest, false, error(nil) for !alreadyExists { @@ -185,7 +137,7 @@ func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBl } previous := current - current, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String()) + current, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false) if err != nil { a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String()) // Revert back to block we failed to fetch @@ -203,7 +155,7 @@ func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBl } // trackLatestBlocks will poll the beacon node for the latest blocks and persist blobs for them. -func (a *ArchiverService) trackLatestBlocks(ctx context.Context) error { +func (a *Archiver) trackLatestBlocks(ctx context.Context) error { t := time.NewTicker(a.cfg.PollInterval) defer t.Stop() @@ -222,7 +174,7 @@ func (a *ArchiverService) trackLatestBlocks(ctx context.Context) error { // processBlocksUntilKnownBlock will fetch and persist blobs for blocks until it finds a block that has been stored before. // In the case of a reorg, it will fetch the new head and then walk back the chain, storing all blobs until it finds a // known block -- that already exists in the archivers' storage. -func (a *ArchiverService) processBlocksUntilKnownBlock(ctx context.Context) { +func (a *Archiver) processBlocksUntilKnownBlock(ctx context.Context) { a.log.Debug("refreshing live data") var start *v1.BeaconBlockHeader @@ -230,7 +182,7 @@ func (a *ArchiverService) processBlocksUntilKnownBlock(ctx context.Context) { for { current, alreadyExisted, err := retry.Do2(ctx, liveFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { - return a.persistBlobsForBlockToS3(ctx, currentBlockId) + return a.persistBlobsForBlockToS3(ctx, currentBlockId, false) }) if err != nil { @@ -254,3 +206,23 @@ func (a *ArchiverService) processBlocksUntilKnownBlock(ctx context.Context) { a.log.Info("live data refreshed", "startHash", start.Root.String(), "endHash", currentBlockId) } + +func (a *Archiver) rearchiveRange(from uint64, to uint64) (uint64, uint64, error) { + for i := from; i <= to; i++ { + id := strconv.FormatUint(i, 10) + + a.log.Debug("rearchiving block", "blockId", id) + + _, _, err := retry.Do2(context.Background(), rearchiveMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { + return a.persistBlobsForBlockToS3(context.Background(), id, true) + }) + + if err != nil { + return from, i, err + } + + a.metrics.RecordProcessedBlock(metrics.BlockSourceRearchive) + } + + return from, to, nil +} diff --git a/archiver/service/archiver_test.go b/archiver/service/archiver_test.go index 25a537b..7733649 100644 --- a/archiver/service/archiver_test.go +++ b/archiver/service/archiver_test.go @@ -17,15 +17,15 @@ import ( "github.com/stretchr/testify/require" ) -func setup(t *testing.T, beacon *beacontest.StubBeaconClient) (*ArchiverService, *storagetest.TestFileStorage) { +func setup(t *testing.T, beacon *beacontest.StubBeaconClient) (*Archiver, *storagetest.TestFileStorage) { l := testlog.Logger(t, log.LvlInfo) fs := storagetest.NewTestFileStorage(t, l) m := metrics.NewMetrics() - svc, err := NewService(l, flags.ArchiverConfig{ + svc, err := NewArchiver(l, flags.ArchiverConfig{ PollInterval: 5 * time.Second, OriginBlock: blobtest.OriginBlock, - }, NewAPI(m, l), fs, beacon, m) + }, fs, beacon, m) require.NoError(t, err) return svc, fs } @@ -35,7 +35,7 @@ func TestArchiver_FetchAndPersist(t *testing.T) { fs.CheckNotExistsOrFail(t, blobtest.OriginBlock) - header, alreadyExists, err := svc.persistBlobsForBlockToS3(context.Background(), blobtest.OriginBlock.String()) + header, alreadyExists, err := svc.persistBlobsForBlockToS3(context.Background(), blobtest.OriginBlock.String(), false) require.False(t, alreadyExists) require.NoError(t, err) require.NotNil(t, header) @@ -43,7 +43,7 @@ func TestArchiver_FetchAndPersist(t *testing.T) { fs.CheckExistsOrFail(t, blobtest.OriginBlock) - header, alreadyExists, err = svc.persistBlobsForBlockToS3(context.Background(), blobtest.OriginBlock.String()) + header, alreadyExists, err = svc.persistBlobsForBlockToS3(context.Background(), blobtest.OriginBlock.String(), false) require.True(t, alreadyExists) require.NoError(t, err) require.NotNil(t, header) @@ -52,6 +52,38 @@ func TestArchiver_FetchAndPersist(t *testing.T) { fs.CheckExistsOrFail(t, blobtest.OriginBlock) } +func TestArchiver_FetchAndPersistOverwriting(t *testing.T) { + beacon := beacontest.NewDefaultStubBeaconClient(t) + svc, fs := setup(t, beacon) + + // Blob 5 already exists + fs.WriteOrFail(t, storage.BlobData{ + Header: storage.Header{ + BeaconBlockHash: blobtest.Five, + }, + BlobSidecars: storage.BlobSidecars{ + Data: beacon.Blobs[blobtest.Five.String()], + }, + }) + + require.Equal(t, fs.ReadOrFail(t, blobtest.Five).BlobSidecars.Data, beacon.Blobs[blobtest.Five.String()]) + + // change the blob data -- this isn't possible w/out changing the hash. But it allows us to test the overwrite + beacon.Blobs[blobtest.Five.String()] = blobtest.NewBlobSidecars(t, 6) + + _, exists, err := svc.persistBlobsForBlockToS3(context.Background(), blobtest.Five.String(), true) + require.NoError(t, err) + require.True(t, exists) + + // It should have overwritten the blob data + require.Equal(t, fs.ReadOrFail(t, blobtest.Five).BlobSidecars.Data, beacon.Blobs[blobtest.Five.String()]) + + // Overwriting a non-existent blob should return exists=false + _, exists, err = svc.persistBlobsForBlockToS3(context.Background(), blobtest.Four.String(), true) + require.NoError(t, err) + require.False(t, exists) +} + func TestArchiver_BackfillToOrigin(t *testing.T) { beacon := beacontest.NewDefaultStubBeaconClient(t) svc, fs := setup(t, beacon) @@ -303,3 +335,44 @@ func TestArchiver_LatestHaltsOnPersistentError(t *testing.T) { fs.CheckNotExistsOrFail(t, blobtest.Four) fs.CheckExistsOrFail(t, blobtest.Three) } + +func TestArchiver_RearchiveRange(t *testing.T) { + beacon := beacontest.NewDefaultStubBeaconClient(t) + svc, fs := setup(t, beacon) + + // 5 is the current head, if three already exists, we should write 5 and 4 and stop at three + fs.WriteOrFail(t, storage.BlobData{ + Header: storage.Header{ + BeaconBlockHash: blobtest.Three, + }, + BlobSidecars: storage.BlobSidecars{ + Data: beacon.Blobs[blobtest.Three.String()], + }, + }) + + // startSlot+1 == One + fs.CheckNotExistsOrFail(t, blobtest.One) + fs.CheckNotExistsOrFail(t, blobtest.Two) + fs.CheckExistsOrFail(t, blobtest.Three) + fs.CheckNotExistsOrFail(t, blobtest.Four) + + // this modifies the blobs at 3, purely to test the blob is rearchived + beacon.Blobs[blobtest.Three.String()] = blobtest.NewBlobSidecars(t, 6) + + from, to := blobtest.StartSlot+1, blobtest.StartSlot+4 + + actualFrom, actualTo, err := svc.rearchiveRange(from, to) + // Should index the whole range + require.NoError(t, err) + require.Equal(t, from, actualFrom) + require.Equal(t, to, actualTo) + + // Should have written all the blobs + fs.CheckExistsOrFail(t, blobtest.One) + fs.CheckExistsOrFail(t, blobtest.Two) + fs.CheckExistsOrFail(t, blobtest.Three) + fs.CheckExistsOrFail(t, blobtest.Four) + + // Should have overwritten any existing blobs + require.Equal(t, fs.ReadOrFail(t, blobtest.Three).BlobSidecars.Data, beacon.Blobs[blobtest.Three.String()]) +} diff --git a/archiver/service/service.go b/archiver/service/service.go new file mode 100644 index 0000000..6322a7f --- /dev/null +++ b/archiver/service/service.go @@ -0,0 +1,89 @@ +package service + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + + client "github.com/attestantio/go-eth2-client" + "github.com/base-org/blob-archiver/archiver/flags" + "github.com/base-org/blob-archiver/archiver/metrics" + "github.com/ethereum-optimism/optimism/op-service/httputil" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum/go-ethereum/log" +) + +var ErrAlreadyStopped = errors.New("already stopped") + +type BeaconClient interface { + client.BlobSidecarsProvider + client.BeaconBlockHeadersProvider +} + +func NewService(l log.Logger, cfg flags.ArchiverConfig, api *API, archiver *Archiver, m metrics.Metricer) (*ArchiverService, error) { + return &ArchiverService{ + log: l, + cfg: cfg, + archiver: archiver, + metrics: m, + api: api, + }, nil +} + +type ArchiverService struct { + stopped atomic.Bool + log log.Logger + metricsServer *httputil.HTTPServer + cfg flags.ArchiverConfig + metrics metrics.Metricer + api *API + archiver *Archiver +} + +// Start starts the archiver service. It begins polling the beacon node for the latest blocks and persisting blobs for +// them. Concurrently it'll also begin a backfill process (see backfillBlobs) to store all blobs from the current head +// to the previously stored blocks. This ensures that during restarts or outages of an archiver, any gaps will be +// filled in. +func (a *ArchiverService) Start(ctx context.Context) error { + if a.cfg.MetricsConfig.Enabled { + a.log.Info("starting metrics server", "addr", a.cfg.MetricsConfig.ListenAddr, "port", a.cfg.MetricsConfig.ListenPort) + srv, err := opmetrics.StartServer(a.metrics.Registry(), a.cfg.MetricsConfig.ListenAddr, a.cfg.MetricsConfig.ListenPort) + if err != nil { + return err + } + + a.log.Info("started metrics server", "addr", srv.Addr()) + a.metricsServer = srv + } + + srv, err := httputil.StartHTTPServer(a.cfg.ListenAddr, a.api.router) + if err != nil { + return fmt.Errorf("failed to start Archiver API server: %w", err) + } + + a.log.Info("Archiver API server started", "address", srv.Addr().String()) + + return a.archiver.Start(ctx) +} + +// Stops the archiver service. +func (a *ArchiverService) Stop(ctx context.Context) error { + if a.stopped.Load() { + return ErrAlreadyStopped + } + a.log.Info("Stopping Archiver") + a.stopped.Store(true) + + if a.metricsServer != nil { + if err := a.metricsServer.Stop(ctx); err != nil { + return err + } + } + + return a.archiver.Stop(ctx) +} + +func (a *ArchiverService) Stopped() bool { + return a.stopped.Load() +} diff --git a/common/beacon/beacontest/stub.go b/common/beacon/beacontest/stub.go index 7e48440..9e1211a 100644 --- a/common/beacon/beacontest/stub.go +++ b/common/beacon/beacontest/stub.go @@ -3,6 +3,7 @@ package beacontest import ( "context" "fmt" + "strconv" "testing" "github.com/attestantio/go-eth2-client/api" @@ -61,16 +62,29 @@ func NewDefaultStubBeaconClient(t *testing.T) *StubBeaconClient { headBlobs := blobtest.NewBlobSidecars(t, 6) finalizedBlobs := blobtest.NewBlobSidecars(t, 4) + startSlot := blobtest.StartSlot + return &StubBeaconClient{ Headers: map[string]*v1.BeaconBlockHeader{ - blobtest.OriginBlock.String(): makeHeader(10, blobtest.OriginBlock, common.Hash{9, 9, 9}), - blobtest.One.String(): makeHeader(11, blobtest.One, blobtest.OriginBlock), - blobtest.Two.String(): makeHeader(12, blobtest.Two, blobtest.One), - blobtest.Three.String(): makeHeader(13, blobtest.Three, blobtest.Two), - blobtest.Four.String(): makeHeader(14, blobtest.Four, blobtest.Three), - blobtest.Five.String(): makeHeader(15, blobtest.Five, blobtest.Four), - "head": makeHeader(15, blobtest.Five, blobtest.Four), - "finalized": makeHeader(13, blobtest.Three, blobtest.Two), + // Lookup by hash + blobtest.OriginBlock.String(): makeHeader(startSlot, blobtest.OriginBlock, common.Hash{9, 9, 9}), + blobtest.One.String(): makeHeader(startSlot+1, blobtest.One, blobtest.OriginBlock), + blobtest.Two.String(): makeHeader(startSlot+2, blobtest.Two, blobtest.One), + blobtest.Three.String(): makeHeader(startSlot+3, blobtest.Three, blobtest.Two), + blobtest.Four.String(): makeHeader(startSlot+4, blobtest.Four, blobtest.Three), + blobtest.Five.String(): makeHeader(startSlot+5, blobtest.Five, blobtest.Four), + + // Lookup by identifier + "head": makeHeader(startSlot+5, blobtest.Five, blobtest.Four), + "finalized": makeHeader(startSlot+3, blobtest.Three, blobtest.Two), + + // Lookup by slot + strconv.FormatUint(startSlot, 10): makeHeader(startSlot, blobtest.OriginBlock, common.Hash{9, 9, 9}), + strconv.FormatUint(startSlot+1, 10): makeHeader(startSlot+1, blobtest.One, blobtest.OriginBlock), + strconv.FormatUint(startSlot+2, 10): makeHeader(startSlot+2, blobtest.Two, blobtest.One), + strconv.FormatUint(startSlot+3, 10): makeHeader(startSlot+3, blobtest.Three, blobtest.Two), + strconv.FormatUint(startSlot+4, 10): makeHeader(startSlot+4, blobtest.Four, blobtest.Three), + strconv.FormatUint(startSlot+5, 10): makeHeader(startSlot+5, blobtest.Five, blobtest.Four), }, Blobs: map[string][]*deneb.BlobSidecar{ blobtest.OriginBlock.String(): blobtest.NewBlobSidecars(t, 1), diff --git a/common/blobtest/helpers.go b/common/blobtest/helpers.go index fcb7e24..a37d354 100644 --- a/common/blobtest/helpers.go +++ b/common/blobtest/helpers.go @@ -17,6 +17,8 @@ var ( Three = common.Hash{3} Four = common.Hash{4} Five = common.Hash{5} + + StartSlot = uint64(10) ) func RandBytes(t *testing.T, size uint) []byte { From 734ec1428a62f084eb51c2154735b1dcdbb40ae6 Mon Sep 17 00:00:00 2001 From: Danyal Prout Date: Sun, 18 Feb 2024 18:26:00 -0600 Subject: [PATCH 2/4] Handle skipped slots --- archiver/service/archiver.go | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/archiver/service/archiver.go b/archiver/service/archiver.go index 0e5a49e..505111e 100644 --- a/archiver/service/archiver.go +++ b/archiver/service/archiver.go @@ -2,6 +2,7 @@ package service import ( "context" + "errors" "strconv" "time" @@ -47,7 +48,7 @@ type Archiver struct { // to the previously stored blocks. This ensures that during restarts or outages of an archiver, any gaps will be // filled in. func (a *Archiver) Start(ctx context.Context) error { - currentBlob, _, err := retry.Do2(ctx, startupFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { + _, _, err := retry.Do2(ctx, startupFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { return a.persistBlobsForBlockToS3(ctx, "head", false) }) @@ -56,7 +57,7 @@ func (a *Archiver) Start(ctx context.Context) error { return err } - go a.backfillBlobs(ctx, currentBlob) + //go a.backfillBlobs(ctx, currentBlob) return a.trackLatestBlocks(ctx) } @@ -211,16 +212,34 @@ func (a *Archiver) rearchiveRange(from uint64, to uint64) (uint64, uint64, error for i := from; i <= to; i++ { id := strconv.FormatUint(i, 10) - a.log.Debug("rearchiving block", "blockId", id) + l := a.log.New("slot", id) - _, _, err := retry.Do2(context.Background(), rearchiveMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { - return a.persistBlobsForBlockToS3(context.Background(), id, true) + l.Info("rearchiving block") + + reindexed, err := retry.Do(context.Background(), rearchiveMaximumRetries, retry.Exponential(), func() (bool, error) { + _, _, e := a.persistBlobsForBlockToS3(context.Background(), id, true) + + // If the block is not found, we can assume that the slot has been skipped + if e != nil { + var apiErr *api.Error + if errors.As(e, &apiErr) && apiErr.StatusCode == 404 { + return false, nil + } + + return false, e + } + + return true, nil }) if err != nil { return from, i, err } + if !reindexed { + l.Info("block not found during reindexing", "slot", id) + } + a.metrics.RecordProcessedBlock(metrics.BlockSourceRearchive) } From 044d543d0fe896d144df2f29f1e644789fdd60b0 Mon Sep 17 00:00:00 2001 From: Danyal Prout Date: Sun, 18 Feb 2024 19:22:07 -0600 Subject: [PATCH 3/4] reindex => rearchive --- archiver/service/api.go | 22 ++++++++++++---------- archiver/service/api_test.go | 16 ++++++++-------- archiver/service/archiver.go | 12 ++++++------ archiver/service/service.go | 5 +---- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/archiver/service/api.go b/archiver/service/api.go index 663489f..71b5918 100644 --- a/archiver/service/api.go +++ b/archiver/service/api.go @@ -46,12 +46,12 @@ func NewAPI(metrics m.Metricer, logger log.Logger, archiver *Archiver) *API { }) r.Get("/", http.NotFound) - r.Post("/reindex", result.reindexBlocks) + r.Post("/rearchive", result.rearchiveBlocks) return result } -type reindexResponse struct { +type rearchiveResponse struct { Error string `json:"error,omitempty"` BlockStart uint64 `json:"blockStart"` BlockEnd uint64 `json:"blockEnd"` @@ -68,11 +68,13 @@ func toSlot(input string) (uint64, error) { return res, nil } -func (a *API) reindexBlocks(w http.ResponseWriter, r *http.Request) { +// rearchiveBlocks rearchives blobs from blocks between the given from and to slots. +// If any blocks are already archived, they will be overwritten with data from the beacon node. +func (a *API) rearchiveBlocks(w http.ResponseWriter, r *http.Request) { from, err := toSlot(r.URL.Query().Get("from")) if err != nil { w.WriteHeader(http.StatusBadRequest) - _ = json.NewEncoder(w).Encode(reindexResponse{ + _ = json.NewEncoder(w).Encode(rearchiveResponse{ Error: fmt.Sprintf("invalid from param: %v", err), }) return @@ -81,7 +83,7 @@ func (a *API) reindexBlocks(w http.ResponseWriter, r *http.Request) { to, err := toSlot(r.URL.Query().Get("to")) if err != nil { w.WriteHeader(http.StatusBadRequest) - _ = json.NewEncoder(w).Encode(reindexResponse{ + _ = json.NewEncoder(w).Encode(rearchiveResponse{ Error: fmt.Sprintf("invalid to param: %v", err), }) return @@ -89,7 +91,7 @@ func (a *API) reindexBlocks(w http.ResponseWriter, r *http.Request) { if from > to { w.WriteHeader(http.StatusBadRequest) - _ = json.NewEncoder(w).Encode(reindexResponse{ + _ = json.NewEncoder(w).Encode(rearchiveResponse{ Error: fmt.Sprintf("invalid range: from %d to %d", from, to), }) return @@ -97,19 +99,19 @@ func (a *API) reindexBlocks(w http.ResponseWriter, r *http.Request) { blockStart, blockEnd, err := a.archiver.rearchiveRange(from, to) if err != nil { - a.logger.Error("Failed to reindex blocks", "err", err) + a.logger.Error("Failed to rearchive blocks", "err", err) w.WriteHeader(http.StatusInternalServerError) - err = json.NewEncoder(w).Encode(reindexResponse{ + err = json.NewEncoder(w).Encode(rearchiveResponse{ Error: err.Error(), BlockStart: blockStart, BlockEnd: blockEnd, }) } else { - a.logger.Info("Reindexing blocks complete") + a.logger.Info("Rearchiving blocks complete") w.WriteHeader(http.StatusOK) - err = json.NewEncoder(w).Encode(reindexResponse{ + err = json.NewEncoder(w).Encode(rearchiveResponse{ BlockStart: blockStart, BlockEnd: blockEnd, }) diff --git a/archiver/service/api_test.go b/archiver/service/api_test.go index faed0bb..f26f919 100644 --- a/archiver/service/api_test.go +++ b/archiver/service/api_test.go @@ -36,7 +36,7 @@ func TestHealthHandler(t *testing.T) { require.Equal(t, 200, response.Code) } -func TestReindexHandler(t *testing.T) { +func TestRearchiveHandler(t *testing.T) { a, _ := setupAPI(t) tests := []struct { @@ -47,37 +47,37 @@ func TestReindexHandler(t *testing.T) { }{ { name: "should fail with no params", - path: "/reindex", + path: "/rearchive", expectedStatus: 400, error: "invalid from param: must provide param", }, { name: "should fail with missing to param", - path: "/reindex?from=1", + path: "/rearchive?from=1", expectedStatus: 400, error: "invalid to param: must provide param", }, { name: "should fail with missing from param", - path: "/reindex?to=1", + path: "/rearchive?to=1", expectedStatus: 400, error: "invalid from param: must provide param", }, { name: "should fail with invalid from param", - path: "/reindex?from=blah&to=1", + path: "/rearchive?from=blah&to=1", expectedStatus: 400, error: "invalid from param: invalid slot: \"blah\"", }, { name: "should fail with invalid to param", - path: "/reindex?from=1&to=blah", + path: "/rearchive?from=1&to=blah", expectedStatus: 400, error: "invalid to param: invalid slot: \"blah\"", }, { name: "should fail with to greater than equal to from", - path: "/reindex?from=2&to=1", + path: "/rearchive?from=2&to=1", expectedStatus: 400, error: "invalid range: from 2 to 1", }, @@ -93,7 +93,7 @@ func TestReindexHandler(t *testing.T) { require.Equal(t, test.expectedStatus, response.Code) - var errResponse reindexResponse + var errResponse rearchiveResponse err := json.NewDecoder(response.Body).Decode(&errResponse) require.NoError(t, err) diff --git a/archiver/service/archiver.go b/archiver/service/archiver.go index 505111e..db3c490 100644 --- a/archiver/service/archiver.go +++ b/archiver/service/archiver.go @@ -43,12 +43,12 @@ type Archiver struct { stopCh chan struct{} } -// Start starts the archiver service. It begins polling the beacon node for the latest blocks and persisting blobs for +// Start starts archiving blobs. It begins polling the beacon node for the latest blocks and persisting blobs for // them. Concurrently it'll also begin a backfill process (see backfillBlobs) to store all blobs from the current head // to the previously stored blocks. This ensures that during restarts or outages of an archiver, any gaps will be // filled in. func (a *Archiver) Start(ctx context.Context) error { - _, _, err := retry.Do2(ctx, startupFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { + currentBlock, _, err := retry.Do2(ctx, startupFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { return a.persistBlobsForBlockToS3(ctx, "head", false) }) @@ -57,7 +57,7 @@ func (a *Archiver) Start(ctx context.Context) error { return err } - //go a.backfillBlobs(ctx, currentBlob) + go a.backfillBlobs(ctx, currentBlock) return a.trackLatestBlocks(ctx) } @@ -216,7 +216,7 @@ func (a *Archiver) rearchiveRange(from uint64, to uint64) (uint64, uint64, error l.Info("rearchiving block") - reindexed, err := retry.Do(context.Background(), rearchiveMaximumRetries, retry.Exponential(), func() (bool, error) { + rewritten, err := retry.Do(context.Background(), rearchiveMaximumRetries, retry.Exponential(), func() (bool, error) { _, _, e := a.persistBlobsForBlockToS3(context.Background(), id, true) // If the block is not found, we can assume that the slot has been skipped @@ -236,8 +236,8 @@ func (a *Archiver) rearchiveRange(from uint64, to uint64) (uint64, uint64, error return from, i, err } - if !reindexed { - l.Info("block not found during reindexing", "slot", id) + if !rewritten { + l.Info("block not found during reachiving", "slot", id) } a.metrics.RecordProcessedBlock(metrics.BlockSourceRearchive) diff --git a/archiver/service/service.go b/archiver/service/service.go index 6322a7f..33c1bca 100644 --- a/archiver/service/service.go +++ b/archiver/service/service.go @@ -41,10 +41,7 @@ type ArchiverService struct { archiver *Archiver } -// Start starts the archiver service. It begins polling the beacon node for the latest blocks and persisting blobs for -// them. Concurrently it'll also begin a backfill process (see backfillBlobs) to store all blobs from the current head -// to the previously stored blocks. This ensures that during restarts or outages of an archiver, any gaps will be -// filled in. +// Start starts the archiver service. It'll start the API's as well as the archiving process. func (a *ArchiverService) Start(ctx context.Context) error { if a.cfg.MetricsConfig.Enabled { a.log.Info("starting metrics server", "addr", a.cfg.MetricsConfig.ListenAddr, "port", a.cfg.MetricsConfig.ListenPort) From d99f0f48a2a1c7991a92e55371b4cf2301ac55f3 Mon Sep 17 00:00:00 2001 From: Danyal Prout Date: Tue, 20 Feb 2024 09:50:11 -0600 Subject: [PATCH 4/4] minor tweaks --- archiver/service/archiver.go | 9 +++++++++ archiver/service/service.go | 6 ------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/archiver/service/archiver.go b/archiver/service/archiver.go index db3c490..fcdbc98 100644 --- a/archiver/service/archiver.go +++ b/archiver/service/archiver.go @@ -6,6 +6,7 @@ import ( "strconv" "time" + client "github.com/attestantio/go-eth2-client" "github.com/attestantio/go-eth2-client/api" v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/base-org/blob-archiver/archiver/flags" @@ -23,6 +24,11 @@ const ( backfillErrorRetryInterval = 5 * time.Second ) +type BeaconClient interface { + client.BlobSidecarsProvider + client.BeaconBlockHeadersProvider +} + func NewArchiver(l log.Logger, cfg flags.ArchiverConfig, dataStoreClient storage.DataStore, client BeaconClient, m metrics.Metricer) (*Archiver, error) { return &Archiver{ log: l, @@ -208,6 +214,9 @@ func (a *Archiver) processBlocksUntilKnownBlock(ctx context.Context) { a.log.Info("live data refreshed", "startHash", start.Root.String(), "endHash", currentBlockId) } +// rearchiveRange will rearchive all blocks in the range from the given start to end. It returns the start and end of the +// range that was successfully rearchived. On any persistent errors, it will halt archiving and return the range of blocks +// that were rearchived and the error that halted the process. func (a *Archiver) rearchiveRange(from uint64, to uint64) (uint64, uint64, error) { for i := from; i <= to; i++ { id := strconv.FormatUint(i, 10) diff --git a/archiver/service/service.go b/archiver/service/service.go index 33c1bca..b4f265d 100644 --- a/archiver/service/service.go +++ b/archiver/service/service.go @@ -6,7 +6,6 @@ import ( "fmt" "sync/atomic" - client "github.com/attestantio/go-eth2-client" "github.com/base-org/blob-archiver/archiver/flags" "github.com/base-org/blob-archiver/archiver/metrics" "github.com/ethereum-optimism/optimism/op-service/httputil" @@ -16,11 +15,6 @@ import ( var ErrAlreadyStopped = errors.New("already stopped") -type BeaconClient interface { - client.BlobSidecarsProvider - client.BeaconBlockHeadersProvider -} - func NewService(l log.Logger, cfg flags.ArchiverConfig, api *API, archiver *Archiver, m metrics.Metricer) (*ArchiverService, error) { return &ArchiverService{ log: l,