Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions python/pyarrow/_flight.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,37 @@ cdef class FlightCallOptions(_Weakrefable):
return &((<FlightCallOptions> obj).options)
raise TypeError(f"Expected a FlightCallOptions object, not '{type(obj)}'")

@property
def timeout(self):
"""Get timeout for the call (in seconds)
"""
return self.options.timeout.count()

@property
def headers(self):
"""Get list of headers (key, value tuples) for client's context
"""
return self.options.headers

@property
def read_options(self):
"""Get serialization options for reading IPC format
"""
return wrap_ipc_read_options(self.options.read_options)

@property
def write_options(self):
"""Get IPC write options
"""
return wrap_ipc_write_options(self.options.write_options)

def __repr__(self):
return (f"<pyarrow.flight.FlightCallOptions "
f"timeout={self.timeout} "
f"headers={self.headers}\n"
f" read_options={self.read_options}\n"
f" write_options={self.write_options}\n>")


_CertKeyPair = collections.namedtuple('_CertKeyPair', ['cert', 'key'])

Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_flight.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:

cdef cppclass CTimeoutDuration" arrow::flight::TimeoutDuration":
CTimeoutDuration(double)
double count()

cdef cppclass CFlightCallOptions" arrow::flight::FlightCallOptions":
CFlightCallOptions()
Expand Down
63 changes: 59 additions & 4 deletions python/pyarrow/ipc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,34 @@ cdef class IpcReadOptions(_Weakrefable):
def included_fields(self, list value not None):
self.c_options.included_fields = value

def __repr__(self):
alignment = Alignment(self.ensure_alignment).name

return (f"<pyarrow.ipc.IpcReadOptions "
f"ensure_native_endian={self.ensure_native_endian} "
f"ensure_alignment={alignment} "
f"use_threads={self.use_threads} "
f"included_fields={self.included_fields}>")


cdef IpcReadOptions wrap_ipc_read_options(CIpcReadOptions c):
"""Get Python's IpcReadOptions from C++'s IpcReadOptions
"""

return IpcReadOptions(
ensure_native_endian=c.ensure_native_endian,
ensure_alignment=c.ensure_alignment,
use_threads=c.use_threads,
included_fields=c.included_fields,
)


cdef object _get_compression_from_codec(shared_ptr[CCodec] codec):
if codec == nullptr:
return None
else:
return frombytes(codec.get().name())


cdef class IpcWriteOptions(_Weakrefable):
"""
Expand Down Expand Up @@ -277,10 +305,7 @@ cdef class IpcWriteOptions(_Weakrefable):

@property
def compression(self):
if self.c_options.codec == nullptr:
return None
else:
return frombytes(self.c_options.codec.get().name())
return _get_compression_from_codec(self.c_options.codec)

@compression.setter
def compression(self, value):
Expand Down Expand Up @@ -324,6 +349,36 @@ cdef class IpcWriteOptions(_Weakrefable):
def unify_dictionaries(self, bint value):
self.c_options.unify_dictionaries = value

def __repr__(self):
compression_repr = f"compression=\"{self.compression}\" " \
if self.compression is not None else ""

metadata_version = MetadataVersion(self.metadata_version).name

return (f"<pyarrow.ipc.IpcWriteOptions "
f"allow_64bit={self.allow_64bit} "
f"use_legacy_format={self.use_legacy_format} "
f"metadata_version={metadata_version} "
f"{compression_repr}"
f"use_threads={self.use_threads} "
f"emit_dictionary_deltas={self.emit_dictionary_deltas} "
f"unify_dictionaries={self.unify_dictionaries}>")


cdef IpcWriteOptions wrap_ipc_write_options(CIpcWriteOptions c):
"""Get Python's IpcWriteOptions from C++'s IpcWriteOptions
"""

return IpcWriteOptions(
metadata_version=c.metadata_version,
allow_64bit=c.allow_64bit,
use_legacy_format=c.write_legacy_ipc_format,
compression=_get_compression_from_codec(c.codec),
use_threads=c.use_threads,
emit_dictionary_deltas=c.emit_dictionary_deltas,
unify_dictionaries=c.unify_dictionaries,
)


cdef class Message(_Weakrefable):
"""
Expand Down
6 changes: 6 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,17 @@ cdef class IpcWriteOptions(_Weakrefable):
CIpcWriteOptions c_options


cdef IpcWriteOptions wrap_ipc_write_options(CIpcWriteOptions c)


cdef class IpcReadOptions(_Weakrefable):
cdef:
CIpcReadOptions c_options


cdef IpcReadOptions wrap_ipc_read_options(CIpcReadOptions c)


cdef _wrap_read_stats(CIpcReadStats c)


Expand Down
40 changes: 40 additions & 0 deletions python/pyarrow/tests/test_flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
ServerAuthHandler, ClientAuthHandler,
ServerMiddleware, ServerMiddlewareFactory,
ClientMiddleware, ClientMiddlewareFactory,
FlightCallOptions,
)
except ImportError:
flight = None
FlightClient, FlightServerBase = object, object
ServerAuthHandler, ClientAuthHandler = object, object
ServerMiddleware, ServerMiddlewareFactory = object, object
ClientMiddleware, ClientMiddlewareFactory = object, object
FlightCallOptions = object

# Marks all of the tests in this module
# Ignore these with pytest ... -m 'not flight'
Expand Down Expand Up @@ -2618,3 +2620,41 @@ def do_exchange(self, context, descriptor, reader, writer):

assert received_table.equals(expected_table)
assert reader.stats == expected_stats[command]


@pytest.fixture
def call_options_args(request):
if request.param == "default":
return {
"timeout": 3,
"headers": None,
"write_options": None,
"read_options": None,
}
elif request.param == "all":
return {
"timeout": 7,
"headers": [(b"abc", b"def")],
"write_options": pa.ipc.IpcWriteOptions(compression="zstd"),
"read_options": pa.ipc.IpcReadOptions(
use_threads=False,
ensure_alignment=pa.ipc.Alignment.DataTypeSpecific,
),
}
else:
return {}


@pytest.mark.parametrize(
"call_options_args", ["default", "all"], indirect=True)
def test_call_options_repr(call_options_args):
# https://github.com/apache/arrow/issues/47358
call_options = FlightCallOptions(**call_options_args)
repr = call_options.__repr__()

for arg, val in call_options_args.items():
if val is None:
assert arg in repr
continue

assert f"{arg}={val}" in repr
79 changes: 79 additions & 0 deletions python/pyarrow/tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1338,3 +1338,82 @@ def test_record_batch_file_writer_with_empty_metadata():
buffer = sink.getvalue()
with pa.ipc.open_file(buffer) as r:
assert r.metadata is None


def check_ipc_options_repr(options_obj, options_args):
options = options_obj(**options_args)
repr = options.__repr__()

for arg, val in options_args.items():
if val is None:
continue

value = val if not isinstance(val, str) else f"\"{val}\""

if arg == "ensure_alignment":
value = pa.ipc.Alignment(val).name
elif arg == "metadata_version":
value = pa.ipc.MetadataVersion(val).name

assert f"{arg}={value}" in repr


@pytest.fixture
def write_options_args(request):
if request.param == "default":
return {
"allow_64bit": False,
"use_legacy_format": False,
"metadata_version": pa.ipc.MetadataVersion.V5,
"compression": None,
"use_threads": True,
"emit_dictionary_deltas": False,
"unify_dictionaries": False,
}
elif request.param == "all":
return {
"allow_64bit": True,
"use_legacy_format": True,
"metadata_version": pa.ipc.MetadataVersion.V4,
"compression": "zstd",
"use_threads": False,
"emit_dictionary_deltas": True,
"unify_dictionaries": True,
}
else:
return {}


@pytest.mark.zstd
@pytest.mark.parametrize(
"write_options_args", ["default", "all"], indirect=True)
def test_write_options_repr(write_options_args):
# https://github.com/apache/arrow/issues/47358
check_ipc_options_repr(pa.ipc.IpcWriteOptions, write_options_args)


@pytest.fixture
def read_options_args(request):
if request.param == "default":
return {
"ensure_native_endian": True,
"ensure_alignment": pa.ipc.Alignment.Any,
"use_threads": True,
"included_fields": None,
}
elif request.param == "all":
return {
"ensure_native_endian": False,
"ensure_alignment": pa.ipc.Alignment.DataTypeSpecific,
"use_threads": False,
"included_fields": [1, 2, 3],
}
else:
return {}


@pytest.mark.parametrize(
"read_options_args", ["default", "all"], indirect=True)
def test_read_options_repr(read_options_args):
# https://github.com/apache/arrow/issues/47358
check_ipc_options_repr(pa.ipc.IpcReadOptions, read_options_args)
Loading