Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): chunk based map serialization in pure python #2037

Merged
merged 44 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
428667d
make basic
pandalee99 Feb 5, 2025
59c84a3
Merge remote-tracking branch 'origin/dev' into py/Implement_chunk
pandalee99 Feb 5, 2025
48b42aa
back
pandalee99 Feb 5, 2025
bb15ca7
Make MapSerializer
pandalee99 Feb 5, 2025
bf6f200
logic fix
pandalee99 Feb 5, 2025
3fb5a67
recover
pandalee99 Feb 5, 2025
0e78968
serialize write backup
pandalee99 Feb 5, 2025
d818c28
serialize write backup
pandalee99 Feb 5, 2025
8a25b4f
pass
pandalee99 Feb 5, 2025
c6520d6
make map_serializer prototype
pandalee99 Feb 5, 2025
94e78c1
update
pandalee99 Feb 6, 2025
d47f8ac
codestyle
pandalee99 Feb 6, 2025
01fcceb
to serializer
pandalee99 Feb 6, 2025
e531ca5
Merge remote-tracking branch 'ant/main' into py/Implement_chunk
chaokunyang Feb 6, 2025
df1599f
removed fastpath
chaokunyang Feb 6, 2025
bcc4549
add pure python test ci
chaokunyang Feb 6, 2025
820e17d
fix old API error
pandalee99 Feb 6, 2025
7573a28
fix circular logic
pandalee99 Feb 6, 2025
a962502
fix chunk logic
pandalee99 Feb 6, 2025
7b8daf7
make Similarity logic
pandalee99 Feb 6, 2025
3645cd0
fix class CollectionSerializer(Serializer)---
pandalee99 Feb 6, 2025
8c88bbf
fix test_serialize and write_int32
pandalee99 Feb 6, 2025
ddd15b2
rollback
pandalee99 Feb 6, 2025
13d0eb0
add api to fix out
pandalee99 Feb 7, 2025
1fdf085
fix ambiguous
pandalee99 Feb 7, 2025
5a2f068
fix
pandalee99 Feb 7, 2025
e95fc21
fix
pandalee99 Feb 7, 2025
2a85d75
fix
pandalee99 Feb 7, 2025
2f623bc
rollback
pandalee99 Feb 7, 2025
062685a
rollback2
pandalee99 Feb 7, 2025
634f023
reconstructed loop structure
pandalee99 Feb 7, 2025
8a17b05
rollback
pandalee99 Feb 7, 2025
cf5c9af
item
pandalee99 Feb 7, 2025
794ea18
item
pandalee99 Feb 7, 2025
9a29c37
update
pandalee99 Feb 7, 2025
5717cd0
Change write computation
pandalee99 Feb 9, 2025
f56285e
fix write computation
pandalee99 Feb 9, 2025
f597436
fix write computation
pandalee99 Feb 9, 2025
86d2f27
rollback
pandalee99 Feb 9, 2025
f79462e
ref_tracking
pandalee99 Feb 9, 2025
1da9f22
recover
pandalee99 Feb 10, 2025
1c4e3e2
fix loop error
pandalee99 Feb 11, 2025
d8e85a4
remove fastpath
pandalee99 Feb 11, 2025
957a3e7
fix
pandalee99 Feb 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ci/run_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ case $1 in
exit $testcode
fi
echo "Executing fury python tests succeeds"
ENABLE_FURY_CYTHON_SERIALIZATION=0 pytest -v -s --durations=60 pyfury/tests
testcode=$?
if [[ $testcode -ne 0 ]]; then
exit $testcode
fi
echo "Executing fury python tests succeeds"
;;
go)
echo "Executing fury go tests for go"
Expand Down
6 changes: 3 additions & 3 deletions python/pyfury/_fury.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def serialize_nonref(self, buffer, obj):
return
else:
classinfo = self.class_resolver.get_classinfo(cls)
self.class_resolver.write_classinfo(buffer, classinfo)
self.class_resolver.write_typeinfo(buffer, classinfo)
classinfo.serializer.write(buffer, obj)

def xserialize_ref(self, buffer, obj, serializer=None):
Expand Down Expand Up @@ -428,7 +428,7 @@ def write_buffer_object(self, buffer, buffer_object: BufferObject):
def read_buffer_object(self, buffer) -> Buffer:
in_band = buffer.read_bool()
if in_band:
size = buffer.read_varint32()
size = buffer.read_varuint32()
buf = buffer.slice(buffer.reader_index, size)
buffer.reader_index += size
return buf
Expand Down Expand Up @@ -459,7 +459,7 @@ def write_ref_pyobject(self, buffer, value, classinfo=None):
return
if classinfo is None:
classinfo = self.class_resolver.get_classinfo(type(value))
self.class_resolver.write_classinfo(buffer, classinfo)
self.class_resolver.write_typeinfo(buffer, classinfo)
classinfo.serializer.write(buffer, value)

def read_ref_pyobject(self, buffer):
Expand Down
301 changes: 263 additions & 38 deletions python/pyfury/_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from typing import Dict, Iterable, Any

from pyfury._fury import (
NOT_NULL_STRING_FLAG,
NOT_NULL_INT64_FLAG,
NOT_NULL_BOOL_FLAG,
NOT_NULL_STRING_FLAG,
)
from pyfury.resolver import NOT_NULL_VALUE_FLAG, NULL_FLAG
from pyfury.type import is_primitive_type
Expand All @@ -35,6 +35,36 @@

logger = logging.getLogger(__name__)

MAX_CHUNK_SIZE = 255
# Whether track key ref.
TRACKING_KEY_REF = 0b1
# Whether key has null.
KEY_HAS_NULL = 0b10
# Whether key is not declare type.
KEY_DECL_TYPE = 0b100
# Whether track value ref.
TRACKING_VALUE_REF = 0b1000
# Whether value has null.
VALUE_HAS_NULL = 0b10000
# Whether value is not declare type.
VALUE_DECL_TYPE = 0b100000
# When key or value is null that entry will be serialized as a new chunk with size 1.
# In such cases, chunk size will be skipped writing.
# Both key and value are null.
KV_NULL = KEY_HAS_NULL | VALUE_HAS_NULL
# Key is null, value type is declared type, and ref tracking for value is disabled.
NULL_KEY_VALUE_DECL_TYPE = KEY_HAS_NULL | VALUE_DECL_TYPE
# Key is null, value type is declared type, and ref tracking for value is enabled.
NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF = (
KEY_HAS_NULL | VALUE_DECL_TYPE | TRACKING_VALUE_REF
)
# Value is null, key type is declared type, and ref tracking for key is disabled.
NULL_VALUE_KEY_DECL_TYPE = VALUE_HAS_NULL | KEY_DECL_TYPE
# Value is null, key type is declared type, and ref tracking for key is enabled.
NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF = (
VALUE_HAS_NULL | KEY_DECL_TYPE | TRACKING_VALUE_REF
)


class Serializer(ABC):
__slots__ = "fury", "type_", "need_to_write_ref"
Expand Down Expand Up @@ -193,7 +223,7 @@ def write(self, buffer, value: Iterable[Any]):
for s in value:
cls = type(s)
if cls is str:
buffer.write_int16()
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(s)
elif cls is int:
buffer.write_int16(NOT_NULL_INT64_FLAG)
Expand Down Expand Up @@ -280,53 +310,248 @@ def handle_read_elem(self, elem, set_: set):


class MapSerializer(Serializer):
__slots__ = (
"class_resolver",
"ref_resolver",
"key_serializer",
"value_serializer",
)

def __init__(self, fury, type_, key_serializer=None, value_serializer=None):
super().__init__(fury, type_)
self.class_resolver = fury.class_resolver
self.ref_resolver = fury.ref_resolver
self.key_serializer = key_serializer
self.value_serializer = value_serializer

def write(self, buffer, value: Dict):
buffer.write_varuint32(len(value))
for k, v in value.items():
key_cls = type(k)
if key_cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(k)
def write(self, buffer, o):
obj = o
length = len(obj)
buffer.write_varuint32(length)
if length == 0:
return
fury = self.fury
class_resolver = fury.class_resolver
ref_resolver = fury.ref_resolver
key_serializer = self.key_serializer
value_serializer = self.value_serializer

items_iter = iter(obj.items())
key, value = next(items_iter)
has_next = True
while has_next:

while True:
if key is not None:
if value is not None:
break
if key_serializer is not None:
if key_serializer.need_to_write_ref:
buffer.write_int8(NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF)
if not ref_resolver.write_ref_or_null(buffer, key):
key_serializer.write(buffer, key)
else:
buffer.write_int8(NULL_VALUE_KEY_DECL_TYPE)
key_serializer.write(buffer, key)
else:
buffer.write_int8(VALUE_HAS_NULL | TRACKING_KEY_REF)
fury.serialize_ref(buffer, key)
else:
if value is not None:
if value_serializer is not None:
if value_serializer.need_to_write_ref:
buffer.write_int8(NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF)
if not ref_resolver.write_ref_or_null(buffer, key):
value_serializer.write(buffer, key)
if not ref_resolver.write_ref_or_null(buffer, value):
value_serializer.write(buffer, value)
else:
buffer.write_int8(NULL_KEY_VALUE_DECL_TYPE)
value_serializer.write(buffer, value)
else:
buffer.write_int8(KEY_HAS_NULL | TRACKING_VALUE_REF)
fury.serialize_ref(buffer, value)
else:
buffer.write_int8(KV_NULL)
try:
key, value = next(items_iter)
except StopIteration:
has_next = False
break

if not has_next:
break

key_cls = type(key)
value_cls = type(value)
buffer.write_int16(-1)
chunk_size_offset = buffer.writer_index - 1
chunk_header = 0

if key_serializer is not None:
chunk_header |= KEY_DECL_TYPE
else:
if not self.ref_resolver.write_ref_or_null(buffer, k):
classinfo = self.class_resolver.get_classinfo(key_cls)
self.class_resolver.write_typeinfo(buffer, classinfo)
classinfo.serializer.write(buffer, k)
value_cls = type(v)
if value_cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(v)
elif value_cls is int:
buffer.write_int16(NOT_NULL_INT64_FLAG)
buffer.write_varint64(v)
key_classinfo = self.class_resolver.get_classinfo(key_cls)
class_resolver.write_typeinfo(buffer, key_classinfo)
key_serializer = key_classinfo.serializer

if value_serializer is not None:
chunk_header |= VALUE_DECL_TYPE
else:
if not self.ref_resolver.write_ref_or_null(buffer, v):
classinfo = self.class_resolver.get_classinfo(value_cls)
self.class_resolver.write_typeinfo(buffer, classinfo)
classinfo.serializer.write(buffer, v)
value_classinfo = self.class_resolver.get_classinfo(value_cls)
class_resolver.write_typeinfo(buffer, value_classinfo)
value_serializer = value_classinfo.serializer

key_write_ref = (
key_serializer.need_to_write_ref if key_serializer else False
)
value_write_ref = (
value_serializer.need_to_write_ref if value_serializer else False
)
if key_write_ref:
chunk_header |= TRACKING_KEY_REF
if value_write_ref:
chunk_header |= TRACKING_VALUE_REF

buffer.put_uint8(chunk_size_offset - 1, chunk_header)
chunk_size = 0

while chunk_size < MAX_CHUNK_SIZE:

if (
key is None
or value is None
or type(key) is not key_cls
or type(value) is not value_cls
):
break
if not key_write_ref or not ref_resolver.write_ref_or_null(buffer, key):
key_serializer.write(buffer, key)
if not value_write_ref or not ref_resolver.write_ref_or_null(
buffer, value
):
value_serializer.write(buffer, value)

chunk_size += 1

try:
key, value = next(items_iter)
except StopIteration:
has_next = False
break

key_serializer = self.key_serializer
value_serializer = self.value_serializer
buffer.put_uint8(chunk_size_offset, chunk_size)

def read(self, buffer):
len_ = buffer.read_varuint32()
map_ = self.type_()
self.fury.ref_resolver.reference(map_)
for i in range(len_):
k = self.fury.deserialize_ref(buffer)
v = self.fury.deserialize_ref(buffer)
map_[k] = v
fury = self.fury
ref_resolver = self.ref_resolver
class_resolver = self.class_resolver
size = buffer.read_varuint32()
map_ = {}
ref_resolver.reference(map_)
chunk_header = 0
if size != 0:
chunk_header = buffer.read_uint8()
key_serializer, value_serializer = None, None

while size > 0:
while True:
key_has_null = (chunk_header & KEY_HAS_NULL) != 0
value_has_null = (chunk_header & VALUE_HAS_NULL) != 0
if not key_has_null:
if not value_has_null:
break
else:
track_key_ref = (chunk_header & TRACKING_KEY_REF) != 0
if (chunk_header & KEY_DECL_TYPE) != 0:
if track_key_ref:
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
key = ref_resolver.get_read_object()
else:
key = key_serializer.read(buffer)
ref_resolver.set_read_object(ref_id, key)
else:
key = key_serializer.read(buffer)
else:
key = fury.deserialize_ref(buffer)
map_[key] = None
else:
if not value_has_null:
track_value_ref = (chunk_header & TRACKING_VALUE_REF) != 0
if (chunk_header & VALUE_DECL_TYPE) != 0:
if track_value_ref:
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
value = ref_resolver.get_read_object()
else:
value = value_serializer.read(buffer)
ref_resolver.set_read_object(ref_id, value)
else:
value = fury.deserialize_ref(buffer)
map_[None] = value
else:
map_[None] = None
size -= 1
if size == 0:
return map_
else:
chunk_header = buffer.read_uint8()

track_key_ref = (chunk_header & TRACKING_KEY_REF) != 0
track_value_ref = (chunk_header & TRACKING_VALUE_REF) != 0
key_is_declared_type = (chunk_header & KEY_DECL_TYPE) != 0
value_is_declared_type = (chunk_header & VALUE_DECL_TYPE) != 0
chunk_size = buffer.read_uint8()
if not key_is_declared_type:
key_serializer = class_resolver.read_typeinfo(buffer).serializer
if not value_is_declared_type:
value_serializer = class_resolver.read_typeinfo(buffer).serializer
key_serializer_type = type(key_serializer)
value_serializer_type = type(value_serializer)
for i in range(chunk_size):
if track_key_ref:
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
key = ref_resolver.get_read_object()
else:
key = key_serializer.read(buffer)
ref_resolver.set_read_object(ref_id, key)
else:
if key_serializer_type is StringSerializer:
key = buffer.read_string()
elif key_serializer_type is Int64Serializer:
key = buffer.read_varint64()
elif key_serializer_type is Float64Serializer:
key = buffer.read_double()
elif key_serializer_type is Int32Serializer:
key = buffer.read_varint32()
elif key_serializer_type is Float32Serializer:
key = buffer.read_float()
else:
key = key_serializer.read(buffer)
if track_value_ref:
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
value = ref_resolver.get_read_object()
else:
value = value_serializer.read(buffer)
ref_resolver.set_read_object(ref_id, value)
else:
if value_serializer_type is StringSerializer:
value = buffer.read_string()
elif value_serializer_type is Int64Serializer:
value = buffer.read_varint64()
elif value_serializer_type is Float64Serializer:
value = buffer.read_double()
elif value_serializer_type is Int32Serializer:
value = buffer.read_varint32()
elif value_serializer_type is Float32Serializer:
value = buffer.read_float()
elif value_serializer_type is BooleanSerializer:
value = buffer.read_bool()
else:
value = value_serializer.read(buffer)
map_[key] = value
size -= 1
if size != 0:
chunk_header = buffer.read_uint8()

return map_

def xwrite(self, buffer, value: Dict):
Expand Down
2 changes: 2 additions & 0 deletions python/pyfury/_util.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ cdef class Buffer:

cpdef inline put_bool(self, uint32_t offset, c_bool v)

cpdef inline put_uint8(self, uint32_t offset, uint8_t v)

cpdef inline put_int8(self, uint32_t offset, int8_t v)

cpdef inline put_int16(self, uint32_t offset, int16_t v)
Expand Down
Loading
Loading