Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Expose the HOST_BUFFER parquet sink to python #5094

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
- PR #5045 Remove call to `unique()` in concat when `axis=1`
- PR #5085 Print more precise numerical strings in unit tests
- PR #5028 Add Docker 19 support to local gpuci build
- PR #5094 Expose the HOST_BUFFER parquet sink to python

## Bug Fixes

Expand Down
65 changes: 64 additions & 1 deletion python/cudf/cudf/_lib/io/utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ from cpython.memoryview cimport PyMemoryView_FromMemory
from libcpp.memory cimport unique_ptr
from libcpp.pair cimport pair
from libcpp.string cimport string
from libcpp.vector cimport vector
from cudf._lib.cpp.io.types cimport source_info, sink_info, data_sink, io_type

import errno
Expand All @@ -21,6 +22,9 @@ cdef source_info make_source_info(src) except*:
buf = src
else:
empty_buffer = True
elif isinstance(src, HostBuffer):
return source_info((<HostBuffer>src).buf.data(),
(<HostBuffer>src).buf.size())
elif isinstance(src, io.BytesIO):
buf = src.getbuffer()
# Otherwise src is expected to be a numeric fd, string path, or PathLike.
Expand All @@ -42,6 +46,8 @@ cdef source_info make_source_info(src) except*:

# Converts the Python sink input to libcudf++ IO sink_info.
cdef sink_info make_sink_info(src, unique_ptr[data_sink] * data) except*:
if isinstance(src, HostBuffer):
return sink_info(&(<HostBuffer>src).buf)
if isinstance(src, io.IOBase):
data.reset(new iobase_data_sink(src))
return sink_info(data.get())
Expand All @@ -50,7 +56,6 @@ cdef sink_info make_sink_info(src, unique_ptr[data_sink] * data) except*:
else:
raise TypeError("Unrecognized input type: {}".format(type(src)))


# Adapts a python io.IOBase object as a libcudf++ IO data_sink. This lets you
# write from cudf to any python file-like object (File/BytesIO/SocketIO etc)
cdef cppclass iobase_data_sink(data_sink):
Expand All @@ -67,3 +72,61 @@ cdef cppclass iobase_data_sink(data_sink):

size_t bytes_written() with gil:
return buf.tell()


cdef class HostBuffer:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to consider exposing buffer protocol for this class?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use this name here? Thought this was going to be reserved for the pinned memory allocation made by RMM (once that arrives). ( rapidsai/rmm#260 )

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good call, yes we shouldn't use HostBuffer here.

""" HostBuffer lets you spill cudf DataFrames from device memory to host
memory. Once in host memory the dataframe can either be re-loaded back
into gpu memory, or spilled to disk. This is designed to reduce the amount
of unnecessary host memory copies.

Examples
--------
.. code-block:: python
import shutil
import cudf

# read cudf DataFrame into buffer on host
df = cudf.DataFrame({'a': [1, 1, 2], 'b': [1, 2, 3]})
buffer = cudf.io.HostBuffer()
df.to_parquet(buffer)

# Copy HostBuffer back to DataFrame
cudf.read_parquet(df)

# Write HostBuffer to disk
shutil.copyfileobj(buffer, open("output.parquet", "wb"))
"""
cdef vector[char] buf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: RMM will have the ability to allocate pinned host memory in the future which will be faster for moving device <--> host as well as allow things to remain asynchronous.

cdef size_t pos

def __cinit__(self, int initial_capacity=0):
self.pos = 0
if initial_capacity:
self.buf.reserve(initial_capacity)
Comment on lines +105 to +106
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to check it here, passing 0 should have no affect:

Suggested change
if initial_capacity:
self.buf.reserve(initial_capacity)
self.buf.reserve(initial_capacity)


def __len__(self):
return self.buf.size()

def seek(self, size_t pos):
self.pos = pos

def tell(self):
return self.pos

def readall(self):
return self.read(-1)

def read(self, int n=-1):
if self.pos >= self.buf.size():
return b""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little bit awkward that this returns an empty bytes object while below returns a MemoryView. Can we return an empty memoryview?


cdef size_t count = n
if ((n < 0) or (n > self.buf.size() - self.pos)):
count = self.buf.size() - self.pos

cdef size_t start = self.pos
self.pos += count

return PyMemoryView_FromMemory(self.buf.data() + start, count,
PyBUF_READ)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems better to support the buffer protocol on this class instead. Otherwise this could refer to an invalid memory segment that Python has already garbage collected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

We have code already that adapts a c++ byte vector to the buffer protocol here:

cdef class BufferArrayFromVector:
cdef Py_ssize_t length
cdef unique_ptr[vector[uint8_t]] in_vec
# these two things declare part of the buffer interface
cdef Py_ssize_t shape[1]
cdef Py_ssize_t strides[1]
@staticmethod
cdef BufferArrayFromVector from_unique_ptr(
unique_ptr[vector[uint8_t]] in_vec
):
cdef BufferArrayFromVector buf = BufferArrayFromVector()
buf.in_vec = move(in_vec)
buf.length = dereference(buf.in_vec).size()
return buf
def __getbuffer__(self, Py_buffer *buffer, int flags):
cdef Py_ssize_t itemsize = sizeof(uint8_t)
self.shape[0] = self.length
self.strides[0] = 1
buffer.buf = dereference(self.in_vec).data()
buffer.format = NULL # byte
buffer.internal = NULL
buffer.itemsize = itemsize
buffer.len = self.length * itemsize # product(shape) * itemsize
buffer.ndim = 1
buffer.obj = self
buffer.readonly = 0
buffer.shape = self.shape
buffer.strides = self.strides
buffer.suboffsets = NULL
def __releasebuffer__(self, Py_buffer *buffer):
pass

Should we use that class instead of creating a new one? The only change really is between vector<char> expected by the HOST_BUFFER sink and the vector<uint_8> in this class (and I think we could either cast the cpp vector, or use cython fused types to get around).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we actually have 2 instances of classes like this one where ideally we could standardize them.

3 changes: 3 additions & 0 deletions python/cudf/cudf/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) 2018, NVIDIA CORPORATION.

import cudf._lib.io.utils
from cudf.io.avro import read_avro
from cudf.io.csv import read_csv, to_csv
from cudf.io.dlpack import from_dlpack
Expand All @@ -13,3 +14,5 @@
read_parquet_metadata,
write_to_dataset,
)

HostBuffer = cudf._lib.io.utils.HostBuffer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just do from cudf._lib.io.utils import HostBuffer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could go from cudf._lib.io.utils import HostBuffer # noqa , but think we'd need the noqa tag otherwise flake8 will complain about unused imports. (I was trying to be consistent here with #4870 (comment) =)

12 changes: 12 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,18 @@ def test_parquet_writer_bytes_io(simple_gdf):
assert_eq(cudf.read_parquet(output), cudf.concat([simple_gdf, simple_gdf]))


def test_parquet_writer_host_buffer(tmpdir, simple_gdf):
buffer = cudf.io.HostBuffer()
simple_gdf.to_parquet(buffer)

assert_eq(cudf.read_parquet(buffer), simple_gdf)

gdf_fname = tmpdir.join("gdf.parquet")
with open(gdf_fname, "wb") as o:
o.write(buffer.read())
assert_eq(cudf.read_parquet(gdf_fname), simple_gdf)
Comment on lines +612 to +621
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add a test in reserving the size of the HostBuffer() up front just to make sure that doesn't break things? :)



@pytest.mark.parametrize("cols", [["b"], ["c", "b"]])
def test_parquet_write_partitioned(tmpdir_factory, cols):
# Checks that write_to_dataset is wrapping to_parquet
Expand Down