Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flink-connector-aws/flink-connector-dynamodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,12 @@ under the License.
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<version>2.32.0</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactId>
<version>2.32.0</version>
</dependency>

<!-- Table ecosystem -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private <AttributeT> TableSchema<AttributeT> createTableSchemaFromPojo(
tableSchemaBuilder,
propertyDescriptor.getName(),
BeanAttributeGetter.create(
typeInfo.getTypeClass(), propertyDescriptor.getReadMethod()),
typeInfo.getTypeClass(), propertyDescriptor.getReadMethod(), null),
fieldInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.retries.AdaptiveRetryStrategy;
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.retries.api.RetryStrategy;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.utils.AttributeMap;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -145,14 +148,18 @@ public SourceReader<T, DynamoDbStreamsShardSplit> createReader(
final Duration getRecordsIdlePollingTimeBetweenEmptyPolls =
sourceConfig.get(DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS);

Map<String, List<Shard>> childShardMap = new ConcurrentHashMap<>();
// We create a new stream proxy for each split reader since they have their own independent
// lifecycle.
Supplier<PollingDynamoDbStreamsShardSplitReader> splitReaderSupplier =
() ->
new PollingDynamoDbStreamsShardSplitReader(
createDynamoDbStreamsProxy(sourceConfig),
createDynamoDbStreamsProxy(
sourceConfig,
SdkDefaultRetryStrategy.defaultRetryStrategy()),
getRecordsIdlePollingTimeBetweenNonEmptyPolls,
getRecordsIdlePollingTimeBetweenEmptyPolls,
childShardMap,
shardMetricGroupMap);
DynamoDbStreamsRecordEmitter<T> recordEmitter =
new DynamoDbStreamsRecordEmitter<>(deserializationSchema);
Expand All @@ -162,6 +169,7 @@ public SourceReader<T, DynamoDbStreamsShardSplit> createReader(
recordEmitter,
sourceConfig,
readerContext,
childShardMap,
shardMetricGroupMap);
}

Expand All @@ -178,11 +186,25 @@ public SourceReader<T, DynamoDbStreamsShardSplit> createReader(
SplitEnumeratorContext<DynamoDbStreamsShardSplit> enumContext,
DynamoDbStreamsSourceEnumeratorState checkpoint)
throws Exception {
int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT);
Duration minDescribeStreamDelay =
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY);
Duration maxDescribeStreamDelay =
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY);
BackoffStrategy backoffStrategy =
BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay);
AdaptiveRetryStrategy adaptiveRetryStrategy =
SdkDefaultRetryStrategy.adaptiveRetryStrategy()
.toBuilder()
.maxAttempts(maxApiCallAttempts)
.backoffStrategy(backoffStrategy)
.throttlingBackoffStrategy(backoffStrategy)
.build();
return new DynamoDbStreamsSourceEnumerator(
enumContext,
streamArn,
sourceConfig,
createDynamoDbStreamsProxy(sourceConfig),
createDynamoDbStreamsProxy(sourceConfig, adaptiveRetryStrategy),
dynamoDbStreamsShardAssigner,
checkpoint);
}
Expand All @@ -199,7 +221,8 @@ public SimpleVersionedSerializer<DynamoDbStreamsShardSplit> getSplitSerializer()
new DynamoDbStreamsShardSplitSerializer());
}

private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerConfig) {
private DynamoDbStreamsProxy createDynamoDbStreamsProxy(
Configuration consumerConfig, RetryStrategy retryStrategy) {
SdkHttpClient httpClient =
AWSGeneralUtil.createSyncHttpClient(
AttributeMap.builder().build(), ApacheHttpClient.builder());
Expand All @@ -215,26 +238,12 @@ private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerCo
consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties);

AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties);
int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT);
Duration minDescribeStreamDelay =
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY);
Duration maxDescribeStreamDelay =
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY);
BackoffStrategy backoffStrategy =
BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay);
AdaptiveRetryStrategy adaptiveRetryStrategy =
SdkDefaultRetryStrategy.adaptiveRetryStrategy()
.toBuilder()
.maxAttempts(maxApiCallAttempts)
.backoffStrategy(backoffStrategy)
.throttlingBackoffStrategy(backoffStrategy)
.build();
DynamoDbStreamsClient dynamoDbStreamsClient =
AWSClientUtil.createAwsSyncClient(
dynamoDbStreamsClientProperties,
httpClient,
DynamoDbStreamsClient.builder(),
ClientOverrideConfiguration.builder().retryStrategy(adaptiveRetryStrategy),
ClientOverrideConfiguration.builder().retryStrategy(retryStrategy),
DynamodbStreamsSourceConfigConstants
.BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT,
DynamodbStreamsSourceConfigConstants.DDB_STREAMS_CLIENT_USER_AGENT_PREFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public enum InitialPosition {
public static final String BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT =
"Apache Flink %s (%s) DynamoDb Streams Connector";

public static final String DYNAMODB_STREAMS_THROTTLING_EXCEPTION_ERROR_CODE =
"ThrottlingException";

public static final ConfigOption<Duration>
DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS =
ConfigOptions.key("flink.dynamodbstreams.getrecords.empty.mindelay")
Expand All @@ -91,6 +94,10 @@ public enum InitialPosition {
.withDescription(
"The default idle time between non-empty polls for DynamoDB Streams GetRecords API");

public static final int MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS = 5;
public static final Duration CHILD_SHARD_DISCOVERY_MIN_DELAY = Duration.ofMillis(100);
public static final Duration CHILD_SHARD_DISCOVERY_MAX_DELAY = Duration.ofMillis(1000);
Comment on lines +98 to +99

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is intentional since we dont want to let customers override these values


/** DynamoDb Streams identifier for user agent prefix. */
public static final String DDB_STREAMS_CLIENT_USER_AGENT_PREFIX =
"aws.dynamodbstreams.client.user-agent-prefix";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent;
import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext;
import org.apache.flink.connector.dynamodb.source.enumerator.tracker.SplitGraphInconsistencyTracker;
import org.apache.flink.connector.dynamodb.source.enumerator.tracker.SplitTracker;
import org.apache.flink.connector.dynamodb.source.exception.DynamoDbStreamsSourceException;
Expand Down Expand Up @@ -138,7 +139,20 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {

/** When we mark a split as finished, we will only assign its child splits to the subtasks. */
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) {
splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
Set<String> finishedSplitIds =
splitsFinishedEvent.getFinishedSplits().stream()
.map(SplitsFinishedEventContext::getSplitId)
.collect(Collectors.toSet());
splitTracker.markAsFinished(finishedSplitIds);
List<Shard> childrenOfFinishedSplits = new ArrayList<>();
splitsFinishedEvent
.getFinishedSplits()
.forEach(
finishedSplitEvent ->
childrenOfFinishedSplits.addAll(
finishedSplitEvent.getChildSplits()));
LOG.info("Adding Children of finishedSplits to splitTracker: {}", childrenOfFinishedSplits);
splitTracker.addChildSplits(childrenOfFinishedSplits);

Set<DynamoDbStreamsShardSplit> splitsAssignment = splitAssignment.get(subtaskId);
// during recovery, splitAssignment may return null since there might be no split assigned
Expand All @@ -152,13 +166,12 @@ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinis
+ "Child shard discovery might be delayed until we have enough readers."
+ "Finished split ids: {}",
subtaskId,
splitsFinishedEvent.getFinishedSplitIds());
finishedSplitIds);
return;
}

splitsAssignment.removeIf(
split -> splitsFinishedEvent.getFinishedSplitIds().contains(split.splitId()));
assignChildSplits(splitsFinishedEvent.getFinishedSplitIds());
splitsAssignment.removeIf(split -> finishedSplitIds.contains(split.splitId()));
assignChildSplits(finishedSplitIds);
}

private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) {
Expand Down Expand Up @@ -230,6 +243,7 @@ private void assignAllAvailableSplits() {
private void assignChildSplits(Set<String> finishedSplitIds) {
List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
splitTracker.getUnassignedChildSplits(finishedSplitIds);
LOG.info("Unassigned child splits: {}", splitsAvailableForAssignment);
assignSplits(splitsAvailableForAssignment);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,27 @@
import org.apache.flink.api.connector.source.SourceEvent;

import java.util.Set;
import java.util.stream.Collectors;

/** Source event used by source reader to communicate that splits are finished to enumerator. */
@Internal
public class SplitsFinishedEvent implements SourceEvent {
private static final long serialVersionUID = 1;
private final Set<String> finishedSplitIds;
private final Set<SplitsFinishedEventContext> finishedSplits;

public SplitsFinishedEvent(Set<String> finishedSplitIds) {
this.finishedSplitIds = finishedSplitIds;
public SplitsFinishedEvent(Set<SplitsFinishedEventContext> finishedSplits) {
this.finishedSplits = finishedSplits;
}

public Set<String> getFinishedSplitIds() {
return finishedSplitIds;
public Set<SplitsFinishedEventContext> getFinishedSplits() {
return finishedSplits;
}

@Override
public String toString() {
return "SplitsFinishedEvent{"
+ "finishedSplitIds=["
+ String.join(",", finishedSplitIds)
+ "finishedSplit=["
+ finishedSplits.stream().map(Object::toString).collect(Collectors.joining(","))
+ "]}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.flink.connector.dynamodb.source.enumerator.event;

import org.apache.flink.annotation.Internal;

import software.amazon.awssdk.services.dynamodb.model.Shard;

import java.io.Serializable;
import java.util.List;

/** Context which contains the split id and the finished splits for a finished split event. */
@Internal
public class SplitsFinishedEventContext implements Serializable {
String splitId;
List<Shard> childSplits;

public SplitsFinishedEventContext(String splitId, List<Shard> childSplits) {
this.splitId = splitId;
this.childSplits = childSplits;
}

public String getSplitId() {
return splitId;
}

public List<Shard> getChildSplits() {
return childSplits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public void addSplits(Collection<Shard> shardsToAdd) {
addSplitsForLatest(shardsToAdd);
}

public void addChildSplits(Collection<Shard> childShardsToAdd) {
addSplitsForTrimHorizon(childShardsToAdd);
}

private void addSplitsForLatest(Collection<Shard> shardsToAdd) {
List<Shard> openShards =
shardsToAdd.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
import software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
Expand Down Expand Up @@ -88,6 +89,24 @@ public ListShardsResult listShards(String streamArn, @Nullable String lastSeenSh
return listShardsResult;
}

@Override
public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter) {
LOG.info("Child shards with filter called, for shardId: {}", shardFilter.shardId());
ListShardsResult listShardsResult = new ListShardsResult();

try {
DescribeStreamResponse describeStreamResponse =
this.describeStream(streamArn, shardFilter);
listShardsResult.addShards(describeStreamResponse.streamDescription().shards());
listShardsResult.setStreamStatus(
describeStreamResponse.streamDescription().streamStatus());
} catch (Exception e) {
LOG.error("DescribeStream with Filter API threw an exception", e);
}
LOG.info("Child shards returned for shardId: {}", listShardsResult);
return listShardsResult;
}

@Override
public GetRecordsResponse getRecords(
String streamArn, String shardId, StartingPosition startingPosition) {
Expand Down Expand Up @@ -170,6 +189,30 @@ private DescribeStreamResponse describeStream(String streamArn, @Nullable String
return describeStreamResponse;
}

private DescribeStreamResponse describeStream(String streamArn, ShardFilter shardFilter) {
final DescribeStreamRequest describeStreamRequest =
DescribeStreamRequest.builder()
.streamArn(streamArn)
.shardFilter(shardFilter)
.build();

DescribeStreamResponse describeStreamResponse =
dynamoDbStreamsClient.describeStream(describeStreamRequest);

StreamStatus streamStatus = describeStreamResponse.streamDescription().streamStatus();
if (streamStatus.equals(StreamStatus.ENABLING)) {
if (LOG.isWarnEnabled()) {
LOG.warn(
String.format(
"The status of stream %s is %s ; result of the current "
+ "describeStream operation will not contain any shard information.",
streamArn, streamStatus));
}
}

return describeStreamResponse;
}

private String getShardIterator(
String streamArn, String shardId, StartingPosition startingPosition) {
GetShardIteratorRequest.Builder requestBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.connector.dynamodb.source.util.ListShardsResult;

import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;

import javax.annotation.Nullable;

Expand All @@ -41,6 +42,8 @@ public interface StreamProxy extends Closeable {
*/
ListShardsResult listShards(String streamArn, @Nullable String lastSeenShardId);

ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter);

/**
* Retrieves records from the stream.
*
Expand Down
Loading