Skip to content

Commit d443164

Browse files
committed
WIP #2990: make LucenePartitioner thread safe
This changes the concurrentUpdate to run with a single partition, and adds usages of AsyncLock to ensure that updates to the partition metadata work correctly. Before calling the issue complete an additional test should be added that does concurrent inserts, and one for concurrent deletes.
1 parent e1c387d commit d443164

File tree

5 files changed

+155
-86
lines changed

5 files changed

+155
-86
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* RecordCoreInternalException.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2024 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.record;
22+
23+
import com.apple.foundationdb.annotation.API;
24+
25+
import javax.annotation.Nonnull;
26+
import javax.annotation.Nullable;
27+
28+
/**
29+
* Exception thrown when an inconsistency in core record layer behavior is detected.
30+
*/
31+
@API(API.Status.STABLE)
32+
@SuppressWarnings("serial")
33+
public class RecordCoreInternalException extends RecordCoreException {
34+
35+
public RecordCoreInternalException(@Nonnull final String msg, @Nullable final Object... keyValues) {
36+
super(msg, keyValues);
37+
}
38+
}

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,14 @@ private void insertField(LuceneDocumentFromRecord.DocumentField field, final Doc
246246
}
247247
}
248248

249+
<M extends Message> void writeDocument(final FDBIndexableRecord<M> newRecord, final Map.Entry<Tuple, List<LuceneDocumentFromRecord.DocumentField>> entry, final Integer partitionId) {
250+
try {
251+
writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey());
252+
} catch (IOException e) {
253+
throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey());
254+
}
255+
}
256+
249257
@SuppressWarnings("PMD.CloseResource")
250258
private void writeDocument(@Nonnull List<LuceneDocumentFromRecord.DocumentField> fields,
251259
Tuple groupingKey,
@@ -484,14 +492,11 @@ <M extends Message> CompletableFuture<Void> update(@Nullable FDBIndexableRecord<
484492
AsyncUtil.whenAll(newRecordFields.entrySet().stream().map(entry -> {
485493
try {
486494
return tryDeleteInWriteOnlyMode(Objects.requireNonNull(newRecord), entry.getKey()).thenCompose(countDeleted ->
487-
partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint).thenApply(partitionId -> {
488-
try {
489-
writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey());
490-
} catch (IOException e) {
491-
throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey());
492-
}
493-
return null;
494-
}));
495+
partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint)
496+
.thenApply(partitionId -> {
497+
writeDocument(newRecord, entry, partitionId);
498+
return null;
499+
}));
495500
} catch (IOException e) {
496501
throw LuceneExceptions.toRecordCoreException("Issue updating", e, "record", Objects.requireNonNull(newRecord).getPrimaryKey());
497502
}
@@ -538,19 +543,22 @@ private <M extends Message> CompletableFuture<Integer> tryDelete(@Nonnull FDBInd
538543
}
539544

540545
// partitioned
541-
return partitioner.tryGetPartitionInfo(record, groupingKey).thenApply(partitionInfo -> {
546+
return partitioner.tryGetPartitionInfo(record, groupingKey).thenCompose(partitionInfo -> {
547+
// this might be 0 when in writeOnly mode, but otherwise should not happen.
542548
if (partitionInfo != null) {
543549
try {
544550
int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey());
545551
if (countDeleted > 0) {
546-
partitioner.decrementCountAndSave(groupingKey, partitionInfo, countDeleted);
552+
return partitioner.decrementCountAndSave(groupingKey, countDeleted, partitionInfo.getId())
553+
.thenApply(vignore -> countDeleted);
554+
} else {
555+
return CompletableFuture.completedFuture(countDeleted);
547556
}
548-
return countDeleted;
549557
} catch (IOException e) {
550558
throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", record.getPrimaryKey());
551559
}
552560
}
553-
return 0;
561+
return CompletableFuture.completedFuture(0);
554562
});
555563
}
556564

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java

Lines changed: 90 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@
3333
import com.apple.foundationdb.record.PipelineOperation;
3434
import com.apple.foundationdb.record.RecordCoreArgumentException;
3535
import com.apple.foundationdb.record.RecordCoreException;
36+
import com.apple.foundationdb.record.RecordCoreInternalException;
3637
import com.apple.foundationdb.record.RecordCursor;
3738
import com.apple.foundationdb.record.RecordCursorContinuation;
3839
import com.apple.foundationdb.record.RecordCursorEndContinuation;
3940
import com.apple.foundationdb.record.RecordCursorStartContinuation;
4041
import com.apple.foundationdb.record.ScanProperties;
4142
import com.apple.foundationdb.record.TupleRange;
4243
import com.apple.foundationdb.record.cursors.ChainedCursor;
44+
import com.apple.foundationdb.record.locking.LockIdentifier;
4345
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
4446
import com.apple.foundationdb.record.logging.LogMessageKeys;
4547
import com.apple.foundationdb.record.lucene.directory.FDBDirectoryManager;
@@ -81,7 +83,6 @@
8183
import java.util.Arrays;
8284
import java.util.Collection;
8385
import java.util.Comparator;
84-
import java.util.Iterator;
8586
import java.util.List;
8687
import java.util.Objects;
8788
import java.util.concurrent.CompletableFuture;
@@ -489,28 +490,30 @@ public <M extends Message> CompletableFuture<Integer> addToAndSavePartitionMetad
489490
private CompletableFuture<Integer> addToAndSavePartitionMetadata(@Nonnull final Tuple groupingKey,
490491
@Nonnull final Tuple partitioningKey,
491492
@Nullable final Integer assignedPartitionIdOverride) {
492-
493-
final CompletableFuture<LucenePartitionInfoProto.LucenePartitionInfo> assignmentFuture;
494-
if (assignedPartitionIdOverride != null) {
495-
assignmentFuture = getPartitionMetaInfoById(assignedPartitionIdOverride, groupingKey);
496-
} else {
497-
assignmentFuture = getOrCreatePartitionInfo(groupingKey, partitioningKey);
498-
}
499-
return assignmentFuture.thenApply(assignedPartition -> {
500-
// assignedPartition is not null, since a new one is created by the previous call if none exist
501-
LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(assignedPartition).toBuilder();
502-
builder.setCount(assignedPartition.getCount() + 1);
503-
if (isOlderThan(partitioningKey, assignedPartition)) {
504-
// clear the previous key
505-
state.context.ensureActive().clear(partitionMetadataKeyFromPartitioningValue(groupingKey, getPartitionKey(assignedPartition)));
506-
builder.setFrom(ByteString.copyFrom(partitioningKey.pack()));
507-
}
508-
if (isNewerThan(partitioningKey, assignedPartition)) {
509-
builder.setTo(ByteString.copyFrom(partitioningKey.pack()));
510-
}
511-
savePartitionMetadata(groupingKey, builder);
512-
return assignedPartition.getId();
513-
});
493+
return state.context.doWithWriteLock(new LockIdentifier(partitionMetadataSubspace(groupingKey)),
494+
() -> {
495+
final CompletableFuture<LucenePartitionInfoProto.LucenePartitionInfo> assignmentFuture;
496+
if (assignedPartitionIdOverride != null) {
497+
assignmentFuture = getPartitionMetaInfoById(assignedPartitionIdOverride, groupingKey);
498+
} else {
499+
assignmentFuture = getOrCreatePartitionInfo(groupingKey, partitioningKey);
500+
}
501+
return assignmentFuture.thenApply(assignedPartition -> {
502+
// assignedPartition is not null, since a new one is created by the previous call if none exist
503+
LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(assignedPartition).toBuilder();
504+
builder.setCount(assignedPartition.getCount() + 1);
505+
if (isOlderThan(partitioningKey, assignedPartition)) {
506+
// clear the previous key
507+
state.context.ensureActive().clear(partitionMetadataKeyFromPartitioningValue(groupingKey, getPartitionKey(assignedPartition)));
508+
builder.setFrom(ByteString.copyFrom(partitioningKey.pack()));
509+
}
510+
if (isNewerThan(partitioningKey, assignedPartition)) {
511+
builder.setTo(ByteString.copyFrom(partitioningKey.pack()));
512+
}
513+
savePartitionMetadata(groupingKey, builder);
514+
return assignedPartition.getId();
515+
});
516+
});
514517
}
515518

516519
/**
@@ -525,6 +528,10 @@ byte[] partitionMetadataKeyFromPartitioningValue(@Nonnull Tuple groupKey, @Nonnu
525528
return state.indexSubspace.pack(partitionMetadataKeyTuple(groupKey, partitionKey));
526529
}
527530

531+
Subspace partitionMetadataSubspace(@Nonnull Tuple groupKey) {
532+
return state.indexSubspace.subspace(groupKey.add(PARTITION_META_SUBSPACE));
533+
}
534+
528535
private static Tuple partitionMetadataKeyTuple(final @Nonnull Tuple groupKey, @Nonnull Tuple partitionKey) {
529536
return groupKey.add(PARTITION_META_SUBSPACE).addAll(partitionKey);
530537
}
@@ -600,22 +607,30 @@ <M extends Message> CompletableFuture<LucenePartitionInfoProto.LucenePartitionIn
600607
* decrement the doc count of a partition, and save its partition metadata.
601608
*
602609
* @param groupingKey grouping key
603-
* @param partitionInfo partition metadata
604610
* @param amount amount to subtract from the doc count
611+
* @param partitionId the id of the partition to decrement
605612
*/
606-
void decrementCountAndSave(@Nonnull Tuple groupingKey,
607-
@Nonnull LucenePartitionInfoProto.LucenePartitionInfo partitionInfo,
608-
int amount) {
609-
LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(partitionInfo).toBuilder();
610-
// note that the to/from of the partition do not get updated, since that would require us to know what the next potential boundary
611-
// value(s) are. The values, nonetheless, remain valid.
612-
builder.setCount(partitionInfo.getCount() - amount);
613-
614-
if (builder.getCount() < 0) {
615-
// should never happen
616-
throw new RecordCoreException("Issue updating Lucene partition metadata (resulting count < 0)", LogMessageKeys.PARTITION_ID, partitionInfo.getId());
617-
}
618-
savePartitionMetadata(groupingKey, builder);
613+
CompletableFuture<Void> decrementCountAndSave(@Nonnull Tuple groupingKey,
614+
int amount, final int partitionId) {
615+
return state.context.doWithWriteLock(new LockIdentifier(partitionMetadataSubspace(groupingKey)),
616+
() -> getPartitionMetaInfoById(partitionId, groupingKey).thenAccept(serialized -> {
617+
if (serialized == null) {
618+
throw new RecordCoreInternalException("Lucene partition metadata changed during delete")
619+
.addLogInfo(LogMessageKeys.INDEX_NAME, state.index.getName())
620+
.addLogInfo(LogMessageKeys.INDEX_SUBSPACE, state.indexSubspace);
621+
}
622+
LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(serialized).toBuilder();
623+
// note that the to/from of the partition do not get updated, since that would require us to know
624+
// what the next potential boundary value(s) are. The values, nonetheless, remain valid.
625+
builder.setCount(serialized.getCount() - amount);
626+
627+
if (builder.getCount() < 0) {
628+
// should never happen
629+
throw new RecordCoreInternalException("Issue updating Lucene partition metadata (resulting count < 0)",
630+
LogMessageKeys.PARTITION_ID, partitionId);
631+
}
632+
savePartitionMetadata(groupingKey, builder);
633+
}));
619634
}
620635

621636
/**
@@ -1024,7 +1039,7 @@ private CompletableFuture<Integer> moveDocsFromPartition(@Nonnull final LuceneRe
10241039

10251040
timings.initializationNanos = System.nanoTime();
10261041
fetchedRecordsFuture = fetchedRecordsFuture.whenComplete((ignored, throwable) -> cursor.close());
1027-
return fetchedRecordsFuture.thenCompose(records -> {
1042+
return fetchedRecordsFuture.thenApply(records -> {
10281043
timings.searchNanos = System.nanoTime();
10291044
if (records.size() == 0) {
10301045
throw new RecordCoreException("Unexpected error: 0 records fetched. repartitionContext {}", repartitioningContext);
@@ -1041,7 +1056,7 @@ private CompletableFuture<Integer> moveDocsFromPartition(@Nonnull final LuceneRe
10411056
if (LOGGER.isDebugEnabled()) {
10421057
LOGGER.debug("no records to move, partition {}", partitionInfo);
10431058
}
1044-
return CompletableFuture.completedFuture(0);
1059+
return 0;
10451060
}
10461061

10471062
// reset partition info
@@ -1092,40 +1107,43 @@ private CompletableFuture<Integer> moveDocsFromPartition(@Nonnull final LuceneRe
10921107
}
10931108
long updateStart = System.nanoTime();
10941109

1095-
Iterator<? extends FDBIndexableRecord<Message>> recordIterator = records.iterator();
10961110
final int destinationPartitionId = destinationPartition.getId();
1097-
return AsyncUtil.whileTrue(() -> indexMaintainer.update(null, recordIterator.next(), destinationPartitionId)
1098-
.thenApply(ignored -> recordIterator.hasNext()), state.context.getExecutor())
1099-
.thenApply(ignored -> {
1100-
if (LOGGER.isDebugEnabled()) {
1101-
long updateNanos = System.nanoTime();
1102-
final KeyValueLogMessage logMessage = repartitionLogMessage("Repartitioned records", groupingKey, records.size(), partitionInfo);
1103-
logMessage.addKeyAndValue("totalMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - timings.startNanos));
1104-
logMessage.addKeyAndValue("initializationMicros", TimeUnit.NANOSECONDS.toMicros(timings.initializationNanos - timings.startNanos));
1105-
logMessage.addKeyAndValue("searchMicros", TimeUnit.NANOSECONDS.toMicros(timings.searchNanos - timings.initializationNanos));
1106-
logMessage.addKeyAndValue("clearInfoMicros", TimeUnit.NANOSECONDS.toMicros(timings.clearInfoNanos - timings.searchNanos));
1107-
if (timings.emptyingNanos > 0) {
1108-
logMessage.addKeyAndValue("emptyingMicros", TimeUnit.NANOSECONDS.toMicros(timings.emptyingNanos - timings.clearInfoNanos));
1109-
}
1110-
if (timings.deleteNanos > 0) {
1111-
logMessage.addKeyAndValue("deleteMicros", TimeUnit.NANOSECONDS.toMicros(timings.deleteNanos - timings.clearInfoNanos));
1112-
}
1113-
if (timings.metadataUpdateNanos > 0) {
1114-
logMessage.addKeyAndValue("metadataUpdateMicros", TimeUnit.NANOSECONDS.toMicros(timings.metadataUpdateNanos - timings.deleteNanos));
1115-
}
1116-
if (timings.createPartitionNanos > 0) {
1117-
logMessage.addKeyAndValue("createPartitionMicros", TimeUnit.NANOSECONDS.toMicros(timings.createPartitionNanos - endCleanupNanos));
1118-
}
1119-
logMessage.addKeyAndValue("updateMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - updateStart));
1120-
if (timerSnapshot != null && state.context.getTimer() != null) {
1121-
logMessage.addKeysAndValues(
1122-
StoreTimer.getDifference(state.context.getTimer(), timerSnapshot)
1123-
.getKeysAndValues());
1124-
}
1125-
LOGGER.debug(logMessage.toString());
1126-
}
1127-
return records.size();
1128-
});
1111+
for (FDBIndexableRecord<Message> record : records) {
1112+
LuceneDocumentFromRecord.getRecordFields(state.index.getRootExpression(), record)
1113+
.entrySet().forEach(entry -> {
1114+
indexMaintainer.writeDocument(record, entry, destinationPartitionId);
1115+
// TODO could update the partition once
1116+
addToAndSavePartitionMetadata(record, groupingKey, destinationPartitionId);
1117+
});
1118+
}
1119+
if (LOGGER.isDebugEnabled()) {
1120+
long updateNanos = System.nanoTime();
1121+
final KeyValueLogMessage logMessage = repartitionLogMessage("Repartitioned records", groupingKey, records.size(), partitionInfo);
1122+
logMessage.addKeyAndValue("totalMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - timings.startNanos));
1123+
logMessage.addKeyAndValue("initializationMicros", TimeUnit.NANOSECONDS.toMicros(timings.initializationNanos - timings.startNanos));
1124+
logMessage.addKeyAndValue("searchMicros", TimeUnit.NANOSECONDS.toMicros(timings.searchNanos - timings.initializationNanos));
1125+
logMessage.addKeyAndValue("clearInfoMicros", TimeUnit.NANOSECONDS.toMicros(timings.clearInfoNanos - timings.searchNanos));
1126+
if (timings.emptyingNanos > 0) {
1127+
logMessage.addKeyAndValue("emptyingMicros", TimeUnit.NANOSECONDS.toMicros(timings.emptyingNanos - timings.clearInfoNanos));
1128+
}
1129+
if (timings.deleteNanos > 0) {
1130+
logMessage.addKeyAndValue("deleteMicros", TimeUnit.NANOSECONDS.toMicros(timings.deleteNanos - timings.clearInfoNanos));
1131+
}
1132+
if (timings.metadataUpdateNanos > 0) {
1133+
logMessage.addKeyAndValue("metadataUpdateMicros", TimeUnit.NANOSECONDS.toMicros(timings.metadataUpdateNanos - timings.deleteNanos));
1134+
}
1135+
if (timings.createPartitionNanos > 0) {
1136+
logMessage.addKeyAndValue("createPartitionMicros", TimeUnit.NANOSECONDS.toMicros(timings.createPartitionNanos - endCleanupNanos));
1137+
}
1138+
logMessage.addKeyAndValue("updateMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - updateStart));
1139+
if (timerSnapshot != null && state.context.getTimer() != null) {
1140+
logMessage.addKeysAndValues(
1141+
StoreTimer.getDifference(state.context.getTimer(), timerSnapshot)
1142+
.getKeysAndValues());
1143+
}
1144+
LOGGER.debug(logMessage.toString());
1145+
}
1146+
return records.size();
11291147
});
11301148
}
11311149

0 commit comments

Comments
 (0)