From e02a8c6eb3aca3245453ff83df6150367c64cfd0 Mon Sep 17 00:00:00 2001 From: Artem Date: Sun, 31 Aug 2025 14:22:19 +0200 Subject: [PATCH] Extend ttl filter to include rows without ttl --- .../hadoop/dynamodb/DynamoDBClient.java | 16 ++++++ .../dynamodb/preader/ScanReadManager.java | 52 +++++-------------- .../preader/ScanRecordReadRequest.java | 26 ++++++++-- .../dynamodb/preader/ScanReadManagerTest.java | 29 +++++++++-- .../preader/ScanRecordReadRequestTest.java | 17 ++++-- .../read/DynamoDBRecordReaderTest.java | 45 ++++++++++++++++ 6 files changed, 134 insertions(+), 51 deletions(-) diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java index 089c5b13..10bc0ba0 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java @@ -158,6 +158,14 @@ public TableDescription describeTable(String tableName) { public RetryResult scanTable( String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map exclusiveStartKey, long limit, Reporter reporter) { + return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments, exclusiveStartKey, + limit, reporter, null, null); + } + + public RetryResult scanTable( + String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Integer segment, Integer + totalSegments, Map exclusiveStartKey, long limit, Reporter reporter, + String filterExpression, Map expressionAttributeValues) { final ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(tableName) .exclusiveStartKey(exclusiveStartKey) .limit(Ints.checkedCast(limit)) @@ -172,6 +180,14 @@ public RetryResult scanTable( } } + if (filterExpression != null) { + scanRequestBuilder.filterExpression(filterExpression); + } + + if (expressionAttributeValues != null) { + scanRequestBuilder.expressionAttributeValues(expressionAttributeValues); + } + final ScanRequest scanRequest = scanRequestBuilder.build(); RetryResult retryResult = getRetryDriver().runWithRetry(() -> { diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanReadManager.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanReadManager.java index c8dcc28b..913d4376 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanReadManager.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanReadManager.java @@ -17,16 +17,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Random; import org.apache.hadoop.dynamodb.DynamoDBConstants; -import org.apache.hadoop.dynamodb.filter.DynamoDBFilter; -import org.apache.hadoop.dynamodb.filter.DynamoDBFilterOperator; -import org.apache.hadoop.dynamodb.filter.DynamoDBQueryFilter; import org.apache.hadoop.dynamodb.util.AbstractTimeSource; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; -import software.amazon.awssdk.services.dynamodb.model.ComparisonOperator; -import software.amazon.awssdk.services.dynamodb.model.Condition; public class ScanReadManager extends AbstractReadManager { @@ -55,43 +51,21 @@ protected void initializeReadRequests() { // Set a Scan query filter to skip expired records if the configuration // provides the TTL attribute name - Optional maybeScanFilter = - Optional.ofNullable(context.getConf().get(DynamoDBConstants.TTL_ATTRIBUTE_NAME)) - .map(attributeName -> { - long now = Instant.now().getEpochSecond(); - DynamoDBQueryFilter filter = new DynamoDBQueryFilter(); - filter.addScanFilter(new DynamoDBFilter() { - @Override - public String getColumnName() { - return attributeName; - } - - @Override - public String getColumnType() { - throw new Error(); - } - - @Override - public DynamoDBFilterOperator getOperator() { - throw new Error(); - } - - @Override - public Condition getDynamoDBCondition() { - return Condition - .builder() - .comparisonOperator(ComparisonOperator.GT) - .attributeValueList(AttributeValue.fromN(String.valueOf(now))) - .build(); - } - }); - return filter; - }); + String ttlAttributeName = context.getConf().get(DynamoDBConstants.TTL_ATTRIBUTE_NAME); + String filterExpression = null; + Map expressionAttributeValues = null; + if (ttlAttributeName != null) { + long now = Instant.now().getEpochSecond(); + filterExpression = + "(attribute_not_exists(" + ttlAttributeName + ")) OR (" + ttlAttributeName + " > :now)"; + expressionAttributeValues = + Collections.singletonMap(":now", AttributeValue.fromN(String.valueOf(now))); + } // Queue up segment scan requests for (Integer segment : shuffleSgments) { - enqueueReadRequestToTail(new ScanRecordReadRequest(this, context, segment, maybeScanFilter, - null /* lastEvaluatedKey */)); + enqueueReadRequestToTail(new ScanRecordReadRequest(this, context, segment, filterExpression, + expressionAttributeValues, null /* lastEvaluatedKey */)); } } } diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java index 34064e11..da4aa5c3 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java @@ -26,11 +26,16 @@ public class ScanRecordReadRequest extends AbstractRecordReadRequest { /** Optional ScanFilter to add to Scan requests */ final Optional maybeScanFilter; + final String filterExpression; + final Map expressionAttributeValues; + @Deprecated public ScanRecordReadRequest(AbstractReadManager readMgr, DynamoDBRecordReaderContext context, int segment, Map lastEvaluatedKey) { super(readMgr, context, segment, lastEvaluatedKey); this.maybeScanFilter = Optional.empty(); + this.filterExpression = null; + this.expressionAttributeValues = null; } public ScanRecordReadRequest(AbstractReadManager readMgr, DynamoDBRecordReaderContext context, @@ -38,22 +43,33 @@ public ScanRecordReadRequest(AbstractReadManager readMgr, DynamoDBRecordReaderCo Map lastEvaluatedKey) { super(readMgr, context, segment, lastEvaluatedKey); this.maybeScanFilter = maybeScanFilter; + this.filterExpression = null; + this.expressionAttributeValues = null; + } + + public ScanRecordReadRequest(AbstractReadManager readMgr, DynamoDBRecordReaderContext context, + int segment, String filterExpression, Map expressionAttributeValues, + Map lastEvaluatedKey) { + super(readMgr, context, segment, lastEvaluatedKey); + this.maybeScanFilter = Optional.empty(); + this.filterExpression = filterExpression; + this.expressionAttributeValues = expressionAttributeValues; } @Override protected AbstractRecordReadRequest buildNextReadRequest(PageResults> pageResults) { - return new ScanRecordReadRequest(readMgr, context, segment, maybeScanFilter, - pageResults.lastEvaluatedKey); + return new ScanRecordReadRequest(readMgr, context, segment, filterExpression, + expressionAttributeValues, pageResults.lastEvaluatedKey); } @Override protected PageResults> fetchPage(RequestLimit lim) { // Read from DynamoDB RetryResult retryResult = context.getClient() - .scanTable(tableName, maybeScanFilter.orElse(null), segment, - context.getSplit().getTotalSegments(), lastEvaluatedKey, lim.items, - context.getReporter()); + .scanTable(tableName, maybeScanFilter.orElse(null), segment, + context.getSplit().getTotalSegments(), lastEvaluatedKey, lim.items, + context.getReporter(), filterExpression, expressionAttributeValues); ScanResponse response = retryResult.result; int retries = retryResult.retries; diff --git a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/preader/ScanReadManagerTest.java b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/preader/ScanReadManagerTest.java index 416d034e..8806f443 100644 --- a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/preader/ScanReadManagerTest.java +++ b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/preader/ScanReadManagerTest.java @@ -30,6 +30,8 @@ public void queryFilterIsEmptyWhenTtlAttributeNameIsEmpty() { ScanReadManager readManager = new ScanReadManager(Mockito.mock(RateController.class), new MockTimeSource(), context); ScanRecordReadRequest readRequest = (ScanRecordReadRequest) readManager.dequeueReadRequest(); assertFalse(readRequest.maybeScanFilter.isPresent()); + assertNull(readRequest.filterExpression); + assertNull(readRequest.expressionAttributeValues); } @Test @@ -41,9 +43,30 @@ public void queryFilterIsDefinedWhenTtlAttributeNameIsDefined() { when(context.getSplit()).thenReturn(new DynamoDBSegmentsSplit(null, 1, 1, Collections.singletonList(1), 1, 0, null)); ScanReadManager readManager = new ScanReadManager(Mockito.mock(RateController.class), new MockTimeSource(), context); ScanRecordReadRequest readRequest = (ScanRecordReadRequest) readManager.dequeueReadRequest(); - assertTrue(readRequest.maybeScanFilter.isPresent()); - Map scanFilter = readRequest.maybeScanFilter.get().getScanFilter(); - assertNotNull(scanFilter.get(ttlAttributeName)); + assertFalse(readRequest.maybeScanFilter.isPresent()); + assertNotNull(readRequest.filterExpression); + assertTrue(readRequest.filterExpression.contains("attribute_not_exists(" + ttlAttributeName + ")")); + assertTrue(readRequest.filterExpression.contains(ttlAttributeName + " > :now")); + assertNotNull(readRequest.expressionAttributeValues); + assertTrue(readRequest.expressionAttributeValues.containsKey(":now")); + } + + @Test + public void filterExpressionHasCorrectTtlLogic() { + final String ttlAttributeName = "expires_at"; + JobConf conf = new JobConf(); + conf.set(DynamoDBConstants.TTL_ATTRIBUTE_NAME, ttlAttributeName); + when(context.getConf()).thenReturn(conf); + when(context.getSplit()) + .thenReturn(new DynamoDBSegmentsSplit(null, 1, 1, Collections.singletonList(1), 1, 0, null)); + + ScanReadManager readManager = new ScanReadManager(Mockito.mock(RateController.class), new MockTimeSource(), + context); + ScanRecordReadRequest readRequest = (ScanRecordReadRequest) readManager.dequeueReadRequest(); + + String expectedExpression = "(attribute_not_exists(expires_at)) OR (expires_at > :now)"; + assertEquals("FilterExpression should have correct TTL logic", + expectedExpression, readRequest.filterExpression); } } diff --git a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequestTest.java b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequestTest.java index 30a64a8f..3b84e649 100644 --- a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequestTest.java +++ b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequestTest.java @@ -49,8 +49,7 @@ public void fetchPageReturnsZeroConsumedCapacityWhenResultsConsumedCapacityIsNul when(context.getSplit()).thenReturn(new DynamoDBSegmentsSplit()); ScanReadManager readManager = Mockito.mock(ScanReadManager.class); ScanRecordReadRequest readRequest = new ScanRecordReadRequest(readManager, context, 0, Optional.empty(), null); - PageResults> pageResults = - readRequest.fetchPage(new RequestLimit(0, 0)); + PageResults> pageResults = readRequest.fetchPage(new RequestLimit(0, 0)); assertEquals(0.0, pageResults.consumedRcu, 0.0); } @@ -62,8 +61,18 @@ private void stubScanTableWith(RetryResult scanResultRetryResult) anyInt(), any(Map.class), anyLong(), - any(Reporter.class)) - ).thenReturn(scanResultRetryResult); + any(Reporter.class))).thenReturn(scanResultRetryResult); + + when(client.scanTable( + anyString(), + any(DynamoDBQueryFilter.class), + anyInt(), + anyInt(), + any(Map.class), + anyLong(), + any(Reporter.class), + anyString(), + any(Map.class))).thenReturn(scanResultRetryResult); } } diff --git a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/DynamoDBRecordReaderTest.java b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/DynamoDBRecordReaderTest.java index a5c306fd..715280fd 100644 --- a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/DynamoDBRecordReaderTest.java +++ b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/DynamoDBRecordReaderTest.java @@ -98,6 +98,15 @@ public RetryResult scanTable(String tableName, DynamoDBQueryFilter } } + @Override + public RetryResult scanTable(String tableName, DynamoDBQueryFilter + dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map exclusiveStartKey, long limit, Reporter reporter, + String filterExpression, Map expressionAttributeValues) { + return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments, + exclusiveStartKey, limit, reporter); + } + private List> getItems() { List> items = new ArrayList<>(); for (String key : HASH_KEYS) { @@ -146,6 +155,15 @@ public RetryResult scanTable(String tableName, DynamoDBQueryFilter AttributeValue> exclusiveStartKey, long limit, Reporter reporter) { return new RetryResult<>(getHashNumberRangeKeyItems(HASH_KEYS, "S"), 0); } + + @Override + public RetryResult scanTable(String tableName, DynamoDBQueryFilter + dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map exclusiveStartKey, long limit, Reporter reporter, + String filterExpression, Map expressionAttributeValues) { + return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments, + exclusiveStartKey, limit, reporter); + } }); // Setup mock client @@ -189,6 +207,15 @@ public RetryResult scanTable(String tableName, DynamoDBQueryFilter assertNull(exclusiveStartKey); return new RetryResult<>(getHashNumberRangeKeyItems(HASH_KEYS, "S"), 0); } + + @Override + public RetryResult scanTable(String tableName, DynamoDBQueryFilter + dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map exclusiveStartKey, long limit, Reporter reporter, + String filterExpression, Map expressionAttributeValues) { + return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments, + exclusiveStartKey, limit, reporter); + } }); // Setup mock client @@ -232,6 +259,15 @@ public RetryResult scanTable(String tableName, DynamoDBQueryFilter assertEquals(4, (int) totalSegments); return new RetryResult<>(getHashKeyItems(HASH_KEYS), 0); } + + @Override + public RetryResult scanTable(String tableName, DynamoDBQueryFilter + dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map exclusiveStartKey, long limit, Reporter reporter, + String filterExpression, Map expressionAttributeValues) { + return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments, + exclusiveStartKey, limit, reporter); + } }); // Setup mock client @@ -269,6 +305,15 @@ public RetryResult scanTable(String tableName, DynamoDBQueryFilter AttributeValue> exclusiveStartKey, long limit, Reporter reporter) { throw new RuntimeException("Unrecoverable Exception"); } + + @Override + public RetryResult scanTable(String tableName, DynamoDBQueryFilter + dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map exclusiveStartKey, long limit, Reporter reporter, + String filterExpression, Map expressionAttributeValues) { + return scanTable(tableName, dynamoDBQueryFilter, segment, totalSegments, + exclusiveStartKey, limit, reporter); + } }); DefaultDynamoDBRecordReader reader = new DefaultDynamoDBRecordReader(context);