Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial DeltaCAT Native Storage Implementation #485

Merged
merged 8 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,6 @@ benchmark-aws: install

benchmark: install
pytest -m benchmark deltacat/benchmarking

publish: test test-integration rebuild
twine upload dist/*
2 changes: 1 addition & 1 deletion deltacat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__))

__version__ = "1.1.27"
__version__ = "2.0"


__all__ = [
Expand Down
9 changes: 8 additions & 1 deletion deltacat/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from deltacat.storage.model.partition import (
Partition,
PartitionLocator,
PartitionLocatorAlias,
PartitionKey,
PartitionScheme,
PartitionSchemeList,
Expand All @@ -43,7 +44,11 @@
Schema,
SchemaList,
)
from deltacat.storage.model.stream import Stream, StreamLocator
from deltacat.storage.model.stream import (
Stream,
StreamLocator,
StreamLocatorAlias,
)
from deltacat.storage.model.table import (
Table,
TableLocator,
Expand Down Expand Up @@ -132,6 +137,7 @@
"Partition",
"PartitionKey",
"PartitionLocator",
"PartitionLocatorAlias",
"PartitionScheme",
"PartitionSchemeList",
"PartitionValues",
Expand All @@ -145,6 +151,7 @@
"Stream",
"StreamFormat",
"StreamLocator",
"StreamLocatorAlias",
"Table",
"TableLocator",
"TableProperties",
Expand Down
86 changes: 68 additions & 18 deletions deltacat/storage/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
Schema,
SortScheme,
Stream,
StreamFormat,
StreamLocator,
Table,
TableProperties,
TableVersion,
TableVersionLocator,
TableVersionProperties,
)
from deltacat.storage.model.manifest import Manifest
Expand Down Expand Up @@ -234,7 +236,7 @@ def get_delta_manifest(
) -> Manifest:
"""
Get the manifest associated with the given delta or delta locator. This
always retrieves the authoritative remote copy of the delta manifest, and
always retrieves the authoritative durable copy of the delta manifest, and
never the local manifest defined for any input delta.
"""
raise NotImplementedError("get_delta_manifest not implemented")
Expand All @@ -255,7 +257,7 @@ def create_namespace(

def update_namespace(
namespace: str,
properties: NamespaceProperties = None,
properties: Optional[NamespaceProperties] = None,
new_namespace: Optional[str] = None,
*args,
**kwargs,
Expand Down Expand Up @@ -320,6 +322,8 @@ def update_table_version(
schema: Optional[Schema] = None,
description: Optional[str] = None,
properties: Optional[TableVersionProperties] = None,
partition_scheme: Optional[PartitionScheme] = None,
sort_keys: Optional[SortScheme] = None,
*args,
**kwargs,
) -> None:
Expand All @@ -339,18 +343,27 @@ def stage_stream(
namespace: str,
table_name: str,
table_version: Optional[str] = None,
stream_format: StreamFormat = StreamFormat.DELTACAT,
*args,
**kwargs,
) -> Stream:
"""
Stages a new delta stream for the given table version. Resolves to the
latest active table version if no table version is given. Returns the
staged stream. Raises an error if the table version does not exist.
latest active table version if no table version is given. Resolves to the
DeltaCAT stream format if no stream format is given. If this stream
will replace another stream with the same format and scheme, then it will
have its previous stream ID set to the ID of the stream being replaced.
Returns the staged stream. Raises an error if the table version does not
exist.
"""
raise NotImplementedError("stage_stream not implemented")


def commit_stream(stream: Stream, *args, **kwargs) -> Stream:
def commit_stream(
stream: Stream,
*args,
**kwargs,
) -> Stream:
"""
Registers a delta stream with a target table version, replacing any
previous stream registered for the same table version. Returns the
Expand All @@ -363,28 +376,46 @@ def delete_stream(
namespace: str,
table_name: str,
table_version: Optional[str] = None,
stream_format: StreamFormat = StreamFormat.DELTACAT,
*args,
**kwargs,
) -> None:
"""
Deletes the delta stream currently registered with the given table version.
Resolves to the latest active table version if no table version is given.
Resolves to the deltacat stream format if no stream format is given.
Raises an error if the table version does not exist.
"""
raise NotImplementedError("delete_stream not implemented")


def get_staged_stream(
table_version_locator: TableVersionLocator,
stream_id: str,
*args,
**kwargs,
) -> Optional[Partition]:
"""
Gets the staged stream for the given table version locator and stream ID.
Returns None if the stream does not exist. Raises an error if the given
table version locator does not exist.
"""
raise NotImplementedError("get_staged_stream not implemented")


def get_stream(
namespace: str,
table_name: str,
table_version: Optional[str] = None,
stream_format: StreamFormat = StreamFormat.DELTACAT,
*args,
**kwargs,
) -> Optional[Stream]:
"""
Gets the most recently committed stream for the given table version and
partition key values. Resolves to the latest active table version if no
table version is given. Returns None if the table version does not exist.
Gets the most recently committed stream for the given table version.
Resolves to the latest active table version if no table version is given.
Resolves to the deltacat stream format if no stream format is given.
Returns None if the table version or stream format does not exist.
"""
raise NotImplementedError("get_stream not implemented")

Expand Down Expand Up @@ -412,10 +443,15 @@ def commit_partition(
**kwargs,
) -> Partition:
"""
Commits the given partition to its associated table version stream,
replacing any previous partition (i.e., "partition being replaced") registered for the same stream and
Commits the staged partition to its associated table version stream,
replacing any previous partition registered for the same stream and
partition values.
If the previous_partition is passed as an argument, the specified previous_partition will be the partition being replaced, otherwise it will be retrieved.

If previous partition is given then it will be replaced with its deltas
prepended to the new partition being committed. Otherwise the latest
committed partition with the same keys and partition scheme ID will be
retrieved.

Returns the registered partition. If the partition's
previous delta stream position is specified, then the commit will
be rejected if it does not match the actual previous stream position of
Expand All @@ -427,22 +463,34 @@ def commit_partition(


def delete_partition(
namespace: str,
table_name: str,
table_version: Optional[str] = None,
stream_locator: StreamLocator,
partition_values: Optional[PartitionValues] = None,
partition_scheme_id: Optional[str] = None,
*args,
**kwargs,
) -> None:
"""
Deletes the given partition from the specified table version. Resolves to
the latest active table version if no table version is given. Partition
Deletes the given partition from the specified stream. Partition
values should not be specified for unpartitioned tables. Raises an error
if the table version or partition does not exist.
if the partition does not exist.
"""
raise NotImplementedError("delete_partition not implemented")


def get_staged_partition(
stream_locator: StreamLocator,
partition_id: str,
*args,
**kwargs,
) -> Optional[Partition]:
"""
Gets the staged partition for the given stream locator and partition ID.
Returns None if the partition does not exist. Raises an error if the
given stream locator does not exist.
"""
raise NotImplementedError("get_staged_partition not implemented")


def get_partition(
stream_locator: StreamLocator,
partition_values: Optional[PartitionValues] = None,
Expand All @@ -453,7 +501,9 @@ def get_partition(
Gets the most recently committed partition for the given stream locator and
partition key values. Returns None if no partition has been committed for
the given table version and/or partition key values. Partition values
should not be specified for unpartitioned tables.
should not be specified for unpartitioned tables. Partition scheme ID
resolves to the table version's current partition scheme by default.
Raises an error if the given stream locator does not exist.
"""
raise NotImplementedError("get_partition not implemented")

Expand Down
Loading