-
Notifications
You must be signed in to change notification settings - Fork 918
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[REVIEW] Expose the HOST_BUFFER parquet sink to python #5094
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5,6 +5,7 @@ from cpython.memoryview cimport PyMemoryView_FromMemory | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from libcpp.memory cimport unique_ptr | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from libcpp.pair cimport pair | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from libcpp.string cimport string | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from libcpp.vector cimport vector | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from cudf._lib.cpp.io.types cimport source_info, sink_info, data_sink, io_type | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import errno | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -21,6 +22,9 @@ cdef source_info make_source_info(src) except*: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
buf = src | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
empty_buffer = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
elif isinstance(src, HostBuffer): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return source_info((<HostBuffer>src).buf.data(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(<HostBuffer>src).buf.size()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
elif isinstance(src, io.BytesIO): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
buf = src.getbuffer() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Otherwise src is expected to be a numeric fd, string path, or PathLike. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -42,6 +46,8 @@ cdef source_info make_source_info(src) except*: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Converts the Python sink input to libcudf++ IO sink_info. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cdef sink_info make_sink_info(src, unique_ptr[data_sink] * data) except*: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if isinstance(src, HostBuffer): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return sink_info(&(<HostBuffer>src).buf) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if isinstance(src, io.IOBase): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
data.reset(new iobase_data_sink(src)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return sink_info(data.get()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -50,7 +56,6 @@ cdef sink_info make_sink_info(src, unique_ptr[data_sink] * data) except*: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
raise TypeError("Unrecognized input type: {}".format(type(src))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Adapts a python io.IOBase object as a libcudf++ IO data_sink. This lets you | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# write from cudf to any python file-like object (File/BytesIO/SocketIO etc) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cdef cppclass iobase_data_sink(data_sink): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -67,3 +72,61 @@ cdef cppclass iobase_data_sink(data_sink): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
size_t bytes_written() with gil: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return buf.tell() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cdef class HostBuffer: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" HostBuffer lets you spill cudf DataFrames from device memory to host | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
memory. Once in host memory the dataframe can either be re-loaded back | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
into gpu memory, or spilled to disk. This is designed to reduce the amount | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
of unnecessary host memory copies. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Examples | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
-------- | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.. code-block:: python | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import shutil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import cudf | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# read cudf DataFrame into buffer on host | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
df = cudf.DataFrame({'a': [1, 1, 2], 'b': [1, 2, 3]}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
buffer = cudf.io.HostBuffer() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
df.to_parquet(buffer) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Copy HostBuffer back to DataFrame | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cudf.read_parquet(df) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Write HostBuffer to disk | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shutil.copyfileobj(buffer, open("output.parquet", "wb")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cdef vector[char] buf | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a note: RMM will have the ability to allocate pinned host memory in the future which will be faster for moving device <--> host as well as allow things to remain asynchronous. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cdef size_t pos | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def __cinit__(self, int initial_capacity=0): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.pos = 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if initial_capacity: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.buf.reserve(initial_capacity) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+105
to
+106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't need to check it here, passing 0 should have no affect:
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def __len__(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return self.buf.size() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def seek(self, size_t pos): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.pos = pos | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def tell(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return self.pos | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def readall(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return self.read(-1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def read(self, int n=-1): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if self.pos >= self.buf.size(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return b"" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a little bit awkward that this returns an empty bytes object while below returns a MemoryView. Can we return an empty memoryview? |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cdef size_t count = n | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if ((n < 0) or (n > self.buf.size() - self.pos)): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
count = self.buf.size() - self.pos | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cdef size_t start = self.pos | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.pos += count | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return PyMemoryView_FromMemory(self.buf.data() + start, count, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PyBUF_READ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems better to support the buffer protocol on this class instead. Otherwise this could refer to an invalid memory segment that Python has already garbage collected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. We have code already that adapts a c++ byte vector to the buffer protocol here: cudf/python/cudf/cudf/_lib/parquet.pyx Lines 49 to 86 in 85ce695
Should we use that class instead of creating a new one? The only change really is between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we actually have 2 instances of classes like this one where ideally we could standardize them. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
# Copyright (c) 2018, NVIDIA CORPORATION. | ||
|
||
import cudf._lib.io.utils | ||
from cudf.io.avro import read_avro | ||
from cudf.io.csv import read_csv, to_csv | ||
from cudf.io.dlpack import from_dlpack | ||
|
@@ -13,3 +14,5 @@ | |
read_parquet_metadata, | ||
write_to_dataset, | ||
) | ||
|
||
HostBuffer = cudf._lib.io.utils.HostBuffer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we just do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could go |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -609,6 +609,18 @@ def test_parquet_writer_bytes_io(simple_gdf): | |
assert_eq(cudf.read_parquet(output), cudf.concat([simple_gdf, simple_gdf])) | ||
|
||
|
||
def test_parquet_writer_host_buffer(tmpdir, simple_gdf): | ||
buffer = cudf.io.HostBuffer() | ||
simple_gdf.to_parquet(buffer) | ||
|
||
assert_eq(cudf.read_parquet(buffer), simple_gdf) | ||
|
||
gdf_fname = tmpdir.join("gdf.parquet") | ||
with open(gdf_fname, "wb") as o: | ||
o.write(buffer.read()) | ||
assert_eq(cudf.read_parquet(gdf_fname), simple_gdf) | ||
Comment on lines
+612
to
+621
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we also add a test in reserving the size of the |
||
|
||
|
||
@pytest.mark.parametrize("cols", [["b"], ["c", "b"]]) | ||
def test_parquet_write_partitioned(tmpdir_factory, cols): | ||
# Checks that write_to_dataset is wrapping to_parquet | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to consider exposing buffer protocol for this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use this name here? Thought this was going to be reserved for the pinned memory allocation made by RMM (once that arrives). ( rapidsai/rmm#260 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good call, yes we shouldn't use
HostBuffer
here.