From 35b8b0f2210a83646770500636731d3c8b1a5393 Mon Sep 17 00:00:00 2001 From: Ashlie Martinez Date: Tue, 3 Sep 2024 16:35:23 -0700 Subject: [PATCH 1/2] Allow persisting multiple content pieces When compacting manifest data, allow persisting multiple content pieces. This should be alright because flushes on the index are already disabled, which means all changes should appear atomically. Uses a max number of manifests per content piece as that's easily available and can be used as a proxy for total content size to some extent. --- repo/manifest/committed_manifest_manager.go | 79 +++++++++++++++++---- 1 file changed, 64 insertions(+), 15 deletions(-) diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index f6782058edf..9c0b4ca1be6 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -18,6 +18,8 @@ import ( "github.com/kopia/kopia/repo/content/index" ) +const maxManifestsPerContent = 1000000 + // committedManifestManager manages committed manifest entries stored in 'm' contents. type committedManifestManager struct { b contentManager @@ -87,7 +89,25 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma m.lock() defer m.unlock() - return m.writeEntriesLocked(ctx, entries) + return m.writeEntriesLocked(ctx, entries, false) +} + +func (m *committedManifestManager) writeContentChunk( + ctx context.Context, + data any, + buf *gather.WriteBuffer, +) (content.ID, error) { + gz := gzip.NewWriter(buf) + mustSucceed(json.NewEncoder(gz).Encode(data)) + mustSucceed(gz.Flush()) + mustSucceed(gz.Close()) + + contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression) + if err != nil { + return content.EmptyID, errors.Wrap(err, "unable to write content") + } + + return contentID, nil } // writeEntriesLocked writes entries in the provided map as manifest contents @@ -98,38 +118,67 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma // the lock via commitEntries()) and to compact existing committed entries during compaction // where the lock is already being held. // +checklocks:m.cmmu -func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) { +func (m *committedManifestManager) writeEntriesLocked( + ctx context.Context, + entries map[ID]*manifestEntry, + isCompaction bool, +) (map[content.ID]bool, error) { if len(entries) == 0 { return nil, nil } - man := manifest{} + var ( + buf gather.WriteBuffer + man manifest + newlyCommitted = map[content.ID]bool{} + ) + + defer buf.Close() for _, e := range entries { man.Entries = append(man.Entries, e) + + // Still write all entries out in a single content piece if we're not + // compacting things. + if !isCompaction || len(man.Entries) < maxManifestsPerContent { + continue + } + + contentID, err := m.writeContentChunk(ctx, man, &buf) + if err != nil { + return nil, errors.Wrap(err, "writing manifest data") + } + + man.Entries = man.Entries[:0] + newlyCommitted[contentID] = true + + buf.Reset() } - var buf gather.WriteBuffer - defer buf.Close() + // Write out any remaining manifest entries. + if len(man.Entries) > 0 { + contentID, err := m.writeContentChunk(ctx, man, &buf) + if err != nil { + return nil, errors.Wrap(err, "writing final manifest data") + } - gz := gzip.NewWriter(&buf) - mustSucceed(json.NewEncoder(gz).Encode(man)) - mustSucceed(gz.Flush()) - mustSucceed(gz.Close()) + man.Entries = man.Entries[:0] + newlyCommitted[contentID] = true - contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression) - if err != nil { - return nil, errors.Wrap(err, "unable to write content") + buf.Reset() } + // Only update internal data if we successfully wrote all manifest contents. for _, e := range entries { m.committedEntries[e.ID] = e delete(entries, e.ID) } - m.committedContentIDs[contentID] = true + for contentID := range newlyCommitted { + m.committedContentIDs[contentID] = true + } - return map[content.ID]bool{contentID: true}, nil + return newlyCommitted, nil } // +checklocks:m.cmmu @@ -260,7 +309,7 @@ func (m *committedManifestManager) compactLocked(ctx context.Context) error { tmp[k] = v } - written, err := m.writeEntriesLocked(ctx, tmp) + written, err := m.writeEntriesLocked(ctx, tmp, true) if err != nil { return err } From 3f0966d23f37bdb4340e41d4f68e433e1aea3e25 Mon Sep 17 00:00:00 2001 From: Ashlie Martinez Date: Tue, 3 Sep 2024 16:36:58 -0700 Subject: [PATCH 2/2] Add basic tests Basic tests that check lookups and number of content pieces after various operations. --- repo/manifest/manifest_manager_test.go | 68 ++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 2f02c2285e4..73f84c60a00 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -495,3 +495,71 @@ func getManifestContentCount(ctx context.Context, t *testing.T, mgr *Manager) in return foundContents } + +func TestWriteManyManifests(t *testing.T) { + ctx := testlogging.Context(t) + data := blobtesting.DataMap{} + item1 := map[string]int{"foo": 1, "bar": 2} + labels1 := map[string]string{"type": "item", "color": "red"} + numManifests := maxManifestsPerContent + 5 + + mgr := newManagerForTesting(ctx, t, data, ManagerOptions{}) + + for i := 0; i < numManifests; i++ { + addAndVerify(ctx, t, mgr, labels1, item1) + } + + require.NoError(t, mgr.Flush(ctx)) + require.NoError(t, mgr.b.Flush(ctx)) + + // Should only have a single content piece since this wasn't compaction. + foundContents := getManifestContentCount(ctx, t, mgr) + assert.Equal(t, 1, foundContents) + + mans, err := mgr.Find(ctx, map[string]string{"color": "red"}) + assert.NoError(t, err) + assert.Len(t, mans, numManifests) +} + +func TestCompactManyManifests(t *testing.T) { + ctx := testlogging.Context(t) + data := blobtesting.DataMap{} + item1 := map[string]int{"foo": 1, "bar": 2} + labels1 := map[string]string{"type": "item", "color": "red"} + + mgr := newManagerForTesting(ctx, t, data, ManagerOptions{}) + + for i := 0; i < maxManifestsPerContent-1; i++ { + addAndVerify(ctx, t, mgr, labels1, item1) + } + + require.NoError(t, mgr.Flush(ctx)) + require.NoError(t, mgr.b.Flush(ctx)) + + // Should only have a single content piece since this wasn't compaction. + foundContents := getManifestContentCount(ctx, t, mgr) + assert.Equal(t, 1, foundContents) + + // Add individually so we can tell that compaction deleted the old content + // pieces. + for i := 0; i < 6; i++ { + addAndVerify(ctx, t, mgr, labels1, item1) + + require.NoError(t, mgr.Flush(ctx)) + require.NoError(t, mgr.b.Flush(ctx)) + } + + foundContents = getManifestContentCount(ctx, t, mgr) + assert.Equal(t, 7, foundContents) + + // Run compaction which should result in multiple content pieces. + err := mgr.Compact(ctx) + require.NoError(t, err, "compacting manifests") + + foundContents = getManifestContentCount(ctx, t, mgr) + assert.Equal(t, 2, foundContents) + + mans, err := mgr.Find(ctx, map[string]string{"color": "red"}) + assert.NoError(t, err) + assert.Len(t, mans, maxManifestsPerContent+5) +}