Skip to content

Commit

Permalink
Less select for updates (#2278)
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran authored Jul 5, 2024
1 parent d27a24e commit 5eaeb5f
Show file tree
Hide file tree
Showing 25 changed files with 83 additions and 69 deletions.
2 changes: 1 addition & 1 deletion nucliadb/src/migrations/0003_allfields_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
logger.warning(f"kb={kbid} rid={resource_id}: resource not found. Skipping...")
continue

all_fields: Optional[AllFieldIDs] = await resource.get_all_field_ids()
all_fields: Optional[AllFieldIDs] = await resource.get_all_field_ids(for_update=True)
if all_fields is not None:
logger.warning(f"kb={kbid} rid={resource_id}: already has all fields key. Skipping...")
continue
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/migrations/0016_upgrade_to_paragraphs_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def migrate_kb(context: ExecutionContext, kbid: str) -> None:

async def has_old_paragraphs_index(context: ExecutionContext, kbid: str) -> bool:
async with context.kv_driver.transaction(read_only=True) as txn:
shards_object = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
shards_object = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid, for_update=False)
if not shards_object:
raise ShardsObjectNotFound()
for shard in shards_object.shards:
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/migrations/0017_multiple_writable_shards.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def migrate(context: ExecutionContext) -> None: ...

async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
async with context.kv_driver.transaction() as txn:
shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid, for_update=True)
if shards is None:
logger.error("KB without shards", extra={"kbid": kbid})
return
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/migrations/0018_purge_orphan_kbslugs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def migrate(context: ExecutionContext) -> None:
async with context.kv_driver.transaction() as txn:
async for key in txn.keys(KB_SLUGS_BASE, count=-1):
slug = key.replace(KB_SLUGS_BASE, "")
value = await txn.get(key)
value = await txn.get(key, for_update=False)
if value is None:
# KB with slug but without uuid? Seems wrong, let's remove it too
logger.info("Removing /kbslugs with empty value", extra={"maindb_key": key})
Expand Down
4 changes: 2 additions & 2 deletions nucliadb/src/nucliadb/common/cluster/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async def apply_for_all_shards(
async def get_current_active_shard(
self, txn: Transaction, kbid: str
) -> Optional[writer_pb2.ShardObject]:
kb_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
kb_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid, for_update=False)
if kb_shards is None:
return None

Expand All @@ -194,7 +194,7 @@ async def create_shard_by_kbid(
)
raise

kb_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
kb_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid, for_update=True)
if kb_shards is None:
msg = ("Attempting to create a shard for a KB when it has no stored shards in maindb",)
logger.error(msg, extra={"kbid": kbid})
Expand Down
10 changes: 6 additions & 4 deletions nucliadb/src/nucliadb/common/cluster/rollover.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ async def cutover_shards(app_context: ApplicationContext, kbid: str) -> None:
async with datamanagers.with_transaction() as txn:
sm = app_context.shard_manager

previously_active_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
previously_active_shards = await datamanagers.cluster.get_kb_shards(
txn, kbid=kbid, for_update=True
)
rollover_shards = await datamanagers.rollover.get_kb_rollover_shards(txn, kbid=kbid)
if previously_active_shards is None or rollover_shards is None:
raise UnexpectedRolloverError("Shards for kb not found")
Expand All @@ -298,7 +300,7 @@ async def validate_indexed_data(app_context: ApplicationContext, kbid: str) -> l
If a resource was removed during the rollover, it will be removed as well.
"""

async with datamanagers.with_transaction() as txn:
async with datamanagers.with_ro_transaction() as txn:
rolled_over_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
if rolled_over_shards is None:
raise UnexpectedRolloverError(f"No rollover shards found for KB {kbid}")
Expand All @@ -311,7 +313,7 @@ async def validate_indexed_data(app_context: ApplicationContext, kbid: str) -> l

repaired_resources = []
async for resource_id in datamanagers.resources.iterate_resource_ids(kbid=kbid):
async with datamanagers.with_transaction() as txn:
async with datamanagers.with_ro_transaction() as txn:
indexed_data = await datamanagers.rollover.get_indexed_data(
txn, kbid=kbid, resource_id=resource_id
)
Expand Down Expand Up @@ -418,7 +420,7 @@ async def clean_indexed_data(app_context: ApplicationContext, kbid: str) -> None

async def clean_rollover_status(app_context: ApplicationContext, kbid: str) -> None:
async with datamanagers.with_transaction() as txn:
kb_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
kb_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid, for_update=True)
if kb_shards is None:
logger.warning(
"No shards found for KB, skipping clean rollover status",
Expand Down
6 changes: 4 additions & 2 deletions nucliadb/src/nucliadb/common/datamanagers/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
KB_SHARDS = "/kbs/{kbid}/shards"


async def get_kb_shards(txn: Transaction, *, kbid: str) -> Optional[writer_pb2.Shards]:
async def get_kb_shards(
txn: Transaction, *, kbid: str, for_update: bool = False
) -> Optional[writer_pb2.Shards]:
key = KB_SHARDS.format(kbid=kbid)
return await get_kv_pb(txn, key, writer_pb2.Shards)
return await get_kv_pb(txn, key, writer_pb2.Shards, for_update=for_update)


async def update_kb_shards(txn: Transaction, *, kbid: str, shards: writer_pb2.Shards) -> None:
Expand Down
12 changes: 7 additions & 5 deletions nucliadb/src/nucliadb/common/datamanagers/kb.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,22 @@ async def get_kbs(txn: Transaction, *, prefix: str = "") -> AsyncIterator[tuple[


async def exists_kb(txn: Transaction, *, kbid: str) -> bool:
return await get_config(txn, kbid=kbid) is not None
return await get_config(txn, kbid=kbid, for_update=False) is not None


async def get_kb_uuid(txn: Transaction, *, slug: str) -> Optional[str]:
uuid = await txn.get(KB_SLUGS.format(slug=slug))
uuid = await txn.get(KB_SLUGS.format(slug=slug), for_update=False)
if uuid is not None:
return uuid.decode()
else:
return None


async def get_config(txn: Transaction, *, kbid: str) -> Optional[knowledgebox_pb2.KnowledgeBoxConfig]:
async def get_config(
txn: Transaction, *, kbid: str, for_update: bool = False
) -> Optional[knowledgebox_pb2.KnowledgeBoxConfig]:
key = KB_UUID.format(kbid=kbid)
payload = await txn.get(key)
payload = await txn.get(key, for_update=for_update)
if payload is None:
return None
response = knowledgebox_pb2.KnowledgeBoxConfig()
Expand All @@ -71,7 +73,7 @@ async def set_config(txn: Transaction, *, kbid: str, config: knowledgebox_pb2.Kn


async def get_model_metadata(txn: Transaction, *, kbid: str) -> knowledgebox_pb2.SemanticModelMetadata:
shards_obj = await cluster.get_kb_shards(txn, kbid=kbid)
shards_obj = await cluster.get_kb_shards(txn, kbid=kbid, for_update=False)
if shards_obj is None:
raise KnowledgeBoxNotFound(kbid)
if shards_obj.HasField("model"):
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/common/datamanagers/labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def _deprecated_scan_labelset_ids(txn: Transaction, *, kbid: str) -> list[

async def _get_labelset_ids(txn: Transaction, *, kbid: str) -> Optional[list[str]]:
key = KB_LABELSET_IDS.format(kbid=kbid)
data = await txn.get(key)
data = await txn.get(key, for_update=True)
if not data:
return None
return orjson.loads(data)
Expand Down
6 changes: 3 additions & 3 deletions nucliadb/src/nucliadb/common/datamanagers/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ async def get_number_of_resources(txn: Transaction, *, kbid: str) -> int:
"""
Return cached number of resources in a knowledgebox.
"""
raw_value = await txn.get(KB_MATERIALIZED_RESOURCES_COUNT.format(kbid=kbid))
raw_value = await txn.get(KB_MATERIALIZED_RESOURCES_COUNT.format(kbid=kbid), for_update=False)
if raw_value is None:
return -1
return int(raw_value)
Expand All @@ -275,10 +275,10 @@ async def set_number_of_resources(txn: Transaction, kbid: str, value: int) -> No


async def get_all_field_ids(
txn: Transaction, *, kbid: str, rid: str
txn: Transaction, *, kbid: str, rid: str, for_update: bool = False
) -> Optional[resources_pb2.AllFieldIDs]:
key = KB_RESOURCE_ALL_FIELDS.format(kbid=kbid, uuid=rid)
return await get_kv_pb(txn, key, resources_pb2.AllFieldIDs)
return await get_kv_pb(txn, key, resources_pb2.AllFieldIDs, for_update=for_update)


async def set_all_field_ids(
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/common/datamanagers/synonyms.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

async def get(txn: Transaction, *, kbid: str) -> Optional[knowledgebox_pb2.Synonyms]:
key = KB_SYNONYMS.format(kbid=kbid)
return await get_kv_pb(txn, key, knowledgebox_pb2.Synonyms)
return await get_kv_pb(txn, key, knowledgebox_pb2.Synonyms, for_update=False)


async def set(txn: Transaction, *, kbid: str, synonyms: knowledgebox_pb2.Synonyms):
Expand Down
6 changes: 4 additions & 2 deletions nucliadb/src/nucliadb/common/datamanagers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
PB_TYPE = TypeVar("PB_TYPE", bound=Message)


async def get_kv_pb(txn: Transaction, key: str, pb_type: Type[PB_TYPE]) -> Optional[PB_TYPE]:
serialized: Optional[bytes] = await txn.get(key)
async def get_kv_pb(
txn: Transaction, key: str, pb_type: Type[PB_TYPE], for_update: bool = True
) -> Optional[PB_TYPE]:
serialized: Optional[bytes] = await txn.get(key, for_update=for_update)
if serialized is None:
return None
pb = pb_type()
Expand Down
19 changes: 12 additions & 7 deletions nucliadb/src/nucliadb/common/datamanagers/vectorsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,27 @@ async def initialize(txn: Transaction, *, kbid: str):
async def get(
txn: Transaction, *, kbid: str, vectorset_id: str
) -> Optional[knowledgebox_pb2.VectorSetConfig]:
kb_vectorsets = await _get_or_default(txn, kbid=kbid)
kb_vectorsets = await _get_or_default(txn, kbid=kbid, for_update=False)
index = _find_vectorset(kb_vectorsets, vectorset_id)
if index is None:
return None
return kb_vectorsets.vectorsets[index]


async def exists(txn, *, kbid: str, vectorset_id: str) -> bool:
kb_vectorsets = await _get_or_default(txn, kbid=kbid)
kb_vectorsets = await _get_or_default(txn, kbid=kbid, for_update=False)
return _find_vectorset(kb_vectorsets, vectorset_id) is not None


async def iter(txn: Transaction, *, kbid: str) -> AsyncIterator[knowledgebox_pb2.VectorSetConfig]:
kb_vectorsets = await _get_or_default(txn, kbid=kbid)
kb_vectorsets = await _get_or_default(txn, kbid=kbid, for_update=False)
for config in kb_vectorsets.vectorsets:
yield config


async def set(txn: Transaction, *, kbid: str, config: knowledgebox_pb2.VectorSetConfig):
"""Create or update a vectorset configuration"""
kb_vectorsets = await _get_or_default(txn, kbid=kbid)
kb_vectorsets = await _get_or_default(txn, kbid=kbid, for_update=True)
index = _find_vectorset(kb_vectorsets, config.vectorset_id)
if index is None:
# adding a new vectorset
Expand All @@ -68,7 +68,7 @@ async def set(txn: Transaction, *, kbid: str, config: knowledgebox_pb2.VectorSet


async def delete(txn: Transaction, *, kbid: str, vectorset_id: str):
kb_vectorsets = await _get_or_default(txn, kbid=kbid)
kb_vectorsets = await _get_or_default(txn, kbid=kbid, for_update=True)
index = _find_vectorset(kb_vectorsets, vectorset_id)
if index is None:
# already deleted
Expand All @@ -82,10 +82,15 @@ async def delete(txn: Transaction, *, kbid: str, vectorset_id: str):
# XXX At some point in the vectorset epic, we should make this key mandatory and
# fail instead of providing a default
async def _get_or_default(
txn: Transaction, *, kbid: str
txn: Transaction,
*,
kbid: str,
for_update: bool = True,
) -> knowledgebox_pb2.KnowledgeBoxVectorSetsConfig:
key = KB_VECTORSETS.format(kbid=kbid)
stored = await get_kv_pb(txn, key, knowledgebox_pb2.KnowledgeBoxVectorSetsConfig)
stored = await get_kv_pb(
txn, key, knowledgebox_pb2.KnowledgeBoxVectorSetsConfig, for_update=for_update
)
return stored or knowledgebox_pb2.KnowledgeBoxVectorSetsConfig()


Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/common/locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def __aenter__(self) -> "_Lock":
return self

async def get_lock_data(self, txn: Transaction) -> Optional[LockValue]:
existing_data = await txn.get(self.key)
existing_data = await txn.get(self.key, for_update=True)
if existing_data is None:
return None
else:
Expand Down
4 changes: 2 additions & 2 deletions nucliadb/src/nucliadb/common/maindb/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ async def abort(self):
async def commit(self):
raise NotImplementedError()

async def batch_get(self, keys: list[str]) -> list[Optional[bytes]]:
async def batch_get(self, keys: list[str], for_update: bool = False) -> list[Optional[bytes]]:
raise NotImplementedError()

async def get(self, key: str) -> Optional[bytes]:
async def get(self, key: str, for_update: bool = False) -> Optional[bytes]:
raise NotImplementedError()

async def set(self, key: str, value: bytes):
Expand Down
4 changes: 2 additions & 2 deletions nucliadb/src/nucliadb/common/maindb/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async def commit(self):
self.clean()
self.open = False

async def batch_get(self, keys: list[str]) -> list[Optional[bytes]]:
async def batch_get(self, keys: list[str], for_update: bool = False) -> list[Optional[bytes]]:
results: list[Optional[bytes]] = []
for key in keys:
obj = await self.get(key)
Expand All @@ -124,7 +124,7 @@ async def batch_get(self, keys: list[str]) -> list[Optional[bytes]]:

return results

async def get(self, key: str) -> Optional[bytes]:
async def get(self, key: str, for_update: bool = False) -> Optional[bytes]:
if key in self.deleted_keys:
raise KeyError(f"Not found {key}")

Expand Down
16 changes: 8 additions & 8 deletions nucliadb/src/nucliadb/common/maindb/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ async def commit(self):
self.open = False
await self.connection.close()

async def batch_get(self, keys: list[str]):
return await self.data_layer.batch_get(keys, select_for_update=True)
async def batch_get(self, keys: list[str], for_update: bool = True):
return await self.data_layer.batch_get(keys, select_for_update=for_update)

async def get(self, key: str) -> Optional[bytes]:
return await self.data_layer.get(key, select_for_update=True)
async def get(self, key: str, for_update: bool = True) -> Optional[bytes]:
return await self.data_layer.get(key, select_for_update=for_update)

async def set(self, key: str, value: bytes):
await self.data_layer.set(key, value)
Expand Down Expand Up @@ -206,14 +206,14 @@ async def commit(self):
raise Exception("Cannot commit transaction in read only mode")

@backoff.on_exception(backoff.expo, RETRIABLE_EXCEPTIONS, jitter=backoff.random_jitter, max_tries=3)
async def batch_get(self, keys: list[str]):
async def batch_get(self, keys: list[str], for_update: bool = False):
async with self.driver._get_connection() as conn:
return await DataLayer(conn).batch_get(keys)
return await DataLayer(conn).batch_get(keys, select_for_update=False)

@backoff.on_exception(backoff.expo, RETRIABLE_EXCEPTIONS, jitter=backoff.random_jitter, max_tries=3)
async def get(self, key: str) -> Optional[bytes]:
async def get(self, key: str, for_update: bool = False) -> Optional[bytes]:
async with self.driver._get_connection() as conn:
return await DataLayer(conn).get(key)
return await DataLayer(conn).get(key, select_for_update=False)

async def set(self, key: str, value: bytes):
raise Exception("Cannot set in read only transaction")
Expand Down
10 changes: 5 additions & 5 deletions nucliadb/src/nucliadb/ingest/orm/knowledgebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ async def create(
) -> str:
release_channel = cast(ReleaseChannel.ValueType, release_channel_for_kb(slug, release_channel))

exist = await datamanagers.kb.get_kb_uuid(txn, slug=slug)
if exist:
exists = await datamanagers.kb.get_kb_uuid(txn, slug=slug)
if exists:
raise KnowledgeBoxConflict()
if uuid is None or uuid == "":
uuid = str(uuid4())
Expand Down Expand Up @@ -161,7 +161,7 @@ async def update(
slug: Optional[str] = None,
config: Optional[KnowledgeBoxConfig] = None,
) -> str:
exist = await datamanagers.kb.get_config(txn, kbid=uuid)
exist = await datamanagers.kb.get_config(txn, kbid=uuid, for_update=True)
if not exist:
raise datamanagers.exceptions.KnowledgeBoxNotFound()

Expand Down Expand Up @@ -233,7 +233,7 @@ async def purge(cls, driver: Driver, kbid: str):

# Delete KB Shards
shards_match = datamanagers.cluster.KB_SHARDS.format(kbid=kbid)
payload = await txn.get(shards_match)
payload = await txn.get(shards_match, for_update=False)

if payload is None:
logger.warning(f"Shards not found for kbid={kbid}")
Expand Down Expand Up @@ -331,7 +331,7 @@ async def get_unique_slug(self, uuid: str, slug: str) -> str:
key = KB_RESOURCE_SLUG.format(kbid=self.kbid, slug=slug)
key_ok = False
while key_ok is False:
found = await self.txn.get(key)
found = await self.txn.get(key, for_update=False)
if found and found.decode() != uuid:
slug += ".c"
key = KB_RESOURCE_SLUG.format(kbid=self.kbid, slug=slug)
Expand Down
3 changes: 1 addition & 2 deletions nucliadb/src/nucliadb/ingest/orm/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,7 @@ async def index_resource(
if shard is None:
# It's a new resource, get current active shard to place
# new resource on
async with datamanagers.with_ro_transaction() as ro_txn:
shard = await self.shard_manager.get_current_active_shard(ro_txn, kbid)
shard = await self.shard_manager.get_current_active_shard(txn, kbid)
if shard is None:
# no shard available, create a new one
shard = await self.shard_manager.create_shard_by_kbid(txn, kbid)
Expand Down
Loading

2 comments on commit 5eaeb5f

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 5eaeb5f Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 2318.3363577120554 iter/sec (stddev: 0.0000024342882992697187) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 1.23

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 5eaeb5f Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 2277.4160878608404 iter/sec (stddev: 0.000002728128495110599) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 1.25

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.