Skip to content

Commit

Permalink
Added logging and built pipeline.py for speed testing ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
wpfl-dbt committed Dec 17, 2024
1 parent 28665c1 commit 985efb8
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ scratch/
.ruff_cache
notebooks/
*.parquet
data/

# DuckDB
*.duckdb
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies = [
"rustworkx>=0.15.1",
"splink>=4.0.5,<4.1.0",
"sqlalchemy>=2.0.35",
"rich>=13.9.4",
]

[project.optional-dependencies]
Expand Down
80 changes: 56 additions & 24 deletions src/matchbox/common/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@
from dotenv import find_dotenv, load_dotenv
from pandas import DataFrame
from pydantic import BaseModel, ConfigDict, field_validator, model_validator
from rich.console import Console
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from sqlalchemy import Table

from matchbox.common.db import Cluster, Probability
Expand Down Expand Up @@ -574,26 +583,43 @@ def to_hierarchical_clusters(
Returns:
Arrow table with columns ['parent', 'child', 'probability']
"""
console = Console()
progress_columns = [
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
TimeRemainingColumn(),
]

probabilities = probabilities.sort_by(
[("component", "ascending"), ("probability", "descending")]
)
components = pc.unique(probabilities["component"])
n_cores = multiprocessing.cpu_count()
n_components = len(components)

logic_logger.info(f"Processing {n_components} components using {n_cores} workers")
logic_logger.info(f"Processing {n_components:,} components using {n_cores} workers")

# Split table into separate component tables
component_col = probabilities["component"]
indices = []
start_idx = 0

for i in range(1, len(component_col)):
if component_col[i] != component_col[i - 1]:
indices.append((start_idx, i))
start_idx = i
with Progress(*progress_columns, console=console) as progress:
split_task = progress.add_task(
"[cyan]Splitting tables...", total=len(component_col)
)

for i in range(1, len(component_col)):
if component_col[i] != component_col[i - 1]:
indices.append((start_idx, i))
start_idx = i
progress.update(split_task, advance=1)

indices.append((start_idx, len(component_col)))
indices.append((start_idx, len(component_col)))
progress.update(split_task, completed=len(component_col))

component_tables = []
for start, end in indices:
Expand All @@ -602,26 +628,32 @@ def to_hierarchical_clusters(

# Process components in parallel
results = []
with ProcessPoolExecutor(max_workers=n_cores) as executor:
futures = [
executor.submit(proc_func, component_table, dtype)
for component_table in component_tables
]
with Progress(*progress_columns, console=console) as progress:
process_task = progress.add_task(
"[green]Processing components...", total=len(component_tables)
)

for future in futures:
try:
result = future.result(timeout=timeout)
results.append(result)
except TimeoutError:
logic_logger.error(
f"Component processing timed out after {timeout} seconds"
)
raise
except Exception as e:
logic_logger.error(f"Error processing component: {str(e)}")
raise
with ProcessPoolExecutor(max_workers=n_cores) as executor:
futures = [
executor.submit(proc_func, component_table, dtype)
for component_table in component_tables
]

for future in futures:
try:
result = future.result(timeout=timeout)
results.append(result)
progress.update(process_task, advance=1)
except TimeoutError:
logic_logger.error(
f"Component processing timed out after {timeout} seconds"
)
raise
except Exception as e:
logic_logger.error(f"Error processing component: {str(e)}")
raise

logic_logger.info(f"Completed processing {len(results)} components successfully")
logic_logger.info(f"Completed processing {len(results):,} components successfully")

# Create empty table if no results
if not results:
Expand Down
51 changes: 51 additions & 0 deletions test/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import logging
import time
from contextlib import contextmanager
from pathlib import Path

import pyarrow.parquet as pq
from rich.logging import RichHandler

from matchbox.common.results import (
attach_components_to_probabilities,
to_hierarchical_clusters,
)

logging.basicConfig(
level=logging.INFO,
format="%(message)s",
handlers=[RichHandler(rich_tracebacks=True)],
)
pipeline_logger = logging.getLogger("mb_pipeline")

ROOT = Path(__file__).parent.parent


@contextmanager
def timer(description: str):
start = time.time()
yield
elapsed = time.time() - start

if elapsed >= 60:
minutes = int(elapsed // 60)
seconds = elapsed % 60
time_str = f"{minutes} min {seconds:.1f} sec"
else:
time_str = f"{elapsed:.2f} seconds"

pipeline_logger.info(f"{description} in {time_str}")


if __name__ == "__main__":
with timer("Full pipeline completed"):
with timer("Read table"):
table = pq.read_table(Path.cwd() / "data/hierarchical_cc20k.parquet")

pipeline_logger.info(f"Processing {len(table):,} records")

with timer("Added components"):
cc = attach_components_to_probabilities(table)

with timer("Built hierarchical clusters"):
out = to_hierarchical_clusters(cc)
2 changes: 2 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 985efb8

Please sign in to comment.