Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Benchmark] Add parquet read benchmark #1371

Merged
merged 13 commits into from
Aug 30, 2024
1 change: 1 addition & 0 deletions dask_cuda/benchmarks/custom/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .parquet import read_parquet as custom_read_parquet
169 changes: 169 additions & 0 deletions dask_cuda/benchmarks/custom/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import math
import os
from concurrent.futures import ThreadPoolExecutor

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow import dataset

import dask
import dask.dataframe as dd
from dask.base import apply, tokenize
from dask.distributed import get_worker
from dask.utils import parse_bytes

# NOTE: The pyarrow component of this code was mostly copied
# from dask-expr (dask_expr/io/parquet.py)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved


_CPU_COUNT_SET = False


def _maybe_adjust_cpu_count():
global _CPU_COUNT_SET
if not _CPU_COUNT_SET:
# Set the number of threads to the number of cores
# This is a default for pyarrow, but it's not set by default in
# dask/distributed
pa.set_cpu_count(os.cpu_count())
_CPU_COUNT_SET = True


def fragment_to_table(fragment, filters=None, columns=None, schema=None):
_maybe_adjust_cpu_count()

if isinstance(filters, list):
filters = pq.filters_to_expression(filters)

return fragment.to_table(
schema=schema,
columns=columns,
filter=filters,
# Batch size determines how many rows are read at once and will
# cause the underlying array to be split into chunks of this size
# (max). We'd like to avoid fragmentation as much as possible and
# and to set this to something like inf but we have to set a finite,
# positive number.
# In the presence of row groups, the underlying array will still be
# chunked per rowgroup
batch_size=10_000_000,
fragment_scan_options=pa.dataset.ParquetFragmentScanOptions(
pre_buffer=True,
cache_options=pa.CacheOptions(
hole_size_limit=parse_bytes("4 MiB"),
range_size_limit=parse_bytes("32.00 MiB"),
),
),
use_threads=True,
)


def tables_to_frame(tables):
import cudf

return cudf.DataFrame.from_arrow(
pa.concat_tables(tables) if len(tables) > 1 else tables[0]
)


def read_parquet_fragments(
fragments,
columns=None,
filters=None,
fragment_parallelism=None,
):

kwargs = {"columns": columns, "filters": filters}
if not isinstance(fragments, list):
fragments = [fragments]

if len(fragments) > 1:
# Read multiple fragments
token = tokenize(fragments, columns, filters)
chunk_name = f"read-chunk-{token}"
dsk = {
(chunk_name, i): (apply, fragment_to_table, [fragment], kwargs)
for i, fragment in enumerate(fragments)
}
dsk[chunk_name] = (tables_to_frame, list(dsk.keys()))

try:
worker = get_worker()
except ValueError:
return dask.threaded.get(dsk, chunk_name)

if not hasattr(worker, "_rapids_executor"):
fragment_parallelism = fragment_parallelism or 8
num_threads = min(
fragment_parallelism,
len(os.sched_getaffinity(0)),
)
worker._rapids_executor = ThreadPoolExecutor(num_threads)
with dask.config.set(pool=worker._rapids_executor):
return dask.threaded.get(dsk, chunk_name)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

else:
# Read single fragment
return tables_to_frame([fragment_to_table(fragments[0], **kwargs)])


def mean_file_size(fragments, n=10):
n_frags = len(fragments)
if n < n_frags:
indices = np.random.choice(np.arange(n_frags), size=n, replace=False)
else:
indices = np.arange(n_frags)

sizes = []
for f in indices:
size = 0
frag = fragments[f]
for row_group in frag.row_groups:
size += row_group.total_byte_size
sizes.append(size)

return np.mean(sizes)


def aggregate_fragments(fragments, blocksize):
size = mean_file_size(fragments)
blocksize = parse_bytes(blocksize)
stride = int(math.floor(blocksize / size))

if stride < 1:
pass # Not implemented yet!

stride = max(stride, 1)
return [fragments[i : i + stride] for i in range(0, len(fragments), stride)]


def read_parquet(
urlpath,
columns=None,
filters=None,
blocksize="256MB",
fragment_parallelism=None,
):

# Use pyarrow dataset API to get fragments and meta
ds = dataset.dataset(urlpath, format="parquet")
meta = tables_to_frame([ds.schema.empty_table()])
if columns is not None:
meta = meta[columns]
fragments = list(ds.get_fragments())

# Aggregate fragments together if necessary
if blocksize:
fragments = aggregate_fragments(fragments, blocksize)

# Construct collection
return dd.from_map(
read_parquet_fragments,
fragments,
columns=columns,
filters=filters,
fragment_parallelism=fragment_parallelism,
meta=meta,
enforce_metadata=False,
)
187 changes: 187 additions & 0 deletions dask_cuda/benchmarks/remote_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import contextlib
from collections import ChainMap
from time import perf_counter as clock

import pandas as pd

import dask
import dask.dataframe as dd
from dask.distributed import performance_report
from dask.utils import format_bytes, parse_bytes

from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.custom import custom_read_parquet
from dask_cuda.benchmarks.utils import (
parse_benchmark_args,
print_key_value,
print_separator,
print_throughput_bandwidth,
)

DEFAULT_DATASET_PATH = "s3://dask-cudf-parquet-testing/dedup_parquet"
DEFAULT_COLUMNS = ["text", "id"]
DEFAULT_STORAGE_SIZE = 2_843_373_145 # Compressed byte size
rjzamora marked this conversation as resolved.
Show resolved Hide resolved


def read_data(
backend,
filesystem,
aggregate_files,
blocksize,
):
path = DEFAULT_DATASET_PATH
columns = DEFAULT_COLUMNS
with dask.config.set({"dataframe.backend": backend}):
if filesystem == "arrow" and backend == "cudf":
df = custom_read_parquet(
path,
columns=columns,
blocksize=blocksize,
)
else:
if filesystem == "arrow":
# TODO: Warn user that blocksize and aggregate_files
# are ingored when `filesystem == "arrow"`
_blocksize = {}
_aggregate_files = {}
else:
_blocksize = {"blocksize": blocksize}
_aggregate_files = {"aggregate_files": aggregate_files}

df = dd.read_parquet(
path,
columns=columns,
filesystem=filesystem,
**_blocksize,
**_aggregate_files,
)
return df.memory_usage().compute().sum()


def bench_once(client, args, write_profile=None):

if write_profile is None:
ctx = contextlib.nullcontext()
else:
ctx = performance_report(filename=args.profile)

with ctx:
t1 = clock()
output_size = read_data(
backend="cudf" if args.type == "gpu" else "pandas",
filesystem=args.filesystem,
aggregate_files=args.aggregate_files,
blocksize=args.blocksize,
)
t2 = clock()

return (DEFAULT_STORAGE_SIZE, output_size, t2 - t1)


def pretty_print_results(args, address_to_index, p2p_bw, results):
if args.markdown:
print("```")
print("Remote Parquet benchmark")
print_separator(separator="-")
backend = "cudf" if args.type == "gpu" else "pandas"
print_key_value(key="Backend", value=f"{backend}")
print_key_value(key="Filesystem", value=f"{args.filesystem}")
print_key_value(key="Blocksize", value=f"{args.blocksize}")
print_key_value(key="Aggregate files", value=f"{args.aggregate_files}")
print_key_value(key="Output size", value=f"{format_bytes(results[0][1])}")
if args.markdown:
print("\n```")
data_processed, output_size, durations = zip(*results)
print_throughput_bandwidth(
args, durations, data_processed, p2p_bw, address_to_index
)


def create_tidy_results(args, p2p_bw, results):
configuration = {
"backend": "cudf" if args.type == "gpu" else "pandas",
"filesystem": args.filesystem,
"blocksize": args.blocksize,
"aggregate_files": args.aggregate_files,
}
timing_data = pd.DataFrame(
[
pd.Series(
data=ChainMap(
configuration,
{
"wallclock": duration,
"data_processed": data_processed,
"output_size": output_size,
},
)
)
for data_processed, output_size, duration in results
]
)
return timing_data, p2p_bw


def parse_args():
special_args = [
{
"name": "--blocksize",
"default": "256MB",
"type": str,
"help": "How to set the blocksize option",
},
{
"name": "--aggregate-files",
"default": False,
"action": "store_true",
"help": "How to set the aggregate_files option",
},
{
"name": [
"-t",
"--type",
],
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
"help": "Use GPU or CPU dataframes (default 'gpu')",
},
{
"name": "--filesystem",
"choices": ["arrow", "fsspec"],
"default": "fsspec",
"type": str,
"help": "Filesystem backend",
},
{
"name": "--runs",
"default": 3,
"type": int,
"help": "Number of runs",
},
# NOTE: The following args are not relevant to this benchmark
{
"name": "--ignore-size",
"default": "1 MiB",
"metavar": "nbytes",
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB')",
},
]

return parse_benchmark_args(
description="Remote Parquet (dask/cudf) benchmark",
args_list=special_args,
check_explicit_comms=False,
)


if __name__ == "__main__":
execute_benchmark(
Config(
args=parse_args(),
bench_once=bench_once,
create_tidy_results=create_tidy_results,
pretty_print_results=pretty_print_results,
)
)
Loading