-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor rivulet manifest files to use deltacat #464
Conversation
…deltacat-metastore
…deltacat-metastore
@@ -35,6 +35,9 @@ lint: install | |||
test: install | |||
venv/bin/pytest -m "not integration" | |||
|
|||
unit-test: install |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just added this to run tests more rapidly. Note that some of the unit tests for compaction take a while so we should consider moving them to benchmarks
@@ -135,7 +135,7 @@ def of( | |||
f"'{entry_content_type}'" | |||
) | |||
raise ValueError(msg) | |||
entry_content_encoding = meta["content_encoding"] | |||
entry_content_encoding = meta.get("content_encoding", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was raising a ValueError for manifests with no content encoding (like in Rivulet...). We don't populate lots of fields currently for rivulet deltas/manifests so we will have to update code to populate them (see: #476)
looks for the latest revision | ||
""" | ||
rev_directory = os.path.join(delta_dir, "rev") | ||
revisions = filesystem.get_file_info(fs.FileSelector(rev_directory)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we use the fs utils method here? I believe it adds some error handling around get_file_info.
I can include it as part of my changes as well.
root_path, filesystem = resolve_path_and_filesystem(self.delta_root_uri) | ||
root_children = filesystem.get_file_info(fs.FileSelector(root_path)) | ||
delta_directories = [ | ||
child | ||
for child in root_children | ||
if not child.is_file and child.base_name not in excluded_dir_names | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will have to change how we do this once I introduce the namespace -> table -> table_version -> stream -> partition hierarchy. But their probably isn't a better solution until we start filling out the storage interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is a deltcat writer expected to assign stream position
Should the writer be providing a stream position in all scenarios? Is it valid for the writer to ask to append a transaction (and simply take whatever (valid) stream position that gets generated)?
validate_with_full_scan, | ||
assert_data_file_extension, | ||
create_dataset_for_method, | ||
assert_data_file_extension, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside: What happened here? Is there a generally-accepted linting rule/convention to apply a consistent ordering to imports?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm not sure how this happened, if it was the linter or IDE or what
# TODO have rivulet writer populate these values | ||
# see: https://github.com/ray-project/deltacat/issues/476 | ||
meta=ManifestMeta.of( | ||
record_count=None, # or known |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside: What does this comment mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it means that e.g. when writing data rivulet writer will know the record count and propagate that to manifests. Not updating code since I think the comment/linked ticket are sufficient
# Using microsecond precision timestamp as stream position | ||
# TODO consider having storage interface auto assign stream position | ||
stream_position=time.time_ns(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: this appears to be nanosecond precision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea. Not updating for now since will move automatic stream position generation behind storage/catalog interface
assert ( | ||
len(paths) == 1 | ||
), "expected delta commit transaction to write exactly 1 metafile" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, the transaction has been committed. What (w|sh)ould be the right way to handle this AssertionError? (Is there a way for the caller to recover from such a scenario?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more like a sanity check type exception - we only added one metafile and if the transaction committed more than one than something is seriously wrong
We are abstracting the low level Tx handling behind storage interface in near future
yield ManifestAccessor(manifest, self.file_provider, self.sst_reader) | ||
# Rivulet data and SST files written to /data and /metadata | ||
# Deltacat transactions written to /txn | ||
excluded_dir_names = ["data", "metadata", "txn"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: wouldn't it be more resilient to only target the manifest directory (until the deltacat storage API is available)?
def _get_delta(self, delta_dir: str, filesystem: FileSystem) -> RivuletDelta: | ||
""" | ||
Given a DeltaCat delta directory, find latest delta file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: might I suggest _get_latest_delta
?
latest_revision = None | ||
for revision in revisions: | ||
latest_revision = ( | ||
revision if not latest_revision else max(latest_revision, revision.path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be
revision if not latest_revision else max(latest_revision, revision.path) | |
revision.path if not latest_revision else max(latest_revision, revision.path) |
Otherwise you wouldn't be comparing the paths themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revision in this case is a path since it is generated by filesystem list
Summary
See milestone https://github.com/ray-project/deltacat/milestone/5
Note that this PR uses a metastore format like
$root/
$metadata/
$data/
<delta_id_n>/
/rev
This is NOT the final structure for using the deltacat metastore. In part this is waiting on the storage interface from #455
Discussion points:
Testing
rivulet unit tests