From 6efb180ac20cc332507775e0a85c428ba4bcb765 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 5 Feb 2025 10:36:16 -0800 Subject: [PATCH] initial triggers pass w/accumulation --- runners/prism/java/build.gradle | 30 +- .../runners/prism/internal/engine/data.go | 1 + .../prism/internal/engine/elementmanager.go | 310 +++++++++++++++--- .../prism/internal/engine/engine_test.go | 43 +++ .../runners/prism/internal/engine/strategy.go | 1 + .../prism/internal/engine/teststream.go | 2 +- .../beam/runners/prism/internal/execute.go | 50 +++ .../runners/prism/internal/handlecombine.go | 50 ++- .../prism/internal/handlecombine_test.go | 8 +- .../prism/internal/jobservices/management.go | 52 ++- .../prism/internal/unimplemented_test.go | 25 +- 11 files changed, 471 insertions(+), 101 deletions(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 252d9a1fa1c1..aa245c71bf4f 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -86,33 +86,27 @@ def sickbayTests = [ 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedStringSetMetrics', 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedGaugeMetrics', - // Triggers / Accumulation modes not yet implemented in prism. + // ProcessingTime triggers not yet implemented in Prism. // https://github.com/apache/beam/issues/31438 - 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers', - 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode', - 'org.apache.beam.sdk.transforms.windowing.WindowTest.testNoWindowFnDoesNotReassignWindows', 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testCombiningAccumulatingProcessingTime', 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly', - 'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle', - 'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton', - 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate', - 'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode', - 'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow', - 'org.apache.beam.sdk.testing.TestStreamTest.testElementsAtAlmostPositiveInfinity', - 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating', - 'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams', 'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger', + 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating', // Uses processing time trigger for early firings. + + // Triggered Side Inputs not yet implemented in Prism. + // https://github.com/apache/beam/issues/31438 + 'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton', + // Prism doesn't support multiple TestStreams. + 'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams', + // GroupIntoBatchesTest tests that fail: // Teststream has bad KV encodings due to using an outer context. 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode', // ShardedKey not yet implemented. 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', - // Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected - 'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage', - // Java side dying during execution. // https://github.com/apache/beam/issues/32930 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders', @@ -143,9 +137,6 @@ def sickbayTests = [ 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows', 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection', 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes', - 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing', - 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing', - 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows', // Possibly a different error being hidden behind the main error. // org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow cannot be cast to class java.lang.String @@ -154,11 +145,10 @@ def sickbayTests = [ // TODO(https://github.com/apache/beam/issues/31231) 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata', - // Prism isn't handling Java's side input views properly. + // Prism isn't handling Java's side input views properly, likely related to triggered side inputs. // https://github.com/apache/beam/issues/32932 // java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. // Consider using Combine.globally().asSingleton() to combine the PCollection into a single value - 'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput', // java.util.NoSuchElementException: Empty PCollection accessed as a singleton view. 'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput', // ava.lang.IllegalArgumentException: Duplicate values for a diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/data.go b/sdks/go/pkg/beam/runners/prism/internal/engine/data.go index 682e8144ef37..371e3f00b895 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/data.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/data.go @@ -40,6 +40,7 @@ type StateData struct { Multimap map[string][][]byte Trigger map[Trigger]triggerState + Pane typex.PaneInfo } func (s *StateData) getTriggerState(key Trigger) triggerState { diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index d220a8ea6a0c..6f007f84c0d4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "log/slog" + "runtime/debug" "sort" "strings" "sync" @@ -70,7 +71,7 @@ func (e element) String() string { if e.IsTimer() { return fmt.Sprintf("{Timer - Window %v, EventTime %v, Hold %v, %q %q %q %q}", e.window, e.timestamp, e.holdTimestamp, e.transform, e.family, e.tag, e.keyBytes) } - return fmt.Sprintf("{Data - Window %v, EventTime %v, Element %v}", e.window, e.timestamp, e.elmBytes) + return fmt.Sprintf("{Data - Window %v, EventTime %v, Pane: %v Element %v - %q}", e.window, e.timestamp, e.pane, e.elmBytes, string(e.elmBytes)) } type elements struct { @@ -315,7 +316,7 @@ func (em *ElementManager) Impulse(stageID string) { for _, sID := range consumers { consumer := em.stages[sID] - count := consumer.AddPending(newPending) + count := consumer.AddPending(em, newPending) em.addPending(count) } refreshes := stage.updateWatermarks(em) @@ -355,7 +356,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. defer func() { // In case of panics in bundle generation, fail and cancel the job. if e := recover(); e != nil { - upstreamCancelFn(fmt.Errorf("panic in ElementManager.Bundles watermark evaluation goroutine: %v", e)) + upstreamCancelFn(fmt.Errorf("panic in ElementManager.Bundles watermark evaluation goroutine: %v\n%v", e, string(debug.Stack()))) } }() defer close(runStageCh) @@ -385,6 +386,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. em.changedStages.merge(changedByProcessingTime) } // Run any injected bundles first. + // TODO: Migrate these to the per-stage mechanism for consistency with triggers. for len(em.injectedBundles) > 0 { rb := em.injectedBundles[0] em.injectedBundles = em.injectedBundles[1:] @@ -406,14 +408,32 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. // Check each advanced stage, to see if it's able to execute based on the watermark. for stageID := range advanced { ss := em.stages[stageID] - watermark, ready, ptimeEventsReady := ss.bundleReady(em, emNow) + watermark, ready, ptimeEventsReady, injectedReady := ss.bundleReady(em, emNow) + if injectedReady { + ss.mu.Lock() + injected := ss.bundlesToInject + ss.bundlesToInject = nil + ss.mu.Unlock() + for _, rb := range injected { + em.refreshCond.L.Unlock() + select { + case <-ctx.Done(): + return + case runStageCh <- rb: + } + em.refreshCond.L.Lock() + } + } if ready { - bundleID, ok, reschedule := ss.startEventTimeBundle(watermark, nextBundID) + bundleID, ok, reschedule, pendingAdjustment := ss.startEventTimeBundle(watermark, nextBundID) // Handle the reschedule even when there's no bundle. if reschedule { em.changedStages.insert(stageID) } if ok { + if pendingAdjustment > 0 { + em.addPending(pendingAdjustment) + } rb := RunBundle{StageID: stageID, BundleID: bundleID, Watermark: watermark} em.inprogressBundles.insert(rb.BundleID) @@ -527,7 +547,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) error { stageState = append(stageState, fmt.Sprintf("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n", em.testStreamHandler.completed, em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), em.testStreamHandler.events, em.testStreamHandler.processingTime, mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents)) } else { - stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v\n", em.ProcessingTimeNow(), em.processTimeEvents.events)) + stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n", em.ProcessingTimeNow(), em.processTimeEvents.events, em.injectedBundles)) } sort.Strings(ids) for _, id := range ids { @@ -539,7 +559,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) error { if upS == "" { upS = "IMPULSE " // (extra spaces to allow print to align better.) } - stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, "pttEvents", ss.processingTimeTimers.toFire)) + stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, "pttEvents", ss.processingTimeTimers.toFire, "bundlesToInject", ss.bundlesToInject)) var outputConsumers, sideConsumers []string for _, col := range ss.outputIDs { @@ -764,6 +784,7 @@ func reElementResiduals(residuals []Residual, inputInfo PColInfo, rb RunBundle) // PersistBundle takes in the stage ID, ID of the bundle associated with the pending // input elements, and the committed output elements. func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals Residuals) { + stage := em.stages[rb.StageID] var seq int for output, data := range d.Raw { info := col2Coders[output] @@ -797,6 +818,11 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol keyBytes = info.KeyDec(kbuf) // TODO: Optimize unnecessary copies. This is tripleteeing? } for _, w := range ws { + if stage.aggregate { + stage.mu.Lock() + pn = stage.state[LinkID{}][w][string(keyBytes)].Pane + stage.mu.Unlock() + } newPending = append(newPending, element{ window: w, @@ -815,7 +841,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol slog.Debug("PersistBundle: bundle has downstream consumers.", "bundle", rb, slog.Int("newPending", len(newPending)), "consumers", consumers, "sideConsumers", sideConsumers) for _, sID := range consumers { consumer := em.stages[sID] - count := consumer.AddPending(newPending) + count := consumer.AddPending(em, newPending) em.addPending(count) } for _, link := range sideConsumers { @@ -824,8 +850,6 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol } } - stage := em.stages[rb.StageID] - // Triage timers into their time domains for scheduling. // EventTime timers are handled with normal elements, // ProcessingTime timers need to be scheduled into the processing time based queue. @@ -838,7 +862,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol // Add unprocessed back to the pending stack. if len(unprocessedElements) > 0 { // TODO actually reschedule based on the residuals delay... - count := stage.AddPending(unprocessedElements) + count := stage.AddPending(em, unprocessedElements) em.addPending(count) } // Clear out the inprogress elements associated with the completed bundle. @@ -970,7 +994,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag } if len(pendingEventTimers) > 0 { - count := stage.AddPending(pendingEventTimers) + count := stage.AddPending(em, pendingEventTimers) em.addPending(count) } changedHolds := map[mtime.Time]int{} @@ -989,7 +1013,10 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag // FailBundle clears the extant data allowing the execution to shut down. func (em *ElementManager) FailBundle(rb RunBundle) { stage := em.stages[rb.StageID] - stage.mu.Lock() + if !stage.mu.TryLock() { + slog.Error("failing bundle: engine state is corrupted with a locked mutex.", "bundle", rb) + return + } completed := stage.inprogress[rb.BundleID] em.addPending(-len(completed.es)) delete(stage.inprogress, rb.BundleID) @@ -1006,7 +1033,7 @@ func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputI unprocessedElements := reElementResiduals(residuals.Data, inputInfo, rb) if len(unprocessedElements) > 0 { slog.Debug("ReturnResiduals: unprocessed elements", "bundle", rb, "count", len(unprocessedElements)) - count := stage.AddPending(unprocessedElements) + count := stage.AddPending(em, unprocessedElements) em.addPending(count) } em.markStagesAsChanged(singleSet(rb.StageID)) @@ -1122,6 +1149,7 @@ type stageState struct { inprogressKeysByBundle map[string]set[string] // bundle to key assignments. state map[LinkID]map[typex.Window]map[string]StateData // state data for this stage, from {tid, stateID} -> window -> userKey stateTypeLen map[LinkID]func([]byte) int // map from state to a function that will produce the total length of a single value in bytes. + bundlesToInject []RunBundle // bundlesToInject are triggered bundles that will be injected by the watermark loop to avoid premature pipeline termination. // Accounting for handling watermark holds for timers. // We track the count of timers with the same hold, and clear it from @@ -1177,7 +1205,7 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st } // AddPending adds elements to the pending heap. -func (ss *stageState) AddPending(newPending []element) int { +func (ss *stageState) AddPending(em *ElementManager, newPending []element) int { ss.mu.Lock() defer ss.mu.Unlock() if ss.aggregate { @@ -1188,15 +1216,13 @@ func (ss *stageState) AddPending(newPending []element) int { origPending := make([]element, 0, ss.pending.Len()) for _, e := range newPending { if ss.strat.EarliestCompletion(e.window) < threshold { - // TODO: figure out Pane and trigger firings. continue } origPending = append(origPending, e) } newPending = origPending } - //slog.Warn("AddPending", "stage", ss.ID, "pending", newPending) - if ss.stateful { + if ss.stateful || ss.aggregate { if ss.pendingByKeys == nil { ss.pendingByKeys = map[string]*dataAndTimers{} } @@ -1236,6 +1262,39 @@ func (ss *stageState) AddPending(newPending []element) int { // Mark the hold in the heap. ss.watermarkHolds.Add(e.holdTimestamp, 1) + } else if ss.aggregate { + // Check on triggers for this key. + // We use an empty linkID as the key into state for aggregations. + if ss.state == nil { + ss.state = make(map[LinkID]map[typex.Window]map[string]StateData) + } + lv, ok := ss.state[LinkID{}] + if !ok { + lv = make(map[typex.Window]map[string]StateData) + ss.state[LinkID{}] = lv + } + wv, ok := lv[e.window] + if !ok { + wv = make(map[string]StateData) + lv[e.window] = wv + } + state := wv[string(e.keyBytes)] + endOfWindowReached := e.window.MaxTimestamp() < ss.input + ready := ss.strat.IsTriggerReady(triggerInput{ + newElementCount: 1, + endOfWindowReached: endOfWindowReached, + }, &state) + + if ready { + state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached) + } + // Store the state as triggers may have changed it. + ss.state[LinkID{}][e.window][string(e.keyBytes)] = state + + // If we're ready, it's time to fire! + if ready { + count += ss.buildTriggeredBundle(em, e.keyBytes, e.window) + } } } return count @@ -1246,6 +1305,113 @@ func (ss *stageState) AddPending(newPending []element) int { return len(newPending) } +// computeNextTriggeredPane produces the correct pane relative to the previous pane, +// and the end of window state. +func computeNextTriggeredPane(pane typex.PaneInfo, endOfWindowReached bool) typex.PaneInfo { + // This is the first firing, since index and first are both + // set to their zero values. + if pane.Index == 0 && !pane.IsFirst { + pane.IsFirst = true + } else { + pane.Index++ + } + if endOfWindowReached { + pane.Timing = typex.PaneLate + pane.NonSpeculativeIndex++ + } else { + pane.Timing = typex.PaneEarly + pane.NonSpeculativeIndex = -1 + } + return pane +} + +// computeNextWatermarkPane computes the next pane given the previous pane, +// when the watermark passes either the End of Window, or End of Window plus +// the allowed lateness. +func computeNextWatermarkPane(pane typex.PaneInfo) typex.PaneInfo { + // The pane state is still early: this is the OnTime firing. + switch pane.Timing { + case typex.PaneEarly: + // We haven't fired ontime yet. + pane.Timing = typex.PaneOnTime + pane.NonSpeculativeIndex = 0 + + case typex.PaneOnTime: + // This must be the closing pane after an ontime pane. + pane.Timing = typex.PaneLate + pane.NonSpeculativeIndex++ + case typex.PaneLate: + // We have had some other late pane. + pane.NonSpeculativeIndex++ + } + // This is the first firing, since index and first are both + // set to their zero values. + if pane.Index == 0 && !pane.IsFirst { + pane.IsFirst = true + } else { + pane.Index++ + } + return pane +} + +// buildTriggeredBundle must be called with the stage.mu lock held. +// When in discaring mode, returns 0. +// When in accumulating mode, returns the number of fired elements to maintain a correct pending count. +func (ss *stageState) buildTriggeredBundle(em *ElementManager, key []byte, win typex.Window) int { + var toProcess []element + dnt := ss.pendingByKeys[string(key)] + var notYet []element + + rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input} + + // Look at all elements for this key, and only for this window. + for dnt.elements.Len() > 0 { + e := heap.Pop(&dnt.elements).(element) + if e.window != win { + notYet = append(notYet, e) + continue + } + toProcess = append(toProcess, e) + } + + // accumulationDiff adjusts the pending elements count to include + // the accumulated elements, which would be the new elements, but + // also all previous elements as well, which are duplicated at + // at every pane. + accumulationDiff := 0 + if ss.strat.Accumulating { + // When accumulating, we need to retain all elements until the last pane firing. + for _, e := range toProcess { + heap.Push(&dnt.elements, e) + } + accumulationDiff += len(toProcess) + } + dnt.elements = append(dnt.elements, notYet...) + if dnt.elements.Len() == 0 { + delete(ss.pendingByKeys, string(key)) + } else { + // Ensure the heap invariants are maintained. + heap.Init(&dnt.elements) + } + + if ss.inprogressKeys == nil { + ss.inprogressKeys = set[string]{} + } + ss.makeInProgressBundle( + func() string { return rb.BundleID }, + toProcess, + ss.input, + singleSet(string(key)), + nil, + ) + ss.bundlesToInject = append(ss.bundlesToInject, rb) + // Bundle is marked in progress here to prevent a race condition. + em.refreshCond.L.Lock() + em.inprogressBundles.insert(rb.BundleID) + em.refreshCond.L.Unlock() + return accumulationDiff +} + // AddPendingSide adds elements to be consumed as side inputs. func (ss *stageState) AddPendingSide(newPending []element, tID, inputID string) { ss.mu.Lock() @@ -1332,7 +1498,8 @@ var ( // startBundle initializes a bundle with elements if possible. // A bundle only starts if there are elements at all, and if it's // an aggregation stage, if the windowing stratgy allows it. -func (ss *stageState) startEventTimeBundle(watermark mtime.Time, genBundID func() string) (string, bool, bool) { +// Returns a non-zero adjustment to the pending elements count if the stage is accumulating. +func (ss *stageState) startEventTimeBundle(watermark mtime.Time, genBundID func() string) (string, bool, bool, int) { defer func() { if e := recover(); e != nil { panic(fmt.Sprintf("generating bundle for stage %v at watermark %v panicked\n%v", ss.ID, watermark, e)) @@ -1341,15 +1508,9 @@ func (ss *stageState) startEventTimeBundle(watermark mtime.Time, genBundID func( ss.mu.Lock() defer ss.mu.Unlock() - var toProcess, notYet []element - for _, e := range ss.pending { - if !ss.aggregate || ss.aggregate && ss.strat.EarliestCompletion(e.window) < watermark { - toProcess = append(toProcess, e) - } else { - notYet = append(notYet, e) - } - } - ss.pending = notYet + var toProcess = ss.pending + ss.pending = nil + heap.Init(&ss.pending) if ss.inprogressKeys == nil { ss.inprogressKeys = set[string]{} @@ -1367,6 +1528,7 @@ func (ss *stageState) startEventTimeBundle(watermark mtime.Time, genBundID func( // timers might have held back the minimum pending watermark. timerCleared := false + accumulatingPendingAdjustment := 0 keysPerBundle: for k, dnt := range ss.pendingByKeys { if ss.inprogressKeys.present(k) { @@ -1380,6 +1542,8 @@ keysPerBundle: dataInBundle := false + var toProcessForKey []element + // Can we pre-compute this bit when adding to pendingByKeys? // startBundle is in run in a single scheduling goroutine, so moving per-element code // to be computed by the bundle parallel goroutines will speed things up a touch. @@ -1389,11 +1553,36 @@ keysPerBundle: // If the bundle already contains data, then it's before the timer // by the heap invariant, and must be processed before we can fire a timer. // AKA, keep them seperate. - if len(toProcess) > 0 && // If we have already picked some elements AND + if len(toProcessForKey) > 0 && // If we have already picked some elements AND ((dataInBundle && dnt.elements[0].IsTimer()) || // we're about to add a timer to a Bundle that already has data OR (!dataInBundle && !dnt.elements[0].IsTimer())) { // we're about to add data to a bundle that already has a time break } + // If this is an aggregation, only include elements for this key + // if we're after the end of window, or after the window expiry deadline. + if ss.aggregate { + // We will only ever trigger aggregations by watermark at most twice, once the watermark passes the window ends for OnTime completion, + // and once for when the window is closing. + elm := dnt.elements[0] + if watermark <= elm.window.MaxTimestamp() { + // The watermark hasn't passed the end of the window yet, we do nothing. + break + } + // Watermark is past the end of this window. Have we fired an OnTime pane yet? + state := ss.state[LinkID{}][elm.window][string(elm.keyBytes)] + // If this is not the ontime firing for this key. + + if state.Pane.Timing != typex.PaneEarly && watermark <= ss.strat.EarliestCompletion(elm.window) { + // The watermark is still before the earliest final completion for this window. + // Do not add further data for this firing. + // If this is the Never trigger, we also don't fire OnTime until after the earliest completion. + break + } + if ss.strat.IsNeverTrigger() && watermark <= ss.strat.EarliestCompletion(elm.window) { + // The NeverTrigger only has a single firing at the end of window + allowed lateness. + break + } + } e := heap.Pop(&dnt.elements).(element) if e.IsData() { dataInBundle = true @@ -1411,11 +1600,53 @@ keysPerBundle: // Clear the "fired" timer so subsequent matches can be ignored. delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window}) } - toProcess = append(toProcess, e) - if OneElementPerKey { + toProcessForKey = append(toProcessForKey, e) + if !ss.aggregate && OneElementPerKey { + // For aggregations, a single key is a single element. break } } + + // Get the pane for the aggregation correct, only mutate it once per key and window. + if ss.aggregate { + handledWindows := set[typex.Window]{} + for _, elm := range toProcessForKey { + state := ss.state[LinkID{}][elm.window][string(elm.keyBytes)] + if handledWindows.present(elm.window) { + // The pane is already correct for this key + window + firing. + if ss.strat.Accumulating && !state.Pane.IsLast { + // If this isn't the last pane, then we must add the element back to the pending store for subsequent firings. + heap.Push(&dnt.elements, elm) + accumulatingPendingAdjustment++ + } + continue + } + handledWindows.insert(elm.window) + + state.Pane = computeNextWatermarkPane(state.Pane) + // Determine if this is the last pane. + // Check if this is the post closing firing, which will be the last one. + // Unless it's the ontime pane, at which point it can never be last. + if watermark > ss.strat.EarliestCompletion(elm.window) && state.Pane.Timing != typex.PaneOnTime { + state.Pane.IsLast = true + } + if ss.strat.AllowedLateness == 0 || ss.strat.IsNeverTrigger() { + // If the allowed lateness is zero, then this will be the last pane. + // If this is the NeverTrigger, it's the last pane. + state.Pane.IsLast = true + } + ss.state[LinkID{}][elm.window][string(elm.keyBytes)] = state + + // The pane is already correct for this key + window + firing. + if ss.strat.Accumulating && !state.Pane.IsLast { + // If this isn't the last pane, then we must add the element back to the pending store for subsequent firings. + heap.Push(&dnt.elements, elm) + accumulatingPendingAdjustment++ + } + } + } + toProcess = append(toProcess, toProcessForKey...) + if dnt.elements.Len() == 0 { delete(ss.pendingByKeys, k) } @@ -1424,18 +1655,19 @@ keysPerBundle: } } stillSchedulable := true - if len(ss.pendingByKeys) == 0 && !timerCleared { + if ss.aggregate || (len(ss.pendingByKeys) == 0 && !timerCleared) { + // If this is an aggregate, we need a watermark change in order to reschedule // If we're out of data, and timers were not cleared then the watermark is accurate. stillSchedulable = false } if len(toProcess) == 0 { - // If we have nothing - return "", false, stillSchedulable + // If we have nothing, there's nothing to progress. + return "", false, stillSchedulable, accumulatingPendingAdjustment } bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle) - return bundID, true, stillSchedulable + return bundID, true, stillSchedulable, accumulatingPendingAdjustment } func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.Time, genBundID func() string) (string, bool, bool) { @@ -1541,7 +1773,6 @@ func (ss *stageState) makeInProgressBundle(genBundID func() string, toProcess [] ss.inprogressKeysByBundle[bundID] = newKeys ss.inprogressKeys.merge(newKeys) ss.inprogressHoldsByBundle[bundID] = holdsInBundle - //slog.Warn("makeInProgressBundle", "stage", ss.ID, "toProcess", toProcess) return bundID } @@ -1765,11 +1996,12 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele // bundleReady returns the maximum allowed watermark for this stage, and whether // it's permitted to execute by side inputs. -func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.Time, bool, bool) { +func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.Time, bool, bool, bool) { ss.mu.Lock() defer ss.mu.Unlock() ptimeEventsReady := ss.processingTimeTimers.Peek() <= emNow || emNow == mtime.MaxTimestamp + injectedReady := len(ss.bundlesToInject) > 0 // If the upstream watermark and the input watermark are the same, // then we can't yet process this stage. @@ -1781,7 +2013,7 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T slog.Group("watermark", slog.Any("upstream", upstreamW), slog.Any("input", inputW))) - return mtime.MinTimestamp, false, ptimeEventsReady + return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady } ready := true for _, side := range ss.sides { @@ -1798,7 +2030,7 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T ready = false } } - return upstreamW, ready, ptimeEventsReady + return upstreamW, ready, ptimeEventsReady, injectedReady } // ProcessingTimeNow gives the current processing time for the runner. diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go index b6e4412c3a83..0ad5b8c68c5c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go @@ -222,6 +222,49 @@ func TestTestStream(t *testing.T) { } } +func TestTriggers(t *testing.T) { + initRunner(t) + + tests := []struct { + pipeline func(s beam.Scope) + }{ + {pipeline: primitives.TriggerNever}, + {pipeline: primitives.TriggerAfterAll}, + {pipeline: primitives.TriggerAfterAny}, + {pipeline: primitives.TriggerAfterEach}, + {pipeline: primitives.TriggerAfterEndOfWindow}, + {pipeline: primitives.TriggerRepeat}, + } + + configs := []struct { + name string + OneElementPerKey, OneKeyPerBundle bool + }{ + {"Greedy", false, false}, + {"AllElementsPerKey", false, true}, + {"OneElementPerKey", true, false}, + {"OneElementPerBundle", true, true}, + } + for _, config := range configs { + for _, test := range tests { + t.Run(initTestName(test.pipeline)+"_"+config.name, func(t *testing.T) { + t.Cleanup(func() { + engine.OneElementPerKey = false + engine.OneKeyPerBundle = false + }) + engine.OneElementPerKey = config.OneElementPerKey + engine.OneKeyPerBundle = config.OneKeyPerBundle + p, s := beam.NewPipelineWithRoot() + test.pipeline(s) + _, err := executeWithT(context.Background(), t, p) + if err != nil { + t.Fatalf("pipeline failed, but feature should be implemented in Prism: %v", err) + } + }) + } + } +} + // TestProcessingTime is the suite for validating behaviors around ProcessingTime. // Separate from the TestStream, Timers, and Triggers tests due to the unique nature // of the time domain. diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go index da858b280d15..5446d3edd3c0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -46,6 +46,7 @@ import ( // stage's input PCollection. type WinStrat struct { AllowedLateness time.Duration // Used to extend duration + Accumulating bool // If true, elements remain pending until the last firing. Trigger Trigger // Evaluated during execution. } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go index 533cd5a0fc40..0af4e7dc41f0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go @@ -186,7 +186,7 @@ func (ev tsElementEvent) Execute(em *ElementManager) { // Update the consuming state. for _, sID := range em.consumers[t.pcollection] { ss := em.stages[sID] - added := ss.AddPending(pending) + added := ss.AddPending(em, pending) em.addPending(added) em.changedStages.insert(sID) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 68b817a513d9..51848944c0c1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -36,6 +36,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" ) @@ -226,6 +227,8 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic ws := windowingStrategy(comps, tid) em.StageAggregates(stage.ID, engine.WinStrat{ AllowedLateness: time.Duration(ws.GetAllowedLateness()) * time.Millisecond, + Accumulating: pipepb.AccumulationMode_ACCUMULATING == ws.GetAccumulationMode(), + Trigger: buildTrigger(ws.GetTrigger()), }) case urns.TransformImpulse: impulses = append(impulses, stage.ID) @@ -399,3 +402,50 @@ func getOnlyValue[K comparable, V any](in map[K]V) V { _, v := getOnlyPair(in) return v } + +// buildTrigger converts the protocol buffer representation of a trigger +// to the engine representation. +func buildTrigger(tpb *pipepb.Trigger) engine.Trigger { + switch at := tpb.GetTrigger().(type) { + case *pipepb.Trigger_AfterAll_: + subTriggers := make([]engine.Trigger, 0, len(at.AfterAll.GetSubtriggers())) + for _, st := range at.AfterAll.GetSubtriggers() { + subTriggers = append(subTriggers, buildTrigger(st)) + } + return &engine.TriggerAfterAll{SubTriggers: subTriggers} + case *pipepb.Trigger_AfterAny_: + subTriggers := make([]engine.Trigger, 0, len(at.AfterAny.GetSubtriggers())) + for _, st := range at.AfterAny.GetSubtriggers() { + subTriggers = append(subTriggers, buildTrigger(st)) + } + return &engine.TriggerAfterAny{SubTriggers: subTriggers} + case *pipepb.Trigger_AfterEach_: + subTriggers := make([]engine.Trigger, 0, len(at.AfterEach.GetSubtriggers())) + for _, st := range at.AfterEach.GetSubtriggers() { + subTriggers = append(subTriggers, buildTrigger(st)) + } + return &engine.TriggerAfterEach{SubTriggers: subTriggers} + case *pipepb.Trigger_AfterEndOfWindow_: + return &engine.TriggerAfterEndOfWindow{ + Early: buildTrigger(at.AfterEndOfWindow.GetEarlyFirings()), + Late: buildTrigger(at.AfterEndOfWindow.GetLateFirings()), + } + case *pipepb.Trigger_Always_: + return &engine.TriggerAlways{} + case *pipepb.Trigger_ElementCount_: + return &engine.TriggerElementCount{ElementCount: int(at.ElementCount.GetElementCount())} + case *pipepb.Trigger_Never_: + return &engine.TriggerNever{} + case *pipepb.Trigger_OrFinally_: + return &engine.TriggerOrFinally{ + Main: buildTrigger(at.OrFinally.GetMain()), + Finally: buildTrigger(at.OrFinally.GetFinally()), + } + case *pipepb.Trigger_Repeat_: + return &engine.TriggerRepeatedly{Repeated: buildTrigger(at.Repeat.GetSubtrigger())} + case *pipepb.Trigger_AfterProcessingTime_, *pipepb.Trigger_AfterSynchronizedProcessingTime_: + panic(fmt.Sprintf("unsupported trigger: %v", prototext.Format(tpb))) + default: + return &engine.TriggerDefault{} + } +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go b/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go index 5d6d6dee6bf4..6b336043b8c9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go @@ -59,17 +59,51 @@ func (*combine) PrepareUrns() []string { // PrepareTransform returns lifted combines and removes the leaves if enabled. Otherwise returns nothing. func (h *combine) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) prepareResult { - // If we aren't lifting, the "default impl" for combines should be sufficient. - if !h.config.EnableLifting { - return prepareResult{ - SubbedComps: &pipepb.Components{ - Transforms: map[string]*pipepb.PTransform{ - tid: t, - }, - }, + + onlyInput := getOnlyValue(t.GetInputs()) + combineInput := comps.GetPcollections()[onlyInput] + ws := comps.GetWindowingStrategies()[combineInput.GetWindowingStrategyId()] + + var hasElementCount func(tpb *pipepb.Trigger) bool + + hasElementCount = func(tpb *pipepb.Trigger) bool { + elCount := false + switch at := tpb.GetTrigger().(type) { + case *pipepb.Trigger_ElementCount_: + return true + case *pipepb.Trigger_AfterAll_: + for _, st := range at.AfterAll.GetSubtriggers() { + elCount = elCount || hasElementCount(st) + } + return elCount + case *pipepb.Trigger_AfterAny_: + for _, st := range at.AfterAny.GetSubtriggers() { + elCount = elCount || hasElementCount(st) + } + return elCount + case *pipepb.Trigger_AfterEach_: + for _, st := range at.AfterEach.GetSubtriggers() { + elCount = elCount || hasElementCount(st) + } + return elCount + case *pipepb.Trigger_AfterEndOfWindow_: + return hasElementCount(at.AfterEndOfWindow.GetEarlyFirings()) || + hasElementCount(at.AfterEndOfWindow.GetLateFirings()) + case *pipepb.Trigger_OrFinally_: + return hasElementCount(at.OrFinally.GetMain()) || + hasElementCount(at.OrFinally.GetFinally()) + case *pipepb.Trigger_Repeat_: + return hasElementCount(at.Repeat.GetSubtrigger()) + default: + return false } } + // If we aren't lifting, the "default impl" for combines should be sufficient. + if !h.config.EnableLifting || hasElementCount(ws.GetTrigger()) { + return prepareResult{} // Strip the composite layer when lifting is disabled. + } + // To lift a combine, the spec should contain a CombinePayload. // That contains the actual FunctionSpec for the DoFn, and the // id for the accumulator coder. diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlecombine_test.go b/sdks/go/pkg/beam/runners/prism/internal/handlecombine_test.go index 92fd8d78d46d..7b38daa295ef 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlecombine_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlecombine_test.go @@ -103,13 +103,7 @@ func TestHandleCombine(t *testing.T) { Pcollections: basePCollectionMap, Coders: baseCoderMap, }, - want: prepareResult{ - SubbedComps: &pipepb.Components{ - Transforms: map[string]*pipepb.PTransform{ - undertest: combineTransform, - }, - }, - }, + want: prepareResult{}, }, { name: "lifted", lifted: true, diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index a4307b706fa3..7092ae8da685 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -228,7 +228,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { // Both Closing behaviors are identical without additional trigger firings. check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY, pipepb.ClosingBehavior_EMIT_ALWAYS) - check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING) + check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING, pipepb.AccumulationMode_ACCUMULATING) if ws.GetWindowFn().GetUrn() != urns.WindowFnSession { check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING) } @@ -239,20 +239,9 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * // Tests actually using the set behavior will fail. check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW, pipepb.OutputTime_EARLIEST_IN_PANE, pipepb.OutputTime_LATEST_IN_PANE) - // Non default triggers should fail. - if ws.GetTrigger().GetDefault() == nil { - dt := &pipepb.Trigger{ - Trigger: &pipepb.Trigger_Default_{}, - } - // Allow Never and Always triggers to unblock iteration on Java and Python SDKs. - // Without multiple firings, these will be very similar to the default trigger. - nt := &pipepb.Trigger{ - Trigger: &pipepb.Trigger_Never_{}, - } - at := &pipepb.Trigger{ - Trigger: &pipepb.Trigger_Always_{}, - } - check("WindowingStrategy.Trigger", ws.GetTrigger().String(), dt.String(), nt.String(), at.String()) + + if hasUnsupportedTriggers(ws.GetTrigger()) { + check("WindowingStrategy.Trigger", ws.GetTrigger().String()) } } } @@ -272,6 +261,39 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * }, nil } +func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool { + unsupported := false + switch at := tpb.GetTrigger().(type) { + case *pipepb.Trigger_AfterProcessingTime_, *pipepb.Trigger_AfterSynchronizedProcessingTime_: + return true + case *pipepb.Trigger_AfterAll_: + for _, st := range at.AfterAll.GetSubtriggers() { + unsupported = unsupported || hasUnsupportedTriggers(st) + } + return unsupported + case *pipepb.Trigger_AfterAny_: + for _, st := range at.AfterAny.GetSubtriggers() { + unsupported = unsupported || hasUnsupportedTriggers(st) + } + return unsupported + case *pipepb.Trigger_AfterEach_: + for _, st := range at.AfterEach.GetSubtriggers() { + unsupported = unsupported || hasUnsupportedTriggers(st) + } + return unsupported + case *pipepb.Trigger_AfterEndOfWindow_: + return hasUnsupportedTriggers(at.AfterEndOfWindow.GetEarlyFirings()) || + hasUnsupportedTriggers(at.AfterEndOfWindow.GetLateFirings()) + case *pipepb.Trigger_OrFinally_: + return hasUnsupportedTriggers(at.OrFinally.GetMain()) || + hasUnsupportedTriggers(at.OrFinally.GetFinally()) + case *pipepb.Trigger_Repeat_: + return hasUnsupportedTriggers(at.Repeat.GetSubtrigger()) + default: + return false + } +} + func (s *Server) Run(ctx context.Context, req *jobpb.RunJobRequest) (*jobpb.RunJobResponse, error) { s.mu.Lock() job := s.jobs[req.GetPreparationId()] diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index f8917c72ccde..185940eada14 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -43,17 +43,18 @@ func TestUnimplemented(t *testing.T) { }{ // {pipeline: primitives.Drain}, // Can't test drain automatically yet. - // Triggers (Need teststream and are unimplemented.) - {pipeline: primitives.TriggerAfterAll}, - {pipeline: primitives.TriggerAfterAny}, - {pipeline: primitives.TriggerAfterEach}, - {pipeline: primitives.TriggerAfterEndOfWindow}, - {pipeline: primitives.TriggerAfterProcessingTime}, - {pipeline: primitives.TriggerAfterSynchronizedProcessingTime}, + // Implemented but the Go SDK doesn't fully handle panes and + // their associated valid behaviors for these triggers, leading + // to variable results. + // See https://github.com/apache/beam/issues/31153. {pipeline: primitives.TriggerElementCount}, {pipeline: primitives.TriggerOrFinally}, - {pipeline: primitives.TriggerRepeat}, {pipeline: primitives.TriggerAlways}, + + // Currently unimplemented triggers. + // https://github.com/apache/beam/issues/31438 + {pipeline: primitives.TriggerAfterSynchronizedProcessingTime}, + {pipeline: primitives.TriggerAfterProcessingTime}, } for _, test := range tests { @@ -85,11 +86,13 @@ func TestImplemented(t *testing.T) { {pipeline: primitives.ReshuffleKV}, {pipeline: primitives.ParDoProcessElementBundleFinalizer}, - // The following have been "allowed" to unblock further development - // But it's not clear these tests truly validate the expected behavior - // of the triggers or panes. {pipeline: primitives.TriggerNever}, {pipeline: primitives.Panes}, + {pipeline: primitives.TriggerAfterAll}, + {pipeline: primitives.TriggerAfterAny}, + {pipeline: primitives.TriggerAfterEach}, + {pipeline: primitives.TriggerAfterEndOfWindow}, + {pipeline: primitives.TriggerRepeat}, } for _, test := range tests {