diff --git a/Makefile b/Makefile index 67c33fff..a9cec4cb 100644 --- a/Makefile +++ b/Makefile @@ -64,3 +64,6 @@ benchmark-aws: install benchmark: install pytest -m benchmark deltacat/benchmarking + +publish: test test-integration rebuild + twine upload dist/* diff --git a/deltacat/__init__.py b/deltacat/__init__.py index 2a857116..2d0a687c 100644 --- a/deltacat/__init__.py +++ b/deltacat/__init__.py @@ -53,7 +53,7 @@ deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__)) -__version__ = "1.1.27" +__version__ = "2.0" __all__ = [ diff --git a/deltacat/storage/__init__.py b/deltacat/storage/__init__.py index 49d3d756..250232d5 100644 --- a/deltacat/storage/__init__.py +++ b/deltacat/storage/__init__.py @@ -29,6 +29,7 @@ from deltacat.storage.model.partition import ( Partition, PartitionLocator, + PartitionLocatorAlias, PartitionKey, PartitionScheme, PartitionSchemeList, @@ -43,7 +44,11 @@ Schema, SchemaList, ) -from deltacat.storage.model.stream import Stream, StreamLocator +from deltacat.storage.model.stream import ( + Stream, + StreamLocator, + StreamLocatorAlias, +) from deltacat.storage.model.table import ( Table, TableLocator, @@ -132,6 +137,7 @@ "Partition", "PartitionKey", "PartitionLocator", + "PartitionLocatorAlias", "PartitionScheme", "PartitionSchemeList", "PartitionValues", @@ -145,6 +151,7 @@ "Stream", "StreamFormat", "StreamLocator", + "StreamLocatorAlias", "Table", "TableLocator", "TableProperties", diff --git a/deltacat/storage/interface.py b/deltacat/storage/interface.py index f0674cfc..8add5afe 100644 --- a/deltacat/storage/interface.py +++ b/deltacat/storage/interface.py @@ -21,10 +21,12 @@ Schema, SortScheme, Stream, + StreamFormat, StreamLocator, Table, TableProperties, TableVersion, + TableVersionLocator, TableVersionProperties, ) from deltacat.storage.model.manifest import Manifest @@ -234,7 +236,7 @@ def get_delta_manifest( ) -> Manifest: """ Get the manifest associated with the given delta or delta locator. This - always retrieves the authoritative remote copy of the delta manifest, and + always retrieves the authoritative durable copy of the delta manifest, and never the local manifest defined for any input delta. """ raise NotImplementedError("get_delta_manifest not implemented") @@ -255,7 +257,7 @@ def create_namespace( def update_namespace( namespace: str, - properties: NamespaceProperties = None, + properties: Optional[NamespaceProperties] = None, new_namespace: Optional[str] = None, *args, **kwargs, @@ -320,6 +322,8 @@ def update_table_version( schema: Optional[Schema] = None, description: Optional[str] = None, properties: Optional[TableVersionProperties] = None, + partition_scheme: Optional[PartitionScheme] = None, + sort_keys: Optional[SortScheme] = None, *args, **kwargs, ) -> None: @@ -339,18 +343,27 @@ def stage_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> Stream: """ Stages a new delta stream for the given table version. Resolves to the - latest active table version if no table version is given. Returns the - staged stream. Raises an error if the table version does not exist. + latest active table version if no table version is given. Resolves to the + DeltaCAT stream format if no stream format is given. If this stream + will replace another stream with the same format and scheme, then it will + have its previous stream ID set to the ID of the stream being replaced. + Returns the staged stream. Raises an error if the table version does not + exist. """ raise NotImplementedError("stage_stream not implemented") -def commit_stream(stream: Stream, *args, **kwargs) -> Stream: +def commit_stream( + stream: Stream, + *args, + **kwargs, +) -> Stream: """ Registers a delta stream with a target table version, replacing any previous stream registered for the same table version. Returns the @@ -363,28 +376,46 @@ def delete_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> None: """ Deletes the delta stream currently registered with the given table version. Resolves to the latest active table version if no table version is given. + Resolves to the deltacat stream format if no stream format is given. Raises an error if the table version does not exist. """ raise NotImplementedError("delete_stream not implemented") +def get_staged_stream( + table_version_locator: TableVersionLocator, + stream_id: str, + *args, + **kwargs, +) -> Optional[Partition]: + """ + Gets the staged stream for the given table version locator and stream ID. + Returns None if the stream does not exist. Raises an error if the given + table version locator does not exist. + """ + raise NotImplementedError("get_staged_stream not implemented") + + def get_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> Optional[Stream]: """ - Gets the most recently committed stream for the given table version and - partition key values. Resolves to the latest active table version if no - table version is given. Returns None if the table version does not exist. + Gets the most recently committed stream for the given table version. + Resolves to the latest active table version if no table version is given. + Resolves to the deltacat stream format if no stream format is given. + Returns None if the table version or stream format does not exist. """ raise NotImplementedError("get_stream not implemented") @@ -412,10 +443,15 @@ def commit_partition( **kwargs, ) -> Partition: """ - Commits the given partition to its associated table version stream, - replacing any previous partition (i.e., "partition being replaced") registered for the same stream and + Commits the staged partition to its associated table version stream, + replacing any previous partition registered for the same stream and partition values. - If the previous_partition is passed as an argument, the specified previous_partition will be the partition being replaced, otherwise it will be retrieved. + + If previous partition is given then it will be replaced with its deltas + prepended to the new partition being committed. Otherwise the latest + committed partition with the same keys and partition scheme ID will be + retrieved. + Returns the registered partition. If the partition's previous delta stream position is specified, then the commit will be rejected if it does not match the actual previous stream position of @@ -427,22 +463,34 @@ def commit_partition( def delete_partition( - namespace: str, - table_name: str, - table_version: Optional[str] = None, + stream_locator: StreamLocator, partition_values: Optional[PartitionValues] = None, + partition_scheme_id: Optional[str] = None, *args, **kwargs, ) -> None: """ - Deletes the given partition from the specified table version. Resolves to - the latest active table version if no table version is given. Partition + Deletes the given partition from the specified stream. Partition values should not be specified for unpartitioned tables. Raises an error - if the table version or partition does not exist. + if the partition does not exist. """ raise NotImplementedError("delete_partition not implemented") +def get_staged_partition( + stream_locator: StreamLocator, + partition_id: str, + *args, + **kwargs, +) -> Optional[Partition]: + """ + Gets the staged partition for the given stream locator and partition ID. + Returns None if the partition does not exist. Raises an error if the + given stream locator does not exist. + """ + raise NotImplementedError("get_staged_partition not implemented") + + def get_partition( stream_locator: StreamLocator, partition_values: Optional[PartitionValues] = None, @@ -453,7 +501,9 @@ def get_partition( Gets the most recently committed partition for the given stream locator and partition key values. Returns None if no partition has been committed for the given table version and/or partition key values. Partition values - should not be specified for unpartitioned tables. + should not be specified for unpartitioned tables. Partition scheme ID + resolves to the table version's current partition scheme by default. + Raises an error if the given stream locator does not exist. """ raise NotImplementedError("get_partition not implemented") diff --git a/deltacat/storage/main/impl.py b/deltacat/storage/main/impl.py index f740a604..e6002315 100644 --- a/deltacat/storage/main/impl.py +++ b/deltacat/storage/main/impl.py @@ -1,3 +1,5 @@ +import uuid + from deltacat.catalog.main.impl import PropertyCatalog from typing import Any, Callable, Dict, List, Optional, Union @@ -13,13 +15,14 @@ DeltaType, ) from deltacat.storage.model.types import ( + CommitState, DistributedDataset, LifecycleState, LocalDataset, LocalTable, - StreamFormat, TransactionType, TransactionOperationType, + StreamFormat, ) from deltacat.storage.model.list_result import ListResult from deltacat.storage.model.namespace import ( @@ -32,6 +35,8 @@ PartitionLocator, PartitionScheme, PartitionValues, + UNPARTITIONED_SCHEME_ID, + PartitionLocatorAlias, ) from deltacat.storage.model.schema import ( Schema, @@ -46,10 +51,12 @@ from deltacat.storage.model.table import ( Table, TableProperties, + TableLocator, ) from deltacat.storage.model.table_version import ( TableVersion, TableVersionProperties, + TableVersionLocator, ) from deltacat.storage.model.metafile import ( Metafile, @@ -93,7 +100,7 @@ def _get_catalog(**kwargs) -> PropertyCatalog: return catalog -def _list_metafiles( +def _list( metafile: Metafile, txn_op_type: TransactionOperationType, *args, @@ -118,28 +125,142 @@ def _list_metafiles( return list_results_per_op[0] -def _read_latest_metafile( +def _latest( metafile: Metafile, - txn_op_type: TransactionOperationType, *args, **kwargs, ) -> Optional[Metafile]: - list_results = _list_metafiles( + list_results = _list( *args, metafile=metafile, - txn_op_type=txn_op_type, + txn_op_type=TransactionOperationType.READ_LATEST, **kwargs, ) results = list_results.all_items() return results[0] if results else None +def _exists( + metafile: Metafile, + *args, + **kwargs, +) -> Optional[Metafile]: + list_results = _list( + *args, + metafile=metafile, + txn_op_type=TransactionOperationType.READ_EXISTS, + **kwargs, + ) + results = list_results.all_items() + return True if results else False + + +def _resolve_partition_locator_alias( + namespace: str, + table_name: str, + table_version: Optional[str] = None, + partition_values: Optional[PartitionValues] = None, + partition_scheme_id: Optional[str] = None, + *args, + **kwargs, +) -> PartitionLocatorAlias: + # TODO(pdames): A read shouldn't initiate N transactions that + # read against different catalog snapshots. To resolve this, add + # new "start", "step", and "end" methods to Transaction that + # support starting a txn, defining and executing a txn op, retrieve + # its results, then define and execute the next txn op. When + # stepping through a transaction its txn heartbeat timeout should + # be set manually. + partition_locator = None + if not partition_values: + partition_scheme_id = UNPARTITIONED_SCHEME_ID + elif not partition_scheme_id: + # resolve latest partition scheme from the current + # revision of its `deltacat` stream + stream = get_stream( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + if not stream: + raise ValueError( + f"Failed to resolve latest partition scheme for " + f"`{namespace}.{table_name}` at table version " + f"`{table_version or 'latest'}` (no stream found)." + ) + partition_locator = PartitionLocator.of( + stream_locator=stream.locator, + partition_values=partition_values, + partition_id=None, + ) + partition_scheme_id = stream.partition_scheme.id + if not partition_locator: + partition_locator = PartitionLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_id=None, + stream_format=StreamFormat.DELTACAT, + partition_values=partition_values, + partition_id=None, + ) + partition = Partition.of( + locator=partition_locator, + schema=None, + content_types=None, + partition_scheme_id=partition_scheme_id, + ) + return partition.locator_alias + + +def _resolve_latest_active_table_version_id( + namespace: str, + table_name: str, + fail_if_no_active_table_version: True, + *args, + **kwargs, +) -> Optional[str]: + table = get_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + if not table: + raise ValueError(f"Table does not exist: {namespace}.{table_name}") + if fail_if_no_active_table_version and not table.latest_active_table_version: + raise ValueError(f"Table has no active table version: {namespace}.{table_name}") + return table.latest_active_table_version + + +def _resolve_latest_table_version_id( + namespace: str, + table_name: str, + fail_if_no_active_table_version: True, + *args, + **kwargs, +) -> Optional[str]: + table = get_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + if not table: + raise ValueError(f"Table does not exist: {namespace}.{table_name}") + if fail_if_no_active_table_version and not table.latest_table_version: + raise ValueError(f"Table has no table version: {namespace}.{table_name}") + return table.latest_table_version + + def list_namespaces(*args, **kwargs) -> ListResult[Namespace]: """ Lists a page of table namespaces. Namespaces are returned as list result items. """ - return _list_metafiles( + return _list( *args, metafile=Namespace.of(NamespaceLocator.of("placeholder")), txn_op_type=TransactionOperationType.READ_SIBLINGS, @@ -152,18 +273,71 @@ def list_tables(namespace: str, *args, **kwargs) -> ListResult[Table]: Lists a page of tables for the given table namespace. Tables are returned as list result items. Raises an error if the given namespace does not exist. """ - raise NotImplementedError("list_tables not implemented") + locator = TableLocator.at(namespace=namespace, table_name="placeholder") + return _list( + *args, + metafile=Table.of(locator=locator), + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) def list_table_versions( - namespace: str, table_name: str, *args, **kwargs + namespace: str, + table_name: str, + *args, + **kwargs, ) -> ListResult[TableVersion]: """ Lists a page of table versions for the given table. Table versions are returned as list result items. Raises an error if the given table does not exist. """ - raise NotImplementedError("list_table_versions not implemented") + locator = TableVersionLocator.at( + namespace=namespace, + table_name=table_name, + table_version="placeholder", + ) + table_version = TableVersion.of( + locator=locator, + schema=None, + ) + return _list( + *args, + metafile=table_version, + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) + + +def list_streams( + namespace: str, + table_name: str, + table_version: str, + *args, + **kwargs, +) -> ListResult[TableVersion]: + """ + Lists a page of table versions for the given table. + Raises an error if the table does not exist. + """ + tv = get_table_version( + namespace, + table_name, + table_version, + *args, + **kwargs, + ) + if not tv: + raise ValueError( + f"Table Version `{namespace}.{table_name}.{table_version}` not found." + ) + return _list( + tv, + TransactionOperationType.READ_CHILDREN, + *args, + **kwargs, + ) def list_partitions( @@ -179,14 +353,53 @@ def list_partitions( table version if not specified. Raises an error if the table version does not exist. """ - raise NotImplementedError("list_partitions not implemented") + locator = PartitionLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_id=None, + stream_format=StreamFormat.DELTACAT, + partition_values=["placeholder"], + partition_id="placeholder", + ) + partition = Partition.of( + locator=locator, + schema=None, + content_types=None, + ) + return _list( + *args, + metafile=partition, + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) def list_stream_partitions(stream: Stream, *args, **kwargs) -> ListResult[Partition]: """ Lists all partitions committed to the given stream. """ - raise NotImplementedError("list_stream_partitions not implemented") + if stream.stream_format != StreamFormat.DELTACAT: + raise ValueError( + f"Unsupported stream format: {stream.stream_format}" + f"Expected stream format: {StreamFormat.DELTACAT}" + ) + locator = PartitionLocator.of( + stream_locator=stream.locator, + partition_values=["placeholder"], + partition_id="placeholder", + ) + partition = Partition.of( + locator=locator, + schema=None, + content_types=None, + ) + return _list( + *args, + metafile=partition, + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) def list_deltas( @@ -216,7 +429,43 @@ def list_deltas( default. The manifests can either be optionally retrieved as part of this call or lazily loaded via subsequent calls to `get_delta_manifest`. """ - raise NotImplementedError("list_deltas not implemented") + # TODO(pdames): Delta listing should ideally either use an efficient + # range-limited dir listing of partition children between start and end + # positions, or should traverse using Partition.stream_position (to + # resolve last stream position) and Delta.previous_stream_position + # (down to first stream position). + partition_locator_alias = _resolve_partition_locator_alias( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + partition_values=partition_values, + partition_scheme_id=partition_scheme_id, + **kwargs, + ) + locator = DeltaLocator.of(locator=partition_locator_alias) + delta = Delta.of( + locator=locator, + delta_type=None, + meta=None, + properties=None, + manifest=None, + ) + all_deltas_list_result: ListResult[Delta] = _list( + *args, + metafile=delta, + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) + all_deltas = all_deltas_list_result.all_items() + filtered_deltas = [ + delta + for delta in all_deltas + if first_stream_position <= delta.stream_position <= last_stream_position + ] + if ascending_order: + filtered_deltas.reverse() + return filtered_deltas def list_partition_deltas( @@ -235,7 +484,39 @@ def list_partition_deltas( default. The manifests can either be optionally retrieved as part of this call or lazily loaded via subsequent calls to `get_delta_manifest`. """ - raise NotImplementedError("list_partition_deltas not implemented") + # TODO(pdames): Delta listing should ideally either use an efficient + # range-limited dir listing of partition children between start and end + # positions, or should traverse using Partition.stream_position (to + # resolve last stream position) and Delta.previous_stream_position + # (down to first stream position). + locator = DeltaLocator.of( + partition_locator=partition_like + if isinstance(partition_like, PartitionLocator) + else partition_like.locator, + stream_position=None, + ) + delta = Delta.of( + locator=locator, + delta_type=None, + meta=None, + properties=None, + manifest=None, + ) + all_deltas_list_result: ListResult[Delta] = _list( + *args, + metafile=delta, + txn_op_type=TransactionOperationType.READ_SIBLINGS, + **kwargs, + ) + all_deltas = all_deltas_list_result.all_items() + filtered_deltas = [ + delta + for delta in all_deltas + if first_stream_position <= delta.stream_position <= last_stream_position + ] + if ascending_order: + filtered_deltas.reverse() + return filtered_deltas def get_delta( @@ -261,7 +542,32 @@ def get_delta( default. The manifest can either be optionally retrieved as part of this call or lazily loaded via a subsequent call to `get_delta_manifest`. """ - raise NotImplementedError("get_delta not implemented") + # TODO(pdames): Honor `include_manifest` param. + partition_locator_alias = _resolve_partition_locator_alias( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + partition_values=partition_values, + partition_scheme_id=partition_scheme_id, + **kwargs, + ) + locator = DeltaLocator.of( + locator=partition_locator_alias, + stream_position=stream_position, + ) + delta = Delta.of( + locator=locator, + delta_type=None, + meta=None, + properties=None, + manifest=None, + ) + return _latest( + *args, + metafile=delta, + **kwargs, + ) def get_latest_delta( @@ -286,7 +592,33 @@ def get_latest_delta( default. The manifest can either be optionally retrieved as part of this call or lazily loaded via a subsequent call to `get_delta_manifest`. """ - raise NotImplementedError("get_latest_delta not implemented") + # TODO(pdames): Wrap this method in 1 single txn. + stream = get_stream( + namespace=namespace, + table_name=table_name, + table_version=table_version, + ) + partition = get_partition( + stream_locator=stream.locator, + partition_values=partition_values, + partition_scheme_id=partition_scheme_id, + ) + locator = DeltaLocator.of( + locator=partition.locator, + stream_position=partition.stream_position, + ) + delta = Delta.of( + locator=locator, + delta_type=None, + meta=None, + properties=None, + manifest=None, + ) + return _latest( + *args, + metafile=delta, + **kwargs, + ) def download_delta( @@ -332,11 +664,13 @@ def download_delta_manifest_entry( def get_delta_manifest( - delta_like: Union[Delta, DeltaLocator], *args, **kwargs + delta_like: Union[Delta, DeltaLocator], + *args, + **kwargs, ) -> Manifest: """ Get the manifest associated with the given delta or delta locator. This - always retrieves the authoritative remote copy of the delta manifest, and + always retrieves the authoritative durable copy of the delta manifest, and never the local manifest defined for any input delta. """ raise NotImplementedError("get_delta_manifest not implemented") @@ -352,12 +686,10 @@ def create_namespace( Creates a table namespace with the given name and properties. Returns the created namespace. """ - catalog = _get_catalog(**kwargs) namespace = Namespace.of( locator=NamespaceLocator.of(namespace=namespace), properties=properties, ) - # given a transaction that creates a single namespace transaction = Transaction.of( txn_type=TransactionType.APPEND, txn_operations=[ @@ -367,6 +699,7 @@ def create_namespace( ) ], ) + catalog = _get_catalog(**kwargs) transaction.commit( catalog_root_dir=catalog.root, filesystem=catalog.filesystem, @@ -376,7 +709,7 @@ def create_namespace( def update_namespace( namespace: str, - properties: NamespaceProperties = None, + properties: Optional[NamespaceProperties] = None, new_namespace: Optional[str] = None, *args, **kwargs, @@ -385,7 +718,31 @@ def update_namespace( Updates a table namespace's name and/or properties. Raises an error if the given namespace does not exist. """ - raise NotImplementedError("update_namespace not implemented") + # TODO(pdames): Wrap get & update within a single txn. + old_namespace = get_namespace( + *args, + namespace=namespace, + **kwargs, + ) + new_namespace: Namespace = Metafile.update_for(old_namespace) + new_namespace.namespace = namespace + new_namespace.properties = properties + transaction = Transaction.of( + txn_type=TransactionType.ALTER, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=new_namespace, + src_metafile=old_namespace, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return namespace def create_table_version( @@ -412,7 +769,97 @@ def create_table_version( Returns the stream for the created table version. Raises an error if the given namespace does not exist. """ - raise NotImplementedError("create_table_version not implemented") + if not namespace_exists( + *args, + namespace=namespace, + **kwargs, + ): + raise ValueError(f"Namespace {namespace} does not exist") + # check if a parent table and/or previous table version already exist + prev_table_version = None + prev_table = get_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + if not prev_table: + # no parent table exists, so we'll create it in this transaction + txn_type = TransactionType.APPEND + table_txn_op_type = TransactionOperationType.CREATE + prev_table = None + new_table = Table.of( + locator=TableLocator.at(namespace=namespace, table_name=table_name), + ) + table_version = table_version or "1" + else: + # the parent table exists, so we'll update it in this transaction + txn_type = TransactionType.ALTER + table_txn_op_type = TransactionOperationType.UPDATE + new_table: Table = Metafile.update_for(prev_table) + prev_table_version = prev_table.latest_table_version + if not table_version: + table_version = TableVersion.next_version(prev_table_version) + new_table.description = table_description or table_version_description + new_table.properties = table_properties + new_table.latest_table_version = table_version + catalog = _get_catalog(**kwargs) + locator = TableVersionLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + ) + table_version = TableVersion.of( + locator=locator, + schema=schema, + partition_scheme=partition_scheme, + description=table_version_description, + properties=table_version_properties, + content_types=supported_content_types, + sort_scheme=sort_keys, + watermark=None, + lifecycle_state=LifecycleState.UNRELEASED, + schemas=[schema] if schema else None, + partition_schemes=[partition_scheme] if partition_scheme else None, + sort_schemes=[sort_keys] if sort_keys else None, + previous_table_version=prev_table_version, + ) + # create the table version's default deltacat stream in this transaction + stream_locator = StreamLocator.of( + table_version_locator=locator, + stream_id=str(uuid.uuid4()), + stream_format=StreamFormat.DELTACAT, + ) + stream = Stream.of( + locator=stream_locator, + partition_scheme=partition_scheme, + state=CommitState.COMMITTED, + previous_stream_id=None, + watermark=None, + ) + transaction = Transaction.of( + txn_type=txn_type, + txn_operations=[ + TransactionOperation.of( + operation_type=table_txn_op_type, + dest_metafile=new_table, + src_metafile=prev_table, + ), + TransactionOperation.of( + operation_type=TransactionOperationType.CREATE, + dest_metafile=table_version, + ), + TransactionOperation.of( + operation_type=TransactionOperationType.CREATE, + dest_metafile=stream, + ), + ], + ) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return stream def update_table( @@ -430,7 +877,33 @@ def update_table( when its first table version was created. Raises an error if the given table does not exist. """ - raise NotImplementedError("update_table not implemented") + old_table = get_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + if not old_table: + raise ValueError(f"Table `{namespace}.{table_name}` does not exist.") + new_table: Table = Metafile.update_for(old_table) + new_table.description = description or old_table.description + new_table.properties = properties or old_table.properties + new_table.table_name = new_table_name or old_table.table_name + transaction = Transaction.of( + txn_type=TransactionType.ALTER, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=new_table, + src_metafile=old_table, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) def update_table_version( @@ -441,6 +914,8 @@ def update_table_version( schema: Optional[Schema] = None, description: Optional[str] = None, properties: Optional[TableVersionProperties] = None, + partition_scheme: Optional[PartitionScheme] = None, + sort_keys: Optional[SortScheme] = None, *args, **kwargs, ) -> None: @@ -453,65 +928,405 @@ def update_table_version( specify a different table version). Raises an error if the given table version does not exist. """ - raise NotImplementedError("update_table_version not implemented") + # TODO(pdames): Wrap get & update within a single txn. + old_table_version = get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + if not old_table_version: + raise ValueError( + f"Table version `{table_version}` does not exist for " + f"table `{namespace}.{table_name}`." + ) + new_table_version: TableVersion = Metafile.update_for(old_table_version) + new_table_version.state = lifecycle_state or old_table_version.state + # TODO(pdames): Use schema patch to check for backwards incompatible changes. + new_table_version.schema = schema or old_table_version.schema + new_table_version.schemas = ( + old_table_version.schemas + [schema] if schema else old_table_version.schemas + ) + new_table_version.description = description or old_table_version.description + new_table_version.properties = properties or old_table_version.properties + new_table_version.partition_scheme = ( + partition_scheme or old_table_version.partition_scheme + ) + if partition_scheme and partition_scheme.id in [ + ps.id for ps in old_table_version.partition_schemes + ]: + raise ValueError( + f"Partition scheme ID `{partition_scheme.id}` already exists in " + f"table version `{table_version}`." + ) + new_table_version.partition_schemes = ( + old_table_version.partition_schemes + [partition_scheme] + if partition_scheme + else old_table_version.partition_schemes + ) + if sort_keys and sort_keys.id in [sk.id for sk in old_table_version.sort_schemes]: + raise ValueError( + f"Sort scheme ID `{sort_keys.id}` already exists in " + f"table version `{table_version}`." + ) + new_table_version.sort_scheme = sort_keys or old_table_version.sort_scheme + new_table_version.sort_schemes = ( + old_table_version.sort_schemes + [sort_keys] + if sort_keys + else old_table_version.sort_schemes + ) + old_table = get_table( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + txn_operations = [] + if old_table.latest_table_version == new_table_version.table_version: + if ( + old_table_version.state != LifecycleState.ACTIVE + and new_table_version.state == LifecycleState.ACTIVE + ): + # update the table's latest table version + new_table: Table = Metafile.update_for(old_table) + new_table.latest_active_table_version = table_version + txn_operations.append( + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=new_table, + src_metafile=old_table, + ) + ) + txn_operations.append( + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=new_table_version, + src_metafile=old_table_version, + ), + ) + # TODO(pdames): Push changes down to non-deltacat streams via sync module. + # Also copy sort scheme changes down to deltacat child stream? + if partition_scheme: + old_stream = get_stream( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + new_stream: Stream = Metafile.update_for(old_stream) + new_stream.partition_scheme = partition_scheme + txn_operations.append( + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=new_stream, + src_metafile=old_stream, + ) + ) + transaction = Transaction.of( + txn_type=TransactionType.ALTER, + txn_operations=txn_operations, + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) def stage_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> Stream: """ Stages a new delta stream for the given table version. Resolves to the - latest active table version if no table version is given. Returns the - staged stream. Raises an error if the table version does not exist. + latest active table version if no table version is given. Resolves to the + DeltaCAT stream format if no stream format is given. If this stream + will replace another stream with the same format and scheme, then it will + have its previous stream ID set to the ID of the stream being replaced. + Returns the staged stream. Raises an error if the table version does not + exist. """ - raise NotImplementedError("stage_stream not implemented") + # TODO(pdames): Support retrieving previously staged streams by ID. + if not table_version: + table_version = _resolve_latest_active_table_version_id( + namespace=namespace, + table_name=table_name, + ) + table_version_meta = get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + locator = StreamLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_id=str(uuid.uuid4()), + stream_format=stream_format or StreamFormat.DELTACAT, + ) + stream = Stream.of( + locator=locator, + partition_scheme=table_version_meta.partition_scheme, + state=CommitState.STAGED, + previous_stream_id=None, + watermark=None, + ) + prev_stream = get_stream( + *args, + namespace=stream.namespace, + table_name=stream.table_name, + table_version=stream.table_version, + stream_format=stream.stream_format, + **kwargs, + ) + if prev_stream: + if prev_stream.stream_id == stream.stream_id: + raise ValueError( + f"Stream to stage has the same ID as existing stream: {prev_stream}." + ) + stream.previous_stream_id = prev_stream.stream_id + transaction = Transaction.of( + txn_type=TransactionType.APPEND, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.CREATE, + dest_metafile=stream, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return stream -def commit_stream(stream: Stream, *args, **kwargs) -> Stream: +def commit_stream( + stream: Stream, + *args, + **kwargs, +) -> Stream: """ - Registers a delta stream with a target table version, replacing any + Registers a staged delta stream with a target table version, replacing any previous stream registered for the same table version. Returns the committed stream. """ - raise NotImplementedError("commit_stream not implemented") + if not stream.stream_id: + raise ValueError("Stream ID to commit must be set to a staged stream ID.") + if not stream.table_version_locator: + raise ValueError( + "Stream to commit must have its table version locator " + "set to the parent of its staged stream ID." + ) + prev_staged_stream = get_staged_stream( + *args, + table_version_locator=stream.table_version_locator, + stream_id=stream.stream_id, + **kwargs, + ) + if not prev_staged_stream: + raise ValueError( + f"Stream at table version {stream.table_version_locator} with ID " + f"{stream.stream_id} not found." + ) + if prev_staged_stream.state != CommitState.STAGED: + raise ValueError( + f"Expected to find a `{CommitState.STAGED}` stream at table version " + f"{stream.table_version_locator} with ID {stream.stream_id}," + f"but found a `{prev_staged_stream.state}` partition." + ) + if not prev_staged_stream: + raise ValueError( + f"Stream at table_version {stream.table_version_locator} with ID " + f"{stream.stream_id} not found." + ) + if prev_staged_stream.state != CommitState.STAGED: + raise ValueError( + f"Expected to find a `{CommitState.STAGED}` stream at table version " + f"{stream.table_version_locator} with ID {stream.stream_id}," + f"but found a `{prev_staged_stream.state}` stream." + ) + stream: Stream = Metafile.update_for(prev_staged_stream) + stream.state = CommitState.COMMITTED + prev_committed_stream = get_stream( + *args, + namespace=stream.namespace, + table_name=stream.table_name, + table_version=stream.table_version, + stream_format=stream.stream_format, + **kwargs, + ) + # the first transaction operation updates the staged stream commit state + txn_type = TransactionType.ALTER + txn_ops = [ + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=stream, + src_metafile=prev_staged_stream, + ) + ] + if prev_committed_stream: + if prev_committed_stream.stream_id != stream.previous_stream_id: + raise ValueError( + f"Previous stream ID mismatch Expected " + f"{stream.previous_stream_id} but found " + f"{prev_committed_stream.stream_id}." + ) + if prev_committed_stream.stream_id == stream.stream_id: + raise ValueError( + f"Stream to commit has the same ID as existing stream: {prev_committed_stream}." + ) + # there's a previously committed stream, so update the transaction + # type to overwrite the previously committed stream, and add another + # transaction operation to replace it with the staged stream + txn_type = TransactionType.OVERWRITE + txn_ops.append( + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=stream, + src_metafile=prev_committed_stream, + ) + ) + transaction = Transaction.of( + txn_type=txn_type, + txn_operations=txn_ops, + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return stream def delete_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> None: """ Deletes the delta stream currently registered with the given table version. Resolves to the latest active table version if no table version is given. + Resolves to the deltacat stream format if no stream format is given. Raises an error if the table version does not exist. """ - raise NotImplementedError("delete_stream not implemented") + if not table_version: + table_version = _resolve_latest_active_table_version_id( + namespace=namespace, + table_name=table_name, + ) + stream_to_delete = get_stream( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_format=stream_format, + **kwargs, + ) + if not stream_to_delete: + raise ValueError( + f"Stream to delete not found: {namespace}.{table_name}" + f".{table_version}.{stream_format}." + ) + else: + stream_to_delete.state = CommitState.DEPRECATED + transaction = Transaction.of( + txn_type=TransactionType.DELETE, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.DELETE, + dest_metafile=stream_to_delete, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + + +def get_staged_stream( + table_version_locator: TableVersionLocator, + stream_id: str, + *args, + **kwargs, +) -> Optional[Partition]: + """ + Gets the staged stream for the given table version locator and stream ID. + Returns None if the stream does not exist. Raises an error if the given + table version locator does not exist. + """ + locator = StreamLocator.of( + table_version_locator=table_version_locator, + stream_id=stream_id, + stream_format=None, + ) + return _latest( + *args, + metafile=Stream.of(locator=locator, partition_scheme=None), + **kwargs, + ) def get_stream( namespace: str, table_name: str, table_version: Optional[str] = None, + stream_format: StreamFormat = StreamFormat.DELTACAT, *args, **kwargs, ) -> Optional[Stream]: """ - Gets the most recently committed stream for the given table version and - partition key values. Resolves to the latest active table version if no - table version is given. Returns None if the table version does not exist. + Gets the most recently committed stream for the given table version. + Resolves to the latest active table version if no table version is given. + Resolves to the DeltaCAT stream format if no stream format is given. + Returns None if the table version or stream format does not exist. """ - raise NotImplementedError("get_stream not implemented") + if not table_version: + table_version = _resolve_latest_active_table_version_id( + *args, + namespace=namespace, + table_name=table_name, + fail_if_no_active_table_version=False, + **kwargs, + ) + locator = StreamLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + stream_id=None, + stream_format=stream_format, + ) + return _latest( + *args, + metafile=Stream.of( + locator=locator, + partition_scheme=None, + state=CommitState.COMMITTED, + ), + **kwargs, + ) def stage_partition( - stream: Stream, partition_values: Optional[PartitionValues] = None, *args, **kwargs + stream: Stream, + partition_values: Optional[PartitionValues] = None, + partition_scheme_id: Optional[str] = None, + *args, + **kwargs, ) -> Partition: """ Stages a new partition for the given stream and partition values. Returns @@ -523,7 +1338,79 @@ def stage_partition( The partition_values must represent the results of transforms in a partition spec specified in the stream. """ - raise NotImplementedError("stage_partition not implemented") + # TODO(pdames): Cache last retrieved metafile revisions in memory to resolve + # potentially high cost of staging many partitions. + table_version = get_table_version( + *args, + namespace=stream.namespace, + table_name=stream.table_name, + table_version=stream.table_version, + **kwargs, + ) + if not table_version: + raise ValueError( + f"Table version not found: {stream.namespace}.{stream.table_name}." + f"{stream.table_version}." + ) + if not table_version.partition_schemes or partition_scheme_id not in [ + ps.id for ps in table_version.partition_schemes + ]: + raise ValueError( + f"Invalid partition scheme ID `{partition_scheme_id}` (not found " + f"in parent table version `{stream.namespace}.{stream.table_name}" + f".{table_version.table_version}` partition scheme IDs)." + ) + if stream.partition_scheme.id not in table_version.partition_schemes: + # this should never happen, but just in case + raise ValueError( + f"Invalid stream partition scheme ID `{stream.partition_scheme.id}`" + f"in parent table version `{stream.namespace}.{stream.table_name}" + f".{table_version.table_version}` partition scheme IDs)." + ) + locator = PartitionLocator.of( + stream_locator=stream.locator, + partition_values=partition_values, + partition_id=str(uuid.uuid4()), + ) + partition = Partition.of( + locator=locator, + schema=table_version.schema, + content_types=table_version.content_types, + state=CommitState.STAGED, + previous_stream_position=None, + partition_values=partition_values, + previous_partition_id=None, + stream_position=None, + partition_scheme_id=partition_scheme_id, + ) + prev_partition = get_partition( + *args, + stream_locator=stream.locator, + partition_values=partition_values, + partition_scheme_id=partition_scheme_id, + **kwargs, + ) + if prev_partition: + if prev_partition.partition_id == partition.partition_id: + raise ValueError( + f"Partition to stage has the same ID as existing partition: {prev_partition}." + ) + partition.previous_partition_id = prev_partition.partition_id + transaction = Transaction.of( + txn_type=TransactionType.APPEND, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.CREATE, + dest_metafile=partition, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return partition def commit_partition( @@ -533,10 +1420,15 @@ def commit_partition( **kwargs, ) -> Partition: """ - Commits the given partition to its associated table version stream, - replacing any previous partition (i.e., "partition being replaced") registered for the same stream and + Commits the staged partition to its associated table version stream, + replacing any previous partition registered for the same stream and partition values. - If the previous_partition is passed as an argument, the specified previous_partition will be the partition being replaced, otherwise it will be retrieved. + + If previous partition is given then it will be replaced with its deltas + prepended to the new partition being committed. Otherwise the latest + committed partition with the same keys and partition scheme ID will be + retrieved. + Returns the registered partition. If the partition's previous delta stream position is specified, then the commit will be rejected if it does not match the actual previous stream position of @@ -544,29 +1436,162 @@ def commit_partition( specified, then the commit will be rejected if it does not match the actual ID of the partition being replaced. """ - raise NotImplementedError("commit_partition not implemented") + if previous_partition: + raise NotImplementedError( + f"delta prepending from previous partition {previous_partition} " + f"is not yet implemented" + ) + if not partition.partition_id: + raise ValueError("Partition ID to commit must be set to a staged partition ID.") + if not partition.stream_locator: + raise ValueError( + "Partition to commit must have its stream locator " + "set to the parent of its staged partition ID." + ) + prev_staged_partition = get_staged_partition( + *args, + stream_locator=partition.stream_locator, + partition_id=partition.partition_id, + **kwargs, + ) + if not prev_staged_partition: + raise ValueError( + f"Partition at stream {partition.stream_locator} with ID " + f"{partition.partition_id} not found." + ) + if prev_staged_partition.state != CommitState.STAGED: + raise ValueError( + f"Expected to find a `{CommitState.STAGED}` partition at stream " + f"{partition.stream_locator} with ID {partition.partition_id}," + f"but found a `{prev_staged_partition.state}` partition." + ) + partition: Partition = Metafile.update_for(prev_staged_partition) + partition.state = CommitState.COMMITTED + prev_committed_partition = get_partition( + *args, + stream_locator=partition.stream_locator, + partition_value=partition.partition_values, + partition_scheme_id=partition.partition_scheme_id, + **kwargs, + ) + # the first transaction operation updates the staged partition commit state + txn_type = TransactionType.ALTER + txn_ops = [ + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=partition, + src_metafile=prev_staged_partition, + ) + ] + if prev_committed_partition: + if prev_committed_partition.partition_id != partition.previous_partition_id: + raise ValueError( + f"Previous partition ID mismatch Expected " + f"{partition.previous_partition_id} but found " + f"{prev_committed_partition.partition_id}." + ) + # TODO(pdames): Add previous partition stream position validation. + if prev_committed_partition.partition_id == partition.partition_id: + raise ValueError( + f"Partition to commit has the same ID as existing partition: " + f"{prev_committed_partition}." + ) + # there's a previously committed partition, so update the transaction + # type to overwrite the previously committed partition, and add another + # transaction operation to replace it with the staged partition + txn_type = TransactionType.OVERWRITE + txn_ops.append( + TransactionOperation.of( + operation_type=TransactionOperationType.UPDATE, + dest_metafile=partition, + src_metafile=prev_committed_partition, + ) + ) + transaction = Transaction.of( + txn_type=txn_type, + txn_operations=txn_ops, + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + return partition def delete_partition( - namespace: str, - table_name: str, - table_version: Optional[str] = None, + stream_locator: StreamLocator, partition_values: Optional[PartitionValues] = None, + partition_scheme_id: Optional[str] = None, *args, **kwargs, ) -> None: """ - Deletes the given partition from the specified table version. Resolves to - the latest active table version if no table version is given. Partition + Deletes the given partition from the specified stream. Partition values should not be specified for unpartitioned tables. Raises an error - if the table version or partition does not exist. + if the partition does not exist. + """ + partition_to_delete = get_partition( + *args, + stream_locator=stream_locator, + partition_values=partition_values, + partition_scheme_id=partition_scheme_id, + **kwargs, + ) + if not partition_to_delete: + raise ValueError( + f"Partition with values {partition_values} and scheme " + f"{partition_scheme_id} not found in stream: {stream_locator}" + ) + else: + partition_to_delete.state = CommitState.DEPRECATED + transaction = Transaction.of( + txn_type=TransactionType.DELETE, + txn_operations=[ + TransactionOperation.of( + operation_type=TransactionOperationType.DELETE, + src_metafile=partition_to_delete, + ) + ], + ) + catalog = _get_catalog(**kwargs) + transaction.commit( + catalog_root_dir=catalog.root, + filesystem=catalog.filesystem, + ) + + +def get_staged_partition( + stream_locator: StreamLocator, + partition_id: str, + *args, + **kwargs, +) -> Optional[Partition]: """ - raise NotImplementedError("delete_partition not implemented") + Gets the staged partition for the given stream locator and partition ID. + Returns None if the partition does not exist. Raises an error if the + given stream locator does not exist. + """ + locator = PartitionLocator.of( + stream_locator=stream_locator, + partition_values=None, + partition_id=partition_id, + ) + return _latest( + *args, + metafile=Partition.of( + locator=locator, + schema=None, + content_types=None, + ), + **kwargs, + ) def get_partition( stream_locator: StreamLocator, partition_values: Optional[PartitionValues] = None, + partition_scheme_id: Optional[str] = None, *args, **kwargs, ) -> Optional[Partition]: @@ -574,9 +1599,39 @@ def get_partition( Gets the most recently committed partition for the given stream locator and partition key values. Returns None if no partition has been committed for the given table version and/or partition key values. Partition values - should not be specified for unpartitioned tables. - """ - raise NotImplementedError("get_partition not implemented") + should not be specified for unpartitioned tables. Partition scheme ID + resolves to the table version's current partition scheme by default. + Raises an error if the given stream locator does not exist. + """ + locator = PartitionLocator.of( + stream_locator=stream_locator, + partition_values=partition_values, + partition_id=None, + ) + if not partition_scheme_id: + # resolve latest partition scheme from the current + # revision of its `deltacat` stream + stream = get_stream( + *args, + namespace=stream_locator.namespace, + table_name=stream_locator.table_name, + table_version=stream_locator.table_version, + **kwargs, + ) + if not stream: + raise ValueError(f"Stream {stream_locator} not found.") + partition_scheme_id = stream.partition_scheme.id + return _latest( + *args, + metafile=Partition.of( + locator=locator, + schema=None, + content_types=None, + state=CommitState.COMMITTED, + partition_scheme_id=partition_scheme_id, + ), + **kwargs, + ) def stage_delta( @@ -624,10 +1679,9 @@ def get_namespace(namespace: str, *args, **kwargs) -> Optional[Namespace]: Gets table namespace metadata for the specified table namespace. Returns None if the given namespace does not exist. """ - return _read_latest_metafile( + return _latest( *args, metafile=Namespace.of(NamespaceLocator.of(namespace)), - txn_op_type=TransactionOperationType.READ_LATEST, **kwargs, ) @@ -636,14 +1690,10 @@ def namespace_exists(namespace: str, *args, **kwargs) -> bool: """ Returns True if the given table namespace exists, False if not. """ - return ( - _read_latest_metafile( - *args, - metafile=Namespace.of(NamespaceLocator.of(namespace)), - txn_op_type=TransactionOperationType.READ_EXISTS, - **kwargs, - ) - is not None + return _exists( + *args, + metafile=Namespace.of(NamespaceLocator.of(namespace)), + **kwargs, ) @@ -652,24 +1702,51 @@ def get_table(namespace: str, table_name: str, *args, **kwargs) -> Optional[Tabl Gets table metadata for the specified table. Returns None if the given table does not exist. """ - raise NotImplementedError("get_table not implemented") + locator = TableLocator.at(namespace=namespace, table_name=table_name) + return _latest( + *args, + metafile=Table.of(locator=locator), + **kwargs, + ) def table_exists(namespace: str, table_name: str, *args, **kwargs) -> bool: """ Returns True if the given table exists, False if not. """ - raise NotImplementedError("table_exists not implemented") + locator = TableLocator.at(namespace=namespace, table_name=table_name) + return _exists( + *args, + metafile=Table.of(locator=locator), + **kwargs, + ) def get_table_version( - namespace: str, table_name: str, table_version: str, *args, **kwargs + namespace: str, + table_name: str, + table_version: str, + *args, + **kwargs, ) -> Optional[TableVersion]: """ Gets table version metadata for the specified table version. Returns None if the given table version does not exist. """ - raise NotImplementedError("get_table_version not implemented") + locator = TableVersionLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + ) + table_version = TableVersion.of( + locator=locator, + schema=None, + ) + return _latest( + *args, + metafile=table_version, + **kwargs, + ) def get_latest_table_version( @@ -679,7 +1756,25 @@ def get_latest_table_version( Gets table version metadata for the latest version of the specified table. Returns None if no table version exists for the given table. """ - raise NotImplementedError("get_latest_table_version not implemented") + table_version_id = _resolve_latest_table_version_id( + *args, + namespace=namespace, + table_name=table_name, + fail_if_no_active_table_version=False, + **kwargs, + ) + + return ( + get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version_id, + **kwargs, + ) + if table_version_id + else None + ) def get_latest_active_table_version( @@ -689,7 +1784,24 @@ def get_latest_active_table_version( Gets table version metadata for the latest active version of the specified table. Returns None if no active table version exists for the given table. """ - raise NotImplementedError("get_latest_active_table_version not implemented") + table_version_id = _resolve_latest_active_table_version_id( + *args, + namespace=namespace, + table_name=table_name, + fail_if_no_active_table_version=False, + **kwargs, + ) + return ( + get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version_id, + **kwargs, + ) + if table_version_id + else None + ) def get_table_version_column_names( @@ -707,7 +1819,12 @@ def get_table_version_column_names( Returns None for schemaless tables. Raises an error if the table version does not exist. """ - raise NotImplementedError("get_table_version_column_names not implemented") + schema = get_table_version_schema( + namespace=namespace, + table_name=table_name, + table_version=table_version, + ) + return schema.arrow.names if schema else None def get_table_version_schema( @@ -722,7 +1839,23 @@ def get_table_version_schema( table version if none is specified. Returns None if the table version is schemaless. Raises an error if the table version does not exist. """ - raise NotImplementedError("get_table_version_schema not implemented") + table_version = ( + get_table_version( + *args, + namespace=namespace, + table_name=table_name, + table_version=table_version, + **kwargs, + ) + if table_version + else get_latest_active_table_version( + *args, + namespace=namespace, + table_name=table_name, + **kwargs, + ) + ) + return table_version.schema def table_version_exists( @@ -731,7 +1864,20 @@ def table_version_exists( """ Returns True if the given table version exists, False if not. """ - raise NotImplementedError("table_version_exists not implemented") + locator = TableVersionLocator.at( + namespace=namespace, + table_name=table_name, + table_version=table_version, + ) + table_version = TableVersion.of( + locator=locator, + schema=None, + ) + return _exists( + *args, + metafile=table_version, + **kwargs, + ) def can_categorize(e: BaseException, *args, **kwargs) -> bool: diff --git a/deltacat/storage/model/list_result.py b/deltacat/storage/model/list_result.py index 35f53dd7..b735feaa 100644 --- a/deltacat/storage/model/list_result.py +++ b/deltacat/storage/model/list_result.py @@ -21,6 +21,14 @@ def of( list_result["nextPageProvider"] = next_page_provider return list_result + @staticmethod + def empty() -> ListResult: + list_result = ListResult() + list_result["items"] = [] + list_result["paginationKey"] = None + list_result["nextPageProvider"] = None + return list_result + def read_page(self) -> Optional[List[T]]: return self.get("items") diff --git a/deltacat/storage/model/locator.py b/deltacat/storage/model/locator.py index 43a8fcc4..62aced4a 100644 --- a/deltacat/storage/model/locator.py +++ b/deltacat/storage/model/locator.py @@ -55,6 +55,12 @@ def join(self, separator: str = DEFAULT_NAME_SEPARATOR) -> str: """ return separator.join(self.parts()) + def exists(self) -> bool: + """ + Returns True if this locator name is defined, False otherwise. + """ + return self.immutable_id or all(self.parts()) + class Locator: """ diff --git a/deltacat/storage/model/metafile.py b/deltacat/storage/model/metafile.py index 7625ebc7..f15a14a4 100644 --- a/deltacat/storage/model/metafile.py +++ b/deltacat/storage/model/metafile.py @@ -113,6 +113,15 @@ def latest_revision( current_txn_id: Optional[str] = None, ignore_missing_revision: bool = False, ) -> MetafileRevisionInfo: + """ + Fetch latest revision of a metafile, or return None if no + revisions exist. + :param revision_dir_path: root path of directory for metafile + :param ignore_missing_revision: if True, will return + MetafileRevisionInfo.undefined() on no revisions + :raises ValueError if no revisions are found AND + ignore_missing_revision=False + """ revisions = MetafileRevisionInfo.list_revisions( revision_dir_path=revision_dir_path, filesystem=filesystem, @@ -180,7 +189,7 @@ def new_revision( ignore_missing_revision=is_create_txn, ) # validate the transaction operation type - if mri.revision: + if mri.exists(): # update/delete fails if the last metafile was deleted if mri.txn_op_type == TransactionOperationType.DELETE: if current_txn_op_type != TransactionOperationType.CREATE: @@ -371,6 +380,9 @@ def path(self) -> Optional[str]: else None ) + def exists(self) -> bool: + return bool(self.revision) + class Metafile(dict): """ @@ -490,6 +502,38 @@ def read_txn( pagination_key=None, next_page_provider=None, ) + else: + # Could not find any revisions in list operations - return no results + return ListResult.empty() + + @staticmethod + def get_class(serialized_dict: dict): + """ + Given a serialized dictionary of Metafile data, gets the metafile child + class type to instantiate. + """ + # TODO: more robust implementation. Right now this relies on the + # assumption that XLocator key will only be present in class X, and + # is brittle to renames. On the other hand, this implementation does + # not require any marker fields to be persisted, and a regression + # will be quickly detected by test_metafile.io or other unit tests + if serialized_dict.__contains__("tableLocator"): + return deltacat.storage.model.table.Table + elif serialized_dict.__contains__("namespaceLocator"): + return deltacat.storage.model.namespace.Namespace + elif serialized_dict.__contains__("tableVersionLocator"): + return deltacat.storage.model.table_version.TableVersion + elif serialized_dict.__contains__("partitionLocator"): + return deltacat.storage.model.partition.Partition + elif serialized_dict.__contains__("streamLocator"): + return deltacat.storage.model.stream.Stream + elif serialized_dict.__contains__("deltaLocator"): + return deltacat.storage.model.delta.Delta + else: + raise ValueError( + f"Could not find metafile class from serialized form: " + f"${serialized_dict}" + ) @classmethod def read( @@ -507,7 +551,10 @@ def read( path, filesystem = resolve_path_and_filesystem(path, filesystem) with filesystem.open_input_stream(path) as file: binary = file.readall() - obj = cls(**msgpack.loads(binary)).from_serializable(path, filesystem) + data = msgpack.loads(binary) + # Cast this Metafile into the appropriate child class type + clazz = Metafile.get_class(data) + obj = clazz(**data).from_serializable(path, filesystem) return obj def write_txn( @@ -675,6 +722,7 @@ def children( parent_obj_path, self.id, ) + # List metafiles with respect to this metafile's URI as root return self._list_metafiles( success_txn_log_dir=success_txn_log_dir, metafile_root_dir_path=metafile_root_dir_path, @@ -742,15 +790,34 @@ def revisions( filesystem=filesystem, ) metafile_root = posixpath.join(*[catalog_root] + ancestor_ids) - # TODO(pdames): Refactor id lazy assignment into explicit getter/setter - immutable_id = self.get("id") or Metafile._locator_to_id( - locator=self.locator, - catalog_root=catalog_root, - metafile_root=metafile_root, - filesystem=filesystem, - txn_start_time=current_txn_start_time, - txn_id=current_txn_id, - ) + try: + locator = ( + self.locator + if self.locator.name.exists() + else self.locator_alias + if self.locator_alias and self.locator_alias.name.exists() + else None + ) + immutable_id = ( + # TODO(pdames): Refactor id lazy assignment into explicit getter/setter + self.get("id") + or Metafile._locator_to_id( + locator=locator, + catalog_root=catalog_root, + metafile_root=metafile_root, + filesystem=filesystem, + txn_start_time=current_txn_start_time, + txn_id=current_txn_id, + ) + if locator + else None + ) + except ValueError: + # the metafile has been deleted + return ListResult.empty() + if not immutable_id: + # the metafile does not exist + return ListResult.empty() revision_dir_path = posixpath.join( metafile_root, immutable_id, @@ -766,7 +833,7 @@ def revisions( ) items = [] for mri in revisions: - if mri.revision: + if mri.exists(): metafile = ( {} if not materialize_revisions @@ -841,6 +908,9 @@ def ancestor_ids( txn_start_time=current_txn_start_time, txn_id=current_txn_id, ) + if not ancestor_id: + err_msg = f"Ancestor does not exist: {parent_locator}." + raise ValueError(err_msg) metafile_root = posixpath.join( metafile_root, ancestor_id, @@ -886,9 +956,12 @@ def _locator_to_id( filesystem: pyarrow.fs.FileSystem, txn_start_time: Optional[int] = None, txn_id: Optional[str] = None, - ) -> str: + ) -> Optional[str]: """ - Resolves the metafile ID for the given locator. + Resolves the immutable metafile ID for the given locator. + + :return: Immutable ID read from mapping file. None if no mapping exists. + :raises: ValueError if the id is found but has been deleted """ metafile_id = locator.name.immutable_id if not metafile_id: @@ -906,7 +979,10 @@ def _locator_to_id( success_txn_log_dir=success_txn_log_dir, current_txn_start_time=txn_start_time, current_txn_id=txn_id, + ignore_missing_revision=True, ) + if not mri.exists(): + return None if mri.txn_op_type == TransactionOperationType.DELETE: err_msg = ( f"Locator {locator} to metafile ID resolution failed " @@ -929,6 +1005,8 @@ def _write_locator_to_id_map_file( filesystem: pyarrow.fs.FileSystem, ) -> None: name_resolution_dir_path = locator.path(parent_obj_path) + # TODO(pdames): Don't write updated revisions with the same mapping as + # the latest revision. mri = MetafileRevisionInfo.new_revision( revision_dir_path=name_resolution_dir_path, current_txn_op_type=current_txn_op_type, @@ -991,6 +1069,7 @@ def _write_metafile_revisions( parent_obj_path = posixpath.join(*[catalog_root] + ancestor_path_elements) mutable_src_locator = None mutable_dest_locator = None + # metafiles without named immutable IDs have mutable name mappings if not self.named_immutable_id: mutable_src_locator = ( current_txn_op.src_metafile.locator @@ -998,6 +1077,7 @@ def _write_metafile_revisions( else None ) mutable_dest_locator = current_txn_op.dest_metafile.locator + # metafiles with named immutable IDs may have aliases elif self.locator_alias: mutable_src_locator = ( current_txn_op.src_metafile.locator_alias @@ -1010,6 +1090,7 @@ def _write_metafile_revisions( # from the locator back to its immutable metafile ID if ( current_txn_op.type == TransactionOperationType.UPDATE + and mutable_src_locator is not None and mutable_src_locator != mutable_dest_locator ): # this update includes a rename @@ -1072,16 +1153,23 @@ def _write_metafile_revisions( current_txn_id=current_txn_id, filesystem=filesystem, ) - # mark the dest metafile as created - self._write_metafile_revision( - success_txn_log_dir=success_txn_log_dir, - revision_dir_path=metafile_revision_dir_path, - current_txn_op=current_txn_op, - current_txn_op_type=TransactionOperationType.CREATE, - current_txn_start_time=current_txn_start_time, - current_txn_id=current_txn_id, - filesystem=filesystem, - ) + try: + # mark the dest metafile as created + self._write_metafile_revision( + success_txn_log_dir=success_txn_log_dir, + revision_dir_path=metafile_revision_dir_path, + current_txn_op=current_txn_op, + current_txn_op_type=TransactionOperationType.CREATE, + current_txn_start_time=current_txn_start_time, + current_txn_id=current_txn_id, + filesystem=filesystem, + ) + except ValueError as e: + # TODO(pdames): raise/catch a DuplicateMetafileCreate exception. + if "already exists" not in str(e): + raise e + # src metafile is being replaced by an existing dest metafile + else: self._write_metafile_revision( success_txn_log_dir=success_txn_log_dir, @@ -1123,7 +1211,7 @@ def _list_metafiles( current_txn_id=current_txn_id, ignore_missing_revision=True, ) - if mri.revision: + if mri.exists(): item = self.read( path=mri.path, filesystem=filesystem, @@ -1162,7 +1250,7 @@ def _ancestor_ids( current_txn_id=current_txn_id, ignore_missing_revision=True, ) - if mri.revision: + if mri.exists(): return mri.revision.ancestor_ids else: raise ValueError(f"Metafile {self.id} does not exist.") diff --git a/deltacat/storage/model/partition.py b/deltacat/storage/model/partition.py index 29627c24..afdcecef 100644 --- a/deltacat/storage/model/partition.py +++ b/deltacat/storage/model/partition.py @@ -258,7 +258,9 @@ def from_serializable( filesystem: Optional[pyarrow.fs.FileSystem] = None, ) -> Partition: self["schema"] = ( - Schema.deserialize(pa.py_buffer(self["schema"])) if self["schema"] else None + Schema.deserialize(pa.py_buffer(self["schema"])) + if self.get("schema") + else None ) # restore the table locator from its mapped immutable metafile ID if self.table_locator and self.table_locator.table_name == self.id: @@ -579,16 +581,20 @@ def parts(self) -> List[str]: class PartitionLocatorAlias(Locator, dict): @staticmethod def of(parent_partition: Partition): - return PartitionLocatorAlias( - { - "partition_values": parent_partition.partition_values, - "partition_scheme_id": parent_partition.partition_scheme_id, - "parent": ( - parent_partition.locator.parent - if parent_partition.locator - else None - ), - } + return ( + PartitionLocatorAlias( + { + "partition_values": parent_partition.partition_values, + "partition_scheme_id": parent_partition.partition_scheme_id, + "parent": ( + parent_partition.locator.parent + if parent_partition.locator + else None + ), + } + ) + if parent_partition.state == CommitState.COMMITTED + else None # only committed partitions can be resolved by alias ) @property diff --git a/deltacat/storage/model/stream.py b/deltacat/storage/model/stream.py index 2208a191..77ce7ec9 100644 --- a/deltacat/storage/model/stream.py +++ b/deltacat/storage/model/stream.py @@ -373,13 +373,17 @@ class StreamLocatorAlias(Locator, dict): def of( parent_stream: Stream, ) -> StreamLocatorAlias: - return StreamLocatorAlias( - { - "format": parent_stream.stream_format, - "parent": ( - parent_stream.locator.parent if parent_stream.locator else None - ), - } + return ( + StreamLocatorAlias( + { + "format": parent_stream.stream_format, + "parent": ( + parent_stream.locator.parent if parent_stream.locator else None + ), + } + ) + if parent_stream.state == CommitState.COMMITTED + else None # only committed streams can be resolved by alias ) @property diff --git a/deltacat/storage/model/table.py b/deltacat/storage/model/table.py index 37e0b225..27168aa9 100644 --- a/deltacat/storage/model/table.py +++ b/deltacat/storage/model/table.py @@ -29,12 +29,16 @@ def of( locator: Optional[TableLocator], description: Optional[str] = None, properties: Optional[TableProperties] = None, + latest_active_table_version: Optional[str] = None, + latest_table_version: Optional[str] = None, native_object: Optional[Any] = None, ) -> Table: table = Table() table.locator = locator table.description = description table.properties = properties + table.latest_active_table_version = latest_active_table_version + table.latest_table_version = latest_table_version table.native_object = native_object return table @@ -65,6 +69,28 @@ def properties(self) -> Optional[TableProperties]: def properties(self, properties: Optional[TableProperties]) -> None: self["properties"] = properties + @property + def latest_active_table_version(self) -> Optional[str]: + return self.get("latest_active_table_version") + + @latest_active_table_version.setter + def latest_active_table_version( + self, + latest_active_table_version: Optional[str], + ) -> None: + self["latest_active_table_version"] = latest_active_table_version + + @property + def latest_table_version(self) -> Optional[str]: + return self.get("latest_table_version") + + @latest_table_version.setter + def latest_table_version( + self, + latest_table_version: Optional[str], + ) -> None: + self["latest_table_version"] = latest_table_version + @property def native_object(self) -> Optional[Any]: return self.get("nativeObject") diff --git a/deltacat/storage/model/table_version.py b/deltacat/storage/model/table_version.py index 1f8901a3..5b1d02af 100644 --- a/deltacat/storage/model/table_version.py +++ b/deltacat/storage/model/table_version.py @@ -1,7 +1,9 @@ # Allow classes to use self-referencing Type hints in Python 3.7. from __future__ import annotations +import re import posixpath +import uuid from typing import Any, Dict, List, Optional import pyarrow @@ -43,6 +45,7 @@ def of( schemas: Optional[SchemaList] = None, partition_schemes: Optional[partition.PartitionSchemeList] = None, sort_schemes: Optional[SortSchemeList] = None, + previous_table_version: Optional[str] = None, native_object: Optional[Any] = None, ) -> TableVersion: table_version = TableVersion() @@ -58,6 +61,7 @@ def of( table_version.schemas = schemas table_version.partition_schemes = partition_schemes table_version.sort_schemes = sort_schemes + table_version.previous_table_version = previous_table_version table_version.native_object = native_object return table_version @@ -167,6 +171,14 @@ def description(self) -> Optional[str]: def description(self, description: Optional[str]) -> None: self["description"] = description + @property + def previous_table_version(self) -> Optional[str]: + return self.get("previous_table_version") + + @previous_table_version.setter + def previous_table_version(self, previous_table_version: Optional[str]) -> None: + self["previous_table_version"] = previous_table_version + @property def properties(self) -> Optional[TableVersionProperties]: return self.get("properties") @@ -263,16 +275,19 @@ def from_serializable( filesystem: Optional[pyarrow.fs.FileSystem] = None, ) -> TableVersion: self["schema"] = ( - Schema.deserialize(pa.py_buffer(self["schema"])) if self["schema"] else None + Schema.deserialize(pa.py_buffer(self["schema"])) + if self.get("schema") + else None ) self.schemas = ( [Schema.deserialize(pa.py_buffer(_)) for _ in self["schemas"]] - if self["schemas"] + if self.get("schemas") else None ) - # force list-to-tuple conversion of sort keys via property invocation - self.sort_scheme.keys - [sort_scheme.keys for sort_scheme in self.sort_schemes] + if self.sort_scheme: + # force list-to-tuple conversion of sort keys via property invocation + self.sort_scheme.keys + [sort_scheme.keys for sort_scheme in self.sort_schemes] # restore the table locator from its mapped immutable metafile ID if self.table_locator and self.table_locator.table_name == self.id: parent_rev_dir_path = Metafile._parent_metafile_rev_dir_path( @@ -298,6 +313,23 @@ def from_serializable( self.locator.table_locator = table.locator return self + @staticmethod + def next_version(previous_version: Optional[str] = None) -> str: + """ + Assigns the next table version string given the previous table version. + Attempts to use the convention of 1-based incrementing integers of + the form "v1", "v2", etc. or of the form "1", "2", etc. + If the previous table version is not of this form, then assigns a UUID + to the next table version. + """ + if previous_version is not None: + version_match = re.match(r"^(v?)(\d+)$", previous_version) + if version_match: + prefix, version_number = version_match.groups() + new_version_number = int(version_number) + 1 + return f"{prefix}{new_version_number}" + return str(uuid.uuid4()) + class TableVersionLocatorName(LocatorName): def __init__(self, locator: TableVersionLocator): diff --git a/deltacat/storage/model/transaction.py b/deltacat/storage/model/transaction.py index 83521dd8..4a03b0b9 100644 --- a/deltacat/storage/model/transaction.py +++ b/deltacat/storage/model/transaction.py @@ -81,7 +81,9 @@ class TransactionSystemTimeProvider(TransactionTimeProvider): def start_time(self) -> int: """ - Gets the current system time in nanoseconds since the epoch. + Gets the current system time in nanoseconds since the epoch. Ensures + that the start time returned is greater than the last known end time + recorded at the time this method is invoked. :return: Current epoch time in nanoseconds. """ # ensure serial transactions in a single process have start times after @@ -112,7 +114,9 @@ def start_time(self) -> int: def end_time(self) -> int: """ - Gets the current system time in nanoseconds since the epoch. + Gets the current system time in nanoseconds since the epoch. Ensures + that the end time returned is no less than the last known start time + recorded at the time this method is invoked. :return: Current epoch time in nanoseconds. """ # ensure serial transactions in a single process have end times no less @@ -444,6 +448,17 @@ def to_serializable(self) -> Transaction: # reduce file size (they can be reconstructed from their corresponding # files as required). for operation in serializable.operations: + # Sanity check that IDs exist on source and dest metafiles + if operation.dest_metafile and operation.dest_metafile.id is None: + raise ValueError( + f"Transaction operation ${operation} dest metafile does " + f"not have ID: ${operation.dest_metafile}" + ) + if operation.src_metafile and operation.src_metafile.id is None: + raise ValueError( + f"Transaction operation ${operation} src metafile does " + f"not have ID: ${operation.src_metafile}" + ) operation.dest_metafile = { "id": operation.dest_metafile.id, "locator": operation.dest_metafile.locator, diff --git a/deltacat/tests/storage/main/test_main_storage.py b/deltacat/tests/storage/main/test_main_storage.py index ee472185..9d2ba04f 100644 --- a/deltacat/tests/storage/main/test_main_storage.py +++ b/deltacat/tests/storage/main/test_main_storage.py @@ -1,41 +1,364 @@ +import shutil +import tempfile + +import pytest + +from deltacat import Schema, Field from deltacat.storage import ( metastore, Namespace, NamespaceLocator, + Table, + TableVersion, + TableVersionLocator, + StreamFormat, ) +from deltacat.catalog.main.impl import PropertyCatalog +import pyarrow as pa -class TestMainStorage: - def test_list_namespaces(self, temp_catalog): - # given a namespace to create - namespace_name = "test_namespace" +@pytest.fixture +def schema(): + return Schema.of( + [ + Field.of( + field=pa.field("some_string", pa.string(), nullable=False), + field_id=1, + is_merge_key=True, + ), + Field.of( + field=pa.field("some_int32", pa.int32(), nullable=False), + field_id=2, + is_merge_key=True, + ), + Field.of( + field=pa.field("some_float64", pa.float64()), + field_id=3, + is_merge_key=False, + ), + ] + ) - # when the namespace is created - created_namespace = metastore.create_namespace( - namespace=namespace_name, - catalog=temp_catalog, + +class TestNamespace: + @classmethod + def setup_class(cls): + cls.tmpdir = tempfile.mkdtemp() + cls.catalog = PropertyCatalog(cls.tmpdir) + cls.namespace1 = metastore.create_namespace( + namespace="namespace1", + catalog=cls.catalog, + ) + cls.namespace2 = metastore.create_namespace( + namespace="namespace2", + catalog=cls.catalog, ) + @classmethod + def teardown_class(cls): + shutil.rmtree(cls.tmpdir) + + def test_list_namespaces(self): # expect the namespace returned to match the input namespace to create - namespace_locator = NamespaceLocator.of(namespace=namespace_name) + namespace_locator = NamespaceLocator.of(namespace="namespace1") expected_namespace = Namespace.of(locator=namespace_locator) - assert expected_namespace.equivalent_to(created_namespace) + assert expected_namespace.equivalent_to(self.namespace1) # expect the namespace to exist assert metastore.namespace_exists( - namespace=namespace_name, - catalog=temp_catalog, + namespace="namespace1", + catalog=self.catalog, ) # expect the namespace to also be returned when listing namespaces - list_result = metastore.list_namespaces(catalog=temp_catalog) - namespaces = list_result.all_items() - assert len(namespaces) == 1 - assert namespaces[0].equivalent_to(expected_namespace) + list_result = metastore.list_namespaces(catalog=self.catalog) + namespaces_by_name = {n.locator.namespace: n for n in list_result.all_items()} + assert len(namespaces_by_name.items()) == 2 + assert namespaces_by_name["namespace1"].equivalent_to(self.namespace1) + assert namespaces_by_name["namespace2"].equivalent_to(self.namespace2) + def test_get_namespace(self): # expect the namespace to also be returned when explicitly retrieved read_namespace = metastore.get_namespace( - namespace=namespace_name, - catalog=temp_catalog, + namespace="namespace1", + catalog=self.catalog, + ) + assert read_namespace and read_namespace.equivalent_to(self.namespace1) + + def test_namespace_exists_existing(self): + assert metastore.namespace_exists( + "namespace1", + catalog=self.catalog, + ) + + def test_namespace_exists_nonexisting(self): + assert not metastore.namespace_exists( + "foobar", + catalog=self.catalog, + ) + + +class TestTable: + @classmethod + def setup_class(cls): + cls.tmpdir = tempfile.mkdtemp() + cls.catalog = PropertyCatalog(cls.tmpdir) + # Create a namespace to hold our tables + cls.namespace_obj = metastore.create_namespace( + namespace="test_table_ns", + catalog=cls.catalog, + ) + # Create two tables + cls.stream1 = metastore.create_table_version( + namespace="test_table_ns", + table_name="table1", + table_version="v1", + catalog=cls.catalog, + ) + cls.stream2 = metastore.create_table_version( + namespace="test_table_ns", + table_name="table2", + table_version="v1", + catalog=cls.catalog, + ) + + @classmethod + def teardown_class(cls): + shutil.rmtree(cls.tmpdir) + + def test_list_tables(self): + # list the tables under our namespace + list_result = metastore.list_tables( + "test_table_ns", + catalog=self.catalog, + ) + all_tables = list_result.all_items() + + # we expect 2 distinct tables + table_names = {t.table_name for t in all_tables if isinstance(t, Table)} + assert "table1" in table_names + assert "table2" in table_names + + def test_get_table(self): + # test we can retrieve table1 by name + tbl = metastore.get_table( + "test_table_ns", + "table1", + catalog=self.catalog, + ) + assert tbl is not None + # TODO(pdames): replace with tbl.equivalent_to(expected) + assert tbl.table_name == "table1" + + def test_table_exists_existing(self): + # table1 should exist + assert metastore.table_exists( + "test_table_ns", + "table1", + catalog=self.catalog, + ) + + def test_table_exists_nonexisting(self): + assert not metastore.table_exists( + "test_table_ns", + "no_such_table", + catalog=self.catalog, + ) + + +class TestTableVersion: + @classmethod + def setup_class(cls): + cls.tmpdir = tempfile.mkdtemp() + cls.catalog = PropertyCatalog(cls.tmpdir) + # Create a namespace and single table + cls.namespace_obj = metastore.create_namespace( + namespace="test_tv_ns", + catalog=cls.catalog, + ) + # Create a "base" table to attach versions to + metastore.create_table_version( + namespace="test_tv_ns", + table_name="mytable", + table_version="v1", + catalog=cls.catalog, + ) + # Now create an additional version + cls.stream2 = metastore.create_table_version( + namespace="test_tv_ns", + table_name="mytable", + table_version="v2", + catalog=cls.catalog, + ) + + @classmethod + def teardown_class(cls): + shutil.rmtree(cls.tmpdir) + + def test_list_table_versions(self): + list_result = metastore.list_table_versions( + "test_tv_ns", + "mytable", + catalog=self.catalog, + ) + tvs = list_result.all_items() + # we expect v1 and v2 + version_ids = [] + for tv in tvs: + if isinstance(tv, TableVersion): + version_ids.append(tv.table_version) + assert set(version_ids) == {"v1", "v2"} + + def test_get_table_version(self): + tv = metastore.get_table_version( + "test_tv_ns", + "mytable", + "v1", + catalog=self.catalog, + ) + assert tv is not None + assert tv.table_version == "v1" + + def test_table_version_exists(self): + # v2 should exist + assert metastore.table_version_exists( + "test_tv_ns", + "mytable", + "v2", + catalog=self.catalog, + ) + + def test_table_version_exists_nonexisting(self): + # "v999" should not exist + assert not metastore.table_version_exists( + "test_tv_ns", + "mytable", + "v999", + catalog=self.catalog, + ) + + def test_creation_fails_if_already_exists(self): + # Assert that creating the same table version again raises a ValueError + with pytest.raises(ValueError): + metastore.create_table_version( + namespace="test_tv_ns", + table_name="mytable", + table_version="v1", + catalog=self.catalog, + ) + + +class TestStream: + @classmethod + def setup_class(cls): + cls.tmpdir = tempfile.mkdtemp() + cls.catalog = PropertyCatalog(cls.tmpdir) + metastore.create_namespace( + "test_stream_ns", + catalog=cls.catalog, + ) + # Create a table version. + # This call should automatically create a default DeltaCAT stream. + metastore.create_table_version( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + catalog=cls.catalog, + ) + # Retrieve the auto-created default stream. + cls.default_stream = metastore.get_stream( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + catalog=cls.catalog, + ) + # Ensure that the default stream was auto-created. + assert cls.default_stream is not None, "Default stream not found." + + @classmethod + def teardown_class(cls): + shutil.rmtree(cls.tmpdir) + + def test_list_streams(self): + list_result = metastore.list_streams( + "test_stream_ns", "mystreamtable", "v1", catalog=self.catalog + ) + streams = list_result.all_items() + # We expect exactly one stream (the default "deltacat" stream). + assert len(streams) == 1 + + def test_get_stream(self): + stream = metastore.get_stream( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + catalog=self.catalog, + ) + assert stream is not None + # The stream's format should be the default "deltacat" + assert stream.stream_format.lower() == StreamFormat.DELTACAT.value.lower() + + def test_list_stream_partitions_empty(self): + # no partitions yet + list_result = metastore.list_stream_partitions( + self.default_stream, + catalog=self.catalog, + ) + partitions = list_result.all_items() + assert len(partitions) == 0 + + def test_delete_stream(self): + # We can delete the stream + metastore.delete_stream( + "test_stream_ns", + "mystreamtable", + "v1", + catalog=self.catalog, + ) + # Now get_stream should return None + stream = metastore.get_stream( + "test_stream_ns", + "mystreamtable", + "v1", + catalog=self.catalog, + ) + assert stream is None + + def test_stage_and_commit_stream_replacement(self): + # Stage & commit a stream + stream = metastore.stage_stream( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + catalog=self.catalog, + ) + fetched_stream = metastore.get_staged_stream( + table_version_locator=TableVersionLocator.at( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + ), + stream_id=stream.stream_id, + catalog=self.catalog, + ) + assert fetched_stream.equivalent_to(stream) + committed_stream = metastore.commit_stream( + stream=stream, + catalog=self.catalog, + ) + fetched_stream = metastore.get_stream( + namespace="test_stream_ns", + table_name="mystreamtable", + table_version="v1", + catalog=self.catalog, + ) + assert fetched_stream.equivalent_to(committed_stream) + list_result = metastore.list_streams( + "test_stream_ns", + "mystreamtable", + "v1", + catalog=self.catalog, ) - assert read_namespace and read_namespace.equivalent_to(expected_namespace) + streams = list_result.all_items() + # This will list the staged stream and the committed stream + assert len(streams) == 2 diff --git a/deltacat/tests/storage/model/test_metafile_io.py b/deltacat/tests/storage/model/test_metafile_io.py index 751a9cdc..451ecb41 100644 --- a/deltacat/tests/storage/model/test_metafile_io.py +++ b/deltacat/tests/storage/model/test_metafile_io.py @@ -2723,3 +2723,7 @@ def test_python_type_serde(self, temp_dir): # expect the table created to otherwise match the table given table.properties = expected_properties assert table.equivalent_to(deserialized_table) + + def test_metafile_read_bad_path(self, temp_dir): + with pytest.raises(FileNotFoundError): + Delta.read("foobar") diff --git a/deltacat/tests/storage/model/test_table_version.py b/deltacat/tests/storage/model/test_table_version.py new file mode 100644 index 00000000..cdec505a --- /dev/null +++ b/deltacat/tests/storage/model/test_table_version.py @@ -0,0 +1,26 @@ +import pytest +import uuid +from deltacat.storage.model.table_version import TableVersion + + +@pytest.mark.parametrize( + "previous_version, expected_new_version", + [ + (None, None), + ("v1", "v2"), + ("1", "2"), + ( + "version1", + None, + ), + ("v999", "v1000"), + ], +) +def test_new_version(previous_version, expected_new_version): + new_version = TableVersion.next_version(previous_version) + assert isinstance(new_version, str) + if expected_new_version: + assert new_version == expected_new_version + else: + # ensure that the new version is a valid UUID + uuid.UUID(new_version) diff --git a/setup.py b/setup.py index 226e7853..c0077f81 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ def find_version(*paths): name="deltacat", version=find_version("deltacat", "__init__.py"), author="Ray Team", - description="A scalable, fast, ACID-compliant Data Catalog powered by Ray.", + description="A portable, scalable, fast, and Pythonic Data Lakehouse for AI.", long_description=long_description, long_description_content_type="text/markdown", url="https://github.com/ray-project/deltacat",