Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class PageResultMultiplexer<V> {

private final int batchSize;
private final int capacity;
// number of skipped rows during a scan, because of reached TTL
private volatile long skippedRowsCounter;
private final BlockingQueue<PageResults<V>> pages;
private final AtomicInteger pageCount = new AtomicInteger();
private final Object removeItemLock = new Object();
Expand All @@ -47,6 +49,7 @@ public PageResultMultiplexer(int batchSize, int capacity) {
this.capacity = capacity;
this.pages = new LinkedBlockingQueue<>(capacity);
this.pageIterator = pages.iterator();
this.skippedRowsCounter = 0L; //skipped rows are counted on addPageResults during a scan
}

public boolean addPageResults(PageResults<V> page) {
Expand All @@ -64,6 +67,7 @@ public boolean addPageResults(PageResults<V> page) {
}

pageCount.incrementAndGet();
skippedRowsCounter += page.skippedRowsCount;
log.info("Added a page. Page count: " + pageCount.get());

return true;
Expand All @@ -72,7 +76,7 @@ public boolean addPageResults(PageResults<V> page) {
public V next() throws IOException {
if (itemsReturned % 10000 == 0) {
log.info("Pagemux stats: items=" + itemsReturned + ", pages=" + pageCount.get() + ", cap="
+ capacity);
+ capacity + ", skippedRows=" + this.skippedRowsCounter);
}

synchronized (removeItemLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ public class PageResults<V> {
public final V lastEvaluatedKey;
public final double consumedRcu;
public final int retries;
public final int skippedRowsCount;
public final Exception exception;

private volatile int pos;

public PageResults(List<V> items, V lastEvaluatedKey, double consumedRcu, int retries) {
public PageResults(List<V> items, V lastEvaluatedKey, double consumedRcu,
int retries, int skippedRowsCount) {
if (items == null) {
throw new IllegalArgumentException("Items must not be null");
}
Expand All @@ -37,10 +39,11 @@ public PageResults(List<V> items, V lastEvaluatedKey, double consumedRcu, int re
this.consumedRcu = consumedRcu;
this.retries = retries;
this.exception = null;
this.skippedRowsCount = skippedRowsCount;
}

public PageResults(List<V> items, V lastEvaluatedKey) {
this(items, lastEvaluatedKey, 0.0, 0);
this(items, lastEvaluatedKey, 0.0, 0, 0);
}

public PageResults(Exception exception) {
Expand All @@ -52,6 +55,7 @@ public PageResults(Exception exception) {
this.consumedRcu = 0;
this.retries = 0;
this.exception = exception;
this.skippedRowsCount = 0;
}

public V next() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ protected PageResults<Map<String, AttributeValue>> fetchPage(RequestLimit lim) {
// Translate the default value to NULL here, to keep this assumption in other classes.
response.hasLastEvaluatedKey() ? response.lastEvaluatedKey() : null,
response.consumedCapacity().capacityUnits(),
retries);
retries,
0 // only for scan can have skip raws, due to TTL filtering
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ protected PageResults<Map<String, AttributeValue>> fetchPage(RequestLimit lim) {
// Translate the default value to NULL here, to keep this assumption in other classes.
response.hasLastEvaluatedKey() ? response.lastEvaluatedKey() : null,
consumedCapacityUnits,
retries);
retries,
response.scannedCount() - response.count()
);
}

}