Skip to content

Commit

Permalink
Make update object metadata API atomic (#8264)
Browse files Browse the repository at this point in the history
* Make update object metadata API atomic

Protect updating object metadata on an existing entry to be safe against
concurrent modifications, deletions, and commits and merges to the branch.
This is a bit tricky because it needs to work for both committed and staged
objects.

Fixes #8262 - a race that would give an odd result if update object metadata
managed to lose against concurrent delete and uncommitted GC, or potentially
merges into the branch.

* make gen

* Add missing method to FakeGraveler

Allow tests to pass.

(Also rebase on trunk latest)

* [CR] Simplify "use committed value" logic in code

* Rename UpdateObjectMetadata -> Update; fix some errors

* [CR] s/it it/it/ in comment
  • Loading branch information
arielshaqed authored Oct 13, 2024
1 parent 74ec82d commit aaf2190
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 47 deletions.
12 changes: 2 additions & 10 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4701,16 +4701,8 @@ func (c *Controller) UpdateObjectUserMetadata(w http.ResponseWriter, r *http.Req
ctx := r.Context()
c.LogAction(ctx, "update_object_user_metadata", r, repository, branch, "")

// read all the _other_ metadata. Does not require checking read
// permissions, as the caller will never see this.
entry, err := c.Catalog.GetEntry(ctx, repository, branch, params.Path, catalog.GetEntryParams{})
if c.handleAPIError(ctx, w, r, err) {
return
}

entry.Metadata = catalog.Metadata(body.Set.AdditionalProperties)

err = c.Catalog.CreateEntry(ctx, repository, branch, *entry)
newUserMetadata := body.Set.AdditionalProperties
err := c.Catalog.UpdateEntryUserMetadata(ctx, repository, branch, params.Path, newUserMetadata)
if c.handleAPIError(ctx, w, r, err) {
return
}
Expand Down
96 changes: 61 additions & 35 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2299,47 +2299,73 @@ func TestController_UpdateObjectUserMetadataHander(t *testing.T) {
t.Fatal(err)
}

t.Run("update metadata", func(t *testing.T) {
const objPath = "foo/bar"
entry := catalog.DBEntry{
Path: objPath,
PhysicalAddress: "this_is_bars_address",
CreationDate: time.Now(),
Size: 666,
Checksum: "this_is_a_checksum",
}
testutil.Must(t, deps.catalog.CreateEntry(ctx, repo, "main", entry))
bools := []bool{false, true}

userMetadataMap := map[string]string{
"foo": "bar",
"baz": "quux",
for _, doCommit := range bools {
commitLabel := "no commit"
if doCommit {
commitLabel = "commit"
}
for _, doSetMetadata := range bools {
metadataLabel := "no metadata"
if doSetMetadata {
metadataLabel = "initial metadata"
}

body := apigen.UpdateObjectUserMetadataJSONRequestBody{
Set: apigen.ObjectUserMetadata{
AdditionalProperties: userMetadataMap,
},
}
label := fmt.Sprintf("%s, %s", commitLabel, metadataLabel)
t.Run(label, func(t *testing.T) {
const objPath = "foo/bar"
entry := catalog.DBEntry{
Path: objPath,
PhysicalAddress: "this_is_bars_address",
CreationDate: time.Now(),
Size: 666,
Checksum: "this_is_a_checksum",
}
if doSetMetadata {
entry.Metadata = catalog.Metadata{
"old": "metadata",
}
}
testutil.Must(t, deps.catalog.CreateEntry(ctx, repo, "main", entry))

resp, err := clt.UpdateObjectUserMetadataWithResponse(ctx, repo, "main",
&apigen.UpdateObjectUserMetadataParams{Path: objPath},
body,
)
verifyResponseOK(t, resp, err)
if doCommit {
_, err := deps.catalog.Commit(ctx, repo, "main", "First commit!", t.Name(), nil, nil, nil, false)
testutil.MustDo(t, "Commit", err)
}

// Verify that it was set
statResp, err := clt.StatObjectWithResponse(ctx, repo, "main",
&apigen.StatObjectParams{
Path: objPath,
UserMetadata: swag.Bool(true),
},
)
verifyResponseOK(t, statResp, err)
objectStats := statResp.JSON200
if diffs := deep.Equal(objectStats.Metadata.AdditionalProperties, userMetadataMap); diffs != nil {
t.Errorf("did not get expected metadata, diffs %s", diffs)
userMetadataMap := map[string]string{
"foo": "bar",
"baz": "quux",
}

body := apigen.UpdateObjectUserMetadataJSONRequestBody{
Set: apigen.ObjectUserMetadata{
AdditionalProperties: userMetadataMap,
},
}

resp, err := clt.UpdateObjectUserMetadataWithResponse(ctx, repo, "main",
&apigen.UpdateObjectUserMetadataParams{Path: objPath},
body,
)
verifyResponseOK(t, resp, err)

// Verify that it was set
statResp, err := clt.StatObjectWithResponse(ctx, repo, "main",
&apigen.StatObjectParams{
Path: objPath,
UserMetadata: swag.Bool(true),
},
)
verifyResponseOK(t, statResp, err)
objectStats := statResp.JSON200
if diffs := deep.Equal(objectStats.Metadata.AdditionalProperties, userMetadataMap); diffs != nil {
t.Errorf("did not get expected metadata, diffs %s", diffs)
}
})
}
})
}

t.Run("update metadata not found", func(t *testing.T) {
const objPath = "foo/not/found/bar"
Expand Down
33 changes: 33 additions & 0 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,39 @@ func (c *Catalog) GetEntry(ctx context.Context, repositoryID string, reference s
return &catalogEntry, nil
}

// UpdateEntryUserMetadata updates user metadata for the current entry for a
// path in repository branch reference.
func (c *Catalog) UpdateEntryUserMetadata(ctx context.Context, repositoryID, branch, path string, newUserMetadata map[string]string) error {
branchID := graveler.BranchID(branch)
if err := validator.Validate([]validator.ValidateArg{
{Name: "repository", Value: repositoryID, Fn: graveler.ValidateRepositoryID},
{Name: "branch", Value: branchID, Fn: graveler.ValidateBranchID},
{Name: "path", Value: Path(path), Fn: ValidatePath},
}); err != nil {
return err
}

repository, err := c.getRepository(ctx, repositoryID)
if err != nil {
return nil
}

key := graveler.Key(path)
updater := graveler.ValueUpdateFunc(func(value *graveler.Value) (*graveler.Value, error) {
if value == nil {
return nil, fmt.Errorf("update user metadata on %s/%s/%s: %w",
repositoryID, branchID, path, graveler.ErrNotFound)
}
entry, err := ValueToEntry(value)
if err != nil {
return nil, err
}
entry.Metadata = newUserMetadata
return EntryToValue(entry)
})
return c.Store.Update(ctx, repository, branchID, key, updater)
}

func newEntryFromCatalogEntry(entry DBEntry) *Entry {
ent := &Entry{
Address: entry.PhysicalAddress,
Expand Down
13 changes: 13 additions & 0 deletions pkg/catalog/fake_graveler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@ func (g *FakeGraveler) Set(_ context.Context, repository *graveler.RepositoryRec
return nil
}

func (g *FakeGraveler) Update(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, key graveler.Key, update graveler.ValueUpdateFunc, opts ...graveler.SetOptionsFunc) error {
if g.Err != nil {
return g.Err
}
k := fakeGravelerBuildKey(repository.RepositoryID, graveler.Ref(branchID.String()), key)
value, err := update(g.KeyValue[k])
if err != nil {
return err
}
g.KeyValue[k] = value
return nil
}

func (g *FakeGraveler) Delete(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, key graveler.Key, _ ...graveler.SetOptionsFunc) error {
return nil
}
Expand Down
48 changes: 46 additions & 2 deletions pkg/graveler/graveler.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,10 @@ type KeyValueStore interface {
// Set stores value on repository / branch by key. nil value is a valid value for tombstone
Set(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, value Value, opts ...SetOptionsFunc) error

// Update atomically runs update on repository / branch by key. (Of course, if entry
// is only on committed, the updated entry will still be created (atomically) on staging.)
Update(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error

// Delete value from repository / branch by key
Delete(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, opts ...SetOptionsFunc) error

Expand Down Expand Up @@ -1793,8 +1797,7 @@ func (g *Graveler) Set(ctx context.Context, repository *RepositoryRecord, branch
}

// safeBranchWrite repeatedly attempts to perform stagingOperation, retrying
// if the staging token changes during the write. It never backs off. It
// returns the number of times it tried -- between 1 and options.MaxTries.
// if the staging token changes during the write. It never backs off.
func (g *Graveler) safeBranchWrite(ctx context.Context, log logging.Logger, repository *RepositoryRecord, branchID BranchID,
options safeBranchWriteOptions, stagingOperation func(branch *Branch) error, operation string,
) error {
Expand Down Expand Up @@ -1841,6 +1844,47 @@ func (g *Graveler) safeBranchWrite(ctx context.Context, log logging.Logger, repo
return nil
}

func (g *Graveler) Update(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error {
isProtected, err := g.protectedBranchesManager.IsBlocked(ctx, repository, branchID, BranchProtectionBlockedAction_STAGING_WRITE)
if err != nil {
return err
}
if isProtected {
return ErrWriteToProtectedBranch
}

options := NewSetOptions(opts)
if repository.ReadOnly && !options.Force {
return ErrReadOnlyRepository
}

log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "update_user_metadata"})

// committedValue, if non-nil is a value read from either uncommitted or committed. Usually
// it is read from committed. If there is a value on staging, that entry will be modified
// and committedValue will never be read.
var committedValue *Value

err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{MaxTries: options.MaxTries}, func(branch *Branch) error {
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) {
if currentValue == nil {
// Object not on staging: need to update committed value.
if committedValue == nil {
committedValue, err = g.Get(ctx, repository, Ref(branchID), key)
if err != nil {
// (Includes ErrNotFound)
return nil, fmt.Errorf("read from committed: %w", err)
}
}
// Get always returns a non-nil value or an error.
currentValue = committedValue
}
return update(currentValue)
})
}, "update_metadata")
return err
}

func (g *Graveler) Delete(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, opts ...SetOptionsFunc) error {
isProtected, err := g.protectedBranchesManager.IsBlocked(ctx, repository, branchID, BranchProtectionBlockedAction_STAGING_WRITE)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/graveler/mock/graveler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit aaf2190

Please sign in to comment.