diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index f6782058edf..9c0b4ca1be6 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -18,6 +18,8 @@ import ( "github.com/kopia/kopia/repo/content/index" ) +const maxManifestsPerContent = 1000000 + // committedManifestManager manages committed manifest entries stored in 'm' contents. type committedManifestManager struct { b contentManager @@ -87,7 +89,25 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma m.lock() defer m.unlock() - return m.writeEntriesLocked(ctx, entries) + return m.writeEntriesLocked(ctx, entries, false) +} + +func (m *committedManifestManager) writeContentChunk( + ctx context.Context, + data any, + buf *gather.WriteBuffer, +) (content.ID, error) { + gz := gzip.NewWriter(buf) + mustSucceed(json.NewEncoder(gz).Encode(data)) + mustSucceed(gz.Flush()) + mustSucceed(gz.Close()) + + contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression) + if err != nil { + return content.EmptyID, errors.Wrap(err, "unable to write content") + } + + return contentID, nil } // writeEntriesLocked writes entries in the provided map as manifest contents @@ -98,38 +118,67 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma // the lock via commitEntries()) and to compact existing committed entries during compaction // where the lock is already being held. // +checklocks:m.cmmu -func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) { +func (m *committedManifestManager) writeEntriesLocked( + ctx context.Context, + entries map[ID]*manifestEntry, + isCompaction bool, +) (map[content.ID]bool, error) { if len(entries) == 0 { return nil, nil } - man := manifest{} + var ( + buf gather.WriteBuffer + man manifest + newlyCommitted = map[content.ID]bool{} + ) + + defer buf.Close() for _, e := range entries { man.Entries = append(man.Entries, e) + + // Still write all entries out in a single content piece if we're not + // compacting things. + if !isCompaction || len(man.Entries) < maxManifestsPerContent { + continue + } + + contentID, err := m.writeContentChunk(ctx, man, &buf) + if err != nil { + return nil, errors.Wrap(err, "writing manifest data") + } + + man.Entries = man.Entries[:0] + newlyCommitted[contentID] = true + + buf.Reset() } - var buf gather.WriteBuffer - defer buf.Close() + // Write out any remaining manifest entries. + if len(man.Entries) > 0 { + contentID, err := m.writeContentChunk(ctx, man, &buf) + if err != nil { + return nil, errors.Wrap(err, "writing final manifest data") + } - gz := gzip.NewWriter(&buf) - mustSucceed(json.NewEncoder(gz).Encode(man)) - mustSucceed(gz.Flush()) - mustSucceed(gz.Close()) + man.Entries = man.Entries[:0] + newlyCommitted[contentID] = true - contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression) - if err != nil { - return nil, errors.Wrap(err, "unable to write content") + buf.Reset() } + // Only update internal data if we successfully wrote all manifest contents. for _, e := range entries { m.committedEntries[e.ID] = e delete(entries, e.ID) } - m.committedContentIDs[contentID] = true + for contentID := range newlyCommitted { + m.committedContentIDs[contentID] = true + } - return map[content.ID]bool{contentID: true}, nil + return newlyCommitted, nil } // +checklocks:m.cmmu @@ -260,7 +309,7 @@ func (m *committedManifestManager) compactLocked(ctx context.Context) error { tmp[k] = v } - written, err := m.writeEntriesLocked(ctx, tmp) + written, err := m.writeEntriesLocked(ctx, tmp, true) if err != nil { return err } diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 2f02c2285e4..73f84c60a00 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -495,3 +495,71 @@ func getManifestContentCount(ctx context.Context, t *testing.T, mgr *Manager) in return foundContents } + +func TestWriteManyManifests(t *testing.T) { + ctx := testlogging.Context(t) + data := blobtesting.DataMap{} + item1 := map[string]int{"foo": 1, "bar": 2} + labels1 := map[string]string{"type": "item", "color": "red"} + numManifests := maxManifestsPerContent + 5 + + mgr := newManagerForTesting(ctx, t, data, ManagerOptions{}) + + for i := 0; i < numManifests; i++ { + addAndVerify(ctx, t, mgr, labels1, item1) + } + + require.NoError(t, mgr.Flush(ctx)) + require.NoError(t, mgr.b.Flush(ctx)) + + // Should only have a single content piece since this wasn't compaction. + foundContents := getManifestContentCount(ctx, t, mgr) + assert.Equal(t, 1, foundContents) + + mans, err := mgr.Find(ctx, map[string]string{"color": "red"}) + assert.NoError(t, err) + assert.Len(t, mans, numManifests) +} + +func TestCompactManyManifests(t *testing.T) { + ctx := testlogging.Context(t) + data := blobtesting.DataMap{} + item1 := map[string]int{"foo": 1, "bar": 2} + labels1 := map[string]string{"type": "item", "color": "red"} + + mgr := newManagerForTesting(ctx, t, data, ManagerOptions{}) + + for i := 0; i < maxManifestsPerContent-1; i++ { + addAndVerify(ctx, t, mgr, labels1, item1) + } + + require.NoError(t, mgr.Flush(ctx)) + require.NoError(t, mgr.b.Flush(ctx)) + + // Should only have a single content piece since this wasn't compaction. + foundContents := getManifestContentCount(ctx, t, mgr) + assert.Equal(t, 1, foundContents) + + // Add individually so we can tell that compaction deleted the old content + // pieces. + for i := 0; i < 6; i++ { + addAndVerify(ctx, t, mgr, labels1, item1) + + require.NoError(t, mgr.Flush(ctx)) + require.NoError(t, mgr.b.Flush(ctx)) + } + + foundContents = getManifestContentCount(ctx, t, mgr) + assert.Equal(t, 7, foundContents) + + // Run compaction which should result in multiple content pieces. + err := mgr.Compact(ctx) + require.NoError(t, err, "compacting manifests") + + foundContents = getManifestContentCount(ctx, t, mgr) + assert.Equal(t, 2, foundContents) + + mans, err := mgr.Find(ctx, map[string]string{"color": "red"}) + assert.NoError(t, err) + assert.Len(t, mans, maxManifestsPerContent+5) +}