From e8f4c8346920ff499f39993d648818e75cf031e8 Mon Sep 17 00:00:00 2001 From: Christophe Pache Date: Thu, 1 May 2025 11:08:18 +0200 Subject: [PATCH 1/6] Log skipped line count due to TTL --- .../hadoop/dynamodb/DynamoDBClient.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java index 089c5b1..a2d8dd0 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java @@ -178,6 +178,27 @@ public RetryResult scanTable( log.debug("Executing DynamoDB scan: " + scanRequest); return dynamoDB.scan(scanRequest); }, reporter, PrintCounter.DynamoDBReadThrottle); + String maybeTtlAttributeName = config.get(DynamoDBConstants.TTL_ATTRIBUTE_NAME); + // for information only + if (maybeTtlAttributeName != null && !maybeTtlAttributeName.isEmpty()) { + log.debug( + String.format( + "Reading table %s, taking %s into account for row TTL", + tableName, + maybeTtlAttributeName + ) + ); + if (!retryResult.result.scannedCount().equals(retryResult.result.count())) { + log.warn( + String.format("Reading table %s with TTL field %s, %s rows were scanned " + + "but only %s rows will be returned.", + maybeTtlAttributeName, + retryResult.result.scannedCount(), + retryResult.result.count() + ) + ); + } + } return retryResult; } From f1f7581f65813fe1782934378d12db31001b690e Mon Sep 17 00:00:00 2001 From: Christophe Pache Date: Thu, 8 May 2025 16:43:34 +0200 Subject: [PATCH 2/6] Update emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java Co-authored-by: Dmitry Kropachev --- .../main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java index a2d8dd0..6ca745f 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java @@ -188,8 +188,8 @@ public RetryResult scanTable( maybeTtlAttributeName ) ); - if (!retryResult.result.scannedCount().equals(retryResult.result.count())) { - log.warn( + if (!retryResult.result.scannedCount().equals(retryResult.result.count()) && log.isDebugEnabled()) { + log.debug( String.format("Reading table %s with TTL field %s, %s rows were scanned " + "but only %s rows will be returned.", maybeTtlAttributeName, From 8c982efbba946fe0150449107ac04c364db1c133 Mon Sep 17 00:00:00 2001 From: Christophe Pache Date: Fri, 9 May 2025 08:59:10 +0200 Subject: [PATCH 3/6] Log skipped line count due to TTL on PageResultMultiplexer --- .../java/org/apache/hadoop/dynamodb/DynamoDBClient.java | 3 ++- .../hadoop/dynamodb/preader/PageResultMultiplexer.java | 5 ++++- .../org/apache/hadoop/dynamodb/preader/PageResults.java | 8 ++++++-- .../hadoop/dynamodb/preader/QueryRecordReadRequest.java | 4 +++- .../hadoop/dynamodb/preader/ScanRecordReadRequest.java | 4 +++- 5 files changed, 18 insertions(+), 6 deletions(-) diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java index 6ca745f..d7a31cb 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java @@ -188,7 +188,8 @@ public RetryResult scanTable( maybeTtlAttributeName ) ); - if (!retryResult.result.scannedCount().equals(retryResult.result.count()) && log.isDebugEnabled()) { + if (!retryResult.result.scannedCount().equals(retryResult.result.count()) + && log.isDebugEnabled()) { log.debug( String.format("Reading table %s with TTL field %s, %s rows were scanned " + "but only %s rows will be returned.", diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java index 7595d62..3ebed01 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java @@ -34,6 +34,7 @@ public class PageResultMultiplexer { private final int batchSize; private final int capacity; + private int skippedRowsCounter; private final BlockingQueue> pages; private final AtomicInteger pageCount = new AtomicInteger(); private final Object removeItemLock = new Object(); @@ -47,6 +48,7 @@ public PageResultMultiplexer(int batchSize, int capacity) { this.capacity = capacity; this.pages = new LinkedBlockingQueue<>(capacity); this.pageIterator = pages.iterator(); + this.skippedRowsCounter = 0; } public boolean addPageResults(PageResults page) { @@ -64,6 +66,7 @@ public boolean addPageResults(PageResults page) { } pageCount.incrementAndGet(); + skippedRowsCounter += page.skippedRowsCount; log.info("Added a page. Page count: " + pageCount.get()); return true; @@ -72,7 +75,7 @@ public boolean addPageResults(PageResults page) { public V next() throws IOException { if (itemsReturned % 10000 == 0) { log.info("Pagemux stats: items=" + itemsReturned + ", pages=" + pageCount.get() + ", cap=" - + capacity); + + capacity + ", skippedRows=" + this.skippedRowsCounter); } synchronized (removeItemLock) { diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResults.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResults.java index 287f0b2..1f5918b 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResults.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResults.java @@ -24,11 +24,13 @@ public class PageResults { public final V lastEvaluatedKey; public final double consumedRcu; public final int retries; + public final int skippedRowsCount; public final Exception exception; private volatile int pos; - public PageResults(List items, V lastEvaluatedKey, double consumedRcu, int retries) { + public PageResults(List items, V lastEvaluatedKey, double consumedRcu, + int retries, int skippedRowsCount) { if (items == null) { throw new IllegalArgumentException("Items must not be null"); } @@ -37,10 +39,11 @@ public PageResults(List items, V lastEvaluatedKey, double consumedRcu, int re this.consumedRcu = consumedRcu; this.retries = retries; this.exception = null; + this.skippedRowsCount = skippedRowsCount; } public PageResults(List items, V lastEvaluatedKey) { - this(items, lastEvaluatedKey, 0.0, 0); + this(items, lastEvaluatedKey, 0.0, 0, 0); } public PageResults(Exception exception) { @@ -52,6 +55,7 @@ public PageResults(Exception exception) { this.consumedRcu = 0; this.retries = 0; this.exception = exception; + this.skippedRowsCount = 0; } public V next() { diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java index 5bcb4f5..3d3e0c7 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java @@ -47,6 +47,8 @@ protected PageResults> fetchPage(RequestLimit lim) { // Translate the default value to NULL here, to keep this assumption in other classes. response.hasLastEvaluatedKey() ? response.lastEvaluatedKey() : null, response.consumedCapacity().capacityUnits(), - retries); + retries, + 0 // only for scan + ); } } diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java index 34064e1..aa7137c 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java @@ -68,7 +68,9 @@ protected PageResults> fetchPage(RequestLimit lim) { // Translate the default value to NULL here, to keep this assumption in other classes. response.hasLastEvaluatedKey() ? response.lastEvaluatedKey() : null, consumedCapacityUnits, - retries); + retries, + response.scannedCount() - response.count() + ); } } From 236ad37052748b244834cfc9081149f91c7a5f94 Mon Sep 17 00:00:00 2001 From: Christophe Pache Date: Fri, 9 May 2025 15:59:19 +0200 Subject: [PATCH 4/6] Update emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java Co-authored-by: Dmitry Kropachev --- .../apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java index 3d3e0c7..f2b7e6f 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java @@ -48,7 +48,7 @@ protected PageResults> fetchPage(RequestLimit lim) { response.hasLastEvaluatedKey() ? response.lastEvaluatedKey() : null, response.consumedCapacity().capacityUnits(), retries, - 0 // only for scan + 0 // only for scan can have skip raws, due to TTL filtering ); } } From 9a8d89a35b64c41405bdb007c16a90fe460a7823 Mon Sep 17 00:00:00 2001 From: Christophe Pache Date: Fri, 9 May 2025 16:00:55 +0200 Subject: [PATCH 5/6] Remove initial logging --- .../hadoop/dynamodb/DynamoDBClient.java | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java index d7a31cb..089c5b1 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java @@ -178,28 +178,6 @@ public RetryResult scanTable( log.debug("Executing DynamoDB scan: " + scanRequest); return dynamoDB.scan(scanRequest); }, reporter, PrintCounter.DynamoDBReadThrottle); - String maybeTtlAttributeName = config.get(DynamoDBConstants.TTL_ATTRIBUTE_NAME); - // for information only - if (maybeTtlAttributeName != null && !maybeTtlAttributeName.isEmpty()) { - log.debug( - String.format( - "Reading table %s, taking %s into account for row TTL", - tableName, - maybeTtlAttributeName - ) - ); - if (!retryResult.result.scannedCount().equals(retryResult.result.count()) - && log.isDebugEnabled()) { - log.debug( - String.format("Reading table %s with TTL field %s, %s rows were scanned " - + "but only %s rows will be returned.", - maybeTtlAttributeName, - retryResult.result.scannedCount(), - retryResult.result.count() - ) - ); - } - } return retryResult; } From 26fae73f3e70b064d6369b1895a4a0a2f28a1996 Mon Sep 17 00:00:00 2001 From: Christophe Pache Date: Wed, 14 May 2025 15:02:22 +0200 Subject: [PATCH 6/6] Use long for skipped rows due to ttl --- .../hadoop/dynamodb/preader/PageResultMultiplexer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java index 3ebed01..30de968 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java @@ -34,7 +34,8 @@ public class PageResultMultiplexer { private final int batchSize; private final int capacity; - private int skippedRowsCounter; + // number of skipped rows during a scan, because of reached TTL + private volatile long skippedRowsCounter; private final BlockingQueue> pages; private final AtomicInteger pageCount = new AtomicInteger(); private final Object removeItemLock = new Object(); @@ -48,7 +49,7 @@ public PageResultMultiplexer(int batchSize, int capacity) { this.capacity = capacity; this.pages = new LinkedBlockingQueue<>(capacity); this.pageIterator = pages.iterator(); - this.skippedRowsCounter = 0; + this.skippedRowsCounter = 0L; //skipped rows are counted on addPageResults during a scan } public boolean addPageResults(PageResults page) {