Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class PageResultMultiplexer<V> {

private final int batchSize;
private final int capacity;
private int skippedRowsCounter;
private final BlockingQueue<PageResults<V>> pages;
private final AtomicInteger pageCount = new AtomicInteger();
private final Object removeItemLock = new Object();
Expand All @@ -47,6 +48,7 @@ public PageResultMultiplexer(int batchSize, int capacity) {
this.capacity = capacity;
this.pages = new LinkedBlockingQueue<>(capacity);
this.pageIterator = pages.iterator();
this.skippedRowsCounter = 0;
}

public boolean addPageResults(PageResults<V> page) {
Expand All @@ -64,6 +66,7 @@ public boolean addPageResults(PageResults<V> page) {
}

pageCount.incrementAndGet();
skippedRowsCounter += page.skippedRowsCount;
log.info("Added a page. Page count: " + pageCount.get());

return true;
Expand All @@ -72,7 +75,7 @@ public boolean addPageResults(PageResults<V> page) {
public V next() throws IOException {
if (itemsReturned % 10000 == 0) {
log.info("Pagemux stats: items=" + itemsReturned + ", pages=" + pageCount.get() + ", cap="
+ capacity);
+ capacity + ", skippedRows=" + this.skippedRowsCounter);
}

synchronized (removeItemLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ public class PageResults<V> {
public final V lastEvaluatedKey;
public final double consumedRcu;
public final int retries;
public final int skippedRowsCount;
public final Exception exception;

private volatile int pos;

public PageResults(List<V> items, V lastEvaluatedKey, double consumedRcu, int retries) {
public PageResults(List<V> items, V lastEvaluatedKey, double consumedRcu,
int retries, int skippedRowsCount) {
if (items == null) {
throw new IllegalArgumentException("Items must not be null");
}
Expand All @@ -37,10 +39,11 @@ public PageResults(List<V> items, V lastEvaluatedKey, double consumedRcu, int re
this.consumedRcu = consumedRcu;
this.retries = retries;
this.exception = null;
this.skippedRowsCount = skippedRowsCount;
}

public PageResults(List<V> items, V lastEvaluatedKey) {
this(items, lastEvaluatedKey, 0.0, 0);
this(items, lastEvaluatedKey, 0.0, 0, 0);
}

public PageResults(Exception exception) {
Expand All @@ -52,6 +55,7 @@ public PageResults(Exception exception) {
this.consumedRcu = 0;
this.retries = 0;
this.exception = exception;
this.skippedRowsCount = 0;
}

public V next() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ protected PageResults<Map<String, AttributeValue>> fetchPage(RequestLimit lim) {
// Translate the default value to NULL here, to keep this assumption in other classes.
response.hasLastEvaluatedKey() ? response.lastEvaluatedKey() : null,
response.consumedCapacity().capacityUnits(),
retries);
retries,
0 // only for scan can have skip raws, due to TTL filtering
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ protected PageResults<Map<String, AttributeValue>> fetchPage(RequestLimit lim) {
// Translate the default value to NULL here, to keep this assumption in other classes.
response.hasLastEvaluatedKey() ? response.lastEvaluatedKey() : null,
consumedCapacityUnits,
retries);
retries,
response.scannedCount() - response.count()
);
}

}