From 7b054efb3e82887a469adfcaca28062d16e7a2ba Mon Sep 17 00:00:00 2001 From: Patrick Ames Date: Wed, 29 Jan 2025 09:34:24 -0800 Subject: [PATCH 1/8] Update transaction system time provider docs. --- deltacat/storage/model/transaction.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/deltacat/storage/model/transaction.py b/deltacat/storage/model/transaction.py index 83521dd8..f178b6eb 100644 --- a/deltacat/storage/model/transaction.py +++ b/deltacat/storage/model/transaction.py @@ -81,7 +81,9 @@ class TransactionSystemTimeProvider(TransactionTimeProvider): def start_time(self) -> int: """ - Gets the current system time in nanoseconds since the epoch. + Gets the current system time in nanoseconds since the epoch. Ensures + that the start time returned is greater than the last known end time + recorded at the time this method is invoked. :return: Current epoch time in nanoseconds. """ # ensure serial transactions in a single process have start times after @@ -112,7 +114,9 @@ def start_time(self) -> int: def end_time(self) -> int: """ - Gets the current system time in nanoseconds since the epoch. + Gets the current system time in nanoseconds since the epoch. Ensures + that the end time returned is no less than the last known start time + recorded at the time this method is invoked. :return: Current epoch time in nanoseconds. """ # ensure serial transactions in a single process have end times no less From cdf87662367c3e22aa507c3d87eb2d5106fd2d54 Mon Sep 17 00:00:00 2001 From: Patrick Ames Date: Wed, 29 Jan 2025 20:26:09 -0800 Subject: [PATCH 2/8] Add pypi publish target in Makefile, update package version number, and build description. --- Makefile | 3 +++ deltacat/__init__.py | 2 +- setup.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 67c33fff..a9cec4cb 100644 --- a/Makefile +++ b/Makefile @@ -64,3 +64,6 @@ benchmark-aws: install benchmark: install pytest -m benchmark deltacat/benchmarking + +publish: test test-integration rebuild + twine upload dist/* diff --git a/deltacat/__init__.py b/deltacat/__init__.py index 2a857116..2d0a687c 100644 --- a/deltacat/__init__.py +++ b/deltacat/__init__.py @@ -53,7 +53,7 @@ deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__)) -__version__ = "1.1.27" +__version__ = "2.0" __all__ = [ diff --git a/setup.py b/setup.py index 226e7853..c0077f81 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ def find_version(*paths): name="deltacat", version=find_version("deltacat", "__init__.py"), author="Ray Team", - description="A scalable, fast, ACID-compliant Data Catalog powered by Ray.", + description="A portable, scalable, fast, and Pythonic Data Lakehouse for AI.", long_description=long_description, long_description_content_type="text/markdown", url="https://github.com/ray-project/deltacat", From f9158940deb2bf559fd3c1a9b6532f0cbf97f340 Mon Sep 17 00:00:00 2001 From: Patrick Ames Date: Thu, 30 Jan 2025 23:37:47 -0800 Subject: [PATCH 3/8] [WIP] DeltaCAT Native Storage Implementation. --- deltacat/storage/__init__.py | 9 +- deltacat/storage/interface.py | 23 +- deltacat/storage/main/impl.py | 958 ++++++++++++++++++++++-- deltacat/storage/model/table.py | 26 + deltacat/storage/model/table_version.py | 10 + 5 files changed, 961 insertions(+), 65 deletions(-) diff --git a/deltacat/storage/__init__.py b/deltacat/storage/__init__.py index 49d3d756..250232d5 100644 --- a/deltacat/storage/__init__.py +++ b/deltacat/storage/__init__.py @@ -29,6 +29,7 @@ from deltacat.storage.model.partition import ( Partition, PartitionLocator, + PartitionLocatorAlias, PartitionKey, PartitionScheme, PartitionSchemeList, @@ -43,7 +44,11 @@ Schema, SchemaList, ) -from deltacat.storage.model.stream import Stream, StreamLocator +from deltacat.storage.model.stream import ( + Stream, + StreamLocator, + StreamLocatorAlias, +) from deltacat.storage.model.table import ( Table, TableLocator, @@ -132,6 +137,7 @@ "Partition", "PartitionKey", "PartitionLocator", + "PartitionLocatorAlias", "PartitionScheme", "PartitionSchemeList", "PartitionValues", @@ -145,6 +151,7 @@ "Stream", "StreamFormat", "StreamLocator", + "StreamLocatorAlias", "Table", "TableLocator", "TableProperties", diff --git a/deltacat/storage/interface.py b/deltacat/storage/interface.py index f0674cfc..f8f6e12e 100644 --- a/deltacat/storage/interface.py +++ b/deltacat/storage/interface.py @@ -21,7 +21,9 @@ Schema, SortScheme, Stream, + StreamFormat, StreamLocator, + StreamLocatorAlias, Table, TableProperties, TableVersion, @@ -234,7 +236,7 @@ def get_delta_manifest( ) -> Manifest: """ Get the manifest associated with the given delta or delta locator. This - always retrieves the authoritative remote copy of the delta manifest, and + always retrieves the authoritative durable copy of the delta manifest, and never the local manifest defined for any input delta. """ raise NotImplementedError("get_delta_manifest not implemented") @@ -255,7 +257,7 @@ def create_namespace( def update_namespace( namespace: str, - properties: NamespaceProperties = None, + properties: Optional[NamespaceProperties] = None, new_namespace: Optional[str] = None, *args, **kwargs, @@ -320,6 +322,8 @@ def update_table_version( schema: Optional[Schema] = None, description: Optional[str] = None, properties: Optional[TableVersionProperties] = None, + partition_scheme: Optional[PartitionScheme] = None, + sort_keys: Optional[SortScheme] = None, *args, **kwargs, ) -> None: @@ -353,8 +357,9 @@ def stage_stream( def commit_stream(stream: Stream, *args, **kwargs) -> Stream: """ Registers a delta stream with a target table version, replacing any - previous stream registered for the same table version. Returns the - committed stream. + previous stream registered for the same table version. If the stream + format is not set prior to commit, then it is defaulted to the DeltaCAT + stream format. Returns the committed stream. """ raise NotImplementedError("commit_stream not implemented") @@ -363,12 +368,14 @@ def delete_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> None: """ Deletes the delta stream currently registered with the given table version. Resolves to the latest active table version if no table version is given. + Resolves to the deltacat stream format if no stream format is given. Raises an error if the table version does not exist. """ raise NotImplementedError("delete_stream not implemented") @@ -378,13 +385,15 @@ def get_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> Optional[Stream]: """ - Gets the most recently committed stream for the given table version and - partition key values. Resolves to the latest active table version if no - table version is given. Returns None if the table version does not exist. + Gets the most recently committed stream for the given table version. + Resolves to the latest active table version if no table version is given. + Resolves to the deltacat stream format if no stream format is given. + Returns None if the table version or stream format does not exist. """ raise NotImplementedError("get_stream not implemented") diff --git a/deltacat/storage/main/impl.py b/deltacat/storage/main/impl.py index f740a604..4bf19bf3 100644 --- a/deltacat/storage/main/impl.py +++ b/deltacat/storage/main/impl.py @@ -1,3 +1,5 @@ +import uuid + from deltacat.catalog.main.impl import PropertyCatalog from typing import Any, Callable, Dict, List, Optional, Union @@ -20,6 +22,8 @@ StreamFormat, TransactionType, TransactionOperationType, + StreamFormat, + CommitState, ) from deltacat.storage.model.list_result import ListResult from deltacat.storage.model.namespace import ( @@ -32,6 +36,8 @@ PartitionLocator, PartitionScheme, PartitionValues, + UNPARTITIONED_SCHEME_ID, + PartitionLocatorAlias, ) from deltacat.storage.model.schema import ( Schema, @@ -46,10 +52,12 @@ from deltacat.storage.model.table import ( Table, TableProperties, + TableLocator, ) from deltacat.storage.model.table_version import ( TableVersion, TableVersionProperties, + TableVersionLocator, ) from deltacat.storage.model.metafile import ( Metafile, @@ -93,7 +101,7 @@ def _get_catalog(**kwargs) -> PropertyCatalog: return catalog -def _list_metafiles( +def _list( metafile: Metafile, txn_op_type: TransactionOperationType, *args, @@ -118,28 +126,126 @@ def _list_metafiles( return list_results_per_op[0] -def _read_latest_metafile( +def _latest( metafile: Metafile, - txn_op_type: TransactionOperationType, *args, **kwargs, ) -> Optional[Metafile]: - list_results = _list_metafiles( + list_results = _list( *args, metafile=metafile, - txn_op_type=txn_op_type, + txn_op_type=TransactionOperationType.READ_LATEST, **kwargs, ) results = list_results.all_items() return results[0] if results else None +def _exists( + metafile: Metafile, + *args, + **kwargs, +) -> Optional[Metafile]: + list_results = _list( + *args, + metafile=metafile, + txn_op_type=TransactionOperationType.READ_EXISTS, + **kwargs, + ) + results = list_results.all_items() + return True if results else False + + +def _resolve_partition_locator_alias( + namespace: str, + table_name: str, + table_version: Optional[str] = None, + partition_values: Optional[PartitionValues] = None, + partition_scheme_id: Optional[str] = None, + *args, + **kwargs, +) -> PartitionLocatorAlias: + # TODO(pdames): A read shouldn't initiate N transactions that + # read against different catalog snapshots. To resolve this, add + # new "start", "step", and "end" methods to Transaction that + # support starting a txn, defining and executing a txn op, retrieve + # its results, then define and execute the next txn op. When + # stepping through a transaction its txn heartbeat timeout should + # be set manually. + partition_locator = None + if not partition_values: + partition_scheme_id = UNPARTITIONED_SCHEME_ID + elif not partition_scheme_id: + # resolve latest partition scheme from the current + # revision of its `deltacat` stream + stream = get_stream( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + partition_locator = PartitionLocator.of( + stream_locator=stream.locator, + partition_values=partition_values, + partition_id=None, + ) + partition_scheme_id = stream.partition_scheme.id + if not partition_locator: + partition_locator = PartitionLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_id=None, + stream_format=StreamFormat.DELTACAT, + partition_values=partition_values, + partition_id=None, + ) + partition = Partition.of( + locator=partition_locator, + schema=None, + content_types=None, + partition_scheme_id=partition_scheme_id, + ) + return partition.locator_alias + + +def _resolve_latest_active_table_version_id( + namespace: str, + table_name: str, + *args, + **kwargs, +) -> Optional[str]: + table = get_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + return table.latest_active_table_version + + +def _resolve_latest_table_version_id( + namespace: str, + table_name: str, + *args, + **kwargs, +) -> Optional[str]: + table = get_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + return table.latest_table_version + + def list_namespaces(*args, **kwargs) -> ListResult[Namespace]: """ Lists a page of table namespaces. Namespaces are returned as list result items. """ - return _list_metafiles( + return _list( *args, metafile=Namespace.of(NamespaceLocator.of("placeholder")), txn_op_type=TransactionOperationType.READ_SIBLINGS, @@ -152,18 +258,41 @@ def list_tables(namespace: str, *args, **kwargs) -> ListResult[Table]: Lists a page of tables for the given table namespace. Tables are returned as list result items. Raises an error if the given namespace does not exist. """ - raise NotImplementedError("list_tables not implemented") + locator = TableLocator.at(namespace=namespace, table_name="placeholder") + return _list( + *args, + metafile=Table.of(locator=locator), + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) def list_table_versions( - namespace: str, table_name: str, *args, **kwargs + namespace: str, + table_name: str, + *args, + **kwargs, ) -> ListResult[TableVersion]: """ Lists a page of table versions for the given table. Table versions are returned as list result items. Raises an error if the given table does not exist. """ - raise NotImplementedError("list_table_versions not implemented") + locator = TableVersionLocator.at( + namespace=namespace, + table_name=table_name, + table_version="placeholder", + ) + table_version = TableVersion.of( + locator=locator, + schema=None, + ) + return _list( + *args, + metafile=table_version, + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) def list_partitions( @@ -179,14 +308,53 @@ def list_partitions( table version if not specified. Raises an error if the table version does not exist. """ - raise NotImplementedError("list_partitions not implemented") + locator = PartitionLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_id=None, + stream_format=StreamFormat.DELTACAT, + partition_values=["placeholder"], + partition_id="placeholder", + ) + partition = Partition.of( + locator=locator, + schema=None, + content_types=None, + ) + return _list( + *args, + metafile=partition, + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) def list_stream_partitions(stream: Stream, *args, **kwargs) -> ListResult[Partition]: """ Lists all partitions committed to the given stream. """ - raise NotImplementedError("list_stream_partitions not implemented") + if stream.stream_format != StreamFormat.DELTACAT: + raise ValueError( + f"Unsupported stream format: {stream.stream_format}" + f"Expected stream format: {StreamFormat.DELTACAT}" + ) + locator = PartitionLocator.of( + stream_locator=stream.locator, + partition_values=["placeholder"], + partition_id="placeholder", + ) + partition = Partition.of( + locator=locator, + schema=None, + content_types=None, + ) + return _list( + *args, + metafile=partition, + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) def list_deltas( @@ -216,7 +384,43 @@ def list_deltas( default. The manifests can either be optionally retrieved as part of this call or lazily loaded via subsequent calls to `get_delta_manifest`. """ - raise NotImplementedError("list_deltas not implemented") + # TODO(pdames): Delta listing should ideally either use an efficient + # range-limited dir listing of partition children between start and end + # positions, or should traverse using Partition.stream_position (to + # resolve last stream position) and Delta.previous_stream_position + # (down to first stream position). + partition_locator_alias = _resolve_partition_locator_alias( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + partition_values=partition_values, + partition_scheme_id=partition_scheme_id, + **kwargs, + ) + locator = DeltaLocator.of(locator=partition_locator_alias) + delta = Delta.of( + locator=locator, + delta_type=None, + meta=None, + properties=None, + manifest=None, + ) + all_deltas_list_result: ListResult[Delta] = _list( + *args, + metafile=delta, + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) + all_deltas = all_deltas_list_result.all_items() + filtered_deltas = [ + delta + for delta in all_deltas + if first_stream_position <= delta.stream_position <= last_stream_position + ] + if ascending_order: + filtered_deltas.reverse() + return filtered_deltas def list_partition_deltas( @@ -235,7 +439,39 @@ def list_partition_deltas( default. The manifests can either be optionally retrieved as part of this call or lazily loaded via subsequent calls to `get_delta_manifest`. """ - raise NotImplementedError("list_partition_deltas not implemented") + # TODO(pdames): Delta listing should ideally either use an efficient + # range-limited dir listing of partition children between start and end + # positions, or should traverse using Partition.stream_position (to + # resolve last stream position) and Delta.previous_stream_position + # (down to first stream position). + locator = DeltaLocator.of( + partition_locator=partition_like + if isinstance(partition_like, PartitionLocator) + else partition_like.locator, + stream_position=None, + ) + delta = Delta.of( + locator=locator, + delta_type=None, + meta=None, + properties=None, + manifest=None, + ) + all_deltas_list_result: ListResult[Delta] = _list( + *args, + metafile=delta, + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) + all_deltas = all_deltas_list_result.all_items() + filtered_deltas = [ + delta + for delta in all_deltas + if first_stream_position <= delta.stream_position <= last_stream_position + ] + if ascending_order: + filtered_deltas.reverse() + return filtered_deltas def get_delta( @@ -261,7 +497,32 @@ def get_delta( default. The manifest can either be optionally retrieved as part of this call or lazily loaded via a subsequent call to `get_delta_manifest`. """ - raise NotImplementedError("get_delta not implemented") + # TODO(pdames): Honor `include_manifest` param. + partition_locator_alias = _resolve_partition_locator_alias( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + partition_values=partition_values, + partition_scheme_id=partition_scheme_id, + **kwargs, + ) + locator = DeltaLocator.of( + locator=partition_locator_alias, + stream_position=stream_position, + ) + delta = Delta.of( + locator=locator, + delta_type=None, + meta=None, + properties=None, + manifest=None, + ) + return _latest( + *args, + metafile=delta, + **kwargs, + ) def get_latest_delta( @@ -286,7 +547,33 @@ def get_latest_delta( default. The manifest can either be optionally retrieved as part of this call or lazily loaded via a subsequent call to `get_delta_manifest`. """ - raise NotImplementedError("get_latest_delta not implemented") + # TODO(pdames): Wrap this method in 1 single txn. + stream = get_stream( + namespace=namespace, + table_name=table_name, + table_version=table_version, + ) + partition = get_partition( + stream_locator=stream.locator, + partition_values=partition_values, + partition_scheme_id=partition_scheme_id, + ) + locator = DeltaLocator.of( + locator=partition.locator, + stream_position=partition.stream_position, + ) + delta = Delta.of( + locator=locator, + delta_type=None, + meta=None, + properties=None, + manifest=None, + ) + return _latest( + *args, + metafile=delta, + **kwargs, + ) def download_delta( @@ -332,11 +619,13 @@ def download_delta_manifest_entry( def get_delta_manifest( - delta_like: Union[Delta, DeltaLocator], *args, **kwargs + delta_like: Union[Delta, DeltaLocator], + *args, + **kwargs, ) -> Manifest: """ Get the manifest associated with the given delta or delta locator. This - always retrieves the authoritative remote copy of the delta manifest, and + always retrieves the authoritative durable copy of the delta manifest, and never the local manifest defined for any input delta. """ raise NotImplementedError("get_delta_manifest not implemented") @@ -352,12 +641,10 @@ def create_namespace( Creates a table namespace with the given name and properties. Returns the created namespace. """ - catalog = _get_catalog(**kwargs) namespace = Namespace.of( locator=NamespaceLocator.of(namespace=namespace), properties=properties, ) - # given a transaction that creates a single namespace transaction = Transaction.of( txn_type=TransactionType.APPEND, txn_operations=[ @@ -367,6 +654,7 @@ def create_namespace( ) ], ) + catalog = _get_catalog(**kwargs) transaction.commit( catalog_root_dir=catalog.root, filesystem=catalog.filesystem, @@ -376,7 +664,7 @@ def create_namespace( def update_namespace( namespace: str, - properties: NamespaceProperties = None, + properties: Optional[NamespaceProperties] = None, new_namespace: Optional[str] = None, *args, **kwargs, @@ -385,7 +673,31 @@ def update_namespace( Updates a table namespace's name and/or properties. Raises an error if the given namespace does not exist. """ - raise NotImplementedError("update_namespace not implemented") + # TODO(pdames): Wrap get & update within a single txn. + old_namespace = get_namespace( + *args, + namespace=namespace, + **kwargs, + ) + new_namespace: Namespace = Metafile.update_for(old_namespace) + new_namespace.namespace = namespace + new_namespace.properties = properties + transaction = Transaction.of( + txn_type=TransactionType.ALTER, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=new_namespace, + src_metafile=old_namespace, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return namespace def create_table_version( @@ -412,7 +724,107 @@ def create_table_version( Returns the stream for the created table version. Raises an error if the given namespace does not exist. """ - raise NotImplementedError("create_table_version not implemented") + if not namespace_exists( + *args, + namespace=namespace, + **kwargs, + ): + raise ValueError(f"Namespace {namespace} does not exist") + prev_table_version = None + prev_table = get_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + if not prev_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ): + # create a new table as part of this transaction + txn_type = TransactionType.APPEND + table_txn_op_type = TransactionOperationType.CREATE + prev_table = None + new_table = Table.of( + locator=TableLocator.of(namespace=namespace, table_name=table_name), + ) + table_version = table_version or "1" + else: + # update the existing table as part of this transaction + txn_type = TransactionType.ALTER + table_txn_op_type = TransactionOperationType.UPDATE + new_table: Table = Metafile.update_for(prev_table) + prev_table_version = prev_table.latest_table_version + if not table_version: + try: + # if the last table version was an int then increment it by 1 + table_version = str(int(prev_table_version) + 1) + except ValueError: + # if it isn't, set it to a UUID + table_version = str(uuid.uuid4()) + new_table.description = table_description or table_version_description + new_table.properties = table_properties + new_table.latest_table_version = table_version + catalog = _get_catalog(**kwargs) + locator = TableVersionLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + ) + table_version = TableVersion.of( + locator=locator, + schema=schema, + partition_scheme=partition_scheme, + description=table_version_description, + properties=table_version_properties, + content_types=supported_content_types, + sort_scheme=sort_keys, + watermark=None, + lifecycle_state=LifecycleState.UNRELEASED, + schemas=[schema] if schema else None, + partition_schemes=[partition_scheme] if partition_scheme else None, + sort_schemes=[sort_keys] if sort_keys else None, + previous_table_version=prev_table_version, + ) + stream_locator = StreamLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_id=str(uuid.uuid4()), + stream_format=StreamFormat.DELTACAT, + ) + stream = Stream.of( + locator=stream_locator, + partition_scheme=partition_scheme, + state=CommitState.COMMITTED, + previous_stream_id=None, + watermark=None, + ) + transaction = Transaction.of( + txn_type=txn_type, + txn_operations=[ + TransactionOperation.of( + operation_type=table_txn_op_type, + dest_metafile=new_table, + src_metafile=prev_table, + ), + TransactionOperation.of( + operation_type=TransactionOperationType.CREATE, + dest_metafile=table_version, + ), + TransactionOperation.of( + operation_type=TransactionOperationType.CREATE, + dest_metafile=stream, + ), + ], + ) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return stream def update_table( @@ -430,7 +842,89 @@ def update_table( when its first table version was created. Raises an error if the given table does not exist. """ - raise NotImplementedError("update_table not implemented") + old_table = get_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + if not old_table: + raise ValueError(f"Table `{namespace}.{table_name}` does not exist.") + new_table: Table = Metafile.update_for(old_table) + new_table.description = description or old_table.description + new_table.properties = properties or old_table.properties + new_table.table_name = new_table_name or old_table.table_name + transaction = Transaction.of( + txn_type=TransactionType.ALTER, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=new_table, + src_metafile=old_table, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + + +def stage_stream( + namespace: str, + table_name: str, + table_version: Optional[str] = None, + *args, + **kwargs, +) -> Stream: + """ + Stages a new delta stream for the given table version. Resolves to the + latest active table version if no table version is given. Returns the + staged stream. Raises an error if the table version does not exist. + """ + # TODO(pdames): Support retrieving previously staged streams by ID. + if not table_version: + table_version = _resolve_latest_active_table_version_id( + namespace=namespace, + table_name=table_name, + ) + table_version_meta = get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + locator = StreamLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_id=str(uuid.uuid4()), + stream_format=None, # stream format isn't set until stream commit + ) + stream = Stream.of( + locator=locator, + partition_scheme=table_version_meta.partition_scheme, + state=CommitState.STAGED, + previous_stream_id=None, + watermark=None, + ) + transaction = Transaction.of( + txn_type=TransactionType.APPEND, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.CREATE, + dest_metafile=stream, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return stream def update_table_version( @@ -441,6 +935,8 @@ def update_table_version( schema: Optional[Schema] = None, description: Optional[str] = None, properties: Optional[TableVersionProperties] = None, + partition_scheme: Optional[PartitionScheme] = None, + sort_keys: Optional[SortScheme] = None, *args, **kwargs, ) -> None: @@ -453,7 +949,99 @@ def update_table_version( specify a different table version). Raises an error if the given table version does not exist. """ - raise NotImplementedError("update_table_version not implemented") + # TODO(pdames): Wrap get & update within a single txn. + old_table_version = get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + if not old_table_version: + raise ValueError( + f"Table version `{table_version}` does not exist for " + f"table `{namespace}.{table_name}`." + ) + new_table_version: TableVersion = Metafile.update_for(old_table_version) + new_table_version.state = lifecycle_state or old_table_version.state + # TODO(pdames): Use schema patch to check for backwards incompatible changes. + new_table_version.schema = schema or old_table_version.schema + new_table_version.schemas = ( + old_table_version.schemas + [schema] if schema else old_table_version.schemas + ) + new_table_version.description = description or old_table_version.description + new_table_version.properties = properties or old_table_version.properties + new_table_version.partition_scheme = ( + partition_scheme or old_table_version.partition_scheme + ) + new_table_version.partition_schemes = ( + old_table_version.partition_schemes + [partition_scheme] + if partition_scheme + else old_table_version.partition_schemes + ) + new_table_version.sort_scheme = sort_keys or old_table_version.sort_scheme + new_table_version.sort_schemes = ( + old_table_version.sort_schemes + [sort_keys] + if sort_keys + else old_table_version.sort_schemes + ) + old_table = get_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + txn_operations = [] + if old_table.latest_table_version == new_table_version.table_version: + if ( + old_table_version.state != LifecycleState.ACTIVE + and new_table_version.state == LifecycleState.ACTIVE + ): + # update the table's latest table version + new_table: Table = Metafile.update_for(old_table) + new_table.latest_active_table_version = table_version + txn_operations.append( + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=new_table, + src_metafile=old_table, + ) + ) + txn_operations.append( + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=new_table_version, + src_metafile=old_table_version, + ), + ) + # TODO(pdames): Push changes down to non-deltacat streams via sync module. + # Also copy sort scheme changes down to deltacat child stream? + if partition_scheme: + old_stream = get_stream( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + new_stream: Stream = Metafile.update_for(old_stream) + new_stream.partition_scheme = partition_scheme + txn_operations.append( + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=new_stream, + src_metafile=old_stream, + ) + ) + transaction = Transaction.of( + txn_type=TransactionType.ALTER, + txn_operations=txn_operations, + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) def stage_stream( @@ -468,50 +1056,193 @@ def stage_stream( latest active table version if no table version is given. Returns the staged stream. Raises an error if the table version does not exist. """ - raise NotImplementedError("stage_stream not implemented") + # TODO(pdames): Support retrieving previously staged streams by ID. + if not table_version: + table_version = _resolve_latest_active_table_version_id( + namespace=namespace, + table_name=table_name, + ) + table_version_meta = get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + locator = StreamLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_id=str(uuid.uuid4()), + stream_format=None, # stream format isn't set until stream commit + ) + stream = Stream.of( + locator=locator, + partition_scheme=table_version_meta.partition_scheme, + state=CommitState.STAGED, + previous_stream_id=None, + watermark=None, + ) + transaction = Transaction.of( + txn_type=TransactionType.APPEND, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.CREATE, + dest_metafile=stream, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return stream -def commit_stream(stream: Stream, *args, **kwargs) -> Stream: +def commit_stream( + stream: Stream, + *args, + **kwargs, +) -> Stream: """ Registers a delta stream with a target table version, replacing any - previous stream registered for the same table version. Returns the - committed stream. - """ - raise NotImplementedError("commit_stream not implemented") + previous stream registered for the same table version. If the stream + format is not set prior to commit, then it is defaulted to the DeltaCAT + stream format. Returns the committed stream. + """ + stream: Stream = Metafile.update_for(stream) + if not stream.locator.stream_id: + stream.locator.stream_id = str(uuid.uuid4()) + if not stream.stream_format: + stream.stream_format = StreamFormat.DELTACAT + stream.state = CommitState.COMMITTED + prev_stream = get_stream( + *args, + namespace=stream.namespace, + table_name=stream.table_name, + table_version=stream.table_version, + stream_format=stream.stream_format, + **kwargs, + ) + if prev_stream: + if prev_stream.stream_id == stream.stream_id: + raise ValueError( + f"Cannot replace stream with duplicate ID: {stream.stream_id}." + ) + stream.previous_stream_id = prev_stream.stream_id + txn_type = TransactionType.OVERWRITE + else: + txn_type = TransactionType.APPEND + + transaction = Transaction.of( + txn_type=txn_type, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=stream, + src_metafile=prev_stream, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return stream def delete_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> None: """ Deletes the delta stream currently registered with the given table version. Resolves to the latest active table version if no table version is given. + Resolves to the deltacat stream format if no stream format is given. Raises an error if the table version does not exist. """ - raise NotImplementedError("delete_stream not implemented") + if not table_version: + table_version = _resolve_latest_active_table_version_id( + namespace=namespace, + table_name=table_name, + ) + stream_to_delete = get_stream( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_format=stream_format, + **kwargs, + ) + if not stream_to_delete: + raise ValueError( + f"Stream to delete not found: {namespace}.{table_name}" + f".{table_version}.{stream_format}." + ) + else: + stream_to_delete.state = CommitState.DEPRECATED + transaction = Transaction.of( + txn_type=TransactionType.DELETE, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.DELETE, + src_metafile=stream_to_delete, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) def get_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> Optional[Stream]: """ - Gets the most recently committed stream for the given table version and - partition key values. Resolves to the latest active table version if no - table version is given. Returns None if the table version does not exist. + Gets the most recently committed stream for the given table version. + Resolves to the latest active table version if no table version is given. + Resolves to the deltacat stream format if no stream format is given. + Returns None if the table version or stream format does not exist. """ - raise NotImplementedError("get_stream not implemented") + if not table_version: + table_version = _resolve_latest_active_table_version_id( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + locator = StreamLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_id=None, + stream_format=stream_format, + ) + return _latest( + *args, + metafile=Stream.of(locator=locator, partition_scheme=None), + **kwargs, + ) def stage_partition( - stream: Stream, partition_values: Optional[PartitionValues] = None, *args, **kwargs + stream: Stream, + partition_values: Optional[PartitionValues] = None, + *args, + **kwargs, ) -> Partition: """ Stages a new partition for the given stream and partition values. Returns @@ -567,6 +1298,7 @@ def delete_partition( def get_partition( stream_locator: StreamLocator, partition_values: Optional[PartitionValues] = None, + partition_scheme_id: Optional[str] = None, *args, **kwargs, ) -> Optional[Partition]: @@ -574,9 +1306,35 @@ def get_partition( Gets the most recently committed partition for the given stream locator and partition key values. Returns None if no partition has been committed for the given table version and/or partition key values. Partition values - should not be specified for unpartitioned tables. + should not be specified for unpartitioned tables. Partition scheme ID + resolves to the table version's current partition scheme by default. """ - raise NotImplementedError("get_partition not implemented") + locator = PartitionLocator.of( + stream_locator=stream_locator, + partition_values=partition_values, + partition_id=None, + ) + if not partition_scheme_id: + # resolve latest partition scheme from the current + # revision of its `deltacat` stream + stream = get_stream( + *args, + namespace=stream_locator.namespace, + table_name=stream_locator.table_name, + table_version=stream_locator.table_version, + **kwargs, + ) + partition_scheme_id = stream.partition_scheme.id + return _latest( + *args, + metafile=Partition.of( + locator=locator, + schema=None, + content_types=None, + partition_scheme_id=partition_scheme_id, + ), + **kwargs, + ) def stage_delta( @@ -624,10 +1382,9 @@ def get_namespace(namespace: str, *args, **kwargs) -> Optional[Namespace]: Gets table namespace metadata for the specified table namespace. Returns None if the given namespace does not exist. """ - return _read_latest_metafile( + return _latest( *args, metafile=Namespace.of(NamespaceLocator.of(namespace)), - txn_op_type=TransactionOperationType.READ_LATEST, **kwargs, ) @@ -636,14 +1393,10 @@ def namespace_exists(namespace: str, *args, **kwargs) -> bool: """ Returns True if the given table namespace exists, False if not. """ - return ( - _read_latest_metafile( - *args, - metafile=Namespace.of(NamespaceLocator.of(namespace)), - txn_op_type=TransactionOperationType.READ_EXISTS, - **kwargs, - ) - is not None + return _exists( + *args, + metafile=Namespace.of(NamespaceLocator.of(namespace)), + **kwargs, ) @@ -652,24 +1405,51 @@ def get_table(namespace: str, table_name: str, *args, **kwargs) -> Optional[Tabl Gets table metadata for the specified table. Returns None if the given table does not exist. """ - raise NotImplementedError("get_table not implemented") + locator = TableLocator.at(namespace=namespace, table_name=table_name) + return _latest( + *args, + metafile=Table.of(locator=locator), + **kwargs, + ) def table_exists(namespace: str, table_name: str, *args, **kwargs) -> bool: """ Returns True if the given table exists, False if not. """ - raise NotImplementedError("table_exists not implemented") + locator = TableLocator.at(namespace=namespace, table_name=table_name) + return _exists( + *args, + metafile=Table.of(locator=locator), + **kwargs, + ) def get_table_version( - namespace: str, table_name: str, table_version: str, *args, **kwargs + namespace: str, + table_name: str, + table_version: str, + *args, + **kwargs, ) -> Optional[TableVersion]: """ Gets table version metadata for the specified table version. Returns None if the given table version does not exist. """ - raise NotImplementedError("get_table_version not implemented") + locator = TableVersionLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + ) + table_version = TableVersion.of( + locator=locator, + schema=None, + ) + return _latest( + *args, + metafile=table_version, + **kwargs, + ) def get_latest_table_version( @@ -679,7 +1459,24 @@ def get_latest_table_version( Gets table version metadata for the latest version of the specified table. Returns None if no table version exists for the given table. """ - raise NotImplementedError("get_latest_table_version not implemented") + table_version_id = _resolve_latest_table_version_id( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + + return ( + get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version_id, + **kwargs, + ) + if table_version_id + else None + ) def get_latest_active_table_version( @@ -689,7 +1486,20 @@ def get_latest_active_table_version( Gets table version metadata for the latest active version of the specified table. Returns None if no active table version exists for the given table. """ - raise NotImplementedError("get_latest_active_table_version not implemented") + table_version_id = _resolve_latest_active_table_version_id( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + if table_version_id: + return get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version_id, + **kwargs, + ) def get_table_version_column_names( @@ -707,7 +1517,12 @@ def get_table_version_column_names( Returns None for schemaless tables. Raises an error if the table version does not exist. """ - raise NotImplementedError("get_table_version_column_names not implemented") + schema = get_table_version_schema( + namespace=namespace, + table_name=table_name, + table_version=table_version, + ) + return schema.arrow.names if schema else None def get_table_version_schema( @@ -722,7 +1537,23 @@ def get_table_version_schema( table version if none is specified. Returns None if the table version is schemaless. Raises an error if the table version does not exist. """ - raise NotImplementedError("get_table_version_schema not implemented") + table_version = ( + get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + if table_version + else get_latest_active_table_version( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + ) + return table_version.schema def table_version_exists( @@ -731,7 +1562,20 @@ def table_version_exists( """ Returns True if the given table version exists, False if not. """ - raise NotImplementedError("table_version_exists not implemented") + locator = TableVersionLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + ) + table_version = TableVersion.of( + locator=locator, + schema=None, + ) + return _exists( + *args, + metafile=table_version, + **kwargs, + ) def can_categorize(e: BaseException, *args, **kwargs) -> bool: diff --git a/deltacat/storage/model/table.py b/deltacat/storage/model/table.py index 37e0b225..27168aa9 100644 --- a/deltacat/storage/model/table.py +++ b/deltacat/storage/model/table.py @@ -29,12 +29,16 @@ def of( locator: Optional[TableLocator], description: Optional[str] = None, properties: Optional[TableProperties] = None, + latest_active_table_version: Optional[str] = None, + latest_table_version: Optional[str] = None, native_object: Optional[Any] = None, ) -> Table: table = Table() table.locator = locator table.description = description table.properties = properties + table.latest_active_table_version = latest_active_table_version + table.latest_table_version = latest_table_version table.native_object = native_object return table @@ -65,6 +69,28 @@ def properties(self) -> Optional[TableProperties]: def properties(self, properties: Optional[TableProperties]) -> None: self["properties"] = properties + @property + def latest_active_table_version(self) -> Optional[str]: + return self.get("latest_active_table_version") + + @latest_active_table_version.setter + def latest_active_table_version( + self, + latest_active_table_version: Optional[str], + ) -> None: + self["latest_active_table_version"] = latest_active_table_version + + @property + def latest_table_version(self) -> Optional[str]: + return self.get("latest_table_version") + + @latest_table_version.setter + def latest_table_version( + self, + latest_table_version: Optional[str], + ) -> None: + self["latest_table_version"] = latest_table_version + @property def native_object(self) -> Optional[Any]: return self.get("nativeObject") diff --git a/deltacat/storage/model/table_version.py b/deltacat/storage/model/table_version.py index 1f8901a3..b3dae804 100644 --- a/deltacat/storage/model/table_version.py +++ b/deltacat/storage/model/table_version.py @@ -43,6 +43,7 @@ def of( schemas: Optional[SchemaList] = None, partition_schemes: Optional[partition.PartitionSchemeList] = None, sort_schemes: Optional[SortSchemeList] = None, + previous_table_version: Optional[str] = None, native_object: Optional[Any] = None, ) -> TableVersion: table_version = TableVersion() @@ -58,6 +59,7 @@ def of( table_version.schemas = schemas table_version.partition_schemes = partition_schemes table_version.sort_schemes = sort_schemes + table_version.previous_table_version = previous_table_version table_version.native_object = native_object return table_version @@ -167,6 +169,14 @@ def description(self) -> Optional[str]: def description(self, description: Optional[str]) -> None: self["description"] = description + @property + def previous_table_version(self) -> Optional[str]: + return self.get("previous_table_version") + + @previous_table_version.setter + def previous_table_version(self, previous_table_version: Optional[str]) -> None: + self["previous_table_version"] = previous_table_version + @property def properties(self) -> Optional[TableVersionProperties]: return self.get("properties") From 716a7be13dc4873f57642121ed01697b0864071d Mon Sep 17 00:00:00 2001 From: Patrick Ames Date: Fri, 7 Feb 2025 08:38:24 -0800 Subject: [PATCH 4/8] [WIP] DeltaCAT Native Storage Implementation --- deltacat/storage/interface.py | 48 ++-- deltacat/storage/main/impl.py | 349 +++++++++++++++++++++------- deltacat/storage/model/partition.py | 24 +- deltacat/storage/model/stream.py | 18 +- 4 files changed, 325 insertions(+), 114 deletions(-) diff --git a/deltacat/storage/interface.py b/deltacat/storage/interface.py index f8f6e12e..4c5674c7 100644 --- a/deltacat/storage/interface.py +++ b/deltacat/storage/interface.py @@ -343,23 +343,31 @@ def stage_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> Stream: """ Stages a new delta stream for the given table version. Resolves to the - latest active table version if no table version is given. Returns the - staged stream. Raises an error if the table version does not exist. + latest active table version if no table version is given. Resolves to the + DeltaCAT stream format if no stream format is given. If this stream + will replace another stream with the same format and scheme, then it will + have its previous stream ID set to the ID of the stream being replaced. + Returns the staged stream. Raises an error if the table version does not + exist. """ raise NotImplementedError("stage_stream not implemented") -def commit_stream(stream: Stream, *args, **kwargs) -> Stream: +def commit_stream( + stream: Stream, + *args, + **kwargs, +) -> Stream: """ Registers a delta stream with a target table version, replacing any - previous stream registered for the same table version. If the stream - format is not set prior to commit, then it is defaulted to the DeltaCAT - stream format. Returns the committed stream. + previous stream registered for the same table version. Returns the + committed stream. """ raise NotImplementedError("commit_stream not implemented") @@ -436,22 +444,34 @@ def commit_partition( def delete_partition( - namespace: str, - table_name: str, - table_version: Optional[str] = None, + stream_locator: StreamLocator, partition_values: Optional[PartitionValues] = None, + partition_scheme_id: Optional[str] = None, *args, **kwargs, ) -> None: """ - Deletes the given partition from the specified table version. Resolves to - the latest active table version if no table version is given. Partition + Deletes the given partition from the specified stream. Partition values should not be specified for unpartitioned tables. Raises an error - if the table version or partition does not exist. + if the partition does not exist. """ raise NotImplementedError("delete_partition not implemented") +def get_staged_partition( + stream_locator: StreamLocator, + partition_id: str, + *args, + **kwargs, +) -> Optional[Partition]: + """ + Gets the staged partition for the given stream locator and partition ID. + Returns None if the partition does not exist. Raises an error if the + given stream locator does not exist. + """ + raise NotImplementedError("get_staged_partition not implemented") + + def get_partition( stream_locator: StreamLocator, partition_values: Optional[PartitionValues] = None, @@ -462,7 +482,9 @@ def get_partition( Gets the most recently committed partition for the given stream locator and partition key values. Returns None if no partition has been committed for the given table version and/or partition key values. Partition values - should not be specified for unpartitioned tables. + should not be specified for unpartitioned tables. Partition scheme ID + resolves to the table version's current partition scheme by default. + Raises an error if the given stream locator does not exist. """ raise NotImplementedError("get_partition not implemented") diff --git a/deltacat/storage/main/impl.py b/deltacat/storage/main/impl.py index 4bf19bf3..1ce21498 100644 --- a/deltacat/storage/main/impl.py +++ b/deltacat/storage/main/impl.py @@ -185,6 +185,12 @@ def _resolve_partition_locator_alias( table_version=table_version, **kwargs, ) + if not stream: + raise ValueError( + f"Failed to resolve latest partition scheme for " + f"`{namespace}.{table_name}` at table version " + f"`{table_version or 'latest'}` (no stream found)." + ) partition_locator = PartitionLocator.of( stream_locator=stream.locator, partition_values=partition_values, @@ -213,6 +219,7 @@ def _resolve_partition_locator_alias( def _resolve_latest_active_table_version_id( namespace: str, table_name: str, + fail_if_no_active_table_version: True, *args, **kwargs, ) -> Optional[str]: @@ -222,12 +229,17 @@ def _resolve_latest_active_table_version_id( table_name=table_name, **kwargs, ) + if not table: + raise ValueError(f"Table does not exist: {namespace}.{table_name}") + if fail_if_no_active_table_version and not table.latest_active_table_version: + raise ValueError(f"Table has no active table version: {namespace}.{table_name}") return table.latest_active_table_version def _resolve_latest_table_version_id( namespace: str, table_name: str, + fail_if_no_active_table_version: True, *args, **kwargs, ) -> Optional[str]: @@ -237,6 +249,10 @@ def _resolve_latest_table_version_id( table_name=table_name, **kwargs, ) + if not table: + raise ValueError(f"Table does not exist: {namespace}.{table_name}") + if fail_if_no_active_table_version and not table.latest_table_version: + raise ValueError(f"Table has no table version: {namespace}.{table_name}") return table.latest_table_version @@ -730,6 +746,7 @@ def create_table_version( **kwargs, ): raise ValueError(f"Namespace {namespace} does not exist") + # check if a parent table and/or previous table version already exist prev_table_version = None prev_table = get_table( *args, @@ -743,7 +760,7 @@ def create_table_version( table_name=table_name, **kwargs, ): - # create a new table as part of this transaction + # no parent table exists, so we'll create it in this transaction txn_type = TransactionType.APPEND table_txn_op_type = TransactionOperationType.CREATE prev_table = None @@ -752,7 +769,7 @@ def create_table_version( ) table_version = table_version or "1" else: - # update the existing table as part of this transaction + # the parent table exists, so we'll update it in this transaction txn_type = TransactionType.ALTER table_txn_op_type = TransactionOperationType.UPDATE new_table: Table = Metafile.update_for(prev_table) @@ -788,10 +805,9 @@ def create_table_version( sort_schemes=[sort_keys] if sort_keys else None, previous_table_version=prev_table_version, ) - stream_locator = StreamLocator.at( - namespace=namespace, - table_name=table_name, - table_version=table_version, + # create the table version's default deltacat stream in this transaction + stream_locator = StreamLocator.of( + table_version_locator=locator, stream_id=str(uuid.uuid4()), stream_format=StreamFormat.DELTACAT, ) @@ -871,62 +887,6 @@ def update_table( ) -def stage_stream( - namespace: str, - table_name: str, - table_version: Optional[str] = None, - *args, - **kwargs, -) -> Stream: - """ - Stages a new delta stream for the given table version. Resolves to the - latest active table version if no table version is given. Returns the - staged stream. Raises an error if the table version does not exist. - """ - # TODO(pdames): Support retrieving previously staged streams by ID. - if not table_version: - table_version = _resolve_latest_active_table_version_id( - namespace=namespace, - table_name=table_name, - ) - table_version_meta = get_table_version( - *args, - namespace=namespace, - table_name=table_name, - table_version=table_version, - **kwargs, - ) - locator = StreamLocator.at( - namespace=namespace, - table_name=table_name, - table_version=table_version, - stream_id=str(uuid.uuid4()), - stream_format=None, # stream format isn't set until stream commit - ) - stream = Stream.of( - locator=locator, - partition_scheme=table_version_meta.partition_scheme, - state=CommitState.STAGED, - previous_stream_id=None, - watermark=None, - ) - transaction = Transaction.of( - txn_type=TransactionType.APPEND, - txn_operations=[ - TransactionOperation.of( - operation_type=TransactionOperationType.CREATE, - dest_metafile=stream, - ) - ], - ) - catalog = _get_catalog(**kwargs) - transaction.commit( - catalog_root_dir=catalog.root, - filesystem=catalog.filesystem, - ) - return stream - - def update_table_version( namespace: str, table_name: str, @@ -974,11 +934,23 @@ def update_table_version( new_table_version.partition_scheme = ( partition_scheme or old_table_version.partition_scheme ) + if partition_scheme and partition_scheme.id in [ + ps.id for ps in old_table_version.partition_schemes + ]: + raise ValueError( + f"Partition scheme ID `{partition_scheme.id}` already exists in " + f"table version `{table_version}`." + ) new_table_version.partition_schemes = ( old_table_version.partition_schemes + [partition_scheme] if partition_scheme else old_table_version.partition_schemes ) + if sort_keys and sort_keys.id in [sk.id for sk in old_table_version.sort_schemes]: + raise ValueError( + f"Sort scheme ID `{sort_keys.id}` already exists in " + f"table version `{table_version}`." + ) new_table_version.sort_scheme = sort_keys or old_table_version.sort_scheme new_table_version.sort_schemes = ( old_table_version.sort_schemes + [sort_keys] @@ -1048,13 +1020,18 @@ def stage_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> Stream: """ Stages a new delta stream for the given table version. Resolves to the - latest active table version if no table version is given. Returns the - staged stream. Raises an error if the table version does not exist. + latest active table version if no table version is given. Resolves to the + DeltaCAT stream format if no stream format is given. If this stream + will replace another stream with the same format and scheme, then it will + have its previous stream ID set to the ID of the stream being replaced. + Returns the staged stream. Raises an error if the table version does not + exist. """ # TODO(pdames): Support retrieving previously staged streams by ID. if not table_version: @@ -1074,7 +1051,7 @@ def stage_stream( table_name=table_name, table_version=table_version, stream_id=str(uuid.uuid4()), - stream_format=None, # stream format isn't set until stream commit + stream_format=stream_format, ) stream = Stream.of( locator=locator, @@ -1083,6 +1060,20 @@ def stage_stream( previous_stream_id=None, watermark=None, ) + prev_stream = get_stream( + *args, + namespace=stream.namespace, + table_name=stream.table_name, + table_version=stream.table_version, + stream_format=stream.stream_format, + **kwargs, + ) + if prev_stream: + if prev_stream.stream_id == stream.stream_id: + raise ValueError( + f"Stream to stage has the same ID as existing stream: {prev_stream}." + ) + stream.previous_stream_id = prev_stream.stream_id transaction = Transaction.of( txn_type=TransactionType.APPEND, txn_operations=[ @@ -1107,9 +1098,8 @@ def commit_stream( ) -> Stream: """ Registers a delta stream with a target table version, replacing any - previous stream registered for the same table version. If the stream - format is not set prior to commit, then it is defaulted to the DeltaCAT - stream format. Returns the committed stream. + previous stream registered for the same table version. Returns the + committed stream. """ stream: Stream = Metafile.update_for(stream) if not stream.locator.stream_id: @@ -1126,11 +1116,16 @@ def commit_stream( **kwargs, ) if prev_stream: + if prev_stream.stream_id != stream.previous_stream_id: + raise ValueError( + f"Previous stream ID mismatch Expected " + f"{stream.previous_stream_id} but found " + f"{prev_stream.stream_id}." + ) if prev_stream.stream_id == stream.stream_id: raise ValueError( - f"Cannot replace stream with duplicate ID: {stream.stream_id}." + f"Stream to commit has the same ID as existing stream: {prev_stream}." ) - stream.previous_stream_id = prev_stream.stream_id txn_type = TransactionType.OVERWRITE else: txn_type = TransactionType.APPEND @@ -1214,7 +1209,7 @@ def get_stream( """ Gets the most recently committed stream for the given table version. Resolves to the latest active table version if no table version is given. - Resolves to the deltacat stream format if no stream format is given. + Resolves to the DeltaCAT stream format if no stream format is given. Returns None if the table version or stream format does not exist. """ if not table_version: @@ -1222,6 +1217,7 @@ def get_stream( *args, namespace=namespace, table_name=table_name, + fail_if_no_active_table_version=False, **kwargs, ) locator = StreamLocator.at( @@ -1241,6 +1237,7 @@ def get_stream( def stage_partition( stream: Stream, partition_values: Optional[PartitionValues] = None, + partition_scheme_id: Optional[str] = None, *args, **kwargs, ) -> Partition: @@ -1254,7 +1251,79 @@ def stage_partition( The partition_values must represent the results of transforms in a partition spec specified in the stream. """ - raise NotImplementedError("stage_partition not implemented") + # TODO(pdames): Cache last retrieved metafile revisions in memory to resolve + # potentially high cost of staging many partitions. + table_version = get_table_version( + *args, + namespace=stream.namespace, + table_name=stream.table_name, + table_version=stream.table_version, + **kwargs, + ) + if not table_version: + raise ValueError( + f"Table version not found: {stream.namespace}.{stream.table_name}." + f"{stream.table_version}." + ) + if not table_version.partition_schemes or partition_scheme_id not in [ + ps.id for ps in table_version.partition_schemes + ]: + raise ValueError( + f"Invalid partition scheme ID `{partition_scheme_id}` (not found " + f"in parent table version `{stream.namespace}.{stream.table_name}" + f".{table_version.table_version}` partition scheme IDs)." + ) + if stream.partition_scheme.id not in table_version.partition_schemes: + # this should never happen, but just in case + raise ValueError( + f"Invalid stream partition scheme ID `{stream.partition_scheme.id}`" + f"in parent table version `{stream.namespace}.{stream.table_name}" + f".{table_version.table_version}` partition scheme IDs)." + ) + locator = PartitionLocator.of( + stream_locator=stream.locator, + partition_values=partition_values, + partition_id=str(uuid.uuid4()), + ) + partition = Partition.of( + locator=locator, + schema=table_version.schema, + content_types=table_version.content_types, + state=CommitState.STAGED, + previous_stream_position=None, + partition_values=partition_values, + previous_partition_id=None, + stream_position=None, + partition_scheme_id=partition_scheme_id, + ) + prev_partition = get_partition( + *args, + stream_locator=stream.locator, + partition_values=partition_values, + partition_scheme_id=partition_scheme_id, + **kwargs, + ) + if prev_partition: + if prev_partition.partition_id == partition.partition_id: + raise ValueError( + f"Partition to stage has the same ID as existing partition: {prev_partition}." + ) + partition.previous_partition_id = prev_partition.partition_id + transaction = Transaction.of( + txn_type=TransactionType.APPEND, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.CREATE, + dest_metafile=partition, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return partition def commit_partition( @@ -1265,9 +1334,14 @@ def commit_partition( ) -> Partition: """ Commits the given partition to its associated table version stream, - replacing any previous partition (i.e., "partition being replaced") registered for the same stream and + replacing any previous partition registered for the same stream and partition values. - If the previous_partition is passed as an argument, the specified previous_partition will be the partition being replaced, otherwise it will be retrieved. + + If previous partition is given then it will be replaced with its deltas + prepended to the new partition being committed. Otherwise the latest + committed partition with the same keys and partition scheme ID will be + retrieved. + Returns the registered partition. If the partition's previous delta stream position is specified, then the commit will be rejected if it does not match the actual previous stream position of @@ -1275,24 +1349,123 @@ def commit_partition( specified, then the commit will be rejected if it does not match the actual ID of the partition being replaced. """ - raise NotImplementedError("commit_partition not implemented") + if previous_partition: + raise NotImplementedError( + f"delta prepending from previous partition {previous_partition} " + f"is not yet implemented" + ) + partition: Partition = Metafile.update_for(partition) + if not partition.locator.partition_id: + partition.locator.partition_id = str(uuid.uuid4()) + partition.state = CommitState.COMMITTED + prev_partition = get_partition( + *args, + stream_locator=partition.stream_locator, + partition_value=partition.partition_values, + partition_scheme_id=partition.partition_scheme_id, + **kwargs, + ) + if prev_partition: + if prev_partition.partition_id != partition.previous_partition_id: + raise ValueError( + f"Previous partition ID mismatch Expected " + f"{partition.previous_partition_id} but found " + f"{prev_partition.partition_id}." + ) + # TODO(pdames): Add previous partition stream position validation. + if prev_partition.partition_id == partition.partition_id: + raise ValueError( + f"Partition to commit has the same ID as existing partition: {prev_partition}." + ) + txn_type = TransactionType.OVERWRITE + else: + txn_type = TransactionType.APPEND + + transaction = Transaction.of( + txn_type=txn_type, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=partition, + src_metafile=prev_partition, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return partition def delete_partition( - namespace: str, - table_name: str, - table_version: Optional[str] = None, + stream_locator: StreamLocator, partition_values: Optional[PartitionValues] = None, + partition_scheme_id: Optional[str] = None, *args, **kwargs, ) -> None: """ - Deletes the given partition from the specified table version. Resolves to - the latest active table version if no table version is given. Partition + Deletes the given partition from the specified stream. Partition values should not be specified for unpartitioned tables. Raises an error - if the table version or partition does not exist. + if the partition does not exist. """ - raise NotImplementedError("delete_partition not implemented") + partition_to_delete = get_partition( + *args, + stream_locator=stream_locator, + partition_values=partition_values, + partition_scheme_id=partition_scheme_id, + **kwargs, + ) + if not partition_to_delete: + raise ValueError( + f"Partition with values {partition_values} and scheme " + f"{partition_scheme_id} not found in stream: {stream_locator}" + ) + else: + partition_to_delete.state = CommitState.DEPRECATED + transaction = Transaction.of( + txn_type=TransactionType.DELETE, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.DELETE, + src_metafile=partition_to_delete, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + + +def get_staged_partition( + stream_locator: StreamLocator, + partition_id: str, + *args, + **kwargs, +) -> Optional[Partition]: + """ + Gets the staged partition for the given stream locator and partition ID. + Returns None if the partition does not exist. Raises an error if the + given stream locator does not exist. + """ + locator = PartitionLocator.of( + stream_locator=stream_locator, + partition_values=None, + partition_id=partition_id, + ) + return _latest( + *args, + metafile=Partition.of( + locator=locator, + schema=None, + content_types=None, + ), + **kwargs, + ) def get_partition( @@ -1308,6 +1481,7 @@ def get_partition( the given table version and/or partition key values. Partition values should not be specified for unpartitioned tables. Partition scheme ID resolves to the table version's current partition scheme by default. + Raises an error if the given stream locator does not exist. """ locator = PartitionLocator.of( stream_locator=stream_locator, @@ -1324,6 +1498,8 @@ def get_partition( table_version=stream_locator.table_version, **kwargs, ) + if not stream: + raise ValueError(f"Stream {stream_locator} not found.") partition_scheme_id = stream.partition_scheme.id return _latest( *args, @@ -1463,6 +1639,7 @@ def get_latest_table_version( *args, namespace=namespace, table_name=table_name, + fail_if_no_active_table_version=False, **kwargs, ) @@ -1490,16 +1667,20 @@ def get_latest_active_table_version( *args, namespace=namespace, table_name=table_name, + fail_if_no_active_table_version=False, **kwargs, ) - if table_version_id: - return get_table_version( + return ( + get_table_version( *args, namespace=namespace, table_name=table_name, table_version=table_version_id, **kwargs, ) + if table_version_id + else None + ) def get_table_version_column_names( diff --git a/deltacat/storage/model/partition.py b/deltacat/storage/model/partition.py index 29627c24..edcd26ef 100644 --- a/deltacat/storage/model/partition.py +++ b/deltacat/storage/model/partition.py @@ -579,16 +579,20 @@ def parts(self) -> List[str]: class PartitionLocatorAlias(Locator, dict): @staticmethod def of(parent_partition: Partition): - return PartitionLocatorAlias( - { - "partition_values": parent_partition.partition_values, - "partition_scheme_id": parent_partition.partition_scheme_id, - "parent": ( - parent_partition.locator.parent - if parent_partition.locator - else None - ), - } + return ( + PartitionLocatorAlias( + { + "partition_values": parent_partition.partition_values, + "partition_scheme_id": parent_partition.partition_scheme_id, + "parent": ( + parent_partition.locator.parent + if parent_partition.locator + else None + ), + } + ) + if parent_partition.state == CommitState.COMMITTED + else None # only committed partitions can be resolved by alias ) @property diff --git a/deltacat/storage/model/stream.py b/deltacat/storage/model/stream.py index 2208a191..77ce7ec9 100644 --- a/deltacat/storage/model/stream.py +++ b/deltacat/storage/model/stream.py @@ -373,13 +373,17 @@ class StreamLocatorAlias(Locator, dict): def of( parent_stream: Stream, ) -> StreamLocatorAlias: - return StreamLocatorAlias( - { - "format": parent_stream.stream_format, - "parent": ( - parent_stream.locator.parent if parent_stream.locator else None - ), - } + return ( + StreamLocatorAlias( + { + "format": parent_stream.stream_format, + "parent": ( + parent_stream.locator.parent if parent_stream.locator else None + ), + } + ) + if parent_stream.state == CommitState.COMMITTED + else None # only committed streams can be resolved by alias ) @property From 60f26cadf4dc5ec7c46ce1c4faed85248e321fac Mon Sep 17 00:00:00 2001 From: Patrick Ames Date: Mon, 10 Feb 2025 22:19:30 -0800 Subject: [PATCH 5/8] Integrate passing test suite from #484 --- deltacat/storage/interface.py | 27 +- deltacat/storage/main/impl.py | 237 ++++++++++--- deltacat/storage/model/list_result.py | 8 + deltacat/storage/model/locator.py | 6 + deltacat/storage/model/metafile.py | 97 ++++-- deltacat/storage/model/partition.py | 4 +- deltacat/storage/model/table_version.py | 13 +- .../tests/storage/main/test_main_storage.py | 313 ++++++++++++++++-- 8 files changed, 596 insertions(+), 109 deletions(-) diff --git a/deltacat/storage/interface.py b/deltacat/storage/interface.py index 4c5674c7..8add5afe 100644 --- a/deltacat/storage/interface.py +++ b/deltacat/storage/interface.py @@ -23,10 +23,10 @@ Stream, StreamFormat, StreamLocator, - StreamLocatorAlias, Table, TableProperties, TableVersion, + TableVersionLocator, TableVersionProperties, ) from deltacat.storage.model.manifest import Manifest @@ -389,6 +389,20 @@ def delete_stream( raise NotImplementedError("delete_stream not implemented") +def get_staged_stream( + table_version_locator: TableVersionLocator, + stream_id: str, + *args, + **kwargs, +) -> Optional[Partition]: + """ + Gets the staged stream for the given table version locator and stream ID. + Returns None if the stream does not exist. Raises an error if the given + table version locator does not exist. + """ + raise NotImplementedError("get_staged_stream not implemented") + + def get_stream( namespace: str, table_name: str, @@ -429,10 +443,15 @@ def commit_partition( **kwargs, ) -> Partition: """ - Commits the given partition to its associated table version stream, - replacing any previous partition (i.e., "partition being replaced") registered for the same stream and + Commits the staged partition to its associated table version stream, + replacing any previous partition registered for the same stream and partition values. - If the previous_partition is passed as an argument, the specified previous_partition will be the partition being replaced, otherwise it will be retrieved. + + If previous partition is given then it will be replaced with its deltas + prepended to the new partition being committed. Otherwise the latest + committed partition with the same keys and partition scheme ID will be + retrieved. + Returns the registered partition. If the partition's previous delta stream position is specified, then the commit will be rejected if it does not match the actual previous stream position of diff --git a/deltacat/storage/main/impl.py b/deltacat/storage/main/impl.py index 1ce21498..59b90708 100644 --- a/deltacat/storage/main/impl.py +++ b/deltacat/storage/main/impl.py @@ -311,6 +311,36 @@ def list_table_versions( ) +def list_streams( + namespace: str, + table_name: str, + table_version: str, + *args, + **kwargs, +) -> ListResult[TableVersion]: + """ + Lists a page of table versions for the given table. + Raises an error if the table does not exist. + """ + tv = get_table_version( + namespace, + table_name, + table_version, + *args, + **kwargs, + ) + if not tv: + raise ValueError( + f"Table Version `{namespace}.{table_name}.{table_version}` not found." + ) + return _list( + tv, + TransactionOperationType.READ_CHILDREN, + *args, + **kwargs, + ) + + def list_partitions( namespace: str, table_name: str, @@ -754,18 +784,13 @@ def create_table_version( table_name=table_name, **kwargs, ) - if not prev_table( - *args, - namespace=namespace, - table_name=table_name, - **kwargs, - ): + if not prev_table: # no parent table exists, so we'll create it in this transaction txn_type = TransactionType.APPEND table_txn_op_type = TransactionOperationType.CREATE prev_table = None new_table = Table.of( - locator=TableLocator.of(namespace=namespace, table_name=table_name), + locator=TableLocator.at(namespace=namespace, table_name=table_name), ) table_version = table_version or "1" else: @@ -1039,19 +1064,19 @@ def stage_stream( namespace=namespace, table_name=table_name, ) - table_version_meta = get_table_version( - *args, - namespace=namespace, - table_name=table_name, - table_version=table_version, - **kwargs, - ) + table_version_meta = get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) locator = StreamLocator.at( namespace=namespace, table_name=table_name, table_version=table_version, stream_id=str(uuid.uuid4()), - stream_format=stream_format, + stream_format=stream_format or StreamFormat.DELTACAT, ) stream = Stream.of( locator=locator, @@ -1097,17 +1122,48 @@ def commit_stream( **kwargs, ) -> Stream: """ - Registers a delta stream with a target table version, replacing any + Registers a staged delta stream with a target table version, replacing any previous stream registered for the same table version. Returns the committed stream. """ - stream: Stream = Metafile.update_for(stream) - if not stream.locator.stream_id: - stream.locator.stream_id = str(uuid.uuid4()) - if not stream.stream_format: - stream.stream_format = StreamFormat.DELTACAT + if not stream.stream_id: + raise ValueError("Stream ID to commit must be set to a staged stream ID.") + if not stream.table_version_locator: + raise ValueError( + "Stream to commit must have its table version locator " + "set to the parent of its staged stream ID." + ) + prev_staged_stream = get_staged_stream( + *args, + table_version_locator=stream.table_version_locator, + stream_id=stream.stream_id, + **kwargs, + ) + if not prev_staged_stream: + raise ValueError( + f"Stream at table version {stream.table_version_locator} with ID " + f"{stream.stream_id} not found." + ) + if prev_staged_stream.state != CommitState.STAGED: + raise ValueError( + f"Expected to find a `{CommitState.STAGED}` stream at table version " + f"{stream.table_version_locator} with ID {stream.stream_id}," + f"but found a `{prev_staged_stream.state}` partition." + ) + if not prev_staged_stream: + raise ValueError( + f"Stream at table_version {stream.table_version_locator} with ID " + f"{stream.stream_id} not found." + ) + if prev_staged_stream.state != CommitState.STAGED: + raise ValueError( + f"Expected to find a `{CommitState.STAGED}` stream at table version " + f"{stream.table_version_locator} with ID {stream.stream_id}," + f"but found a `{prev_staged_stream.state}` stream." + ) + stream: Stream = Metafile.update_for(prev_staged_stream) stream.state = CommitState.COMMITTED - prev_stream = get_stream( + prev_committed_stream = get_stream( *args, namespace=stream.namespace, table_name=stream.table_name, @@ -1115,30 +1171,40 @@ def commit_stream( stream_format=stream.stream_format, **kwargs, ) - if prev_stream: - if prev_stream.stream_id != stream.previous_stream_id: + # the first transaction operation updates the staged stream commit state + txn_type = TransactionType.ALTER + txn_ops = [ + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=stream, + src_metafile=prev_staged_stream, + ) + ] + if prev_committed_stream: + if prev_committed_stream.stream_id != stream.previous_stream_id: raise ValueError( f"Previous stream ID mismatch Expected " f"{stream.previous_stream_id} but found " - f"{prev_stream.stream_id}." + f"{prev_committed_stream.stream_id}." ) - if prev_stream.stream_id == stream.stream_id: + if prev_committed_stream.stream_id == stream.stream_id: raise ValueError( - f"Stream to commit has the same ID as existing stream: {prev_stream}." + f"Stream to commit has the same ID as existing stream: {prev_committed_stream}." ) + # there's a previously committed stream, so update the transaction + # type to overwrite the previously committed stream, and add another + # transaction operation to replace it with the staged stream txn_type = TransactionType.OVERWRITE - else: - txn_type = TransactionType.APPEND - - transaction = Transaction.of( - txn_type=txn_type, - txn_operations=[ + txn_ops.append( TransactionOperation.of( operation_type=TransactionOperationType.UPDATE, dest_metafile=stream, - src_metafile=prev_stream, + src_metafile=prev_committed_stream, ) - ], + ) + transaction = Transaction.of( + txn_type=txn_type, + txn_operations=txn_ops, ) catalog = _get_catalog(**kwargs) transaction.commit( @@ -1187,7 +1253,7 @@ def delete_stream( txn_operations=[ TransactionOperation.of( operation_type=TransactionOperationType.DELETE, - src_metafile=stream_to_delete, + dest_metafile=stream_to_delete, ) ], ) @@ -1198,6 +1264,29 @@ def delete_stream( ) +def get_staged_stream( + table_version_locator: TableVersionLocator, + stream_id: str, + *args, + **kwargs, +) -> Optional[Partition]: + """ + Gets the staged stream for the given table version locator and stream ID. + Returns None if the stream does not exist. Raises an error if the given + table version locator does not exist. + """ + locator = StreamLocator.of( + table_version_locator=table_version_locator, + stream_id=stream_id, + stream_format=None, + ) + return _latest( + *args, + metafile=Stream.of(locator=locator, partition_scheme=None), + **kwargs, + ) + + def get_stream( namespace: str, table_name: str, @@ -1229,7 +1318,11 @@ def get_stream( ) return _latest( *args, - metafile=Stream.of(locator=locator, partition_scheme=None), + metafile=Stream.of( + locator=locator, + partition_scheme=None, + state=CommitState.COMMITTED, + ), **kwargs, ) @@ -1333,7 +1426,7 @@ def commit_partition( **kwargs, ) -> Partition: """ - Commits the given partition to its associated table version stream, + Commits the staged partition to its associated table version stream, replacing any previous partition registered for the same stream and partition values. @@ -1354,42 +1447,75 @@ def commit_partition( f"delta prepending from previous partition {previous_partition} " f"is not yet implemented" ) - partition: Partition = Metafile.update_for(partition) - if not partition.locator.partition_id: - partition.locator.partition_id = str(uuid.uuid4()) + if not partition.partition_id: + raise ValueError("Partition ID to commit must be set to a staged partition ID.") + if not partition.stream_locator: + raise ValueError( + "Partition to commit must have its stream locator " + "set to the parent of its staged partition ID." + ) + prev_staged_partition = get_staged_partition( + *args, + stream_locator=partition.stream_locator, + partition_id=partition.partition_id, + **kwargs, + ) + if not prev_staged_partition: + raise ValueError( + f"Partition at stream {partition.stream_locator} with ID " + f"{partition.partition_id} not found." + ) + if prev_staged_partition.state != CommitState.STAGED: + raise ValueError( + f"Expected to find a `{CommitState.STAGED}` partition at stream " + f"{partition.stream_locator} with ID {partition.partition_id}," + f"but found a `{prev_staged_partition.state}` partition." + ) + partition: Partition = Metafile.update_for(prev_staged_partition) partition.state = CommitState.COMMITTED - prev_partition = get_partition( + prev_committed_partition = get_partition( *args, stream_locator=partition.stream_locator, partition_value=partition.partition_values, partition_scheme_id=partition.partition_scheme_id, **kwargs, ) - if prev_partition: - if prev_partition.partition_id != partition.previous_partition_id: + # the first transaction operation updates the staged partition commit state + txn_type = TransactionType.ALTER + txn_ops = [ + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=partition, + src_metafile=prev_staged_partition, + ) + ] + if prev_committed_partition: + if prev_committed_partition.partition_id != partition.previous_partition_id: raise ValueError( f"Previous partition ID mismatch Expected " f"{partition.previous_partition_id} but found " - f"{prev_partition.partition_id}." + f"{prev_committed_partition.partition_id}." ) # TODO(pdames): Add previous partition stream position validation. - if prev_partition.partition_id == partition.partition_id: + if prev_committed_partition.partition_id == partition.partition_id: raise ValueError( - f"Partition to commit has the same ID as existing partition: {prev_partition}." + f"Partition to commit has the same ID as existing partition: " + f"{prev_committed_partition}." ) + # there's a previously committed partition, so update the transaction + # type to overwrite the previously committed partition, and add another + # transaction operation to replace it with the staged partition txn_type = TransactionType.OVERWRITE - else: - txn_type = TransactionType.APPEND - - transaction = Transaction.of( - txn_type=txn_type, - txn_operations=[ + txn_ops.append( TransactionOperation.of( operation_type=TransactionOperationType.UPDATE, dest_metafile=partition, - src_metafile=prev_partition, + src_metafile=prev_committed_partition, ) - ], + ) + transaction = Transaction.of( + txn_type=txn_type, + txn_operations=txn_ops, ) catalog = _get_catalog(**kwargs) transaction.commit( @@ -1507,6 +1633,7 @@ def get_partition( locator=locator, schema=None, content_types=None, + state=CommitState.COMMITTED, partition_scheme_id=partition_scheme_id, ), **kwargs, diff --git a/deltacat/storage/model/list_result.py b/deltacat/storage/model/list_result.py index 35f53dd7..b735feaa 100644 --- a/deltacat/storage/model/list_result.py +++ b/deltacat/storage/model/list_result.py @@ -21,6 +21,14 @@ def of( list_result["nextPageProvider"] = next_page_provider return list_result + @staticmethod + def empty() -> ListResult: + list_result = ListResult() + list_result["items"] = [] + list_result["paginationKey"] = None + list_result["nextPageProvider"] = None + return list_result + def read_page(self) -> Optional[List[T]]: return self.get("items") diff --git a/deltacat/storage/model/locator.py b/deltacat/storage/model/locator.py index 43a8fcc4..62aced4a 100644 --- a/deltacat/storage/model/locator.py +++ b/deltacat/storage/model/locator.py @@ -55,6 +55,12 @@ def join(self, separator: str = DEFAULT_NAME_SEPARATOR) -> str: """ return separator.join(self.parts()) + def exists(self) -> bool: + """ + Returns True if this locator name is defined, False otherwise. + """ + return self.immutable_id or all(self.parts()) + class Locator: """ diff --git a/deltacat/storage/model/metafile.py b/deltacat/storage/model/metafile.py index 7625ebc7..6d0f5b36 100644 --- a/deltacat/storage/model/metafile.py +++ b/deltacat/storage/model/metafile.py @@ -180,7 +180,7 @@ def new_revision( ignore_missing_revision=is_create_txn, ) # validate the transaction operation type - if mri.revision: + if mri.exists(): # update/delete fails if the last metafile was deleted if mri.txn_op_type == TransactionOperationType.DELETE: if current_txn_op_type != TransactionOperationType.CREATE: @@ -371,6 +371,9 @@ def path(self) -> Optional[str]: else None ) + def exists(self) -> bool: + return bool(self.revision) + class Metafile(dict): """ @@ -490,6 +493,9 @@ def read_txn( pagination_key=None, next_page_provider=None, ) + else: + # Could not find any revisions in list operations - return no results + return ListResult.empty() @classmethod def read( @@ -675,6 +681,7 @@ def children( parent_obj_path, self.id, ) + # List metafiles with respect to this metafile's URI as root return self._list_metafiles( success_txn_log_dir=success_txn_log_dir, metafile_root_dir_path=metafile_root_dir_path, @@ -742,15 +749,34 @@ def revisions( filesystem=filesystem, ) metafile_root = posixpath.join(*[catalog_root] + ancestor_ids) - # TODO(pdames): Refactor id lazy assignment into explicit getter/setter - immutable_id = self.get("id") or Metafile._locator_to_id( - locator=self.locator, - catalog_root=catalog_root, - metafile_root=metafile_root, - filesystem=filesystem, - txn_start_time=current_txn_start_time, - txn_id=current_txn_id, - ) + try: + locator = ( + self.locator + if self.locator.name.exists() + else self.locator_alias + if self.locator_alias and self.locator_alias.name.exists() + else None + ) + immutable_id = ( + # TODO(pdames): Refactor id lazy assignment into explicit getter/setter + self.get("id") + or Metafile._locator_to_id( + locator=locator, + catalog_root=catalog_root, + metafile_root=metafile_root, + filesystem=filesystem, + txn_start_time=current_txn_start_time, + txn_id=current_txn_id, + ) + if locator + else None + ) + except ValueError: + # the metafile has been deleted - return an empty list result + return ListResult.empty() + if not immutable_id: + # the metafile does not exist - return an empty list result + return ListResult.empty() revision_dir_path = posixpath.join( metafile_root, immutable_id, @@ -766,7 +792,7 @@ def revisions( ) items = [] for mri in revisions: - if mri.revision: + if mri.exists(): metafile = ( {} if not materialize_revisions @@ -841,6 +867,9 @@ def ancestor_ids( txn_start_time=current_txn_start_time, txn_id=current_txn_id, ) + if not ancestor_id: + err_msg = f"Ancestor does not exist: {parent_locator}." + raise ValueError(err_msg) metafile_root = posixpath.join( metafile_root, ancestor_id, @@ -886,9 +915,12 @@ def _locator_to_id( filesystem: pyarrow.fs.FileSystem, txn_start_time: Optional[int] = None, txn_id: Optional[str] = None, - ) -> str: + ) -> Optional[str]: """ - Resolves the metafile ID for the given locator. + Resolves the immutable metafile ID for the given locator. + + :return: Immutable ID read from mapping file. None if no mapping exists. + :raises: ValueError if the id is found but has been deleted """ metafile_id = locator.name.immutable_id if not metafile_id: @@ -906,7 +938,10 @@ def _locator_to_id( success_txn_log_dir=success_txn_log_dir, current_txn_start_time=txn_start_time, current_txn_id=txn_id, + ignore_missing_revision=True, ) + if not mri.exists(): + return None if mri.txn_op_type == TransactionOperationType.DELETE: err_msg = ( f"Locator {locator} to metafile ID resolution failed " @@ -929,6 +964,8 @@ def _write_locator_to_id_map_file( filesystem: pyarrow.fs.FileSystem, ) -> None: name_resolution_dir_path = locator.path(parent_obj_path) + # TODO(pdames): Don't write updated revisions with the same mapping as + # the latest revision. mri = MetafileRevisionInfo.new_revision( revision_dir_path=name_resolution_dir_path, current_txn_op_type=current_txn_op_type, @@ -991,6 +1028,7 @@ def _write_metafile_revisions( parent_obj_path = posixpath.join(*[catalog_root] + ancestor_path_elements) mutable_src_locator = None mutable_dest_locator = None + # metafiles without named immutable IDs have mutable name mappings if not self.named_immutable_id: mutable_src_locator = ( current_txn_op.src_metafile.locator @@ -998,6 +1036,7 @@ def _write_metafile_revisions( else None ) mutable_dest_locator = current_txn_op.dest_metafile.locator + # metafiles with named immutable IDs may have aliases elif self.locator_alias: mutable_src_locator = ( current_txn_op.src_metafile.locator_alias @@ -1010,6 +1049,7 @@ def _write_metafile_revisions( # from the locator back to its immutable metafile ID if ( current_txn_op.type == TransactionOperationType.UPDATE + and mutable_src_locator is not None and mutable_src_locator != mutable_dest_locator ): # this update includes a rename @@ -1072,16 +1112,23 @@ def _write_metafile_revisions( current_txn_id=current_txn_id, filesystem=filesystem, ) - # mark the dest metafile as created - self._write_metafile_revision( - success_txn_log_dir=success_txn_log_dir, - revision_dir_path=metafile_revision_dir_path, - current_txn_op=current_txn_op, - current_txn_op_type=TransactionOperationType.CREATE, - current_txn_start_time=current_txn_start_time, - current_txn_id=current_txn_id, - filesystem=filesystem, - ) + try: + # mark the dest metafile as created + self._write_metafile_revision( + success_txn_log_dir=success_txn_log_dir, + revision_dir_path=metafile_revision_dir_path, + current_txn_op=current_txn_op, + current_txn_op_type=TransactionOperationType.CREATE, + current_txn_start_time=current_txn_start_time, + current_txn_id=current_txn_id, + filesystem=filesystem, + ) + except ValueError as e: + # TODO(pdames): raise/catch a DuplicateMetafileCreate exception. + if "already exists" not in str(e): + raise e + # src metafile is being replaced by an existing dest metafile + else: self._write_metafile_revision( success_txn_log_dir=success_txn_log_dir, @@ -1123,7 +1170,7 @@ def _list_metafiles( current_txn_id=current_txn_id, ignore_missing_revision=True, ) - if mri.revision: + if mri.exists(): item = self.read( path=mri.path, filesystem=filesystem, @@ -1162,7 +1209,7 @@ def _ancestor_ids( current_txn_id=current_txn_id, ignore_missing_revision=True, ) - if mri.revision: + if mri.exists(): return mri.revision.ancestor_ids else: raise ValueError(f"Metafile {self.id} does not exist.") diff --git a/deltacat/storage/model/partition.py b/deltacat/storage/model/partition.py index edcd26ef..afdcecef 100644 --- a/deltacat/storage/model/partition.py +++ b/deltacat/storage/model/partition.py @@ -258,7 +258,9 @@ def from_serializable( filesystem: Optional[pyarrow.fs.FileSystem] = None, ) -> Partition: self["schema"] = ( - Schema.deserialize(pa.py_buffer(self["schema"])) if self["schema"] else None + Schema.deserialize(pa.py_buffer(self["schema"])) + if self.get("schema") + else None ) # restore the table locator from its mapped immutable metafile ID if self.table_locator and self.table_locator.table_name == self.id: diff --git a/deltacat/storage/model/table_version.py b/deltacat/storage/model/table_version.py index b3dae804..78bc0f29 100644 --- a/deltacat/storage/model/table_version.py +++ b/deltacat/storage/model/table_version.py @@ -273,16 +273,19 @@ def from_serializable( filesystem: Optional[pyarrow.fs.FileSystem] = None, ) -> TableVersion: self["schema"] = ( - Schema.deserialize(pa.py_buffer(self["schema"])) if self["schema"] else None + Schema.deserialize(pa.py_buffer(self["schema"])) + if self.get("schema") + else None ) self.schemas = ( [Schema.deserialize(pa.py_buffer(_)) for _ in self["schemas"]] - if self["schemas"] + if self.get("schemas") else None ) - # force list-to-tuple conversion of sort keys via property invocation - self.sort_scheme.keys - [sort_scheme.keys for sort_scheme in self.sort_schemes] + if self.sort_scheme: + # force list-to-tuple conversion of sort keys via property invocation + self.sort_scheme.keys + [sort_scheme.keys for sort_scheme in self.sort_schemes] # restore the table locator from its mapped immutable metafile ID if self.table_locator and self.table_locator.table_name == self.id: parent_rev_dir_path = Metafile._parent_metafile_rev_dir_path( diff --git a/deltacat/tests/storage/main/test_main_storage.py b/deltacat/tests/storage/main/test_main_storage.py index ee472185..3da39ca2 100644 --- a/deltacat/tests/storage/main/test_main_storage.py +++ b/deltacat/tests/storage/main/test_main_storage.py @@ -1,41 +1,316 @@ +import shutil +import tempfile + +import pytest + +from deltacat import Schema, Field from deltacat.storage import ( metastore, Namespace, NamespaceLocator, + Table, + TableVersion, ) +from deltacat.catalog.main.impl import PropertyCatalog +import pyarrow as pa -class TestMainStorage: - def test_list_namespaces(self, temp_catalog): - # given a namespace to create - namespace_name = "test_namespace" +@pytest.fixture +def schema(): + return Schema.of( + [ + Field.of( + field=pa.field("some_string", pa.string(), nullable=False), + field_id=1, + is_merge_key=True, + ), + Field.of( + field=pa.field("some_int32", pa.int32(), nullable=False), + field_id=2, + is_merge_key=True, + ), + Field.of( + field=pa.field("some_float64", pa.float64()), + field_id=3, + is_merge_key=False, + ), + ] + ) - # when the namespace is created - created_namespace = metastore.create_namespace( - namespace=namespace_name, - catalog=temp_catalog, + +class TestNamespace: + @classmethod + def setup_class(cls): + cls.tmpdir = tempfile.mkdtemp() + cls.catalog = PropertyCatalog(cls.tmpdir) + cls.namespace1 = metastore.create_namespace( + namespace="namespace1", + catalog=cls.catalog, + ) + cls.namespace2 = metastore.create_namespace( + namespace="namespace2", + catalog=cls.catalog, ) + @classmethod + def teardown_class(cls): + shutil.rmtree(cls.tmpdir) + + def test_list_namespaces(self): # expect the namespace returned to match the input namespace to create - namespace_locator = NamespaceLocator.of(namespace=namespace_name) + namespace_locator = NamespaceLocator.of(namespace="namespace1") expected_namespace = Namespace.of(locator=namespace_locator) - assert expected_namespace.equivalent_to(created_namespace) + assert expected_namespace.equivalent_to(self.namespace1) # expect the namespace to exist assert metastore.namespace_exists( - namespace=namespace_name, - catalog=temp_catalog, + namespace="namespace1", + catalog=self.catalog, ) # expect the namespace to also be returned when listing namespaces - list_result = metastore.list_namespaces(catalog=temp_catalog) - namespaces = list_result.all_items() - assert len(namespaces) == 1 - assert namespaces[0].equivalent_to(expected_namespace) + list_result = metastore.list_namespaces(catalog=self.catalog) + namespaces_by_name = {n.locator.namespace: n for n in list_result.all_items()} + assert len(namespaces_by_name.items()) == 2 + assert namespaces_by_name["namespace1"].equivalent_to(self.namespace1) + assert namespaces_by_name["namespace2"].equivalent_to(self.namespace2) + def test_get_namespace(self): # expect the namespace to also be returned when explicitly retrieved read_namespace = metastore.get_namespace( - namespace=namespace_name, - catalog=temp_catalog, + namespace="namespace1", + catalog=self.catalog, + ) + assert read_namespace and read_namespace.equivalent_to(self.namespace1) + + def test_namespace_exists_existing(self): + assert metastore.namespace_exists( + "namespace1", + catalog=self.catalog, + ) + + def test_namespace_exists_nonexisting(self): + assert not metastore.namespace_exists( + "foobar", + catalog=self.catalog, + ) + + +class TestTable: + @classmethod + def setup_class(cls): + cls.tmpdir = tempfile.mkdtemp() + cls.catalog = PropertyCatalog(cls.tmpdir) + # Create a namespace to hold our tables + cls.namespace_obj = metastore.create_namespace( + namespace="test_table_ns", + catalog=cls.catalog, + ) + # Create two tables + cls.table1 = metastore.create_table_version( + namespace="test_table_ns", + table_name="table1", + table_version="v1", + catalog=cls.catalog, + ) + cls.table2 = metastore.create_table_version( + namespace="test_table_ns", + table_name="table2", + table_version="v1", + catalog=cls.catalog, + ) + + @classmethod + def teardown_class(cls): + shutil.rmtree(cls.tmpdir) + + def test_list_tables(self): + # list the tables under our namespace + list_result = metastore.list_tables( + "test_table_ns", + catalog=self.catalog, + ) + all_tables = list_result.all_items() + + # we expect 2 distinct tables + table_names = {t.table_name for t in all_tables if isinstance(t, Table)} + assert "table1" in table_names + assert "table2" in table_names + + def test_get_table(self): + # test we can retrieve table1 by name + tbl = metastore.get_table( + "test_table_ns", + "table1", + catalog=self.catalog, + ) + assert tbl is not None + # In your architecture, you might check equivalence: + # e.g. tbl.equivalent_to(...) if you have that method: + assert tbl.table_name == "table1" + + def test_table_exists_existing(self): + # table1 should exist + assert metastore.table_exists( + "test_table_ns", + "table1", + catalog=self.catalog, + ) + + def test_table_exists_nonexisting(self): + assert not metastore.table_exists( + "test_table_ns", + "no_such_table", + catalog=self.catalog, + ) + + +class TestTableVersion: + @classmethod + def setup_class(cls): + cls.tmpdir = tempfile.mkdtemp() + cls.catalog = PropertyCatalog(cls.tmpdir) + # Create a namespace and single table + cls.namespace_obj = metastore.create_namespace( + namespace="test_tv_ns", + catalog=cls.catalog, + ) + # Create a "base" table to attach versions to + metastore.create_table_version( + namespace="test_tv_ns", + table_name="mytable", + table_version="v1", + catalog=cls.catalog, + ) + # Now create an additional version + cls.stream2 = metastore.create_table_version( + namespace="test_tv_ns", + table_name="mytable", + table_version="v2", + catalog=cls.catalog, + ) + + @classmethod + def teardown_class(cls): + shutil.rmtree(cls.tmpdir) + + def test_list_table_versions(self): + list_result = metastore.list_table_versions( + "test_tv_ns", + "mytable", + catalog=self.catalog, + ) + tvs = list_result.all_items() + # we expect v1 and v2 + version_ids = [] + for tv in tvs: + if isinstance(tv, TableVersion): + version_ids.append(tv.table_version) + assert set(version_ids) == {"v1", "v2"} + + def test_get_table_version(self): + tv = metastore.get_table_version( + "test_tv_ns", + "mytable", + "v1", + catalog=self.catalog, + ) + assert tv is not None + assert tv.table_version == "v1" + + def test_table_version_exists(self): + # v2 should exist + assert metastore.table_version_exists( + "test_tv_ns", + "mytable", + "v2", + catalog=self.catalog, + ) + + def test_table_version_exists_nonexisting(self): + # "v999" should not exist + assert not metastore.table_version_exists( + "test_tv_ns", + "mytable", + "v999", + catalog=self.catalog, + ) + + +class TestStream: + @classmethod + def setup_class(cls): + cls.tmpdir = tempfile.mkdtemp() + cls.catalog = PropertyCatalog(cls.tmpdir) + # Create a table version + metastore.create_namespace("test_stream_ns", catalog=cls.catalog) + metastore.create_table_version( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + catalog=cls.catalog, + ) + # Stage & commit a stream + cls.stream = metastore.stage_stream( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + catalog=cls.catalog, + ) + cls.committed_stream = metastore.commit_stream( + cls.stream, + catalog=cls.catalog, + ) + + @classmethod + def teardown_class(cls): + shutil.rmtree(cls.tmpdir) + + def test_list_streams(self): + list_result = metastore.list_streams( + "test_stream_ns", + "mystreamtable", + "v1", + catalog=self.catalog, + ) + streams = list_result.all_items() + # This will list the staged stream and the committed stream + assert len(streams) == 2 + # TODO - add more assertions + + def test_get_stream(self): + # The stream is created and committed in setup + stream = metastore.get_stream( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + catalog=self.catalog, + ) + # TODO this is broken, stream is table version + assert stream is not None + + def test_list_stream_partitions_empty(self): + # no partitions yet + list_result = metastore.list_stream_partitions( + self.committed_stream, + catalog=self.catalog, + ) + partitions = list_result.all_items() + assert len(partitions) == 0 + + def test_delete_stream(self): + # We can delete the stream + metastore.delete_stream( + "test_stream_ns", + "mystreamtable", + "v1", + catalog=self.catalog, + ) + # Now get_stream should return None + stream = metastore.get_stream( + "test_stream_ns", + "mystreamtable", + "v1", + catalog=self.catalog, ) - assert read_namespace and read_namespace.equivalent_to(expected_namespace) + assert stream is None From 1210ba39112a55ce3c6604b080c7340a7e097957 Mon Sep 17 00:00:00 2001 From: Patrick Ames Date: Mon, 10 Feb 2025 22:21:30 -0800 Subject: [PATCH 6/8] Remove completed TODO. --- deltacat/tests/storage/main/test_main_storage.py | 1 - 1 file changed, 1 deletion(-) diff --git a/deltacat/tests/storage/main/test_main_storage.py b/deltacat/tests/storage/main/test_main_storage.py index 3da39ca2..2ee65abe 100644 --- a/deltacat/tests/storage/main/test_main_storage.py +++ b/deltacat/tests/storage/main/test_main_storage.py @@ -286,7 +286,6 @@ def test_get_stream(self): table_version="v1", catalog=self.catalog, ) - # TODO this is broken, stream is table version assert stream is not None def test_list_stream_partitions_empty(self): From 651fdf74cd8f6bc8902ec0f2101153177aed61c6 Mon Sep 17 00:00:00 2001 From: Patrick Ames Date: Mon, 10 Feb 2025 22:40:15 -0800 Subject: [PATCH 7/8] Linter updates. --- deltacat/storage/main/impl.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/deltacat/storage/main/impl.py b/deltacat/storage/main/impl.py index 59b90708..605939d3 100644 --- a/deltacat/storage/main/impl.py +++ b/deltacat/storage/main/impl.py @@ -15,15 +15,14 @@ DeltaType, ) from deltacat.storage.model.types import ( + CommitState, DistributedDataset, LifecycleState, LocalDataset, LocalTable, - StreamFormat, TransactionType, TransactionOperationType, StreamFormat, - CommitState, ) from deltacat.storage.model.list_result import ListResult from deltacat.storage.model.namespace import ( From d56a8c2bad66486b5549da8384fed29c1283be62 Mon Sep 17 00:00:00 2001 From: Patrick Ames Date: Tue, 11 Feb 2025 22:56:17 -0800 Subject: [PATCH 8/8] Merge common changes from #484 --- deltacat/storage/main/impl.py | 7 +- deltacat/storage/model/metafile.py | 47 +++++++++- deltacat/storage/model/table_version.py | 19 ++++ deltacat/storage/model/transaction.py | 11 +++ .../tests/storage/main/test_main_storage.py | 91 ++++++++++++++----- .../tests/storage/model/test_metafile_io.py | 4 + .../tests/storage/model/test_table_version.py | 26 ++++++ 7 files changed, 175 insertions(+), 30 deletions(-) create mode 100644 deltacat/tests/storage/model/test_table_version.py diff --git a/deltacat/storage/main/impl.py b/deltacat/storage/main/impl.py index 605939d3..e6002315 100644 --- a/deltacat/storage/main/impl.py +++ b/deltacat/storage/main/impl.py @@ -799,12 +799,7 @@ def create_table_version( new_table: Table = Metafile.update_for(prev_table) prev_table_version = prev_table.latest_table_version if not table_version: - try: - # if the last table version was an int then increment it by 1 - table_version = str(int(prev_table_version) + 1) - except ValueError: - # if it isn't, set it to a UUID - table_version = str(uuid.uuid4()) + table_version = TableVersion.next_version(prev_table_version) new_table.description = table_description or table_version_description new_table.properties = table_properties new_table.latest_table_version = table_version diff --git a/deltacat/storage/model/metafile.py b/deltacat/storage/model/metafile.py index 6d0f5b36..f15a14a4 100644 --- a/deltacat/storage/model/metafile.py +++ b/deltacat/storage/model/metafile.py @@ -113,6 +113,15 @@ def latest_revision( current_txn_id: Optional[str] = None, ignore_missing_revision: bool = False, ) -> MetafileRevisionInfo: + """ + Fetch latest revision of a metafile, or return None if no + revisions exist. + :param revision_dir_path: root path of directory for metafile + :param ignore_missing_revision: if True, will return + MetafileRevisionInfo.undefined() on no revisions + :raises ValueError if no revisions are found AND + ignore_missing_revision=False + """ revisions = MetafileRevisionInfo.list_revisions( revision_dir_path=revision_dir_path, filesystem=filesystem, @@ -497,6 +506,35 @@ def read_txn( # Could not find any revisions in list operations - return no results return ListResult.empty() + @staticmethod + def get_class(serialized_dict: dict): + """ + Given a serialized dictionary of Metafile data, gets the metafile child + class type to instantiate. + """ + # TODO: more robust implementation. Right now this relies on the + # assumption that XLocator key will only be present in class X, and + # is brittle to renames. On the other hand, this implementation does + # not require any marker fields to be persisted, and a regression + # will be quickly detected by test_metafile.io or other unit tests + if serialized_dict.__contains__("tableLocator"): + return deltacat.storage.model.table.Table + elif serialized_dict.__contains__("namespaceLocator"): + return deltacat.storage.model.namespace.Namespace + elif serialized_dict.__contains__("tableVersionLocator"): + return deltacat.storage.model.table_version.TableVersion + elif serialized_dict.__contains__("partitionLocator"): + return deltacat.storage.model.partition.Partition + elif serialized_dict.__contains__("streamLocator"): + return deltacat.storage.model.stream.Stream + elif serialized_dict.__contains__("deltaLocator"): + return deltacat.storage.model.delta.Delta + else: + raise ValueError( + f"Could not find metafile class from serialized form: " + f"${serialized_dict}" + ) + @classmethod def read( cls, @@ -513,7 +551,10 @@ def read( path, filesystem = resolve_path_and_filesystem(path, filesystem) with filesystem.open_input_stream(path) as file: binary = file.readall() - obj = cls(**msgpack.loads(binary)).from_serializable(path, filesystem) + data = msgpack.loads(binary) + # Cast this Metafile into the appropriate child class type + clazz = Metafile.get_class(data) + obj = clazz(**data).from_serializable(path, filesystem) return obj def write_txn( @@ -772,10 +813,10 @@ def revisions( else None ) except ValueError: - # the metafile has been deleted - return an empty list result + # the metafile has been deleted return ListResult.empty() if not immutable_id: - # the metafile does not exist - return an empty list result + # the metafile does not exist return ListResult.empty() revision_dir_path = posixpath.join( metafile_root, diff --git a/deltacat/storage/model/table_version.py b/deltacat/storage/model/table_version.py index 78bc0f29..5b1d02af 100644 --- a/deltacat/storage/model/table_version.py +++ b/deltacat/storage/model/table_version.py @@ -1,7 +1,9 @@ # Allow classes to use self-referencing Type hints in Python 3.7. from __future__ import annotations +import re import posixpath +import uuid from typing import Any, Dict, List, Optional import pyarrow @@ -311,6 +313,23 @@ def from_serializable( self.locator.table_locator = table.locator return self + @staticmethod + def next_version(previous_version: Optional[str] = None) -> str: + """ + Assigns the next table version string given the previous table version. + Attempts to use the convention of 1-based incrementing integers of + the form "v1", "v2", etc. or of the form "1", "2", etc. + If the previous table version is not of this form, then assigns a UUID + to the next table version. + """ + if previous_version is not None: + version_match = re.match(r"^(v?)(\d+)$", previous_version) + if version_match: + prefix, version_number = version_match.groups() + new_version_number = int(version_number) + 1 + return f"{prefix}{new_version_number}" + return str(uuid.uuid4()) + class TableVersionLocatorName(LocatorName): def __init__(self, locator: TableVersionLocator): diff --git a/deltacat/storage/model/transaction.py b/deltacat/storage/model/transaction.py index f178b6eb..4a03b0b9 100644 --- a/deltacat/storage/model/transaction.py +++ b/deltacat/storage/model/transaction.py @@ -448,6 +448,17 @@ def to_serializable(self) -> Transaction: # reduce file size (they can be reconstructed from their corresponding # files as required). for operation in serializable.operations: + # Sanity check that IDs exist on source and dest metafiles + if operation.dest_metafile and operation.dest_metafile.id is None: + raise ValueError( + f"Transaction operation ${operation} dest metafile does " + f"not have ID: ${operation.dest_metafile}" + ) + if operation.src_metafile and operation.src_metafile.id is None: + raise ValueError( + f"Transaction operation ${operation} src metafile does " + f"not have ID: ${operation.src_metafile}" + ) operation.dest_metafile = { "id": operation.dest_metafile.id, "locator": operation.dest_metafile.locator, diff --git a/deltacat/tests/storage/main/test_main_storage.py b/deltacat/tests/storage/main/test_main_storage.py index 2ee65abe..9d2ba04f 100644 --- a/deltacat/tests/storage/main/test_main_storage.py +++ b/deltacat/tests/storage/main/test_main_storage.py @@ -10,6 +10,8 @@ NamespaceLocator, Table, TableVersion, + TableVersionLocator, + StreamFormat, ) from deltacat.catalog.main.impl import PropertyCatalog import pyarrow as pa @@ -107,13 +109,13 @@ def setup_class(cls): catalog=cls.catalog, ) # Create two tables - cls.table1 = metastore.create_table_version( + cls.stream1 = metastore.create_table_version( namespace="test_table_ns", table_name="table1", table_version="v1", catalog=cls.catalog, ) - cls.table2 = metastore.create_table_version( + cls.stream2 = metastore.create_table_version( namespace="test_table_ns", table_name="table2", table_version="v1", @@ -145,8 +147,7 @@ def test_get_table(self): catalog=self.catalog, ) assert tbl is not None - # In your architecture, you might check equivalence: - # e.g. tbl.equivalent_to(...) if you have that method: + # TODO(pdames): replace with tbl.equivalent_to(expected) assert tbl.table_name == "table1" def test_table_exists_existing(self): @@ -236,31 +237,43 @@ def test_table_version_exists_nonexisting(self): catalog=self.catalog, ) + def test_creation_fails_if_already_exists(self): + # Assert that creating the same table version again raises a ValueError + with pytest.raises(ValueError): + metastore.create_table_version( + namespace="test_tv_ns", + table_name="mytable", + table_version="v1", + catalog=self.catalog, + ) + class TestStream: @classmethod def setup_class(cls): cls.tmpdir = tempfile.mkdtemp() cls.catalog = PropertyCatalog(cls.tmpdir) - # Create a table version - metastore.create_namespace("test_stream_ns", catalog=cls.catalog) + metastore.create_namespace( + "test_stream_ns", + catalog=cls.catalog, + ) + # Create a table version. + # This call should automatically create a default DeltaCAT stream. metastore.create_table_version( namespace="test_stream_ns", table_name="mystreamtable", table_version="v1", catalog=cls.catalog, ) - # Stage & commit a stream - cls.stream = metastore.stage_stream( + # Retrieve the auto-created default stream. + cls.default_stream = metastore.get_stream( namespace="test_stream_ns", table_name="mystreamtable", table_version="v1", catalog=cls.catalog, ) - cls.committed_stream = metastore.commit_stream( - cls.stream, - catalog=cls.catalog, - ) + # Ensure that the default stream was auto-created. + assert cls.default_stream is not None, "Default stream not found." @classmethod def teardown_class(cls): @@ -268,18 +281,13 @@ def teardown_class(cls): def test_list_streams(self): list_result = metastore.list_streams( - "test_stream_ns", - "mystreamtable", - "v1", - catalog=self.catalog, + "test_stream_ns", "mystreamtable", "v1", catalog=self.catalog ) streams = list_result.all_items() - # This will list the staged stream and the committed stream - assert len(streams) == 2 - # TODO - add more assertions + # We expect exactly one stream (the default "deltacat" stream). + assert len(streams) == 1 def test_get_stream(self): - # The stream is created and committed in setup stream = metastore.get_stream( namespace="test_stream_ns", table_name="mystreamtable", @@ -287,11 +295,13 @@ def test_get_stream(self): catalog=self.catalog, ) assert stream is not None + # The stream's format should be the default "deltacat" + assert stream.stream_format.lower() == StreamFormat.DELTACAT.value.lower() def test_list_stream_partitions_empty(self): # no partitions yet list_result = metastore.list_stream_partitions( - self.committed_stream, + self.default_stream, catalog=self.catalog, ) partitions = list_result.all_items() @@ -313,3 +323,42 @@ def test_delete_stream(self): catalog=self.catalog, ) assert stream is None + + def test_stage_and_commit_stream_replacement(self): + # Stage & commit a stream + stream = metastore.stage_stream( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + catalog=self.catalog, + ) + fetched_stream = metastore.get_staged_stream( + table_version_locator=TableVersionLocator.at( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + ), + stream_id=stream.stream_id, + catalog=self.catalog, + ) + assert fetched_stream.equivalent_to(stream) + committed_stream = metastore.commit_stream( + stream=stream, + catalog=self.catalog, + ) + fetched_stream = metastore.get_stream( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + catalog=self.catalog, + ) + assert fetched_stream.equivalent_to(committed_stream) + list_result = metastore.list_streams( + "test_stream_ns", + "mystreamtable", + "v1", + catalog=self.catalog, + ) + streams = list_result.all_items() + # This will list the staged stream and the committed stream + assert len(streams) == 2 diff --git a/deltacat/tests/storage/model/test_metafile_io.py b/deltacat/tests/storage/model/test_metafile_io.py index 751a9cdc..451ecb41 100644 --- a/deltacat/tests/storage/model/test_metafile_io.py +++ b/deltacat/tests/storage/model/test_metafile_io.py @@ -2723,3 +2723,7 @@ def test_python_type_serde(self, temp_dir): # expect the table created to otherwise match the table given table.properties = expected_properties assert table.equivalent_to(deserialized_table) + + def test_metafile_read_bad_path(self, temp_dir): + with pytest.raises(FileNotFoundError): + Delta.read("foobar") diff --git a/deltacat/tests/storage/model/test_table_version.py b/deltacat/tests/storage/model/test_table_version.py new file mode 100644 index 00000000..cdec505a --- /dev/null +++ b/deltacat/tests/storage/model/test_table_version.py @@ -0,0 +1,26 @@ +import pytest +import uuid +from deltacat.storage.model.table_version import TableVersion + + +@pytest.mark.parametrize( + "previous_version, expected_new_version", + [ + (None, None), + ("v1", "v2"), + ("1", "2"), + ( + "version1", + None, + ), + ("v999", "v1000"), + ], +) +def test_new_version(previous_version, expected_new_version): + new_version = TableVersion.next_version(previous_version) + assert isinstance(new_version, str) + if expected_new_version: + assert new_version == expected_new_version + else: + # ensure that the new version is a valid UUID + uuid.UUID(new_version)