Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streamed execution in MERGE #3145

Merged
merged 1 commit into from
Jan 21, 2025

Conversation

ion-elgreco
Copy link
Collaborator

@ion-elgreco ion-elgreco commented Jan 19, 2025

Description

This adds support for streamed execution in MERGE, with one caveat:

  • Initially MERGE derives statistics about the source to prune the target a bit better and allow for more concurrent writers, the problem here is that deriving statistics about source means we have to consume source. Caching that early on will defeat the purpose of the streamed execution. So I've disabled this part of the early filter construction when streaming mode is on.

_In the future we can look into the work of influxdb or lancedb. To maybe solve getting the statistics and then pushdown this filter to the delta scan directly

@github-actions github-actions bot added binding/python Issues for the Python package binding/rust Issues for the Rust crate labels Jan 19, 2025
@ion-elgreco ion-elgreco marked this pull request as draft January 19, 2025 20:55
Signed-off-by: Ion Koutsouris <[email protected]>
@ion-elgreco ion-elgreco marked this pull request as ready for review January 21, 2025 12:06
Copy link

codecov bot commented Jan 21, 2025

Codecov Report

Attention: Patch coverage is 26.37363% with 134 lines in your changes missing coverage. Please review.

Project coverage is 71.77%. Comparing base (82ef6bf) to head (92a8f7b).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
crates/core/src/delta_datafusion/mod.rs 0.00% 69 Missing ⚠️
python/src/merge.rs 0.00% 42 Missing ⚠️
crates/core/src/operations/merge/filter.rs 68.51% 17 Missing ⚠️
crates/core/src/operations/merge/mod.rs 73.33% 4 Missing ⚠️
python/src/lib.rs 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3145      +/-   ##
==========================================
- Coverage   71.96%   71.77%   -0.19%     
==========================================
  Files         135      135              
  Lines       43581    43717     +136     
  Branches    43581    43717     +136     
==========================================
+ Hits        31361    31378      +17     
- Misses      10181    10293     +112     
- Partials     2039     2046       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@ion-elgreco ion-elgreco enabled auto-merge January 21, 2025 14:10
Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took me a bit to read through all of this - very exciting stuff!

Do I read that correctly, that this right now mainly affects tables that are coming from the python side, or if someone were to specify a table externally in rs that is not a delta table (or at lest not ours? :)

So the flow where this makes most sense would be where someone has a RecordBatchReader or Dataset in python and wants to merge that with our table? Our tables being datasets right now?

Also excited about this b/c once we have kernelized things, this fits very well into the execution model (i think).

@roeap
Copy link
Collaborator

roeap commented Jan 21, 2025

This reminds me - at some point we were discussing that we may no longer be able to use the Dataset abstraction from pyarrow, when deletion vectors land... which are one of the main things I hope to finally unblock with the kernel migration :)

Is that correct, or am I remembering something wrong?

In any case we should be able to provider a recordbatch reader implementation of we can no longer use that api.

Copy link
Member

@rtyler rtyler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to land this and test it out with some bigger integration tests. I think this is really important work! 🥳

@ion-elgreco ion-elgreco added this pull request to the merge queue Jan 21, 2025
@rtyler rtyler added this to the v0.24 milestone Jan 21, 2025
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Jan 21, 2025
@ion-elgreco
Copy link
Collaborator Author

ion-elgreco commented Jan 21, 2025

took me a bit to read through all of this - very exciting stuff!

Do I read that correctly, that this right now mainly affects tables that are coming from the python side, or if someone were to specify a table externally in rs that is not a delta table (or at lest not ours? :)

Yes at least in an automatic fashion. But rust users can use these lines to get the same result with these lines and then passing the DF into our operation. But they do have to enable the flag, streaming=True otherwise the early_filter_construction will consume the stream before the writer receives it:

            let arrow_stream: Arc<Mutex<ArrowArrayStreamReader>> = Arc::new(Mutex::new(source)); // Should be a send+sync stream
            let arrow_stream_batch_generator: Arc<RwLock<dyn LazyBatchGenerator>> =
                Arc::new(RwLock::new(ArrowStreamBatchGenerator::new(arrow_stream)));

            let table_provider: Arc<dyn TableProvider> = Arc::new(LazyTableProvider::try_new(
                schema.clone(),
                vec![arrow_stream_batch_generator],
            )?);
            ctx.read_table(table_provider).unwrap()

I am not sure if we can automatically enable streaming when see a certain TableProvider, I believe we can do downcast_ref on tableProviders but not sure how to do this once it's in a logical plan. Especially when people have their own implementation of a lazyTableProvider it will become tricky, but maybe you have some ideas?

So the flow where this makes most sense would be where someone has a RecordBatchReader or Dataset in python and wants to merge that with our table? Our tables being datasets right now?

Also excited about this b/c once we have kernelized things, this fits very well into the execution model (i think).

Exactly, anything that will provide RecordBatchStream will be consumed and marked as streaming mode. I am currently also refactoring the schema evolution into a logical node for write (https://github.com/ion-elgreco/delta-rs/tree/feat/lazy_schema_evolution), so that it brings us closer to streamed execution as well.

This reminds me - at some point we were discussing that we may no longer be able to use the Dataset abstraction from pyarrow, when deletion vectors land... which are one of the main things I hope to finally unblock with the kernel migration :)

Is that correct, or am I remembering something wrong?

In any case we should be able to provider a recordbatch reader implementation of we can no longer use that api.

Yeah that is a tricky one, because Dataset construct doesn't provide you an interface to filter down dataset fragments. Yeah RecordBatchReaders are quite feasible, but we should have some interface to push down filters, projection into the read. Maybe even using pyarrow expr

@ion-elgreco ion-elgreco added this pull request to the merge queue Jan 21, 2025
Merged via the queue into delta-io:main with commit 4193cb0 Jan 21, 2025
32 checks passed
@ion-elgreco ion-elgreco deleted the lazy_table_provider branch January 21, 2025 16:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants