Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,26 @@
* key/value entry into this bucket. If the existing value check is requested, but there
* is no existing value for the key, the value is not added.
*
* <p>This method returns a boolean value that indicates there were some changes to the
* bucket, that is the new value is different from the existing value. If the existing
* value check is requested, but failed, this method returns {@code false}, since no
* updates are performed.
*
* @param key the entry key
* @param keyHashCode the key hash code
* @param oldValue the value to check the existing value against, if {@code checkOldValue} is true. If
* {@code checkOldValue} is false, this old value is ignored
* @param value the entry value, this can also be special
* HalfDiskHashMap.INVALID_VALUE to mean delete
* @return {@code true} if the bucket was changed or not
*/
public void putValue(final Bytes key, final int keyHashCode, final long oldValue, final long value) {
public boolean putValue(final Bytes key, final int keyHashCode, final long oldValue, final long value) {
final boolean needCheckOldValue = oldValue != INVALID_VALUE;
final FindResult result = findEntry(keyHashCode, key);
if (value == INVALID_VALUE) {
if (result.found()) {
if (needCheckOldValue && (oldValue != result.entryValue)) {
return;
return false;
}
final long nextEntryOffset = result.entryOffset() + result.entrySize();
final long remainderSize = bucketData.length() - nextEntryOffset;
Expand All @@ -241,25 +247,31 @@
bucketData.position(0); // limit() doesn't work if the new limit is less than the current pos
bucketData.limit(result.entryOffset() + remainderSize);
entryCount--;
// entry removed -> bucket is updated
return true;
} else {
// entry not found, nothing to delete
// entry not found, nothing to delete -> bucket is not updated
return false;

Check warning on line 254 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/Bucket.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/Bucket.java#L254

Added line #L254 was not covered by tests
}
return;
}
if (result.found()) {
// yay! we found it, so update value
if (needCheckOldValue && (oldValue != result.entryValue)) {
return;
return false;

Check warning on line 260 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/Bucket.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/Bucket.java#L260

Added line #L260 was not covered by tests
}
bucketData.position(result.entryValueOffset());
bucketData.writeLong(value);
return value != result.entryValue;
} else {
if (needCheckOldValue) {
return;
// no existing value, but a check is requested
return false;

Check warning on line 268 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/Bucket.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/Bucket.java#L268

Added line #L268 was not covered by tests
}
// add a new entry
writeNewEntry(keyHashCode, value, key);
checkLargestBucket(++entryCount);
// entry added -> bucket updated
return true;
}
}

Expand Down Expand Up @@ -358,12 +370,14 @@
*
* @param expectedIndex Bucket index to set to this bucket
* @param expectedMaskBits Bucket mask bits to validate all bucket entries against
* @return if the bucket was changed by this method
*/
public void sanitize(final int expectedIndex, final int expectedMaskBits) {
public boolean sanitize(final int expectedIndex, final int expectedMaskBits) {
final int expectedMask = (1 << expectedMaskBits) - 1;
bucketData.resetPosition();
long srcIndex = 0;
long dstIndex = 0;
boolean updated = false;
while (bucketData.hasRemaining()) {
final long fieldOffset = bucketData.position();
final int tag = bucketData.readVarInt(false);
Expand All @@ -384,13 +398,16 @@
if ((entryHashCode & expectedMask) == expectedIndex) {
copyBucketDataBytes(srcIndex, dstIndex, entryLenWithTag);
dstIndex += entryLenWithTag;
} else {
updated = true;
}
srcIndex += entryLenWithTag;
bucketData.position(nextEntryOffset);
}
}
bucketData.position(0);
bucketData.limit(dstIndex);
return updated;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* A simple linked-list of "mutations" for a bucket.
* The key.
*/
class BucketMutation {
final class BucketMutation {

private final Bytes keyBytes;
private final int keyHashCode;
Expand Down Expand Up @@ -93,38 +93,23 @@ int size() {
return size;
}

// For testing purposes
Bytes getKeyBytes() {
return keyBytes;
}

int getKeyHashCode() {
return keyHashCode;
}

long getValue() {
return value;
}

// For testing purposes
long getOldValue() {
return oldValue;
}

// For testing purposes
BucketMutation getNext() {
return next;
}

/**
* Visit each mutation in the list, starting from this mutation.
* @param consumer
* The callback. Cannot be null.
*/
void forEachKeyValue(MutationCallback consumer) {
BucketMutation mutation = this;
while (mutation != null) {
consumer.accept(mutation.keyBytes, mutation.keyHashCode, mutation.oldValue, mutation.value);
mutation = mutation.next;
}
}

/**
* A simple callback for {@link BucketMutation#forEachKeyValue(MutationCallback)}.
*/
interface MutationCallback {
void accept(final Bytes keyBytes, final int keyHashCode, final long oldValue, final long value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,12 +588,14 @@ public DataFileReader endWriting() throws IOException {
throw new IllegalStateException("Tried calling endWriting with different thread to startWriting()");
}
final int size = oneTransactionsData.size();
logger.info(
MERKLE_DB.getMarker(),
"Finishing writing to {}, num of changed bins = {}, num of changed keys = {}",
storeName,
size,
oneTransactionsData.stream().mapToLong(BucketMutation::size).sum());
if (logger.isDebugEnabled(MERKLE_DB.getMarker())) {
logger.debug(
MERKLE_DB.getMarker(),
"Finishing writing to {}, num of changed bins = {}, num of changed keys = {}",
storeName,
size,
oneTransactionsData.stream().mapToLong(BucketMutation::size).sum());
}
final DataFileReader dataFileReader;
try {
if (size > 0) {
Expand Down Expand Up @@ -687,6 +689,20 @@ protected boolean onExecute() {
}
}

// This is a helper method used in ReadUpdateBucketTask and StoreBucketTask, when a bucket
// is processed (either no updates to the bucket, or it has been written to disk).
private void bucketProcessed() {
// Let the current submit task know that a bucket is fully processed, and
// the task can be run
if (bucketPermits.getAndIncrement() == 0) {
// If a submit task is currently running in parallel, it must have already created
// a new "current" submit task and permits have been set to 0, otherwise the
// getAndIncrement() above couldn't return 0. It means, notifyBucketProcessed()
// will be called on a different submit task than the one currently running
currentSubmitTask.get().notifyBucketProcessed();
}
}

/**
* A task to read a bucket identified by the given idex from disk and apply a list of
* updates to the keys to it. The task has no dependencies, it's executed right after
Expand All @@ -706,25 +722,36 @@ private class ReadUpdateBucketTask extends AbstractTask {
this.keyUpdates = keyUpdates;
}

private void createAndScheduleStoreTask(final Bucket bucket) {
// Create a subsequent "store bucket" task for the bucket
final StoreBucketTask storeTask = new StoreBucketTask(getPool(), bucket);
// The last created "store bucket" task. storeTask above will be set as an
// output dependency for that task to make sure tasks are running only one at
// a time. See StoreBucketTask for details
final StoreBucketTask prevTask = lastStoreTask.getAndSet(storeTask);
if (prevTask != null) {
// This will trigger prevTask execution as soon as its prev task is complete
prevTask.setNext(storeTask);
private void createAndScheduleStoreTask(final Bucket bucket, final boolean bucketChanged) throws IOException {
if (bucketChanged) {
// Create a subsequent "store bucket" task for the bucket
final StoreBucketTask storeTask = new StoreBucketTask(getPool(), bucket);
// The last created "store bucket" task. storeTask above will be set as an
// output dependency for that task to make sure tasks are running only one at
// a time. See StoreBucketTask for details
final StoreBucketTask prevTask = lastStoreTask.getAndSet(storeTask);
if (prevTask != null) {
// This will trigger prevTask execution as soon as its prev task is complete
prevTask.setNext(storeTask);
} else {
// The first task: no dependency on the prev task, can be executed rightaway
storeTask.send();
}
} else {
// The first task: no dependency on the prev task, can be executed rightaway
storeTask.send();
// Just close the bucket and mark it as processed
bucket.close();
bucketProcessed();
}
if (storeBucketTasksCreated.incrementAndGet() == updatedBucketsCount.get()) {
// The last task: no dependency on the next task, can be executed as soon as
// its prev task is complete, no need to wait until the next task dependency
// is set
lastStoreTask.get().setNext(notifyTaskRef.get());
final StoreBucketTask lastTask = lastStoreTask.get();
if (lastTask != null) {
lastTask.setNext(notifyTaskRef.get());
} else {
notifyTaskRef.get().send();
}
}
}

Expand All @@ -733,6 +760,7 @@ protected boolean onExecute() throws IOException {
BufferedData bucketData = fileCollection.readDataItemUsingIndex(bucketIndexToBucketLocation, bucketIndex);
// The bucket will be closed by StoreBucketTask
final Bucket bucket = bucketPool.getBucket();
boolean bucketChanged = false;
if (bucketData == null) {
// An empty bucket
bucket.setBucketIndex(bucketIndex);
Expand All @@ -753,12 +781,18 @@ may be different from the expected one. In this case, we clear the bucket (as it
bucket.clear();
}
// Clear old bucket entries with wrong hash codes
bucket.sanitize(bucketIndex, bucketMaskBits.get());
if (bucket.sanitize(bucketIndex, bucketMaskBits.get())) {
bucketChanged = true;
}
}
// Apply all updates
keyUpdates.forEachKeyValue(bucket::putValue);
for (BucketMutation m = keyUpdates; m != null; m = m.getNext()) {
if (bucket.putValue(m.getKeyBytes(), m.getKeyHashCode(), m.getOldValue(), m.getValue())) {
bucketChanged = true;
}
}
// Schedule a "store bucket" task for this bucket
createAndScheduleStoreTask(bucket);
createAndScheduleStoreTask(bucket, bucketChanged);
return true;
}

Expand Down Expand Up @@ -817,15 +851,7 @@ protected boolean onExecute() throws IOException {
}
return true;
} finally {
// Let the current submit task know that a bucket is fully processed, and
// the task can be run
if (bucketPermits.getAndIncrement() == 0) {
// If a submit task is currently running in parallel, it must have already created
// a new "current" submit task and permits have been set to 0, otherwise the
// getAndIncrement() above couldn't return 0. It means, notifyBucketProcessed()
// will be called on a different submit task than the one currently running
currentSubmitTask.get().notifyBucketProcessed();
}
bucketProcessed();
next.send();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,36 +109,40 @@
* {@inheritDoc}
*/
@Override
public void putValue(final Bytes keyBytes, final int keyHashCode, final long oldValue, final long value) {
public boolean putValue(final Bytes keyBytes, final int keyHashCode, final long oldValue, final long value) {
final boolean needCheckOldValue = oldValue != INVALID_VALUE;
try {
final int entryIndex = findEntryIndex(keyHashCode, keyBytes);
if (value == INVALID_VALUE) {
if (entryIndex >= 0) { // if found
final BucketEntry entry = entries.get(entryIndex);
if (needCheckOldValue && (oldValue != entry.getValue())) {
return;
return false;

Check warning on line 120 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java#L120

Added line #L120 was not covered by tests
}
entries.remove(entryIndex);
return true;

Check warning on line 123 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java#L123

Added line #L123 was not covered by tests
} else {
// entry not found, nothing to delete
return false;

Check warning on line 126 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java#L126

Added line #L126 was not covered by tests
}
return;
}
if (entryIndex >= 0) {
// yay! we found it, so update value
final BucketEntry entry = entries.get(entryIndex);
if (needCheckOldValue && (oldValue != entry.getValue())) {
return;
return false;

Check warning on line 133 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java#L133

Added line #L133 was not covered by tests
}
final long entryOldValue = entry.getValue();

Check warning on line 135 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java#L135

Added line #L135 was not covered by tests
entry.setValue(value);
return value == entryOldValue;
} else {
if (needCheckOldValue) {
return;
return false;

Check warning on line 140 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/files/hashmap/ParsedBucket.java#L140

Added line #L140 was not covered by tests
}
final BucketEntry newEntry = new BucketEntry(keyHashCode, value, keyBytes);
entries.add(newEntry);
checkLargestBucket(entries.size());
return true;
}
} catch (IOException e) {
logger.error(EXCEPTION.getMarker(), "Failed putting key={} value={} in a bucket", keyBytes, value, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,26 @@ void nullKeyThrows() {
@Test
void createList() {
final Bytes rootKey = ExampleLongKey.longToKey(1);
final var root = new BucketMutation(rootKey, rootKey.hashCode(), 10);
var root = new BucketMutation(rootKey, rootKey.hashCode(), 10);
for (int i = 2; i < 100; i++) {
put(root, ExampleLongKey.longToKey(i), 10 * i);
}

final AtomicLong index = new AtomicLong(1);
root.forEachKeyValue((k, khc, ov, v) -> {
for (; root != null; root = root.getNext()) {
final var k = root.getKeyBytes();
final var v = root.getValue();
final long i = index.getAndIncrement();
assertEquals(ExampleLongKey.longToKey(i), k, "Unexpected key " + k + " for iteration " + i);
assertEquals(i * 10, v, "Unexpected value " + v + " for iteration " + i);
});
}
}

@Test
void updateList() {
// Test adding the keys out of order, and also updating only the first, middle and last.
final Bytes rootKey = ExampleLongKey.longToKey(1);
final var root = new BucketMutation(rootKey, rootKey.hashCode(), 10);
var root = new BucketMutation(rootKey, rootKey.hashCode(), 10);
put(root, ExampleLongKey.longToKey(3), 30);
put(root, ExampleLongKey.longToKey(2), 20);
put(root, ExampleLongKey.longToKey(5), 50);
Expand All @@ -69,10 +71,12 @@ void updateList() {

final var expectedValues = new LinkedList<>(List.of(100, 30, 200, 50, 400));

root.forEachKeyValue((k, khc, ov, v) -> {
for (; root != null; root = root.getNext()) {
final var k = root.getKeyBytes();
final var v = root.getValue();
assertEquals(expectedKeys.removeFirst(), k, "Unexpected key");
assertEquals((long) expectedValues.removeFirst(), v, "Unexpected value");
});
}

assertTrue(expectedKeys.isEmpty(), "Shouldn't have any expected keys left");
assertTrue(expectedValues.isEmpty(), "Shouldn't have any expected values left");
Expand Down
Loading
Loading