-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
design: Columnar Encodings for Persist #26175
Conversation
9b8e77f
to
6c91284
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exciting!
<td> | ||
|
||
Represented by encoding the `months`, `days`, and `micros` fields encoded as | ||
little endian. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do these sort in postgres? We might be able to get meaningful sorting if we encode it in big endian instead (right? isn't that how that works?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're totally right, if we want them to sort by month, day, micros
then big endian is how we would want to encode these, updated!
I'll double check they sort in Postgres
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked on this, and if we encode the values as big endian, and apply an offset for negative numbers, we do retain the same order. Here's a draft PR #26799
Also should note that Intervals do not currently sort correctly in Materialize, https://github.com/MaterializeInc/materialize/issues/10854. I don't think this matters for this project, but is worth calling out
e241e83
to
66a1377
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great! I'd love to be able to re-use some of the datum -> arrow/parquet encoding as part of the copy-to-s3 work.
Copy-to-s3 would prefer encoding as native arrow/parquet types rather than opaque rust-type blobs, whereas for persist we are optimizing for encoding/decoding perf and the ability to support projection pushdown. If we can structure the code in a way that it's easy to swap out implementations per type that would be great.
Hmm, @rjobanp I'm inclined to have any sort of reuse with copy-to-s3 as a non-goal. This is all going to be tricky enough without additional constraints on the implementation. Let's discuss offline! (Maybe in the storage hangout next week?) |
@danhhz sounds good, let's discuss next week! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exciting!
|
||
For all of these scenarios I believe we can handle consolidation by first | ||
decoding to the Rust type (e.g. `Row`) and consolidating based on that. It will | ||
be relatively inefficient, but none of these scenarios should happen too often. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will leave data in an order that doesn't match either the existing consolidated output (based on bytes) or the eventual-future output (based on the structured representation).
I'd pitch just sorting based the Codec
representation while we have it! The two notes below seem to handle things well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd pitch just sorting based the Codec representation while we have it! The two notes below seem to handle things well.
Totally agree! These are scenarios in which we don't have Codec
data, or we're trying to consolidate Codec
with Arrow
. Although thinking about this and streaming consolidation, made me realize the best approach might be decoding to the Rust type K
and then encoding into the latest Arrow format. Will need to think on this some more and get back to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yeah this is tricky. If we're sorting and consolidating based on the decoded types anywhere, we probably need to do that everywhere to make stuff like streaming consolidation work. That said, if we're making the decode path as cheap as possible, then maybe that's fine?
If everything was always kept sorted by the decoded data, would that remove the deterministic requirement on the format? Seems initially to me like yes an might be pretty nice if so
sorted consolidating an Arrow array is relatively easy and fast! To delete a | ||
value all we need to do is unset the bit in the | ||
[validity bitmap](https://arrow.apache.org/docs/format/Columnar.html#validity-bitmaps) | ||
for that value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we say a little bit about streaming consolidation -- would we support it, and if so how would it work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to think on this some bit, will get back to you!
48bc0dc
to
cb349d0
Compare
of writing there are only two accepted [canonical extension types](https://arrow.apache.org/docs/format/CanonicalExtensions.html#official-list) | ||
but we are free to create our own. | ||
|
||
As part of our mapping from `Datum` to Arrow column we could include an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This idea is cool, but I worry about the effect on the size of the resulting blobs. Inline writes will perhaps change this math a bit (speaking of which, we should hash out how inline writes work with this columnar encoding shift), but we currently have a pretty silly overhead of data we write to s3 to encode small batches of data (e.g. 1 row). Given that an overall format version (that you describe below) could easily have a mapping in code to the versions of each column type, I'm not sure what duplicating that into the individual column metadata buys us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair! We can shorten this then to something like mz.p.<v>.<datum type tag>
, e.g. assuming String has a type tag of 12 it would look like mz.p.1.12
.
I like having the Datum
specific version because it allows us to evolve each Datum
encoding independently which allows us to avoid unnecessary migrations. For example, say we change the encoding of Interval
, if we bumped the overall format version then all batches regardless of whether they contain an Interval
, would need to migrate during consolidation. But with a Datum
specific version we would only have to migrate batches that contain Interval
s
impact cost, and requires `ConsolidatingIter` to handle a larger number of | ||
Parts. | ||
|
||
Neither option is great, but I am leanning towards [1] since it puts the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the biggest open question for me.
Another big factor is memory pressure. I worry a bit about larger part sizes getting decoded simultaneously and OOMing where it wouldn't have before. It'd be good to collect some data here on the actual sizes we see from your prototyping (e.g. try (1) with tpch data in staging and see how big the parts are with just k v, and then also with k v k_s v_s).
Maybe we can get some of this back by enabling parquet compression on all of k, v, k_s, v_s (I don't think we currently do any 😳) but only when we're writing both (to see if we can keep sizes the ~same during the transition). That might let us "hide" some of the extra bytes. We could then be careful about lazily decoding things into only one of k, v OR k_s, v_s in the important codepaths (persist_source)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another big factor is memory pressure.
Very true! Added a note about Prometheus metrics to track the size of blobs we read from S3.
Maybe we can get some of this back by enabling parquet compression on all of k, v, k_s, v_s
This would be great! Parquet supports a number of compression types, I'll need to read up but I think either LZ4 or Snapp would be good here? AFAIU they're fairly fast compression algos. Another win we could get is enabling run-length encoding on the diff
and time
columns. Thinking about some of our largest blobs which are physically compacted (I think?) all of the time and diff fields should be the same(?).
|
||
For all of these scenarios I believe we can handle consolidation by first | ||
decoding to the Rust type (e.g. `Row`) and consolidating based on that. It will | ||
be relatively inefficient, but none of these scenarios should happen too often. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yeah this is tricky. If we're sorting and consolidating based on the decoded types anywhere, we probably need to do that everywhere to make stuff like streaming consolidation work. That said, if we're making the decode path as cheap as possible, then maybe that's fine?
If everything was always kept sorted by the decoded data, would that remove the deterministic requirement on the format? Seems initially to me like yes an might be pretty nice if so
bf9ea07
to
eba9e09
Compare
This PR implements "packed" encodings for `Interval` and `Time` that will be used for writing structured data in Persist. The packed encodings are designed to be as fast as possible and have the same sort order as the original types. They're based on discussion from #26175. I added benchmarks that measure throughput and locally I get the following results: type | encode | decode ---------|---------------------|-------- interval | ~1.5 billion/second | ~1.6 billion/second time | ~2.5 billion/second | ~1.3 billion/second I believe the implementations can be optimized further with SIMD, but right now that requires `inline_asm!` or the `Nightly` compiler. Note: The code placement feels a bit weird since these impls should only be used by Persist, so I was thinking of putting the impl behind a trait like `mz_persist_types::ColumnarCodec`, but was curious what other folks thought. ### Motivation Progress towards https://github.com/MaterializeInc/materialize/issues/24830 ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] This PR includes the following [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note): - <!-- Add release notes here or explicitly state that there are no user-facing behavior changes. -->
eba9e09
to
051c616
Compare
Rendered
Motivation
Design for https://github.com/MaterializeInc/database-issues/issues/7411
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.