|  | 
| 106 | 106 | import static org.apache.flink.api.common.JobStatus.RUNNING; | 
| 107 | 107 | import static org.apache.flink.util.Preconditions.checkState; | 
| 108 | 108 | import static org.assertj.core.api.Assertions.assertThat; | 
|  | 109 | +import static org.junit.jupiter.api.Assertions.assertTrue; | 
| 109 | 110 | 
 | 
| 110 | 111 | /** IT tests for {@link MySqlSource}. */ | 
| 111 | 112 | @Timeout(value = 300, unit = TimeUnit.SECONDS) | 
| @@ -417,7 +418,7 @@ void testSnapshotSplitReadingFailCrossCheckpoints(String tableName, String chunk | 
| 417 | 418 |         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); | 
| 418 | 419 | 
 | 
| 419 | 420 |         // The sleeping source will sleep awhile after send per record | 
| 420 |  | -        MySqlSource<RowData> sleepingSource = buildSleepingSource(tableName, chunkColumnName); | 
|  | 421 | +        MySqlSource<RowData> sleepingSource = buildSleepingSource(tableName, chunkColumnName, null); | 
| 421 | 422 |         DataStreamSource<RowData> source = | 
| 422 | 423 |                 env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); | 
| 423 | 424 | 
 | 
| @@ -711,6 +712,153 @@ void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { | 
| 711 | 712 |         assertEqualsInAnyOrder(expectedRecords, records); | 
| 712 | 713 |     } | 
| 713 | 714 | 
 | 
|  | 715 | +    @ParameterizedTest | 
|  | 716 | +    @MethodSource("parameters") | 
|  | 717 | +    @SuppressWarnings({"rawtypes", "unchecked"}) | 
|  | 718 | +    public void testSnapshotFilters(String tableName, String chunkColumnName) throws Exception { | 
|  | 719 | +        customDatabase.createAndInitialize(); | 
|  | 720 | +        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | 
|  | 721 | +        env.setParallelism(DEFAULT_PARALLELISM); | 
|  | 722 | +        env.enableCheckpointing(5000L); | 
|  | 723 | +        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); | 
|  | 724 | + | 
|  | 725 | +        // Filter user with `id > 200` | 
|  | 726 | +        // The sleeping source will sleep awhile after send per record | 
|  | 727 | +        MySqlSource<RowData> sleepingSource = | 
|  | 728 | +                buildSleepingSource(tableName, chunkColumnName, "id > 200"); | 
|  | 729 | +        DataStreamSource<RowData> source = | 
|  | 730 | +                env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); | 
|  | 731 | + | 
|  | 732 | +        String[] expectedSnapshotData = | 
|  | 733 | +                new String[] { | 
|  | 734 | +                    "+I[1009, user_10, Shanghai, 123567891234]", | 
|  | 735 | +                    "+I[1010, user_11, Shanghai, 123567891234]", | 
|  | 736 | +                    "+I[1011, user_12, Shanghai, 123567891234]", | 
|  | 737 | +                    "+I[1012, user_13, Shanghai, 123567891234]", | 
|  | 738 | +                    "+I[1013, user_14, Shanghai, 123567891234]", | 
|  | 739 | +                    "+I[1014, user_15, Shanghai, 123567891234]", | 
|  | 740 | +                    "+I[1015, user_16, Shanghai, 123567891234]", | 
|  | 741 | +                    "+I[1016, user_17, Shanghai, 123567891234]", | 
|  | 742 | +                    "+I[1017, user_18, Shanghai, 123567891234]", | 
|  | 743 | +                    "+I[1018, user_19, Shanghai, 123567891234]", | 
|  | 744 | +                    "+I[1019, user_20, Shanghai, 123567891234]", | 
|  | 745 | +                    "+I[2000, user_21, China:Shanghai, 123567891234]" | 
|  | 746 | +                }; | 
|  | 747 | +        TypeSerializer<RowData> serializer = | 
|  | 748 | +                source.getTransformation().getOutputType().createSerializer(env.getConfig()); | 
|  | 749 | +        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); | 
|  | 750 | +        CollectSinkOperatorFactory<RowData> factory = | 
|  | 751 | +                new CollectSinkOperatorFactory(serializer, accumulatorName); | 
|  | 752 | +        CollectSinkOperator<RowData> operator = (CollectSinkOperator) factory.getOperator(); | 
|  | 753 | +        CollectResultIterator<RowData> iterator = | 
|  | 754 | +                new CollectResultIterator( | 
|  | 755 | +                        operator.getOperatorIdFuture(), | 
|  | 756 | +                        serializer, | 
|  | 757 | +                        accumulatorName, | 
|  | 758 | +                        env.getCheckpointConfig(), | 
|  | 759 | +                        10000L); | 
|  | 760 | +        CollectStreamSink<RowData> sink = new CollectStreamSink(source, factory); | 
|  | 761 | +        sink.name("Data stream collect sink"); | 
|  | 762 | +        env.addOperator(sink.getTransformation()); | 
|  | 763 | +        JobClient jobClient = env.executeAsync("snapshotSplitTest"); | 
|  | 764 | +        iterator.setJobClient(jobClient); | 
|  | 765 | +        JobID jobId = jobClient.getJobID(); | 
|  | 766 | + | 
|  | 767 | +        // Trigger failover once some snapshot records has been sent by sleeping source | 
|  | 768 | +        if (iterator.hasNext()) { | 
|  | 769 | +            triggerFailover( | 
|  | 770 | +                    FailoverType.JM, | 
|  | 771 | +                    jobId, | 
|  | 772 | +                    miniClusterResource.get().getMiniCluster(), | 
|  | 773 | +                    () -> sleepMs(100)); | 
|  | 774 | +        } | 
|  | 775 | + | 
|  | 776 | +        // Check all snapshot records are sent with exactly-once semantics | 
|  | 777 | +        assertEqualsInAnyOrder( | 
|  | 778 | +                Arrays.asList(expectedSnapshotData), | 
|  | 779 | +                fetchRowData(iterator, expectedSnapshotData.length)); | 
|  | 780 | +        assertTrue(!hasNextData(iterator)); | 
|  | 781 | +        jobClient.cancel().get(); | 
|  | 782 | +    } | 
|  | 783 | + | 
|  | 784 | +    @ParameterizedTest | 
|  | 785 | +    @MethodSource("parameters") | 
|  | 786 | +    @SuppressWarnings({"rawtypes", "unchecked"}) | 
|  | 787 | +    public void testSnapshotFiltersEscape(String tableName, String chunkColumnName) | 
|  | 788 | +            throws Exception { | 
|  | 789 | +        customDatabase.createAndInitialize(); | 
|  | 790 | +        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | 
|  | 791 | +        env.setParallelism(DEFAULT_PARALLELISM); | 
|  | 792 | +        env.enableCheckpointing(5000L); | 
|  | 793 | +        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); | 
|  | 794 | + | 
|  | 795 | +        // Filter user with `id > 200` | 
|  | 796 | +        // The sleeping source will sleep awhile after send per record | 
|  | 797 | +        MySqlSource<RowData> sleepingSource = | 
|  | 798 | +                buildSleepingSource(tableName, chunkColumnName, "address != 'China:Shanghai'"); | 
|  | 799 | +        DataStreamSource<RowData> source = | 
|  | 800 | +                env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); | 
|  | 801 | + | 
|  | 802 | +        String[] expectedSnapshotData = | 
|  | 803 | +                new String[] { | 
|  | 804 | +                    "+I[101, user_1, Shanghai, 123567891234]", | 
|  | 805 | +                    "+I[102, user_2, Shanghai, 123567891234]", | 
|  | 806 | +                    "+I[103, user_3, Shanghai, 123567891234]", | 
|  | 807 | +                    "+I[109, user_4, Shanghai, 123567891234]", | 
|  | 808 | +                    "+I[110, user_5, Shanghai, 123567891234]", | 
|  | 809 | +                    "+I[111, user_6, Shanghai, 123567891234]", | 
|  | 810 | +                    "+I[118, user_7, Shanghai, 123567891234]", | 
|  | 811 | +                    "+I[121, user_8, Shanghai, 123567891234]", | 
|  | 812 | +                    "+I[123, user_9, Shanghai, 123567891234]", | 
|  | 813 | +                    "+I[1009, user_10, Shanghai, 123567891234]", | 
|  | 814 | +                    "+I[1010, user_11, Shanghai, 123567891234]", | 
|  | 815 | +                    "+I[1011, user_12, Shanghai, 123567891234]", | 
|  | 816 | +                    "+I[1012, user_13, Shanghai, 123567891234]", | 
|  | 817 | +                    "+I[1013, user_14, Shanghai, 123567891234]", | 
|  | 818 | +                    "+I[1014, user_15, Shanghai, 123567891234]", | 
|  | 819 | +                    "+I[1015, user_16, Shanghai, 123567891234]", | 
|  | 820 | +                    "+I[1016, user_17, Shanghai, 123567891234]", | 
|  | 821 | +                    "+I[1017, user_18, Shanghai, 123567891234]", | 
|  | 822 | +                    "+I[1018, user_19, Shanghai, 123567891234]", | 
|  | 823 | +                    "+I[1019, user_20, Shanghai, 123567891234]" | 
|  | 824 | +                }; | 
|  | 825 | +        TypeSerializer<RowData> serializer = | 
|  | 826 | +                source.getTransformation().getOutputType().createSerializer(env.getConfig()); | 
|  | 827 | +        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); | 
|  | 828 | +        CollectSinkOperatorFactory<RowData> factory = | 
|  | 829 | +                new CollectSinkOperatorFactory(serializer, accumulatorName); | 
|  | 830 | +        CollectSinkOperator<RowData> operator = (CollectSinkOperator) factory.getOperator(); | 
|  | 831 | +        CollectResultIterator<RowData> iterator = | 
|  | 832 | +                new CollectResultIterator( | 
|  | 833 | +                        operator.getOperatorIdFuture(), | 
|  | 834 | +                        serializer, | 
|  | 835 | +                        accumulatorName, | 
|  | 836 | +                        env.getCheckpointConfig(), | 
|  | 837 | +                        10000L); | 
|  | 838 | +        CollectStreamSink<RowData> sink = new CollectStreamSink(source, factory); | 
|  | 839 | +        sink.name("Data stream collect sink"); | 
|  | 840 | +        env.addOperator(sink.getTransformation()); | 
|  | 841 | +        JobClient jobClient = env.executeAsync("snapshotSplitTest"); | 
|  | 842 | +        iterator.setJobClient(jobClient); | 
|  | 843 | +        JobID jobId = jobClient.getJobID(); | 
|  | 844 | + | 
|  | 845 | +        // Trigger failover once some snapshot records has been sent by sleeping source | 
|  | 846 | +        if (iterator.hasNext()) { | 
|  | 847 | +            triggerFailover( | 
|  | 848 | +                    FailoverType.JM, | 
|  | 849 | +                    jobId, | 
|  | 850 | +                    miniClusterResource.get().getMiniCluster(), | 
|  | 851 | +                    () -> sleepMs(100)); | 
|  | 852 | +        } | 
|  | 853 | + | 
|  | 854 | +        // Check all snapshot records are sent with exactly-once semantics | 
|  | 855 | +        assertEqualsInAnyOrder( | 
|  | 856 | +                Arrays.asList(expectedSnapshotData), | 
|  | 857 | +                fetchRowData(iterator, expectedSnapshotData.length)); | 
|  | 858 | +        assertTrue(!hasNextData(iterator)); | 
|  | 859 | +        jobClient.cancel().get(); | 
|  | 860 | +    } | 
|  | 861 | + | 
| 714 | 862 |     private List<String> testBackfillWhenWritingEvents( | 
| 715 | 863 |             boolean skipSnapshotBackfill, | 
| 716 | 864 |             int fetchSize, | 
| @@ -1020,7 +1168,8 @@ private <T> CollectResultIterator<T> addCollector( | 
| 1020 | 1168 |         return iterator; | 
| 1021 | 1169 |     } | 
| 1022 | 1170 | 
 | 
| 1023 |  | -    private MySqlSource<RowData> buildSleepingSource(String tableName, String chunkColumnName) { | 
|  | 1171 | +    private MySqlSource<RowData> buildSleepingSource( | 
|  | 1172 | +            String tableName, String chunkColumnName, String snapshotFilter) { | 
| 1024 | 1173 |         ResolvedSchema physicalSchema = | 
| 1025 | 1174 |                 new ResolvedSchema( | 
| 1026 | 1175 |                         Arrays.asList( | 
|  | 
0 commit comments