perf(window): Skip RowContainer round-trip in streaming window build#17558
perf(window): Skip RowContainer round-trip in streaming window build#17558zhli1142015 wants to merge 6 commits into
Conversation
✅ Deploy Preview for meta-velox canceled.
|
Build Impact AnalysisSelective Build Targets (building these covers all 319 affected)Total affected: 319/582 targets
Affected targets (319)Directly changed (15)
Transitively affected (304)
Slow path • Graph generated from PR branch |
f8d8aa8 to
7aecf7d
Compare
|
@JkSelf and @mbasmanova Could you please help to review? |
mbasmanova
left a comment
There was a problem hiding this comment.
@zhli1142015, Thank you for the contribution! The benchmark numbers look promising.
Before diving into the code, please update the PR title and description.
The title "Optimize RowsStreamingWindowBuild with vectors" is vague — optimize how? Consider something like "Skip RowContainer round-trip in streaming window build" or similar.
The benchmark table is hard to interpret — please add a summary of the key takeaways, e.g.:
- What's the optimization? (eliminate RowContainer c2r/r2c round-trip for streaming window)
- When does it help most? (simple functions like rank: ~55% CPU reduction; diminishing returns for heavier functions like sum: ~33%; minimal impact with many functions: ~6-8%)
- What's the tradeoff? (retains input vectors in memory instead of copying into RowContainer)
The raw table is useful for validation, but the reader needs the story first.
mbasmanova
left a comment
There was a problem hiding this comment.
@zhli1142015, Thank you for the contribution! A few concerns:
-
Significant code duplication.
VectorWindowPartition(~527 lines) reimplementsextractColumn,extractNulls,computePeerBuffers,computeKRangeFrameBounds,searchFrameValue,linearSearchFrameValue,updateKRangeFrameBounds,isInvalidNanFrameBound, andisNanAt— all of which have parallel implementations inWindowPartition. Bug fixes to one will need to be manually applied to the other. Can the shared algorithmic logic (peer computation, frame bound search, NaN handling) be extracted into helpers that work with an abstract row accessor, so both implementations share the same algorithms? -
previousRef_pins entire input vectors.RowReferenceholds aRowVectorPtr, which keeps the entire input vector alive even though only one row is needed for cross-batch comparisons. For large input vectors this wastes memory. Consider copying just the needed row values into a small vector instead. -
loadedVector()on all children.
for (auto& child : input->children()) {
child->loadedVector();
}This materializes all lazy columns, even those not used by the window function. Should this be limited to columns actually needed?
- PR size. +1890 lines is large for a single review. Could the
WindowBuild/WindowPartitionrefactoring (making methods virtual, movingRowContainerinit to subclasses) be split into a preparatory PR? That would make the core optimization easier to review.
|
@zhli1142015 Thanks for your great work. Just curious about the overall impact here—could you provide some end-to-end performance benchmarks, such as the TPC-DS Q67 results before and after this fix? |
@mbasmanova I updated the PR title and description to make the optimization and tradeoff clearer. The description now starts with the main story: RowsStreamingWindowBuild avoids the RowContainer column-to-row and row-to-column round trip by retaininginput RowVector ranges directly. I also added key takeaways before the benchmark table to explain when the change helps most and why the end-to-end impactdepends on how much of the query time is spent in this removed conversion overhead. |
@mbasmanova I addressed the review comments and also split the preparatory WindowBuild/WindowPartition refactor into #17590. Could you please take a look at that change? For the code duplication concern, #17590 extracts the shared peer-group and RANGE frame-bound algorithms into WindowPartitionAlgorithms behind storageaccessors. The existing RowContainer-backed path is the first accessor. This PR then adds the vector-backed accessor, so the peer and frame-bound logic isshared instead of copied. For previous-row retention, I replaced the RowVectorPtr-based previous-row reference with an owned one-row key snapshot. This keeps only the key valuesneeded for cross-batch partition and peer comparisons, instead of pinning a processed input vector. For lazy vectors, RowsStreamingWindowBuild no longer loads every child eagerly. It loads only the partition/order boundary columns needed to decidepartition and peer boundaries. Function payload columns can remain lazy until they are actually read by window function evaluation. For PR size, #17590 contains the prep refactor. After that lands, I will rebase this PR so the remaining diff focuses on the vector-backedRowsStreamingWindowBuild optimization. |
@JkSelf In that query67, the Window is replaced by WindowGroupLimit, so this PR's RowsStreamingWindowBuild path is not exercised. We do have a real workload where this change improves end-to-end runtime by about 30%. The direct improvement, as shown by the benchmark, is removing theRowContainer round trip: rows-streaming window no longer copies each row into RowContainer and then copies it back out to vectors for window function evaluation. The end-to-end impact depends on the query shape and on how much of total time is spent in this removed overhead. If window function evaluationor surrounding operators dominate, the overall gain will be smaller. Partition-based StreamingWindow could also use this approach, but it is not included in this PR. We have spill support for partition-based StreamingWindowbuilt on top of this optimization, and extracting only the vector-backed partition optimization for that path would require more work. I kept this PR focused on RowsStreamingWindowBuild. |
Summary: Rows-streaming window can keep future partitions as input vectors instead of copying every row into a `RowContainer`. This PR extracts peer-group and k RANGE frame-bound code so both RowContainer-backed and future vector-backed storage layouts can share the same window partition algorithms. ### Before - `WindowBuild` always initialized `RowContainer` and reusable `DecodedVector` state. - `WindowPartition` owned RowContainer-backed row storage and also contained peer computation, frame-bound search, and NaN frame-bound handling directly coupled to `RowContainer`. ### After - `WindowBuild` holds optional `RowContainer` and `DecodedVector` state; subclasses decide whether to initialize them. - `WindowPartition` remains the RowContainer-backed partition representation used today, but delegates peer/frame algorithms through a RowContainer accessor. - `RowAccessor` is a C++20 concept that defines the storage contract shared by the window partition algorithms. - `PeerGroupComputation` contains storage-agnostic peer group bound logic. - `KRangeFrameBound` contains storage-agnostic k RANGE frame-bound search and NaN frame-bound handling. - `RowContainerAccessor` adapts the existing RowContainer storage to the shared `RowAccessor` concept. ### Current assembly | WindowBuild subclass | Base RowContainer | DecodedVectors | Partition accessor | |---|---|---|---| | `SortWindowBuild` | yes | yes | RowContainer | | `SubPartitionedSortWindowBuild` | no (manages its own per sub-partition) | yes | RowContainer | | `PartitionStreamingWindowBuild` | yes | yes | RowContainer | | `RowsStreamingWindowBuild` | yes | yes | RowContainer | After #17558, `RowsStreamingWindowBuild` will skip RowContainer materialization and use vector-backed accessors. That is the only new storage combination this refactor is intended to enable. ### Notes The storage-agnostic algorithms use the shared `RowAccessor` C++20 concept rather than a virtual interface so the hot peer/frame search paths remain inlineable. The current PR keeps all existing behavior RowContainer-backed; #17558 can add the vector-backed accessor implementation against the same concept. Pull Request resolved: #17590 Reviewed By: kevinwilfong Differential Revision: D106508428 Pulled By: kKPulla fbshipit-source-id: e58097e58f2e5ed0f59d05fae426f3524fc8332c
e916db8 to
b261a07
Compare
|
@mbasmanova, could you please take another look when you have a chance? I rebased this PR after #17590 and addressed the review comments:
The current CI Spark expression fuzzer failure looks unrelated to this PR. I reproduced it locally and tracked it separately in #17697: #17697. |
mbasmanova
left a comment
There was a problem hiding this comment.
Thank you for the updates!
-
WindowPartitionKeys.hputs all types indetail::namespace, but they're used across multiple files.detailis for implementation details private to a single header. Move to the parent namespace. -
WindowPartitionKeys.his a grab-bag of three unrelated types:WindowPartitionRowReference— only used withinVectorWindowPartition.cpp. Move to anonymous namespace in the.cppinstead of exposing in a shared header.WindowPartitionKeyRowSnapshot— used by bothRowsStreamingWindowBuildandVectorWindowPartition. Legitimately shared, but the name is generic — it copies a subset of columns from one row for later comparison, nothing window-specific about it.WindowPartitionKeyChannels— two static methods that loop and deduplicate column indices, called from two places. A class is over-engineered for this — a free function or inline at the call sites would be simpler.appendUniqueis a private static method — use a free function in anonymous namespace in the.cpp.
There are now 22+ window-related files flat in
velox/exec/. Please submit a separate preparatory PR to move existing window files into anexec/window/subdirectory with its own namespace, then rebase this PR on top. Adding 4 more window files to the flatexec/directory makes the organization worse.
|
A few more:
|
|
Hi @mbasmanova, thank you so much for the thorough review! Following your suggestion, I've split the preparatory file relocation (moving the window files under I've also addressed your other comments here — moving the shared types out of the Thanks again for your time and guidance! |
Preparatory, mechanical refactor for facebookincubator#17558. The window subsystem lived as a flat set of files under velox/exec/. This moves the internal implementation files into a dedicated velox/exec/window/ directory and facebook::velox::exec::window namespace, while keeping the public-facing files in velox/exec/ and the facebook::velox::exec namespace. Stay in velox/exec/ (exec namespace): - Window.h/.cpp -- the operator itself, alongside HashJoin, Aggregate, and TableScan. - WindowFunction.h/.cpp -- the public API that window function authors implement against, like AggregateFunction.h. Move under velox/exec/window/ (exec::window namespace): - WindowBuild and its variants, WindowPartition, AggregateWindow, KRangeFrameBound, PeerGroupComputation, WindowPartitionAccessor -- internal implementation details. There are no behavior changes; only file locations, the enclosing namespace of the internal files, and references to them change. The operator and the public API reference the internal types through the window:: namespace. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Summary: Preparatory, mechanical refactor for #17558. The window subsystem lived as a flat set of files under `velox/exec/`. This moves the internal implementation files into a dedicated `velox/exec/window/` directory and `facebook::velox::exec::window` namespace, while keeping the public-facing files in `velox/exec/` and the `facebook::velox::exec` namespace. **Stay in `velox/exec/` (`exec` namespace):** - `Window.h`/`.cpp` — the operator itself, alongside `HashJoin`, `Aggregate`, and `TableScan`. - `WindowFunction.h`/`.cpp` — the public API that window function authors implement against, like `AggregateFunction.h`. **Move under `velox/exec/window/` (`exec::window` namespace):** - `WindowBuild` and its variants, `WindowPartition`, `AggregateWindow`, `KRangeFrameBound`, `PeerGroupComputation`, `WindowPartitionAccessor` — internal implementation details. There are no behavior changes; only file locations, the enclosing namespace of the internal files, and references to them change. The operator and the public API reference the internal types through the `window::` namespace. Pull Request resolved: #17710 Reviewed By: apurva-meta Differential Revision: D107420866 Pulled By: bikramSingh91 fbshipit-source-id: 7a9a306463fd5005d5b48db395d58afde2d32ea0
e95ccdc to
07fc8d1
Compare
CI Failure Analysis
🟡 Window Fuzzer with Presto as source of truth — FUZZER Failure View logsFuzzer: Window Fuzzer (Presto as source of truth) Root cause: The fuzzer's verification rate dropped below the required 50% threshold (49.25% < 50%). This was caused by too many Presto reference query failures (15 out of 134 iterations = 11.19%), which reduced the number of successfully verified iterations below the minimum. The fuzzer aborted with Correlation with PR changes:
Known issues:
Reproduce locally: # Build the window fuzzer
make debug
# Run with the same seed (requires a Presto instance as reference)
./_build/debug/velox/functions/prestosql/fuzzer/velox_window_fuzzer_test \
--seed 501594481 \
--duration_sec 300Note: Reproduction requires a running Presto instance as the reference query runner, which makes local reproduction difficult without the full CI environment. Recommended fix: |
RowsStreamingWindowBuild handles row-streaming window input that is already sorted by the partition and order keys. Because the input is already in window order and the operator can stream rows through partial partitions, copying every row into RowContainer and then extracting the same rows back into vectors adds unnecessary c2r/r2c work. This change keeps the existing RowsStreamingWindowBuild path, but retains input RowVector ranges directly and exposes them through a vector-backed WindowPartition. This removes the RowContainer materialization/extraction round trip for rows-streaming window execution while preserving partial-partition processing and range-frame peer/NaN semantics. Add a row-streaming-window benchmark with pre-sorted Values input. Folly timing excludes cursor/task setup and stats collection via BENCHMARK_SUSPEND; windowCpu/windowWall report the Window operator's addInput + getOutput timings. Benchmark setup: - Input: p INTEGER, s INTEGER, v BIGINT, sorted by p, s. - Sizes: 10K, 100K, 1M rows; 10K rows per input vector. - Larger inputs use 25K rows per partition to cross vector boundaries. - Cases: rank, sum(v), rank()+sum(v), and 7 funcs. - 7 funcs: rank, dense_rank, row_number, sum, count, min, max. - Output batch: 1K rows. - Build: _build/release (-O3 -DNDEBUG), main 32e6cc4 vs this change. - Command: `velox_rows_streaming_window_benchmark --bm_regex='rowsStreamingWindow.*' --bm_min_iters=5 --bm_min_usec=1000000` Benchmark results. Each timing cell is total time / Window CPU. | Case | Rows | Main | This change | CPU reduction | | --- | ---: | ---: | ---: | ---: | | rank | 10K | 638.83us / 589.03us | 303.48us / 258.87us | 56.1% | | sum | 10K | 1.47ms / 1.42ms | 1.02ms / 951.05us | 33.0% | | rank+sum | 10K | 1.50ms / 1.45ms | 1.01ms / 956.86us | 34.0% | | 7 funcs | 10K | 3.13ms / 3.09ms | 2.85ms / 2.84ms | 8.1% | | rank | 100K | 5.71ms / 5.52ms | 2.77ms / 2.57ms | 53.4% | | sum | 100K | 14.28ms / 14.10ms | 9.67ms / 9.45ms | 33.0% | | rank+sum | 100K | 14.24ms / 14.09ms | 9.75ms / 9.57ms | 32.1% | | 7 funcs | 100K | 30.00ms / 30.21ms | 28.11ms / 28.30ms | 6.3% | | rank | 1M | 56.18ms / 54.83ms | 27.95ms / 26.52ms | 51.6% | | sum | 1M | 145.83ms / 140.15ms | 96.36ms / 95.24ms | 32.0% | | rank+sum | 1M | 147.18ms / 141.30ms | 97.40ms / 96.25ms | 31.9% | | 7 funcs | 1M | 299.70ms / 302.85ms | 282.31ms / 285.03ms | 5.9% | Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
WindowPartitionKeys.h bundled three unrelated types in the detail namespace
even though they were used across translation units. Separate them by
responsibility:
- RowColumnsSnapshot (renamed from WindowPartitionKeyRowSnapshot) is the only
genuinely shared type. It copies a subset of a row's columns so the row can
be compared after its source vector is gone, which is not window-specific.
It moves to RowColumnsSnapshot.{h,cpp} in the exec namespace.
- The reference into a retained input vector is used only by
VectorWindowPartition, so it moves to an anonymous namespace in the .cpp
together with rowAt and rowsEqual.
- The key-channel deduplication becomes a small free function inlined into
each of the two callers instead of a class with static methods.
No behavior change.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Move the duplicated RowBlock struct into a single shared header velox/exec/window/RowBlock.h and validate its invariants in the constructor so all creation paths are consistent. - Rename VectorWindowPartition::addBlock to addRows to match the existing add-rows vocabulary. - Inline the trivial numRows and numRowsForProcessing getters in the header. - Initialize blockPrefixSums_ inline instead of in the constructor body. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
07fc8d1 to
652168e
Compare
There was a problem hiding this comment.
Thank you for the updates!
RowBlock— consider renaming toRowRange. "Block" implies a data structure; this is a row range within a vector.RowColumnsSnapshot— considerSingleRowValues. Simple, direct — it stores values from a single row.- The doc comment ("Copies a subset of columns from one row into self-contained one-row vectors so the row can be compared against later rows after its source vector is gone") is hard to parse. Simpler: "Stores copies of selected column values from a single row for later comparison."
isValid()/clear()— usehasValue()/reset()to matchstd::optionalvocabulary. The doc comment should describe the lifecycle: empty by default,capture()populates,reset()clears,hasValue()checks — likestd::optional.rowsEqual()— redundant with the class name. Justequals().capture()takeschannelson every call, but the same channels are passed every time.equals()takeskeyInfoandinputChannelson every call, also always the same. Make these constructor parameters. Thencapture(input, row)andequals(input, row)become simple — the object knows what to capture and compare.
Rename RowBlock to RowRange and RowColumnsSnapshot to SingleRowValues, and rework SingleRowValues to bake its columns and pool into the constructor so capture/equals take only an input row. Adopt std::optional vocabulary (hasValue/reset) and rename the block-based members and helpers to range. RowsStreamingWindowBuild now keeps separate partition-key and sort-key value snapshots instead of one combined snapshot. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
8384f11 to
13fe802
Compare
|
Thank you for the detailed review, @mbasmanova ! I've addressed all the points in commit 13fe802:
All window unit tests and the benchmark pass with no regression. Please take another look when you have a chance. |
mbasmanova
left a comment
There was a problem hiding this comment.
Thank you for the thorough updates.
|
@bikramSingh91 has imported this pull request. If you are a Meta employee, you can view this in D107544696. |
RowsStreamingWindowBuild is used when input is already sorted by the partition and order keys. In this path, copying every row into
RowContainerand then extracting the same rows back into vectors adds unnecessary column-to-row and row-to-column conversion work before window functions can run.This PR removes that round trip while keeping the rows-streaming execution model and partial-partition behavior unchanged.
Key takeaways:
RowContainerand reading them back into vectors, rows-streaming window now retains inputRowVectorranges directly and exposes them through a vector-backedWindowPartition.rankimproves Window CPU by 44-47%. Therank + sumcase still improves, but less (18-20%) because function evaluation dominates more of the runtime. With 7 funcs, the improvement is smaller (about 7%) because the removed conversion cost is amortized across more function work.RowContainer. To reduce avoidable retention, the cross-batch previous-row state copies only the needed one-row key snapshot, so it does not pin a processed input vector just for boundary comparison.Benchmark setup:
origin/mainplus this PR's benchmark harness only.