Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ public TableDescription describeTable(String tableName) {
public RetryResult<ScanResponse> scanTable(
String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Integer segment, Integer
totalSegments, Map<String, AttributeValue> exclusiveStartKey, long limit, Reporter reporter) {
return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments, exclusiveStartKey,
limit, reporter, null, null);
}

public RetryResult<ScanResponse> scanTable(
String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Integer segment, Integer
totalSegments, Map<String, AttributeValue> exclusiveStartKey, long limit, Reporter reporter,
String filterExpression, Map<String, AttributeValue> expressionAttributeValues) {
final ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(tableName)
.exclusiveStartKey(exclusiveStartKey)
.limit(Ints.checkedCast(limit))
Expand All @@ -172,6 +180,14 @@ public RetryResult<ScanResponse> scanTable(
}
}

if (filterExpression != null) {
scanRequestBuilder.filterExpression(filterExpression);
}

if (expressionAttributeValues != null) {
scanRequestBuilder.expressionAttributeValues(expressionAttributeValues);
}

final ScanRequest scanRequest = scanRequestBuilder.build();

RetryResult<ScanResponse> retryResult = getRetryDriver().runWithRetry(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import org.apache.hadoop.dynamodb.DynamoDBConstants;
import org.apache.hadoop.dynamodb.filter.DynamoDBFilter;
import org.apache.hadoop.dynamodb.filter.DynamoDBFilterOperator;
import org.apache.hadoop.dynamodb.filter.DynamoDBQueryFilter;
import org.apache.hadoop.dynamodb.util.AbstractTimeSource;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ComparisonOperator;
import software.amazon.awssdk.services.dynamodb.model.Condition;

public class ScanReadManager extends AbstractReadManager {

Expand Down Expand Up @@ -55,43 +51,21 @@ protected void initializeReadRequests() {

// Set a Scan query filter to skip expired records if the configuration
// provides the TTL attribute name
Optional<DynamoDBQueryFilter> maybeScanFilter =
Optional.ofNullable(context.getConf().get(DynamoDBConstants.TTL_ATTRIBUTE_NAME))
.map(attributeName -> {
long now = Instant.now().getEpochSecond();
DynamoDBQueryFilter filter = new DynamoDBQueryFilter();
filter.addScanFilter(new DynamoDBFilter() {
@Override
public String getColumnName() {
return attributeName;
}

@Override
public String getColumnType() {
throw new Error();
}

@Override
public DynamoDBFilterOperator getOperator() {
throw new Error();
}

@Override
public Condition getDynamoDBCondition() {
return Condition
.builder()
.comparisonOperator(ComparisonOperator.GT)
.attributeValueList(AttributeValue.fromN(String.valueOf(now)))
.build();
}
});
return filter;
});
String ttlAttributeName = context.getConf().get(DynamoDBConstants.TTL_ATTRIBUTE_NAME);
String filterExpression = null;
Map<String, AttributeValue> expressionAttributeValues = null;
if (ttlAttributeName != null) {
long now = Instant.now().getEpochSecond();
filterExpression =
"(attribute_not_exists(" + ttlAttributeName + ")) OR (" + ttlAttributeName + " > :now)";
expressionAttributeValues =
Collections.singletonMap(":now", AttributeValue.fromN(String.valueOf(now)));
}

// Queue up segment scan requests
for (Integer segment : shuffleSgments) {
enqueueReadRequestToTail(new ScanRecordReadRequest(this, context, segment, maybeScanFilter,
null /* lastEvaluatedKey */));
enqueueReadRequestToTail(new ScanRecordReadRequest(this, context, segment, filterExpression,
expressionAttributeValues, null /* lastEvaluatedKey */));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,50 @@ public class ScanRecordReadRequest extends AbstractRecordReadRequest {
/** Optional ScanFilter to add to Scan requests */
final Optional<DynamoDBQueryFilter> maybeScanFilter;

final String filterExpression;
final Map<String, AttributeValue> expressionAttributeValues;

@Deprecated
public ScanRecordReadRequest(AbstractReadManager readMgr, DynamoDBRecordReaderContext context,
int segment, Map<String, AttributeValue> lastEvaluatedKey) {
super(readMgr, context, segment, lastEvaluatedKey);
this.maybeScanFilter = Optional.empty();
this.filterExpression = null;
this.expressionAttributeValues = null;
}

public ScanRecordReadRequest(AbstractReadManager readMgr, DynamoDBRecordReaderContext context,
int segment, Optional<DynamoDBQueryFilter> maybeScanFilter,
Map<String, AttributeValue> lastEvaluatedKey) {
super(readMgr, context, segment, lastEvaluatedKey);
this.maybeScanFilter = maybeScanFilter;
this.filterExpression = null;
this.expressionAttributeValues = null;
}

public ScanRecordReadRequest(AbstractReadManager readMgr, DynamoDBRecordReaderContext context,
int segment, String filterExpression, Map<String, AttributeValue> expressionAttributeValues,
Map<String, AttributeValue> lastEvaluatedKey) {
super(readMgr, context, segment, lastEvaluatedKey);
this.maybeScanFilter = Optional.empty();
this.filterExpression = filterExpression;
this.expressionAttributeValues = expressionAttributeValues;
}

@Override
protected AbstractRecordReadRequest buildNextReadRequest(PageResults<Map<String,
AttributeValue>> pageResults) {
return new ScanRecordReadRequest(readMgr, context, segment, maybeScanFilter,
pageResults.lastEvaluatedKey);
return new ScanRecordReadRequest(readMgr, context, segment, filterExpression,
expressionAttributeValues, pageResults.lastEvaluatedKey);
}

@Override
protected PageResults<Map<String, AttributeValue>> fetchPage(RequestLimit lim) {
// Read from DynamoDB
RetryResult<ScanResponse> retryResult = context.getClient()
.scanTable(tableName, maybeScanFilter.orElse(null), segment,
context.getSplit().getTotalSegments(), lastEvaluatedKey, lim.items,
context.getReporter());
.scanTable(tableName, maybeScanFilter.orElse(null), segment,
context.getSplit().getTotalSegments(), lastEvaluatedKey, lim.items,
context.getReporter(), filterExpression, expressionAttributeValues);

ScanResponse response = retryResult.result;
int retries = retryResult.retries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public void queryFilterIsEmptyWhenTtlAttributeNameIsEmpty() {
ScanReadManager readManager = new ScanReadManager(Mockito.mock(RateController.class), new MockTimeSource(), context);
ScanRecordReadRequest readRequest = (ScanRecordReadRequest) readManager.dequeueReadRequest();
assertFalse(readRequest.maybeScanFilter.isPresent());
assertNull(readRequest.filterExpression);
assertNull(readRequest.expressionAttributeValues);
}

@Test
Expand All @@ -41,9 +43,30 @@ public void queryFilterIsDefinedWhenTtlAttributeNameIsDefined() {
when(context.getSplit()).thenReturn(new DynamoDBSegmentsSplit(null, 1, 1, Collections.singletonList(1), 1, 0, null));
ScanReadManager readManager = new ScanReadManager(Mockito.mock(RateController.class), new MockTimeSource(), context);
ScanRecordReadRequest readRequest = (ScanRecordReadRequest) readManager.dequeueReadRequest();
assertTrue(readRequest.maybeScanFilter.isPresent());
Map<String, Condition> scanFilter = readRequest.maybeScanFilter.get().getScanFilter();
assertNotNull(scanFilter.get(ttlAttributeName));
assertFalse(readRequest.maybeScanFilter.isPresent());
assertNotNull(readRequest.filterExpression);
assertTrue(readRequest.filterExpression.contains("attribute_not_exists(" + ttlAttributeName + ")"));
assertTrue(readRequest.filterExpression.contains(ttlAttributeName + " > :now"));
assertNotNull(readRequest.expressionAttributeValues);
assertTrue(readRequest.expressionAttributeValues.containsKey(":now"));
}

@Test
public void filterExpressionHasCorrectTtlLogic() {
final String ttlAttributeName = "expires_at";
JobConf conf = new JobConf();
conf.set(DynamoDBConstants.TTL_ATTRIBUTE_NAME, ttlAttributeName);
when(context.getConf()).thenReturn(conf);
when(context.getSplit())
.thenReturn(new DynamoDBSegmentsSplit(null, 1, 1, Collections.singletonList(1), 1, 0, null));

ScanReadManager readManager = new ScanReadManager(Mockito.mock(RateController.class), new MockTimeSource(),
context);
ScanRecordReadRequest readRequest = (ScanRecordReadRequest) readManager.dequeueReadRequest();

String expectedExpression = "(attribute_not_exists(expires_at)) OR (expires_at > :now)";
assertEquals("FilterExpression should have correct TTL logic",
expectedExpression, readRequest.filterExpression);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public void fetchPageReturnsZeroConsumedCapacityWhenResultsConsumedCapacityIsNul
when(context.getSplit()).thenReturn(new DynamoDBSegmentsSplit());
ScanReadManager readManager = Mockito.mock(ScanReadManager.class);
ScanRecordReadRequest readRequest = new ScanRecordReadRequest(readManager, context, 0, Optional.empty(), null);
PageResults<Map<String, AttributeValue>> pageResults =
readRequest.fetchPage(new RequestLimit(0, 0));
PageResults<Map<String, AttributeValue>> pageResults = readRequest.fetchPage(new RequestLimit(0, 0));
assertEquals(0.0, pageResults.consumedRcu, 0.0);
}

Expand All @@ -62,8 +61,18 @@ private void stubScanTableWith(RetryResult<ScanResponse> scanResultRetryResult)
anyInt(),
any(Map.class),
anyLong(),
any(Reporter.class))
).thenReturn(scanResultRetryResult);
any(Reporter.class))).thenReturn(scanResultRetryResult);

when(client.scanTable(
anyString(),
any(DynamoDBQueryFilter.class),
anyInt(),
anyInt(),
any(Map.class),
anyLong(),
any(Reporter.class),
anyString(),
any(Map.class))).thenReturn(scanResultRetryResult);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ public RetryResult<ScanResponse> scanTable(String tableName, DynamoDBQueryFilter
}
}

@Override
public RetryResult<ScanResponse> scanTable(String tableName, DynamoDBQueryFilter
dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map<String,
AttributeValue> exclusiveStartKey, long limit, Reporter reporter,
String filterExpression, Map<String, AttributeValue> expressionAttributeValues) {
return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments,
exclusiveStartKey, limit, reporter);
}

private List<Map<String, AttributeValue>> getItems() {
List<Map<String, AttributeValue>> items = new ArrayList<>();
for (String key : HASH_KEYS) {
Expand Down Expand Up @@ -146,6 +155,15 @@ public RetryResult<ScanResponse> scanTable(String tableName, DynamoDBQueryFilter
AttributeValue> exclusiveStartKey, long limit, Reporter reporter) {
return new RetryResult<>(getHashNumberRangeKeyItems(HASH_KEYS, "S"), 0);
}

@Override
public RetryResult<ScanResponse> scanTable(String tableName, DynamoDBQueryFilter
dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map<String,
AttributeValue> exclusiveStartKey, long limit, Reporter reporter,
String filterExpression, Map<String, AttributeValue> expressionAttributeValues) {
return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments,
exclusiveStartKey, limit, reporter);
}
});

// Setup mock client
Expand Down Expand Up @@ -189,6 +207,15 @@ public RetryResult<ScanResponse> scanTable(String tableName, DynamoDBQueryFilter
assertNull(exclusiveStartKey);
return new RetryResult<>(getHashNumberRangeKeyItems(HASH_KEYS, "S"), 0);
}

@Override
public RetryResult<ScanResponse> scanTable(String tableName, DynamoDBQueryFilter
dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map<String,
AttributeValue> exclusiveStartKey, long limit, Reporter reporter,
String filterExpression, Map<String, AttributeValue> expressionAttributeValues) {
return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments,
exclusiveStartKey, limit, reporter);
}
});

// Setup mock client
Expand Down Expand Up @@ -232,6 +259,15 @@ public RetryResult<ScanResponse> scanTable(String tableName, DynamoDBQueryFilter
assertEquals(4, (int) totalSegments);
return new RetryResult<>(getHashKeyItems(HASH_KEYS), 0);
}

@Override
public RetryResult<ScanResponse> scanTable(String tableName, DynamoDBQueryFilter
dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map<String,
AttributeValue> exclusiveStartKey, long limit, Reporter reporter,
String filterExpression, Map<String, AttributeValue> expressionAttributeValues) {
return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments,
exclusiveStartKey, limit, reporter);
}
});

// Setup mock client
Expand Down Expand Up @@ -269,6 +305,15 @@ public RetryResult<ScanResponse> scanTable(String tableName, DynamoDBQueryFilter
AttributeValue> exclusiveStartKey, long limit, Reporter reporter) {
throw new RuntimeException("Unrecoverable Exception");
}

@Override
public RetryResult<ScanResponse> scanTable(String tableName, DynamoDBQueryFilter
dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map<String,
AttributeValue> exclusiveStartKey, long limit, Reporter reporter,
String filterExpression, Map<String, AttributeValue> expressionAttributeValues) {
return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments,
exclusiveStartKey, limit, reporter);
}
});

DefaultDynamoDBRecordReader reader = new DefaultDynamoDBRecordReader(context);
Expand Down