diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java index 7595d62..30de968 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java @@ -34,6 +34,8 @@ public class PageResultMultiplexer { private final int batchSize; private final int capacity; + // number of skipped rows during a scan, because of reached TTL + private volatile long skippedRowsCounter; private final BlockingQueue> pages; private final AtomicInteger pageCount = new AtomicInteger(); private final Object removeItemLock = new Object(); @@ -47,6 +49,7 @@ public PageResultMultiplexer(int batchSize, int capacity) { this.capacity = capacity; this.pages = new LinkedBlockingQueue<>(capacity); this.pageIterator = pages.iterator(); + this.skippedRowsCounter = 0L; //skipped rows are counted on addPageResults during a scan } public boolean addPageResults(PageResults page) { @@ -64,6 +67,7 @@ public boolean addPageResults(PageResults page) { } pageCount.incrementAndGet(); + skippedRowsCounter += page.skippedRowsCount; log.info("Added a page. Page count: " + pageCount.get()); return true; @@ -72,7 +76,7 @@ public boolean addPageResults(PageResults page) { public V next() throws IOException { if (itemsReturned % 10000 == 0) { log.info("Pagemux stats: items=" + itemsReturned + ", pages=" + pageCount.get() + ", cap=" - + capacity); + + capacity + ", skippedRows=" + this.skippedRowsCounter); } synchronized (removeItemLock) { diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResults.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResults.java index 287f0b2..1f5918b 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResults.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResults.java @@ -24,11 +24,13 @@ public class PageResults { public final V lastEvaluatedKey; public final double consumedRcu; public final int retries; + public final int skippedRowsCount; public final Exception exception; private volatile int pos; - public PageResults(List items, V lastEvaluatedKey, double consumedRcu, int retries) { + public PageResults(List items, V lastEvaluatedKey, double consumedRcu, + int retries, int skippedRowsCount) { if (items == null) { throw new IllegalArgumentException("Items must not be null"); } @@ -37,10 +39,11 @@ public PageResults(List items, V lastEvaluatedKey, double consumedRcu, int re this.consumedRcu = consumedRcu; this.retries = retries; this.exception = null; + this.skippedRowsCount = skippedRowsCount; } public PageResults(List items, V lastEvaluatedKey) { - this(items, lastEvaluatedKey, 0.0, 0); + this(items, lastEvaluatedKey, 0.0, 0, 0); } public PageResults(Exception exception) { @@ -52,6 +55,7 @@ public PageResults(Exception exception) { this.consumedRcu = 0; this.retries = 0; this.exception = exception; + this.skippedRowsCount = 0; } public V next() { diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java index 5bcb4f5..f2b7e6f 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/QueryRecordReadRequest.java @@ -47,6 +47,8 @@ protected PageResults> fetchPage(RequestLimit lim) { // Translate the default value to NULL here, to keep this assumption in other classes. response.hasLastEvaluatedKey() ? response.lastEvaluatedKey() : null, response.consumedCapacity().capacityUnits(), - retries); + retries, + 0 // only for scan can have skip raws, due to TTL filtering + ); } } diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java index 34064e1..aa7137c 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/ScanRecordReadRequest.java @@ -68,7 +68,9 @@ protected PageResults> fetchPage(RequestLimit lim) { // Translate the default value to NULL here, to keep this assumption in other classes. response.hasLastEvaluatedKey() ? response.lastEvaluatedKey() : null, consumedCapacityUnits, - retries); + retries, + response.scannedCount() - response.count() + ); } }