From 2699ea2138f0c0b1f7748e68771fd0499e53971d Mon Sep 17 00:00:00 2001 From: Abhi Gupta Date: Thu, 17 Jul 2025 17:21:59 +0530 Subject: [PATCH] [FLINK-36296][Connectors/DynamoDB] Add support for incremental shard discovery for DynamoDB Streams Source --- .../flink-connector-dynamodb/pom.xml | 2 + .../DynamoDbTypeInformedElementConverter.java | 2 +- .../source/DynamoDbStreamsSource.java | 45 ++++++---- .../DynamodbStreamsSourceConfigConstants.java | 7 ++ .../DynamoDbStreamsSourceEnumerator.java | 24 ++++-- .../enumerator/event/SplitsFinishedEvent.java | 15 ++-- .../event/SplitsFinishedEventContext.java | 28 ++++++ .../enumerator/tracker/SplitTracker.java | 4 + .../source/proxy/DynamoDbStreamsProxy.java | 43 ++++++++++ .../dynamodb/source/proxy/StreamProxy.java | 3 + .../reader/DynamoDbStreamsSourceReader.java | 51 ++++++++--- ...ollingDynamoDbStreamsShardSplitReader.java | 84 +++++++++++++++--- .../split/DynamoDbStreamsShardSplit.java | 40 ++++++++- .../DynamoDbStreamsShardSplitSerializer.java | 51 ++++++++++- .../split/DynamoDbStreamsShardSplitState.java | 10 +++ .../DynamoDbStreamsSourceEnumeratorTest.java | 32 +++++-- .../proxy/DynamoDbStreamsProxyTest.java | 85 ++++++++++++++++++- .../DynamoDbStreamsSourceReaderTest.java | 16 +++- ...ngDynamoDbStreamsShardSplitReaderTest.java | 16 +++- ...namoDbStreamsShardSplitSerializerTest.java | 14 +++ .../util/DynamoDbStreamsClientProvider.java | 29 +++++++ .../util/DynamoDbStreamsProxyProvider.java | 21 +++++ .../dynamodb/source/util/TestUtil.java | 35 ++++++++ 23 files changed, 581 insertions(+), 76 deletions(-) create mode 100644 flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java diff --git a/flink-connector-aws/flink-connector-dynamodb/pom.xml b/flink-connector-aws/flink-connector-dynamodb/pom.xml index 06ae5cd4d..e4cab321e 100644 --- a/flink-connector-aws/flink-connector-dynamodb/pom.xml +++ b/flink-connector-aws/flink-connector-dynamodb/pom.xml @@ -71,10 +71,12 @@ under the License. software.amazon.awssdk dynamodb + 2.32.0 software.amazon.awssdk dynamodb-enhanced + 2.32.0 diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java index 4978f4664..e2307fcf1 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java @@ -176,7 +176,7 @@ private TableSchema createTableSchemaFromPojo( tableSchemaBuilder, propertyDescriptor.getName(), BeanAttributeGetter.create( - typeInfo.getTypeClass(), propertyDescriptor.getReadMethod()), + typeInfo.getTypeClass(), propertyDescriptor.getReadMethod(), null), fieldInfo); } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java index 5b20bc13c..2998b30a1 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java @@ -55,10 +55,13 @@ import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.retries.AdaptiveRetryStrategy; import software.amazon.awssdk.retries.api.BackoffStrategy; +import software.amazon.awssdk.retries.api.RetryStrategy; +import software.amazon.awssdk.services.dynamodb.model.Shard; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import software.amazon.awssdk.utils.AttributeMap; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -145,14 +148,18 @@ public SourceReader createReader( final Duration getRecordsIdlePollingTimeBetweenEmptyPolls = sourceConfig.get(DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS); + Map> childShardMap = new ConcurrentHashMap<>(); // We create a new stream proxy for each split reader since they have their own independent // lifecycle. Supplier splitReaderSupplier = () -> new PollingDynamoDbStreamsShardSplitReader( - createDynamoDbStreamsProxy(sourceConfig), + createDynamoDbStreamsProxy( + sourceConfig, + SdkDefaultRetryStrategy.defaultRetryStrategy()), getRecordsIdlePollingTimeBetweenNonEmptyPolls, getRecordsIdlePollingTimeBetweenEmptyPolls, + childShardMap, shardMetricGroupMap); DynamoDbStreamsRecordEmitter recordEmitter = new DynamoDbStreamsRecordEmitter<>(deserializationSchema); @@ -162,6 +169,7 @@ public SourceReader createReader( recordEmitter, sourceConfig, readerContext, + childShardMap, shardMetricGroupMap); } @@ -178,11 +186,25 @@ public SourceReader createReader( SplitEnumeratorContext enumContext, DynamoDbStreamsSourceEnumeratorState checkpoint) throws Exception { + int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT); + Duration minDescribeStreamDelay = + sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY); + Duration maxDescribeStreamDelay = + sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY); + BackoffStrategy backoffStrategy = + BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay); + AdaptiveRetryStrategy adaptiveRetryStrategy = + SdkDefaultRetryStrategy.adaptiveRetryStrategy() + .toBuilder() + .maxAttempts(maxApiCallAttempts) + .backoffStrategy(backoffStrategy) + .throttlingBackoffStrategy(backoffStrategy) + .build(); return new DynamoDbStreamsSourceEnumerator( enumContext, streamArn, sourceConfig, - createDynamoDbStreamsProxy(sourceConfig), + createDynamoDbStreamsProxy(sourceConfig, adaptiveRetryStrategy), dynamoDbStreamsShardAssigner, checkpoint); } @@ -199,7 +221,8 @@ public SimpleVersionedSerializer getSplitSerializer() new DynamoDbStreamsShardSplitSerializer()); } - private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerConfig) { + private DynamoDbStreamsProxy createDynamoDbStreamsProxy( + Configuration consumerConfig, RetryStrategy retryStrategy) { SdkHttpClient httpClient = AWSGeneralUtil.createSyncHttpClient( AttributeMap.builder().build(), ApacheHttpClient.builder()); @@ -215,26 +238,12 @@ private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerCo consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties); AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties); - int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT); - Duration minDescribeStreamDelay = - sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY); - Duration maxDescribeStreamDelay = - sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY); - BackoffStrategy backoffStrategy = - BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay); - AdaptiveRetryStrategy adaptiveRetryStrategy = - SdkDefaultRetryStrategy.adaptiveRetryStrategy() - .toBuilder() - .maxAttempts(maxApiCallAttempts) - .backoffStrategy(backoffStrategy) - .throttlingBackoffStrategy(backoffStrategy) - .build(); DynamoDbStreamsClient dynamoDbStreamsClient = AWSClientUtil.createAwsSyncClient( dynamoDbStreamsClientProperties, httpClient, DynamoDbStreamsClient.builder(), - ClientOverrideConfiguration.builder().retryStrategy(adaptiveRetryStrategy), + ClientOverrideConfiguration.builder().retryStrategy(retryStrategy), DynamodbStreamsSourceConfigConstants .BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT, DynamodbStreamsSourceConfigConstants.DDB_STREAMS_CLIENT_USER_AGENT_PREFIX); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java index b16833e51..7d031cf9a 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java @@ -76,6 +76,9 @@ public enum InitialPosition { public static final String BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT = "Apache Flink %s (%s) DynamoDb Streams Connector"; + public static final String DYNAMODB_STREAMS_THROTTLING_EXCEPTION_ERROR_CODE = + "ThrottlingException"; + public static final ConfigOption DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS = ConfigOptions.key("flink.dynamodbstreams.getrecords.empty.mindelay") @@ -91,6 +94,10 @@ public enum InitialPosition { .withDescription( "The default idle time between non-empty polls for DynamoDB Streams GetRecords API"); + public static final int MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS = 5; + public static final Duration CHILD_SHARD_DISCOVERY_MIN_DELAY = Duration.ofMillis(100); + public static final Duration CHILD_SHARD_DISCOVERY_MAX_DELAY = Duration.ofMillis(1000); + /** DynamoDb Streams identifier for user agent prefix. */ public static final String DDB_STREAMS_CLIENT_USER_AGENT_PREFIX = "aws.dynamodbstreams.client.user-agent-prefix"; diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java index 0c2f00b11..fd4d56558 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition; import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent; +import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext; import org.apache.flink.connector.dynamodb.source.enumerator.tracker.SplitGraphInconsistencyTracker; import org.apache.flink.connector.dynamodb.source.enumerator.tracker.SplitTracker; import org.apache.flink.connector.dynamodb.source.exception.DynamoDbStreamsSourceException; @@ -138,7 +139,20 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { /** When we mark a split as finished, we will only assign its child splits to the subtasks. */ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { - splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + Set finishedSplitIds = + splitsFinishedEvent.getFinishedSplits().stream() + .map(SplitsFinishedEventContext::getSplitId) + .collect(Collectors.toSet()); + splitTracker.markAsFinished(finishedSplitIds); + List childrenOfFinishedSplits = new ArrayList<>(); + splitsFinishedEvent + .getFinishedSplits() + .forEach( + finishedSplitEvent -> + childrenOfFinishedSplits.addAll( + finishedSplitEvent.getChildSplits())); + LOG.info("Adding Children of finishedSplits to splitTracker: {}", childrenOfFinishedSplits); + splitTracker.addChildSplits(childrenOfFinishedSplits); Set splitsAssignment = splitAssignment.get(subtaskId); // during recovery, splitAssignment may return null since there might be no split assigned @@ -152,13 +166,12 @@ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinis + "Child shard discovery might be delayed until we have enough readers." + "Finished split ids: {}", subtaskId, - splitsFinishedEvent.getFinishedSplitIds()); + finishedSplitIds); return; } - splitsAssignment.removeIf( - split -> splitsFinishedEvent.getFinishedSplitIds().contains(split.splitId())); - assignChildSplits(splitsFinishedEvent.getFinishedSplitIds()); + splitsAssignment.removeIf(split -> finishedSplitIds.contains(split.splitId())); + assignChildSplits(finishedSplitIds); } private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { @@ -230,6 +243,7 @@ private void assignAllAvailableSplits() { private void assignChildSplits(Set finishedSplitIds) { List splitsAvailableForAssignment = splitTracker.getUnassignedChildSplits(finishedSplitIds); + LOG.info("Unassigned child splits: {}", splitsAvailableForAssignment); assignSplits(splitsAvailableForAssignment); } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java index 0da5f01a5..22659c46b 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java @@ -22,26 +22,27 @@ import org.apache.flink.api.connector.source.SourceEvent; import java.util.Set; +import java.util.stream.Collectors; /** Source event used by source reader to communicate that splits are finished to enumerator. */ @Internal public class SplitsFinishedEvent implements SourceEvent { private static final long serialVersionUID = 1; - private final Set finishedSplitIds; + private final Set finishedSplits; - public SplitsFinishedEvent(Set finishedSplitIds) { - this.finishedSplitIds = finishedSplitIds; + public SplitsFinishedEvent(Set finishedSplits) { + this.finishedSplits = finishedSplits; } - public Set getFinishedSplitIds() { - return finishedSplitIds; + public Set getFinishedSplits() { + return finishedSplits; } @Override public String toString() { return "SplitsFinishedEvent{" - + "finishedSplitIds=[" - + String.join(",", finishedSplitIds) + + "finishedSplit=[" + + finishedSplits.stream().map(Object::toString).collect(Collectors.joining(",")) + "]}"; } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java new file mode 100644 index 000000000..1f2535877 --- /dev/null +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java @@ -0,0 +1,28 @@ +package org.apache.flink.connector.dynamodb.source.enumerator.event; + +import org.apache.flink.annotation.Internal; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.io.Serializable; +import java.util.List; + +/** Context which contains the split id and the finished splits for a finished split event. */ +@Internal +public class SplitsFinishedEventContext implements Serializable { + String splitId; + List childSplits; + + public SplitsFinishedEventContext(String splitId, List childSplits) { + this.splitId = splitId; + this.childSplits = childSplits; + } + + public String getSplitId() { + return splitId; + } + + public List getChildSplits() { + return childSplits; + } +} diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java index e655d4cde..042a216f7 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java @@ -102,6 +102,10 @@ public void addSplits(Collection shardsToAdd) { addSplitsForLatest(shardsToAdd); } + public void addChildSplits(Collection childShardsToAdd) { + addSplitsForTrimHorizon(childShardsToAdd); + } + private void addSplitsForLatest(Collection shardsToAdd) { List openShards = shardsToAdd.stream() diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java index 537b1bf59..74e7390f5 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; import software.amazon.awssdk.services.dynamodb.model.StreamStatus; import software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; @@ -88,6 +89,24 @@ public ListShardsResult listShards(String streamArn, @Nullable String lastSeenSh return listShardsResult; } + @Override + public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter) { + LOG.info("Child shards with filter called, for shardId: {}", shardFilter.shardId()); + ListShardsResult listShardsResult = new ListShardsResult(); + + try { + DescribeStreamResponse describeStreamResponse = + this.describeStream(streamArn, shardFilter); + listShardsResult.addShards(describeStreamResponse.streamDescription().shards()); + listShardsResult.setStreamStatus( + describeStreamResponse.streamDescription().streamStatus()); + } catch (Exception e) { + LOG.error("DescribeStream with Filter API threw an exception", e); + } + LOG.info("Child shards returned for shardId: {}", listShardsResult); + return listShardsResult; + } + @Override public GetRecordsResponse getRecords( String streamArn, String shardId, StartingPosition startingPosition) { @@ -170,6 +189,30 @@ private DescribeStreamResponse describeStream(String streamArn, @Nullable String return describeStreamResponse; } + private DescribeStreamResponse describeStream(String streamArn, ShardFilter shardFilter) { + final DescribeStreamRequest describeStreamRequest = + DescribeStreamRequest.builder() + .streamArn(streamArn) + .shardFilter(shardFilter) + .build(); + + DescribeStreamResponse describeStreamResponse = + dynamoDbStreamsClient.describeStream(describeStreamRequest); + + StreamStatus streamStatus = describeStreamResponse.streamDescription().streamStatus(); + if (streamStatus.equals(StreamStatus.ENABLING)) { + if (LOG.isWarnEnabled()) { + LOG.warn( + String.format( + "The status of stream %s is %s ; result of the current " + + "describeStream operation will not contain any shard information.", + streamArn, streamStatus)); + } + } + + return describeStreamResponse; + } + private String getShardIterator( String streamArn, String shardId, StartingPosition startingPosition) { GetShardIteratorRequest.Builder requestBuilder = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java index fa9bf7c9f..9de83de99 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java @@ -23,6 +23,7 @@ import org.apache.flink.connector.dynamodb.source.util.ListShardsResult; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; import javax.annotation.Nullable; @@ -41,6 +42,8 @@ public interface StreamProxy extends Closeable { */ ListShardsResult listShards(String streamArn, @Nullable String lastSeenShardId); + ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter); + /** * Retrieves records from the stream. * diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java index 90a105dd5..b60e3d03e 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent; +import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext; import org.apache.flink.connector.dynamodb.source.metrics.DynamoDbStreamsShardMetrics; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplitState; @@ -32,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.model.Record; +import software.amazon.awssdk.services.dynamodb.model.Shard; import java.util.ArrayList; import java.util.Collections; @@ -41,6 +43,7 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; +import java.util.stream.Collectors; /** * Coordinates the reading from assigned splits. Runs on the TaskManager. @@ -55,6 +58,7 @@ public class DynamoDbStreamsSourceReader private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsSourceReader.class); private final Map shardMetricGroupMap; private final NavigableMap> splitFinishedEvents; + private final Map> childShardIdMap; private long currentCheckpointId; public DynamoDbStreamsSourceReader( @@ -62,10 +66,12 @@ public DynamoDbStreamsSourceReader( RecordEmitter recordEmitter, Configuration config, SourceReaderContext context, + Map> childShardIdMap, Map shardMetricGroupMap) { super(splitFetcherManager, recordEmitter, config, context); this.shardMetricGroupMap = shardMetricGroupMap; this.splitFinishedEvents = new TreeMap<>(); + this.childShardIdMap = childShardIdMap; this.currentCheckpointId = Long.MIN_VALUE; } @@ -83,19 +89,40 @@ protected void onSplitFinished(Map finis splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new HashSet<>()); finishedSplitIds.values().stream() .map( - finishedSplit -> - new DynamoDbStreamsShardSplit( - finishedSplit.getStreamArn(), - finishedSplit.getShardId(), - finishedSplit.getNextStartingPosition(), - finishedSplit - .getDynamoDbStreamsShardSplit() - .getParentShardId(), - true)) + finishedSplit -> { + List childSplits = new ArrayList<>(); + String finishedSplitId = finishedSplit.getSplitId(); + if (childShardIdMap.containsKey(finishedSplitId)) { + List childSplitIdsOfFinishedSplit = + childShardIdMap.get(finishedSplitId); + childSplits.addAll(childSplitIdsOfFinishedSplit); + } + return new DynamoDbStreamsShardSplit( + finishedSplit.getStreamArn(), + finishedSplit.getShardId(), + finishedSplit.getNextStartingPosition(), + finishedSplit.getDynamoDbStreamsShardSplit().getParentShardId(), + true, + childSplits); + }) .forEach(split -> splitFinishedEvents.get(currentCheckpointId).add(split)); + Set splitsFinishedEventContextMap = + finishedSplitIds.values().stream() + .map( + finishedSplit -> { + List childSplits = new ArrayList<>(); + String finishedSplitId = finishedSplit.getSplitId(); + if (childShardIdMap.containsKey(finishedSplitId)) { + childSplits.addAll(childShardIdMap.remove(finishedSplitId)); + } + return new SplitsFinishedEventContext( + finishedSplitId, childSplits); + }) + .collect(Collectors.toSet()); + LOG.info("Sending splits finished event to coordinator: {}", splitsFinishedEventContextMap); context.sendSourceEventToCoordinator( - new SplitsFinishedEvent(new HashSet<>(finishedSplitIds.keySet()))); + new SplitsFinishedEvent(splitsFinishedEventContextMap)); finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup); } @@ -121,8 +148,10 @@ public void addSplits(List splits) { // buffer. If the next checkpoint doesn't complete, // we would go back to the previous checkpointed // state which will again replay these split finished events. + SplitsFinishedEventContext splitsFinishedEventContext = + new SplitsFinishedEventContext(split.splitId(), split.getChildSplits()); context.sendSourceEventToCoordinator( - new SplitsFinishedEvent(Collections.singleton(split.splitId()))); + new SplitsFinishedEvent(Collections.singleton(splitsFinishedEventContext))); } else { dynamoDbStreamsShardSplits.add(split); } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java index 8a516bed2..3da012d58 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java @@ -27,11 +27,16 @@ import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplitState; import org.apache.flink.connector.dynamodb.source.split.StartingPosition; +import org.apache.flink.connector.dynamodb.source.util.ListShardsResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.Record; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; +import software.amazon.awssdk.services.dynamodb.model.StreamStatus; import javax.annotation.Nullable; @@ -43,10 +48,14 @@ import java.util.Deque; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import static java.util.Collections.singleton; +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.CHILD_SHARD_DISCOVERY_MAX_DELAY; +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.CHILD_SHARD_DISCOVERY_MIN_DELAY; +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS; /** * An implementation of the SplitReader that periodically polls the DynamoDb stream to retrieve @@ -64,6 +73,7 @@ public class PollingDynamoDbStreamsShardSplitReader private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls; private final Deque assignedSplits; + private final Map> childShardMap; private final Map shardMetricGroupMap; private final Set pausedSplitIds; private static final Logger LOG = @@ -73,6 +83,7 @@ public PollingDynamoDbStreamsShardSplitReader( StreamProxy dynamodbStreamsProxy, Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls, Duration getRecordsIdlePollingTimeBetweenEmptyPolls, + Map> childShardMap, Map shardMetricGroupMap) { this.dynamodbStreams = dynamodbStreamsProxy; this.getRecordsIdlePollingTimeBetweenNonEmptyPolls = @@ -80,6 +91,7 @@ public PollingDynamoDbStreamsShardSplitReader( this.getRecordsIdlePollingTimeBetweenEmptyPolls = getRecordsIdlePollingTimeBetweenEmptyPolls; this.shardMetricGroupMap = shardMetricGroupMap; + this.childShardMap = childShardMap; this.assignedSplits = new ArrayDeque<>(); this.pausedSplitIds = new HashSet<>(); } @@ -106,6 +118,49 @@ public RecordsWithSplitIds fetch() throws IOException { } long currentTime = System.currentTimeMillis(); + + if (splitContext.splitState.isHasShardEndReached()) { + if (!splitContext.hasAttemptedChildShardDiscovery) { + splitContext.hasAttemptedChildShardDiscovery = true; + splitContext.childShardDiscoveryAttempts = 0; + } + + if (splitContext.childShardDiscoveryAttempts < MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS) { + long nextChildShardDiscoveryEligibleTime = + getNextEligibleTimeForChildDiscovery(splitContext); + if (currentTime >= nextChildShardDiscoveryEligibleTime) { + ListShardsResult listShardsResult = + dynamodbStreams.listShardsWithFilter( + splitContext.splitState.getStreamArn(), + ShardFilter.builder() + .shardId(splitContext.splitState.getShardId()) + .type(ShardFilterType.CHILD_SHARDS) + .build()); + if (!StreamStatus.ENABLED.equals(listShardsResult.getStreamStatus())) { + return new DynamoDbStreamRecordsWithSplitIds( + Collections.emptyIterator(), + splitContext.splitState.getSplitId(), + true); + } + List childShards = listShardsResult.getShards(); + if (!childShards.isEmpty()) { + this.childShardMap.put(splitContext.splitState.getSplitId(), childShards); + return new DynamoDbStreamRecordsWithSplitIds( + Collections.emptyIterator(), + splitContext.splitState.getSplitId(), + true); + } + splitContext.childShardDiscoveryAttempts++; + splitContext.lastChildShardDiscoveryAttemptTime = currentTime; + } + assignedSplits.add(splitContext); + return INCOMPLETE_SHARD_EMPTY_RECORDS; + } else { + return new DynamoDbStreamRecordsWithSplitIds( + Collections.emptyIterator(), splitContext.splitState.getSplitId(), true); + } + } + long nextEligibleTime = getNextEligibleTime(splitContext); LOG.debug( @@ -132,15 +187,11 @@ public RecordsWithSplitIds fetch() throws IOException { splitContext.lastPollTimeMillis = currentTime; splitContext.wasLastPollEmpty = isEmptyPoll; + splitContext.splitState.setHasShardEndReached(isComplete); + assignedSplits.add(splitContext); if (isEmptyPoll) { - if (isComplete) { - return new DynamoDbStreamRecordsWithSplitIds( - Collections.emptyIterator(), splitContext.splitState.getSplitId(), true); - } else { - assignedSplits.add(splitContext); - return INCOMPLETE_SHARD_EMPTY_RECORDS; - } + return INCOMPLETE_SHARD_EMPTY_RECORDS; } else { DynamoDbStreamsShardMetrics shardMetrics = shardMetricGroupMap.get(splitContext.splitState.getShardId()); @@ -164,13 +215,20 @@ public RecordsWithSplitIds fetch() throws IOException { .dynamodb() .sequenceNumber())); - if (!isComplete) { - assignedSplits.add(splitContext); - } return new DynamoDbStreamRecordsWithSplitIds( getRecordsResponse.records().iterator(), splitContext.splitState.getSplitId(), - isComplete); + false); + } + + private long getNextEligibleTimeForChildDiscovery( + DynamoDbStreamsShardSplitWithContext splitContext) { + long baseDelay = CHILD_SHARD_DISCOVERY_MIN_DELAY.toMillis(); + long maxDelay = CHILD_SHARD_DISCOVERY_MAX_DELAY.toMillis(); + + long exponentialDelay = + Math.min(baseDelay * (1L << splitContext.childShardDiscoveryAttempts), maxDelay); + return splitContext.lastChildShardDiscoveryAttemptTime + exponentialDelay; } private void sleep(long milliseconds) throws IOException { @@ -254,11 +312,15 @@ private static class DynamoDbStreamsShardSplitWithContext { final DynamoDbStreamsShardSplitState splitState; long lastPollTimeMillis; boolean wasLastPollEmpty; + boolean hasAttemptedChildShardDiscovery; + int childShardDiscoveryAttempts; + long lastChildShardDiscoveryAttemptTime; DynamoDbStreamsShardSplitWithContext(DynamoDbStreamsShardSplitState splitState) { this.splitState = splitState; this.lastPollTimeMillis = System.currentTimeMillis(); this.wasLastPollEmpty = false; + hasAttemptedChildShardDiscovery = false; } } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java index b79f1e829..967c309d9 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java @@ -23,7 +23,12 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.dynamodb.source.enumerator.DynamoDbStreamsSourceEnumerator; +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.ArrayList; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -41,6 +46,7 @@ public final class DynamoDbStreamsShardSplit implements SourceSplit { private final StartingPosition startingPosition; private final String parentShardId; private final boolean isFinished; + private final List childSplits; public DynamoDbStreamsShardSplit( String streamArn, @@ -50,12 +56,31 @@ public DynamoDbStreamsShardSplit( this(streamArn, shardId, startingPosition, parentShardId, false); } + public DynamoDbStreamsShardSplit( + String streamArn, + String shardId, + StartingPosition startingPosition, + String parentShardId, + List childSplits) { + this(streamArn, shardId, startingPosition, parentShardId, false, childSplits); + } + public DynamoDbStreamsShardSplit( String streamArn, String shardId, StartingPosition startingPosition, String parentShardId, boolean isFinished) { + this(streamArn, shardId, startingPosition, parentShardId, isFinished, new ArrayList<>()); + } + + public DynamoDbStreamsShardSplit( + String streamArn, + String shardId, + StartingPosition startingPosition, + String parentShardId, + boolean isFinished, + List childSplits) { checkNotNull(streamArn, "streamArn cannot be null"); checkNotNull(shardId, "shardId cannot be null"); checkNotNull(startingPosition, "startingPosition cannot be null"); @@ -65,6 +90,7 @@ public DynamoDbStreamsShardSplit( this.startingPosition = startingPosition; this.parentShardId = parentShardId; this.isFinished = isFinished; + this.childSplits = childSplits; } @Override @@ -92,6 +118,10 @@ public boolean isFinished() { return isFinished; } + public List getChildSplits() { + return childSplits; + } + @Override public String toString() { return "DynamoDbStreamsShardSplit{" @@ -108,7 +138,9 @@ public String toString() { + "]" + ", isFinished=" + isFinished - + "}"; + + ", childSplitIds=[" + + childSplits.stream().map(Shard::toString).collect(Collectors.joining(",")) + + "]}"; } @Override @@ -124,11 +156,13 @@ public boolean equals(Object o) { && Objects.equals(shardId, that.shardId) && Objects.equals(startingPosition, that.startingPosition) && Objects.equals(parentShardId, that.parentShardId) - && Objects.equals(isFinished, that.isFinished); + && Objects.equals(isFinished, that.isFinished) + && Objects.equals(childSplits, that.childSplits); } @Override public int hashCode() { - return Objects.hash(streamArn, shardId, startingPosition, parentShardId, isFinished); + return Objects.hash( + streamArn, shardId, startingPosition, parentShardId, isFinished, childSplits); } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java index b10bfce3a..3e7989046 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java @@ -22,6 +22,8 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.io.VersionMismatchException; +import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange; +import software.amazon.awssdk.services.dynamodb.model.Shard; import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType; import java.io.ByteArrayInputStream; @@ -29,8 +31,10 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; /** @@ -41,8 +45,8 @@ public class DynamoDbStreamsShardSplitSerializer implements SimpleVersionedSerializer { - private static final Set COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1)); - private static final int CURRENT_VERSION = 1; + private static final Set COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1, 2)); + private static final int CURRENT_VERSION = 2; @Override public int getVersion() { @@ -74,6 +78,18 @@ public byte[] serialize(DynamoDbStreamsShardSplit split) throws IOException { out.writeUTF(split.getParentShardId()); } out.writeBoolean(split.isFinished()); + out.writeInt(split.getChildSplits().size()); + for (Shard childSplit : split.getChildSplits()) { + out.writeUTF(childSplit.shardId()); + out.writeUTF(childSplit.parentShardId()); + out.writeUTF(childSplit.sequenceNumberRange().startingSequenceNumber()); + if (childSplit.sequenceNumberRange().endingSequenceNumber() == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(childSplit.sequenceNumberRange().endingSequenceNumber()); + } + } out.flush(); return baos.toByteArray(); @@ -116,12 +132,41 @@ public DynamoDbStreamsShardSplit deserialize(int version, byte[] serialized) isFinished = in.readBoolean(); } + int childSplitSize = 0; + List childSplits = new ArrayList<>(); + if (version > 1) { + childSplitSize = in.readInt(); + if (childSplitSize > 0) { + for (int i = 0; i < childSplitSize; i++) { + String splitId = in.readUTF(); + String parentSplitId = in.readUTF(); + String startingSequenceNumber = in.readUTF(); + String endingSequenceNumber = null; + if (in.readBoolean()) { + endingSequenceNumber = in.readUTF(); + } + childSplits.add( + Shard.builder() + .shardId(splitId) + .parentShardId(parentSplitId) + .sequenceNumberRange( + SequenceNumberRange.builder() + .startingSequenceNumber( + startingSequenceNumber) + .endingSequenceNumber(endingSequenceNumber) + .build()) + .build()); + } + } + } + return new DynamoDbStreamsShardSplit( streamArn, shardId, new StartingPosition(shardIteratorType, startingMarker), parentShardId, - isFinished); + isFinished, + childSplits); } } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java index 47e20a132..08226106d 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java @@ -29,10 +29,12 @@ public class DynamoDbStreamsShardSplitState { private final DynamoDbStreamsShardSplit dynamoDbStreamsShardSplit; private StartingPosition nextStartingPosition; private String nextShardIterator; + private boolean hasShardEndReached; public DynamoDbStreamsShardSplitState(DynamoDbStreamsShardSplit dynamoDbStreamsShardSplit) { this.dynamoDbStreamsShardSplit = dynamoDbStreamsShardSplit; this.nextStartingPosition = dynamoDbStreamsShardSplit.getStartingPosition(); + this.hasShardEndReached = false; } public DynamoDbStreamsShardSplit getDynamoDbStreamsShardSplit() { @@ -70,4 +72,12 @@ public String getNextShardIterator() { public void setNextShardIterator(String nextShardIterator) { this.nextShardIterator = nextShardIterator; } + + public boolean isHasShardEndReached() { + return hasShardEndReached; + } + + public void setHasShardEndReached(boolean hasShardEndReached) { + this.hasShardEndReached = hasShardEndReached; + } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java index 031d6de6d..daaca2237 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants; import org.apache.flink.connector.dynamodb.source.enumerator.assigner.ShardAssignerFactory; import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent; +import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext; import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; import org.apache.flink.connector.dynamodb.source.split.StartingPosition; @@ -304,21 +305,29 @@ void testLatestAssignsChildShardsWithTrimHorizonDuringPeriodicDiscovery() throws }; streamProxy.addShards(childShards); enumerator.handleSourceEvent( - subtaskId, new SplitsFinishedEvent(Collections.singleton(shards[2].shardId()))); - // Given no resharding occurs (list of shards remains the same) - // When first periodic discovery runs - context.runPeriodicCallable(0); - // Then no additional splits are assigned - SplitsAssignment periodicDiscoverySplitAssignment = - context.getSplitsAssignmentSequence().get(2); + subtaskId, + new SplitsFinishedEvent( + Collections.singleton( + new SplitsFinishedEventContext( + shards[2].shardId(), + Collections.singletonList(childShards[0]))))); + DynamoDbStreamsShardSplit childSplit = new DynamoDbStreamsShardSplit( STREAM_ARN, childShards[0].shardId(), StartingPosition.fromStart(), shards[2].shardId()); - assertThat(periodicDiscoverySplitAssignment.assignment().get(subtaskId)) + assertThat(context.getSplitsAssignmentSequence().get(1).assignment().get(subtaskId)) .containsExactly(childSplit); + // Given no resharding occurs (list of shards remains the same) + // When first periodic discovery runs + context.runPeriodicCallable(0); + // Then no additional splits are assigned + SplitsAssignment periodicDiscoverySplitAssignment = + context.getSplitsAssignmentSequence().get(2); + assertThat(periodicDiscoverySplitAssignment.assignment().get(subtaskId)) + .isNullOrEmpty(); } } @@ -765,7 +774,12 @@ void testHandleSplitFinishedEventTest() throws Throwable { context.runNextOneTimeCallable(); enumerator.handleSourceEvent( - 1, new SplitsFinishedEvent(Collections.singleton(completedShard.shardId()))); + 1, + new SplitsFinishedEvent( + Collections.singleton( + new SplitsFinishedEventContext( + completedShard.shardId(), + Collections.singletonList(shards[1]))))); // When restored from state DynamoDbStreamsSourceEnumeratorState snapshotState = enumerator.snapshotState(1); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java index 8c5b20420..95e98db64 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java @@ -38,19 +38,22 @@ import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType; import software.amazon.awssdk.services.dynamodb.model.StreamDescription; import software.amazon.awssdk.services.dynamodb.model.StreamStatus; import software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShardId; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException; /** Tests to validate {@link DynamoDbStreamsProxy}. */ @@ -88,6 +91,86 @@ void testListShards(String lastSeenShardId) { .isEqualTo(expectedListShardsResult); } + @Test + void testListShardsWithFilterForChildShards() { + final String streamArn = + "arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-01-01T00:00:00.826"; + final String parentShardId = "shardId-000000000001"; + + // Create child shards that we expect to be returned + final List childShards = + Arrays.asList( + Shard.builder() + .shardId("shardId-000000000002") + .parentShardId(parentShardId) + .build(), + Shard.builder() + .shardId("shardId-000000000003") + .parentShardId(parentShardId) + .build()); + + // Create some other shards that should not be returned + final List otherShards = + Arrays.asList( + Shard.builder() + .shardId("shardId-000000000004") + .parentShardId("different-parent") + .build(), + Shard.builder().shardId("shardId-000000000005").build()); + + // Set up the expected response + final ListShardsResult expectedResult = new ListShardsResult(); + expectedResult.addShards(childShards); + expectedResult.setStreamStatus(StreamStatus.ENABLED); + + // Create describe stream response with all shards + List allShards = new ArrayList<>(); + allShards.addAll(childShards); + allShards.addAll(otherShards); + + DescribeStreamResponse describeStreamResponse = + DescribeStreamResponse.builder() + .streamDescription( + StreamDescription.builder() + .shards(allShards) + .streamStatus(StreamStatus.ENABLED) + .lastEvaluatedShardId(null) + .build()) + .build(); + + TestingDynamoDbStreamsClient testingDynamoDbStreamsClient = + new TestingDynamoDbStreamsClient(); + + // Verify the correct request is made + testingDynamoDbStreamsClient.setDescribeStreamValidation( + request -> { + assertThat(request.streamArn()).isEqualTo(streamArn); + assertThat(request.shardFilter()).isNotNull(); + assertThat(request.shardFilter().type()) + .isEqualTo(ShardFilterType.CHILD_SHARDS); + assertThat(request.shardFilter().shardId()).isEqualTo(parentShardId); + }); + + testingDynamoDbStreamsClient.setDescribeStreamResponse(describeStreamResponse); + + DynamoDbStreamsProxy dynamoDbStreamsProxy = + new DynamoDbStreamsProxy(testingDynamoDbStreamsClient, HTTP_CLIENT); + + // Create the filter for child shards + ShardFilter childShardFilter = + ShardFilter.builder() + .type(ShardFilterType.CHILD_SHARDS) + .shardId(parentShardId) + .build(); + + // Execute the method and verify results + ListShardsResult result = + dynamoDbStreamsProxy.listShardsWithFilter(streamArn, childShardFilter); + + assertThat(result.getShards()).hasSize(2).containsExactlyInAnyOrderElementsOf(childShards); + assertThat(result.getStreamStatus()).isEqualTo(StreamStatus.ENABLED); + } + @Test void testGetRecordsInitialReadFromTrimHorizon() { final String streamArn = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java index d66c0bbfd..12e8ac4e9 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent; +import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext; import org.apache.flink.connector.dynamodb.source.metrics.DynamoDbStreamsShardMetrics; import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; @@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -73,6 +75,7 @@ public void init() { testStreamProxy, NON_EMPTY_POLLING_DELAY_MILLIS, EMPTY_POLLING_DELAY_MILLIS, + new ConcurrentHashMap<>(), shardMetricGroupMap); testingReaderContext = @@ -84,6 +87,7 @@ public void init() { new DynamoDbStreamsRecordEmitter<>(null), new Configuration(), testingReaderContext, + new ConcurrentHashMap<>(), shardMetricGroupMap); } @@ -122,12 +126,14 @@ void testOnSplitFinishedEventSent() { List events = testingReaderContext.getSentEvents(); - Set expectedSplitIds = Collections.singleton(split.splitId()); + Set expectedFinishedSplits = + Collections.singleton( + new SplitsFinishedEventContext(split.splitId(), new ArrayList<>())); assertThat(events) .singleElement() .isInstanceOf(SplitsFinishedEvent.class) .usingRecursiveComparison() - .isEqualTo(new SplitsFinishedEvent(expectedSplitIds)); + .isEqualTo(new SplitsFinishedEvent(expectedFinishedSplits)); } @Test @@ -225,8 +231,10 @@ void testAddSplitsWithStateRestoration() throws Exception { .allSatisfy( e -> { SplitsFinishedEvent event = (SplitsFinishedEvent) e; - assertThat(event.getFinishedSplitIds()).hasSize(1); - assertThat(event.getFinishedSplitIds()) + assertThat(event.getFinishedSplits()).hasSize(1); + assertThat( + event.getFinishedSplits().stream() + .map(SplitsFinishedEventContext::getSplitId)) .containsAnyOf("finished-split-1", "finished-split-2"); }); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java index f3cc1692a..bcd57cf85 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java @@ -73,6 +73,7 @@ public void init() { testStreamProxy, NON_EMPTY_POLLING_DELAY_MILLIS, EMPTY_POLLING_DELAY_MILLIS, + new ConcurrentHashMap<>(), shardMetricGroupMap); } @@ -235,12 +236,20 @@ void testFinishedSplitsReturned() throws Exception { assertThat(retrievedRecords.finishedSplits()).isEmpty(); fetchedRecords.add(retrievedRecords.nextRecordFromSplit()); } - - assertThat(retrievedRecords.nextSplit()).isNull(); - assertThat(retrievedRecords.finishedSplits()).contains(split.splitId()); assertThat(fetchedRecords) .containsExactlyInAnyOrderElementsOf(expectedRecords); }); + + // Now wait for the split to be marked as finished after child shard discovery attempts + await().pollDelay(NON_EMPTY_POLLING_DELAY_MILLIS) + .atMost(Duration.ofSeconds(30)) // Allow enough time for all retry attempts + .untilAsserted( + () -> { + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + // No more records should be returned + assertThat(readAllRecords(retrievedRecords)).isEmpty(); + assertThat(retrievedRecords.finishedSplits()).contains(split.splitId()); + }); } @Test @@ -400,6 +409,7 @@ void testPollingDelayForEmptyRecords() throws Exception { testStreamProxy, NON_EMPTY_POLLING_DELAY_MILLIS, testEmptyPollDelay, + new ConcurrentHashMap<>(), shardMetricGroupMap); // Immediate second poll - should return empty due to polling delay diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java index ebf117829..410f93bcc 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java @@ -29,6 +29,7 @@ import static org.apache.flink.connector.dynamodb.source.util.TestUtil.SHARD_ID; import static org.apache.flink.connector.dynamodb.source.util.TestUtil.STREAM_ARN; import static org.apache.flink.connector.dynamodb.source.util.TestUtil.getTestSplit; +import static org.apache.flink.connector.dynamodb.source.util.TestUtil.getTestSplitWithChildShards; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; @@ -47,6 +48,19 @@ void testSerializeAndDeserializeEverythingSpecified() throws Exception { assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit); } + @Test + void testSerializeAndDeserializeWithChildSplits() throws Exception { + final DynamoDbStreamsShardSplit initialSplit = getTestSplitWithChildShards(); + + DynamoDbStreamsShardSplitSerializer serializer = new DynamoDbStreamsShardSplitSerializer(); + + byte[] serialized = serializer.serialize(initialSplit); + DynamoDbStreamsShardSplit deserializedSplit = + serializer.deserialize(serializer.getVersion(), serialized); + + assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit); + } + @ParameterizedTest @MethodSource("provideStartingPositions") void testSerializeAndDeserializeWithStartingPosition(StartingPosition startingPosition) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java index c4c3285ad..8e6c0a5cb 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java @@ -26,12 +26,18 @@ import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; +import software.amazon.awssdk.services.dynamodb.model.StreamDescription; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsServiceClientConfiguration; import java.util.ArrayDeque; import java.util.Deque; +import java.util.List; import java.util.function.Consumer; +import java.util.stream.Collectors; /** Provides {@link DynamoDbStreamsClient} with mocked DynamoDbStreams behavior. */ public class DynamoDbStreamsClientProvider { @@ -84,6 +90,29 @@ public DescribeStreamResponse describeStream(DescribeStreamRequest describeStrea throws AwsServiceException, SdkClientException { describeStreamValidation.accept(describeStreamRequest); + ShardFilter shardFilter = describeStreamRequest.shardFilter(); + if (shardFilter != null && ShardFilterType.CHILD_SHARDS.equals(shardFilter.type())) { + List shards = describeStreamResponse.streamDescription().shards(); + List childShards = + shards.stream() + .filter( + shard -> + shard.parentShardId() != null + && shard.parentShardId() + .equals(shardFilter.shardId())) + .collect(Collectors.toList()); + return DescribeStreamResponse.builder() + .streamDescription( + StreamDescription.builder() + .shards(childShards) + .streamArn(describeStreamRequest.streamArn()) + .streamStatus( + describeStreamResponse + .streamDescription() + .streamStatus()) + .build()) + .build(); + } return describeStreamResponse; } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java index c9f45c196..c4a81c771 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java @@ -26,6 +26,8 @@ import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; import javax.annotation.Nullable; @@ -84,6 +86,25 @@ public ListShardsResult listShards(String streamArn, @Nullable String lastSeenSh return listShardsResult; } + @Override + public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter) { + if (ShardFilterType.CHILD_SHARDS.equals(shardFilter.type())) { + ListShardsResult listShardsResult = new ListShardsResult(); + List childShards = + shards.stream() + .filter( + shard -> + shard.parentShardId() != null + && shard.parentShardId() + .equals(shardFilter.shardId())) + .collect(Collectors.toList()); + listShardsResult.addShards(childShards); + return listShardsResult; + } + throw new UnsupportedOperationException( + String.format("ShardFilterType %s not supported", shardFilter.type().name())); + } + @Override public GetRecordsResponse getRecords( String streamArn, String shardId, StartingPosition startingPosition) { diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java index 9c3fcb9f5..4c159593f 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java @@ -35,7 +35,10 @@ import java.time.Duration; import java.time.Instant; +import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -45,6 +48,8 @@ public class TestUtil { public static final String STREAM_ARN = "arn:aws:dynamodb:us-east-1:123456789012:stream/2024-01-01T00:00:00Z"; public static final String SHARD_ID = "shardId-000000000002"; + public static final String CHILD_SHARD_ID_1 = "shardId-000000000003"; + public static final String CHILD_SHARD_ID_2 = "shardId-000000000004"; public static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema(); public static final long MILLIS_BEHIND_LATEST_TEST_VALUE = -1L; @@ -95,6 +100,15 @@ public static DynamoDbStreamsShardSplit getTestSplit() { return getTestSplit(SHARD_ID); } + public static DynamoDbStreamsShardSplit getTestSplitWithChildShards() { + return getTestSplitWithChildShards(SHARD_ID); + } + + public static DynamoDbStreamsShardSplit getTestSplitWithChildShards(String shardId) { + return getTestSplit( + STREAM_ARN, SHARD_ID, Arrays.asList(CHILD_SHARD_ID_1, CHILD_SHARD_ID_2)); + } + public static DynamoDbStreamsShardSplit getTestSplit(String shardId) { return getTestSplit(STREAM_ARN, shardId); } @@ -104,6 +118,27 @@ public static DynamoDbStreamsShardSplit getTestSplit(String streamArn, String sh streamArn, shardId, StartingPosition.fromStart(), null); } + public static DynamoDbStreamsShardSplit getTestSplit( + String streamArn, String shardId, List childShardIds) { + return new DynamoDbStreamsShardSplit( + streamArn, + shardId, + StartingPosition.fromStart(), + null, + childShardIds.stream() + .map( + childShardId -> + Shard.builder() + .parentShardId(shardId) + .shardId(childShardId) + .sequenceNumberRange( + SequenceNumberRange.builder() + .startingSequenceNumber("1234") + .build()) + .build()) + .collect(Collectors.toList())); + } + public static DynamoDbStreamsShardSplit getTestSplit(StartingPosition startingPosition) { return new DynamoDbStreamsShardSplit(STREAM_ARN, SHARD_ID, startingPosition, null); }