Skip to content

Commit

Permalink
initial triggers pass w/accumulation
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Feb 5, 2025
1 parent b69b703 commit 6efb180
Show file tree
Hide file tree
Showing 11 changed files with 471 additions and 101 deletions.
30 changes: 10 additions & 20 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,33 +86,27 @@ def sickbayTests = [
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedStringSetMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedGaugeMetrics',

// Triggers / Accumulation modes not yet implemented in prism.
// ProcessingTime triggers not yet implemented in Prism.
// https://github.com/apache/beam/issues/31438
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers',
'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testNoWindowFnDoesNotReassignWindows',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testCombiningAccumulatingProcessingTime',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly',
'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle',
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
'org.apache.beam.sdk.testing.TestStreamTest.testElementsAtAlmostPositiveInfinity',
'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating',
'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',
'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating', // Uses processing time trigger for early firings.

// Triggered Side Inputs not yet implemented in Prism.
// https://github.com/apache/beam/issues/31438
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',

// Prism doesn't support multiple TestStreams.
'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',

// GroupIntoBatchesTest tests that fail:
// Teststream has bad KV encodings due to using an outer context.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode',
// ShardedKey not yet implemented.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',

// Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected
'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',

// Java side dying during execution.
// https://github.com/apache/beam/issues/32930
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
Expand Down Expand Up @@ -143,9 +137,6 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes',
'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing',
'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing',
'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows',

// Possibly a different error being hidden behind the main error.
// org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow cannot be cast to class java.lang.String
Expand All @@ -154,11 +145,10 @@ def sickbayTests = [
// TODO(https://github.com/apache/beam/issues/31231)
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',

// Prism isn't handling Java's side input views properly.
// Prism isn't handling Java's side input views properly, likely related to triggered side inputs.
// https://github.com/apache/beam/issues/32932
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
// Consider using Combine.globally().asSingleton() to combine the PCollection into a single value
'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput',
// java.util.NoSuchElementException: Empty PCollection accessed as a singleton view.
'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput',
// ava.lang.IllegalArgumentException: Duplicate values for a
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type StateData struct {
Multimap map[string][][]byte

Trigger map[Trigger]triggerState
Pane typex.PaneInfo
}

func (s *StateData) getTriggerState(key Trigger) triggerState {
Expand Down
Loading

0 comments on commit 6efb180

Please sign in to comment.