Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate deltacat namespace/table/table_version/stream/partition into rivulet #478

Merged
merged 18 commits into from
Feb 4, 2025

Conversation

anshumankomawar
Copy link
Collaborator

@anshumankomawar anshumankomawar commented Jan 30, 2025

Summary

Integrated additional DeltaCAT metadata into Rivulet to better align with DeltaCAT's metastore model.

Rationale

This update ensures compatibility with DeltaCAT's metadata storage, allowing Rivulet to leverage DeltaCAT transactions, manifests, and partitions.

Changes

  • Implemented DeltaCAT-compatible metadata using transactions:
    • Introduced temporary constants for:
      • NAMESPACE = "namespace"
      • TABLE_NAME = dataset_name
      • TABLE_VERSION = "table_version"
      • STREAM_ID = "stream"
      • PARTITION_ID = "partition"
    • Ensured deltas are written under a sentinel stream and partition.
.riv-meta-contacts
├── {namespace_id}
│   ├── {dataset_id}
│   │   ├── rev
│   │   │   └── {revision_id}.mpk
│   │   └── v1
│   │       ├── {manifest_id}
│   │       │   └── {manifest_file}.stream
│   │       ├── rev
│   │       │   └── {revision_id}.mpk
│   │       └── stream
│   │           ├── {partition_id}
│   │           │   ├── {delta_1}
│   │           │   │   └── rev
│   │           │   │       └── {revision_id}.mpk
│   │           │   ├── {delta_2}
│   │           │   │   └── rev
│   │           │   │       └── {revision_id}.mpk
│   │           │   └── rev
│   │           │       └── {revision_id}.mpk
│   │           ├── data
│   │           │   ├── *.parquet
│   │           ├── metadata
│   │           │   └── ssts
│   │           │       └── 0
│   │           │           ├── {sst_timestamp_1}.json
│   │           │           ├── {sst_timestamp_2}.json
│   │           │           └── {sst_timestamp_N}.json
│   │           ├── partition
│   │           │   ├── {timestamp_1}
│   │           │   │   └── rev
│   │           │   │       └── {revision_id}.mpk
│   │           │   ├── {timestamp_2}
│   │           │   │   └── rev
│   │           │   │       └── {revision_id}.mpk
│   │           │   └── rev
│   │           │       └── {revision_id}.mpk
│   │           └── rev
│   │               └── {revision_id}.mpk
│   ├── {hash_id}
│   │   └── {dataset_id_reference}
│   └── rev
│       └── {revision_id}.mpk
├── {transaction_id}
│   └── {dataset_reference}
└── txn
   ├── failed
   ├── running
   └── success
       ├── {success_id_1}
       │   └── {delta_commit_timestamp_1}
       ├── {success_id_2}
       │   └── {delta_commit_timestamp_2}
       └── {success_id_N}
           └── {delta_commit_timestamp_N}

Impact

  • Makes it easier to integrate with deltacat storage and catalog interface.

Testing

  • Manually verified metadata structure conforms to DeltaCAT expectations.
  • make test

Future Work

  • Dynamic support for table names, versions, and partitions (currently hardcoded).
  • Refine partitioning strategy—evaluate if the rivulet needs/can support partitioning.
  • Further integrate DeltaCAT storage API once available, replacing temporary workarounds.

Checklist

  • Unit tests covering the changes have been added

    • If this is a bugfix, regression tests have been added
  • E2E testing has been performed

@@ -32,6 +51,15 @@
ALL = "all"
DEFAULT = "default"

# Hardcoded deltacat catalog values
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to discuss this with team. default values as well as should we be hardcoding them?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have to put these in the storage interface. Can you at least put these in storage/main/ and have some TODOs explaining how to reconcile defaulting into main storage interface?

self.sst_reader = sst_reader or JsonSstReader()
self.locator = locator

def _find_first_child_with_rev(self, parent_path: str, filesystem: pyarrow.fs.FileSystem) -> str:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These is definitely not scalable and needs to be replaced by the storage interface. I'll try to update these with the helper methods that patrick has.

@anshumankomawar anshumankomawar force-pushed the dev/anshumankomawar branch 2 times, most recently from c07a2ec to bdbb15b Compare January 31, 2025 22:21
# Take lexicographical max to find the latest revision
latest_revision = max(revisions, key=lambda f: f.path)

return RivuletDelta.of(Delta.read(latest_revision.path)) if latest_revision else None

def generate_manifests(self) -> Generator[ManifestAccessor, None, None]:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only work with the current fixed directory structure. All of this should be removed in the favor of helper storage interface methods.

@@ -1,137 +0,0 @@
import shutil
Copy link
Collaborator Author

@anshumankomawar anshumankomawar Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only use FileStore through the FileProvider and the tests included in FileProvider cover most use cases. FileStore will also be removed once the storage interface is added.

@@ -0,0 +1,68 @@
import posixpath
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be removed once storage interface is added.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give this file a more descriptive name?

@anshumankomawar anshumankomawar marked this pull request as ready for review January 31, 2025 22:50

# TODO: Rely on deltcat storage interface to determine current location.
# Consider a wrapper to hold active values.
self._locator = DeltaLocator.at(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...it's not clear to me why we need to instantiate a delta locator on dataset creation. Isn't the point of a locator to be a reference to a specific metafile, i.e. a specific delta? What is the purpose of storing one at the dataset class level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I needed an object to pass around the deltacat values, we could make a custom Rivulet class but wanted to keep it simple for now. I'll change this to a PartitionLocator (should hold any necessary values we need).

self._metadata_path, self._locator, self._file_store
)

self._metastore = DatasetMetastore(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: over the course of the deltacat/rivulet metadata integration epic we should be removing dataset metastore, file provider, and other internal classes which will become irrelevant.

We can't do that yet but just making sure you understand the end goal

except Exception as e:
# TODO: Have deltacat storage interface handle transaction errors.
error_message = str(e).lower()
if "already exists" in error_message:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: in the future the storage interface should have a graceful way to only create e.g. a namespace if it doesn't already exist

if schema:
self.add_schema(schema, schema_name=schema_name)

def _create_metadata_directories(self) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the sake of unit testing, it may be a good idea to have this return the directories that get created (i.e. pass through output of tx commit)

@@ -40,7 +43,8 @@ def provide_data_file(self, extension: str) -> OutputFile:
param: extension: File extension (e.g., "parquet").
returns: OutputFile instance pointing to the created data file.
"""
uri = f"{self.uri}/data/{int(time.time_ns())}.{extension}"
partition_path = _find_partition_path(self.uri, self._locator)
uri = f"{partition_path}/data/{int(time.time_ns())}.{extension}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use posixpath library or filesystem utils to do path joining

There's a discussion in multimodal wg about this...we can use posixpath once we have normalized directories. I'm not 100% sure how it should work in this case

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for other cases later in class of path joining

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update it to use posixpath.join. the _find_partition_path helper method normalizes paths so should be okay to use.

@@ -40,7 +43,8 @@ def provide_data_file(self, extension: str) -> OutputFile:
param: extension: File extension (e.g., "parquet").
returns: OutputFile instance pointing to the created data file.
"""
uri = f"{self.uri}/data/{int(time.time_ns())}.{extension}"
partition_path = _find_partition_path(self.uri, self._locator)
uri = f"{partition_path}/data/{int(time.time_ns())}.{extension}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a TODO around making sure the storage interface has a way to create data files?

It's good practice to put TODOs in the code base for code that will need refactoring in the next 1-3 months

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do!


def _get_delta(self, delta_dir: str, filesystem: FileSystem) -> RivuletDelta:
def _get_delta(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that this is a bit messy but it is all temporary throwaway so it doesn't matter

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed very temporary workaround under the assumption that this will be replaced by storage interface methods.

@@ -51,6 +51,15 @@ def test_invalid_dataset_initialization():
Dataset(dataset_name="")


def test_dataset_creation_metadata_structure(tmp_path):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit weak on test coverage here. Would be nice if you e.g. read back different directories into their metafile objects (similar to how test_metafile asserts correct metafile creation) to ensure that the metafiles get created with the correct metadata in them

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an assertion to read the partition path using my _find_partition_path helper. this also asserts that the directory structure of namespace->table->table_version->stream->partition exists.

Copy link
Collaborator

@mcember mcember left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved assuming you will take action on comments

@anshumankomawar anshumankomawar changed the base branch from dev/rivulet/integrate-deltacat-metastore to 2.0 February 4, 2025 19:51
@anshumankomawar anshumankomawar merged commit e2fc00d into 2.0 Feb 4, 2025
3 checks passed
@anshumankomawar anshumankomawar deleted the dev/anshumankomawar branch February 4, 2025 21:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants