Skip to content

Commit f34e54d

Browse files
committed
Document dataflow pattern
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 7e3bac4 commit f34e54d

File tree

1 file changed

+27
-8
lines changed

1 file changed

+27
-8
lines changed

src/compute/src/logging/watchdog.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,27 @@ pub(super) fn construct<S: Scope<Timestamp = mz_repr::Timestamp>>(
4545
dataflows_exceeding_heap_size_limit,
4646
} = streams;
4747

48+
// The following dataflow computes:
49+
// ```
50+
// arrangement_heap_size batcher_heap_size operator_to_dataflow heap_size_limits
51+
// | | | |
52+
// |----->concat<------ | arrange arrange
53+
// | | |
54+
// arrange: op_to_heap_size | |
55+
// | | |
56+
// |-->join: dataflow_to_heap_size<-| |
57+
// | |
58+
// arrange |
59+
// | |
60+
// count_total_core |
61+
// | |
62+
// arrange |
63+
// | |
64+
// |------------>join<-------------------|
65+
// |
66+
// result
67+
// ```
68+
4869
let operator_to_heap_size = arrangement_heap_size.concat(&batcher_heap_size);
4970
let operator_to_heap_size = operator_to_heap_size
5071
.as_collection()
@@ -58,22 +79,20 @@ pub(super) fn construct<S: Scope<Timestamp = mz_repr::Timestamp>>(
5879
"operator_to_dataflow",
5980
);
6081

61-
let heap_size_limits = heap_size_limits
62-
.as_collection()
63-
.mz_arrange::<ColValBatcher<_, _, _, _>, ColValBuilder<_, _, _, _>, KeyValSpine<_, _, _, _>>("heap_size_limits");
64-
6582
let dataflow_to_heap_size = operator_to_heap_size
6683
.join_core(&operator_to_dataflow, |_op, (), dataflow| {
6784
Some((*dataflow, ()))
6885
});
6986

70-
let dataflow_to_heap_size = dataflow_to_heap_size
87+
let heap_size_limits = heap_size_limits
88+
.as_collection()
89+
.mz_arrange::<ColValBatcher<_, _, _, _>, ColValBuilder<_, _, _, _>, KeyValSpine<_, _, _, _>>("heap_size_limits");
90+
91+
let dataflow_to_heap_size_limit = dataflow_to_heap_size
7192
.mz_arrange::<KeyBatcher<_, _, _>, ColKeyBuilder<_, _, _>, KeySpine<_, _, _>>(
7293
"dataflow_to_heap_size",
7394
)
74-
.count_total_core::<mz_repr::Diff>();
75-
76-
let dataflow_to_heap_size_limit = dataflow_to_heap_size
95+
.count_total_core::<mz_repr::Diff>()
7796
.mz_arrange::<ColValBatcher<_, _, _, _>, ColValBuilder<_, _, _, _>, KeyValSpine<_, _, _, _>>(
7897
"dataflow_to_heap_size",
7998
)

0 commit comments

Comments
 (0)