From dfa81d58a48e5157c402ce13e2a76f02628d3482 Mon Sep 17 00:00:00 2001 From: Bogdan Romenskii Date: Sat, 23 Aug 2025 01:51:49 +0200 Subject: [PATCH 01/10] feat: Add __repr__ for Flight and Ipc Options --- python/pyarrow/_flight.pyx | 7 +++++++ python/pyarrow/ipc.pxi | 16 ++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx index fe2e1b3d67405..01a7daa12a64b 100644 --- a/python/pyarrow/_flight.pyx +++ b/python/pyarrow/_flight.pyx @@ -142,6 +142,13 @@ cdef class FlightCallOptions(_Weakrefable): return &(( obj).options) raise TypeError(f"Expected a FlightCallOptions object, not '{type(obj)}'") + def __repr__(self): + return (f"") + _CertKeyPair = collections.namedtuple('_CertKeyPair', ['cert', 'key']) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index d57a899b58d22..3984a8c3f3c15 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -198,6 +198,13 @@ cdef class IpcReadOptions(_Weakrefable): def included_fields(self, list value not None): self.c_options.included_fields = value + def __repr__(self): + return (f"") + cdef class IpcWriteOptions(_Weakrefable): """ @@ -325,6 +332,15 @@ cdef class IpcWriteOptions(_Weakrefable): def unify_dictionaries(self, bint value): self.c_options.unify_dictionaries = value + def __repr__(self): + return (f"") + cdef class Message(_Weakrefable): """ From 9b89704bd6e30c70e3da616a08463d234abbb564 Mon Sep 17 00:00:00 2001 From: Bogdan Romenskii Date: Fri, 29 Aug 2025 02:42:42 +0200 Subject: [PATCH 02/10] Add CTimeoutDuration conversion --- python/pyarrow/_flight.pyx | 24 +++++++++++++++++---- python/pyarrow/includes/libarrow_flight.pxd | 1 + 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx index 01a7daa12a64b..c2402b5b70721 100644 --- a/python/pyarrow/_flight.pyx +++ b/python/pyarrow/_flight.pyx @@ -142,12 +142,28 @@ cdef class FlightCallOptions(_Weakrefable): return &(( obj).options) raise TypeError(f"Expected a FlightCallOptions object, not '{type(obj)}'") + @property + def timeout(self): + if not hasattr(self, "timeout"): + raise ValueError(f"FlightCallOptions.timeout was not set") + return self.options.timeout.count() + + @property + def headers(self): + if not hasattr(self, "headers"): + raise ValueError(f"FlightCallOptions.headers was not set") + return self.options.headers + + @property + def read_options(self): ... + + @property + def write_options(self): ... + def __repr__(self): return (f"") + f"timeout={self.timeout} " + f"headers={self.headers}>") _CertKeyPair = collections.namedtuple('_CertKeyPair', ['cert', 'key']) diff --git a/python/pyarrow/includes/libarrow_flight.pxd b/python/pyarrow/includes/libarrow_flight.pxd index b1af6bcb4fa36..f2b15fb70642b 100644 --- a/python/pyarrow/includes/libarrow_flight.pxd +++ b/python/pyarrow/includes/libarrow_flight.pxd @@ -272,6 +272,7 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil: cdef cppclass CTimeoutDuration" arrow::flight::TimeoutDuration": CTimeoutDuration(double) + double count() cdef cppclass CFlightCallOptions" arrow::flight::FlightCallOptions": CFlightCallOptions() From fd08474df755d07b5d4ded28e47883b2b488d6d7 Mon Sep 17 00:00:00 2001 From: Bogdan Romenskii Date: Sat, 6 Sep 2025 01:27:12 +0200 Subject: [PATCH 03/10] Add Ipc Options to FlightCallOptions repr --- python/pyarrow/_flight.pyx | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx index c2402b5b70721..8e1a65a8ec2c7 100644 --- a/python/pyarrow/_flight.pyx +++ b/python/pyarrow/_flight.pyx @@ -103,6 +103,9 @@ cdef class FlightCallOptions(_Weakrefable): cdef: CFlightCallOptions options + IpcReadOptions _py_read_options + IpcWriteOptions _py_write_options + def __init__(self, timeout=None, write_options=None, headers=None, IpcReadOptions read_options=None): """Create call options. @@ -122,6 +125,9 @@ cdef class FlightCallOptions(_Weakrefable): """ cdef IpcWriteOptions c_write_options + self._py_write_options = write_options + self._py_read_options = read_options + if timeout is not None: self.options.timeout = CTimeoutDuration(timeout) if write_options is not None: @@ -144,27 +150,36 @@ cdef class FlightCallOptions(_Weakrefable): @property def timeout(self): - if not hasattr(self, "timeout"): - raise ValueError(f"FlightCallOptions.timeout was not set") return self.options.timeout.count() @property def headers(self): - if not hasattr(self, "headers"): - raise ValueError(f"FlightCallOptions.headers was not set") return self.options.headers @property - def read_options(self): ... + def read_options(self): + return self._py_read_options @property - def write_options(self): ... + def write_options(self): + return self._py_write_options def __repr__(self): - return (f"") + f"headers={self.headers}") + + parts = [base] + + if self.read_options is not None: + parts.append(f" {self.read_options}") + + if self.write_options is not None: + parts.append(f" {self.write_options}") + + parts.append(">") + return "\n".join(parts) if len(parts) > 2 else "".join(parts) _CertKeyPair = collections.namedtuple('_CertKeyPair', ['cert', 'key']) From b64d121e67fbeca0f231173b24c006f26c00ca88 Mon Sep 17 00:00:00 2001 From: Bogdan Romenskii Date: Sat, 13 Sep 2025 01:43:30 +0200 Subject: [PATCH 04/10] Complete IPC options' __repr__ --- python/pyarrow/ipc.pxi | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 3984a8c3f3c15..cb6bc8f61c789 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -199,9 +199,11 @@ cdef class IpcReadOptions(_Weakrefable): self.c_options.included_fields = value def __repr__(self): + alignment = Alignment(self.ensure_alignment).name + return (f"") @@ -333,10 +335,16 @@ cdef class IpcWriteOptions(_Weakrefable): self.c_options.unify_dictionaries = value def __repr__(self): + compression_repr = f"compression=\"{self.compression}\" " \ + if self.compression is not None else "" + + metadata_version = MetadataVersion(self.metadata_version).name + return (f"") From 80ab8faf528ebfed8da579eb0bda1d2963ebf91d Mon Sep 17 00:00:00 2001 From: Bogdan Romenskii Date: Sat, 13 Sep 2025 01:44:24 +0200 Subject: [PATCH 05/10] Cover the repr dunders with tests --- python/pyarrow/tests/test_flight.py | 40 +++++++++++++++ python/pyarrow/tests/test_ipc.py | 76 +++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py index e9e99d8eb839f..63bad3d7a8f8a 100644 --- a/python/pyarrow/tests/test_flight.py +++ b/python/pyarrow/tests/test_flight.py @@ -47,6 +47,7 @@ ServerAuthHandler, ClientAuthHandler, ServerMiddleware, ServerMiddlewareFactory, ClientMiddleware, ClientMiddlewareFactory, + FlightCallOptions, ) except ImportError: flight = None @@ -54,6 +55,7 @@ ServerAuthHandler, ClientAuthHandler = object, object ServerMiddleware, ServerMiddlewareFactory = object, object ClientMiddleware, ClientMiddlewareFactory = object, object + FlightCallOptions = object # Marks all of the tests in this module # Ignore these with pytest ... -m 'not flight' @@ -2536,3 +2538,41 @@ def received_headers(self, headers): assert ("x-header-bin", b"header\x01value") in factory.headers assert ("x-trailer", "trailer-value") in factory.headers assert ("x-trailer-bin", b"trailer\x01value") in factory.headers + + +@pytest.fixture +def call_options_args(request): + match request.param: + case "default": + return { + "timeout": 3, + "headers": None, + "write_options": None, + "read_options": None, + } + case "all": + return { + "timeout": 7, + "headers": [(b"abc", b"def")], + "write_options": pa.ipc.IpcWriteOptions(compression="zstd"), + "read_options": pa.ipc.IpcReadOptions(use_threads=False), + } + + +@pytest.mark.parametrize( + "call_options_args", ["default", "all"], indirect=True) +def test_call_options_repr(call_options_args): + call_options = FlightCallOptions(**call_options_args) + repr = call_options.__repr__() + + for arg, val in call_options_args.items(): + if val is None: + continue + + if arg not in ("read_options", "write_options"): + assert f"{arg}={val}" in repr + continue + + val_repr = val.__repr__() + assert val_repr in repr + diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index b3b3367223dc1..a5960f67dd386 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -1338,3 +1338,79 @@ def test_record_batch_file_writer_with_empty_metadata(): buffer = sink.getvalue() with pa.ipc.open_file(buffer) as r: assert r.metadata is None + + +def check_ipc_options_repr(options_obj, options_args): + options = options_obj(**options_args) + repr = options.__repr__() + + for arg, val in options_args.items(): + if val is None: + continue + + value = val if not isinstance(val, str) else f"\"{val}\"" + + if arg == "ensure_alignment": + value = pa.ipc.Alignment(val).name + elif arg == "metadata_version": + value = pa.ipc.MetadataVersion(val).name + + assert f"{arg}={value}" in repr + + +@pytest.fixture +def write_options_args(request): + match request.param: + case "default": + return { + "allow_64bit": False, + "use_legacy_format": False, + "metadata_version": pa.ipc.MetadataVersion.V5, + "compression": None, + "use_threads": True, + "emit_dictionary_deltas": False, + "unify_dictionaries": False, + } + case "all": + return { + "allow_64bit": True, + "use_legacy_format": True, + "metadata_version": pa.ipc.MetadataVersion.V4, + "compression": "zstd", + "use_threads": False, + "emit_dictionary_deltas": True, + "unify_dictionaries": True, + } + + +@pytest.mark.parametrize( + "write_options_args", ["default", "all"], indirect=True) +def test_write_options_repr(write_options_args): + # https://github.com/apache/arrow/issues/47358 + check_ipc_options_repr(pa.ipc.IpcWriteOptions, write_options_args) + + +@pytest.fixture +def read_options_args(request): + match request.param: + case "default": + return { + "ensure_native_endian": True, + "ensure_alignment": pa.ipc.Alignment.Any, + "use_threads": True, + "included_fields": None, + } + case "all": + return { + "ensure_native_endian": False, + "ensure_alignment": pa.ipc.Alignment.DataTypeSpecific, + "use_threads": False, + "included_fields": [1, 2, 3], + } + + +@pytest.mark.parametrize( + "read_options_args", ["default", "all"], indirect=True) +def test_read_options_repr(read_options_args): + # https://github.com/apache/arrow/issues/47358 + check_ipc_options_repr(pa.ipc.IpcReadOptions, read_options_args) From dac2f14f2fb0739d43b90457361cbe14f3cc8d85 Mon Sep 17 00:00:00 2001 From: Bogdan Romenskii Date: Tue, 16 Sep 2025 01:10:21 +0200 Subject: [PATCH 06/10] Add C++ to Py wrapper functions for IPC Read/Write Options --- python/pyarrow/ipc.pxi | 41 ++++++++++++++++++++++++++++++++++++----- python/pyarrow/lib.pxd | 6 ++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index cb6bc8f61c789..f1a5142d78b2d 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -208,6 +208,25 @@ cdef class IpcReadOptions(_Weakrefable): f"included_fields={self.included_fields}>") +cdef IpcReadOptions wrap_ipc_read_options(CIpcReadOptions c): + """Get Python's IpcReadOptions from C++'s IpcReadOptions + """ + + return IpcReadOptions( + ensure_native_endian=c.ensure_native_endian, + ensure_alignment=c.ensure_alignment, + use_threads=c.use_threads, + included_fields=c.included_fields, + ) + + +cdef object _get_compression_from_codec(shared_ptr[CCodec] codec): + if codec == nullptr: + return None + else: + return frombytes(codec.get().name()) + + cdef class IpcWriteOptions(_Weakrefable): """ Serialization options for the IPC format. @@ -287,10 +306,7 @@ cdef class IpcWriteOptions(_Weakrefable): @property def compression(self): - if self.c_options.codec == nullptr: - return None - else: - return frombytes(self.c_options.codec.get().name()) + return _get_compression_from_codec(self.c_options.codec) @compression.setter def compression(self, value): @@ -336,7 +352,7 @@ cdef class IpcWriteOptions(_Weakrefable): def __repr__(self): compression_repr = f"compression=\"{self.compression}\" " \ - if self.compression is not None else "" + if self.compression is not None else "" metadata_version = MetadataVersion(self.metadata_version).name @@ -350,6 +366,21 @@ cdef class IpcWriteOptions(_Weakrefable): f"unify_dictionaries={self.unify_dictionaries}>") +cdef IpcWriteOptions wrap_ipc_write_options(CIpcWriteOptions c): + """Get Python's IpcWriteOptions from C++'s IpcWriteOptions + """ + + return IpcWriteOptions( + metadata_version=c.metadata_version, + allow_64bit=c.allow_64bit, + use_legacy_format=c.write_legacy_ipc_format, + compression=_get_compression_from_codec(c.codec), + use_threads=c.use_threads, + emit_dictionary_deltas=c.emit_dictionary_deltas, + unify_dictionaries=c.unify_dictionaries, + ) + + cdef class Message(_Weakrefable): """ Container for an Arrow IPC message with metadata and optional body diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index cbdac7f102052..a64d678f1ec9c 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -43,11 +43,17 @@ cdef class IpcWriteOptions(_Weakrefable): CIpcWriteOptions c_options +cdef IpcWriteOptions wrap_ipc_write_options(CIpcWriteOptions c) + + cdef class IpcReadOptions(_Weakrefable): cdef: CIpcReadOptions c_options +cdef IpcReadOptions wrap_ipc_read_options(CIpcReadOptions c) + + cdef class Message(_Weakrefable): cdef: unique_ptr[CMessage] message From 0cabf8e8220cf8ed8054cb46f6bf7430e1b905c3 Mon Sep 17 00:00:00 2001 From: Bogdan Romenskii Date: Tue, 16 Sep 2025 01:11:37 +0200 Subject: [PATCH 07/10] Add docstrings for FlightCallOptions and update getters with wrappers --- python/pyarrow/_flight.pyx | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx index 8e1a65a8ec2c7..a24dbbe5d88e6 100644 --- a/python/pyarrow/_flight.pyx +++ b/python/pyarrow/_flight.pyx @@ -103,9 +103,6 @@ cdef class FlightCallOptions(_Weakrefable): cdef: CFlightCallOptions options - IpcReadOptions _py_read_options - IpcWriteOptions _py_write_options - def __init__(self, timeout=None, write_options=None, headers=None, IpcReadOptions read_options=None): """Create call options. @@ -125,9 +122,6 @@ cdef class FlightCallOptions(_Weakrefable): """ cdef IpcWriteOptions c_write_options - self._py_write_options = write_options - self._py_read_options = read_options - if timeout is not None: self.options.timeout = CTimeoutDuration(timeout) if write_options is not None: @@ -150,36 +144,35 @@ cdef class FlightCallOptions(_Weakrefable): @property def timeout(self): + """Get timeout for the call (in seconds) + """ return self.options.timeout.count() @property def headers(self): + """Get list of headers (key, value tuples) for client's context + """ return self.options.headers @property def read_options(self): - return self._py_read_options + """Get serialization options for reading IPC format + """ + return wrap_ipc_read_options(self.options.read_options) @property def write_options(self): - return self._py_write_options + """Get IPC write options + """ + return wrap_ipc_write_options(self.options.write_options) def __repr__(self): - base = (f"") + f"headers={self.headers}\n" + f" read_options={self.read_options}\n" + f" write_options={self.write_options}\n>") - return "\n".join(parts) if len(parts) > 2 else "".join(parts) _CertKeyPair = collections.namedtuple('_CertKeyPair', ['cert', 'key']) From d0eaa574070b2f8689c6b189757f11d0f78c9657 Mon Sep 17 00:00:00 2001 From: Bogdan Romenskii Date: Tue, 16 Sep 2025 01:11:58 +0200 Subject: [PATCH 08/10] Update IPC/Flight Options tests --- python/pyarrow/tests/test_flight.py | 44 ++++++++--------- python/pyarrow/tests/test_ipc.py | 74 +++++++++++++++-------------- 2 files changed, 60 insertions(+), 58 deletions(-) diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py index 63bad3d7a8f8a..1c4049fe0f826 100644 --- a/python/pyarrow/tests/test_flight.py +++ b/python/pyarrow/tests/test_flight.py @@ -2542,26 +2542,31 @@ def received_headers(self, headers): @pytest.fixture def call_options_args(request): - match request.param: - case "default": - return { - "timeout": 3, - "headers": None, - "write_options": None, - "read_options": None, - } - case "all": - return { - "timeout": 7, - "headers": [(b"abc", b"def")], - "write_options": pa.ipc.IpcWriteOptions(compression="zstd"), - "read_options": pa.ipc.IpcReadOptions(use_threads=False), - } + if request.param == "default": + return { + "timeout": 3, + "headers": None, + "write_options": None, + "read_options": None, + } + elif request.param == "all": + return { + "timeout": 7, + "headers": [(b"abc", b"def")], + "write_options": pa.ipc.IpcWriteOptions(compression="zstd"), + "read_options": pa.ipc.IpcReadOptions( + use_threads=False, + ensure_alignment=pa.ipc.Alignment.DataTypeSpecific, + ), + } + else: + return {} @pytest.mark.parametrize( "call_options_args", ["default", "all"], indirect=True) def test_call_options_repr(call_options_args): + # https://github.com/apache/arrow/issues/47358 call_options = FlightCallOptions(**call_options_args) repr = call_options.__repr__() @@ -2569,10 +2574,5 @@ def test_call_options_repr(call_options_args): if val is None: continue - if arg not in ("read_options", "write_options"): - assert f"{arg}={val}" in repr - continue - - val_repr = val.__repr__() - assert val_repr in repr - + assert f"{arg}={val}" in repr + continue diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index a5960f67dd386..6850d62c71872 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -1360,27 +1360,28 @@ def check_ipc_options_repr(options_obj, options_args): @pytest.fixture def write_options_args(request): - match request.param: - case "default": - return { - "allow_64bit": False, - "use_legacy_format": False, - "metadata_version": pa.ipc.MetadataVersion.V5, - "compression": None, - "use_threads": True, - "emit_dictionary_deltas": False, - "unify_dictionaries": False, - } - case "all": - return { - "allow_64bit": True, - "use_legacy_format": True, - "metadata_version": pa.ipc.MetadataVersion.V4, - "compression": "zstd", - "use_threads": False, - "emit_dictionary_deltas": True, - "unify_dictionaries": True, - } + if request.param == "default": + return { + "allow_64bit": False, + "use_legacy_format": False, + "metadata_version": pa.ipc.MetadataVersion.V5, + "compression": None, + "use_threads": True, + "emit_dictionary_deltas": False, + "unify_dictionaries": False, + } + elif request.param == "all": + return { + "allow_64bit": True, + "use_legacy_format": True, + "metadata_version": pa.ipc.MetadataVersion.V4, + "compression": "zstd", + "use_threads": False, + "emit_dictionary_deltas": True, + "unify_dictionaries": True, + } + else: + return {} @pytest.mark.parametrize( @@ -1392,21 +1393,22 @@ def test_write_options_repr(write_options_args): @pytest.fixture def read_options_args(request): - match request.param: - case "default": - return { - "ensure_native_endian": True, - "ensure_alignment": pa.ipc.Alignment.Any, - "use_threads": True, - "included_fields": None, - } - case "all": - return { - "ensure_native_endian": False, - "ensure_alignment": pa.ipc.Alignment.DataTypeSpecific, - "use_threads": False, - "included_fields": [1, 2, 3], - } + if request.param == "default": + return { + "ensure_native_endian": True, + "ensure_alignment": pa.ipc.Alignment.Any, + "use_threads": True, + "included_fields": None, + } + elif request.param == "all": + return { + "ensure_native_endian": False, + "ensure_alignment": pa.ipc.Alignment.DataTypeSpecific, + "use_threads": False, + "included_fields": [1, 2, 3], + } + else: + return {} @pytest.mark.parametrize( From de04f9b777fd209c5e1e02ef95dce425ca7880b0 Mon Sep 17 00:00:00 2001 From: Bogdan Romenskii Date: Tue, 16 Sep 2025 18:05:06 +0200 Subject: [PATCH 09/10] Account for default IPC Read/Write options in Flight Call test --- python/pyarrow/tests/test_flight.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py index 79071f7a4e9f2..e5edc0eaa2c41 100644 --- a/python/pyarrow/tests/test_flight.py +++ b/python/pyarrow/tests/test_flight.py @@ -2654,7 +2654,7 @@ def test_call_options_repr(call_options_args): for arg, val in call_options_args.items(): if val is None: + assert arg in repr continue assert f"{arg}={val}" in repr - continue From 593a0a892ff50be846a61f5aaa8ec6f1c21479fe Mon Sep 17 00:00:00 2001 From: Bogdan Romenskii Date: Wed, 17 Sep 2025 21:35:40 +0200 Subject: [PATCH 10/10] Add ZSTD pytest mark for IPC test --- python/pyarrow/tests/test_ipc.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 6850d62c71872..b4db9cd0875d3 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -1384,6 +1384,7 @@ def write_options_args(request): return {} +@pytest.mark.zstd @pytest.mark.parametrize( "write_options_args", ["default", "all"], indirect=True) def test_write_options_repr(write_options_args):