3333import com .apple .foundationdb .record .PipelineOperation ;
3434import com .apple .foundationdb .record .RecordCoreArgumentException ;
3535import com .apple .foundationdb .record .RecordCoreException ;
36+ import com .apple .foundationdb .record .RecordCoreInternalException ;
3637import com .apple .foundationdb .record .RecordCursor ;
3738import com .apple .foundationdb .record .RecordCursorContinuation ;
3839import com .apple .foundationdb .record .RecordCursorEndContinuation ;
3940import com .apple .foundationdb .record .RecordCursorStartContinuation ;
4041import com .apple .foundationdb .record .ScanProperties ;
4142import com .apple .foundationdb .record .TupleRange ;
4243import com .apple .foundationdb .record .cursors .ChainedCursor ;
44+ import com .apple .foundationdb .record .locking .LockIdentifier ;
4345import com .apple .foundationdb .record .logging .KeyValueLogMessage ;
4446import com .apple .foundationdb .record .logging .LogMessageKeys ;
4547import com .apple .foundationdb .record .lucene .directory .FDBDirectoryManager ;
8183import java .util .Arrays ;
8284import java .util .Collection ;
8385import java .util .Comparator ;
84- import java .util .Iterator ;
8586import java .util .List ;
8687import java .util .Objects ;
8788import java .util .concurrent .CompletableFuture ;
@@ -489,28 +490,30 @@ public <M extends Message> CompletableFuture<Integer> addToAndSavePartitionMetad
489490 private CompletableFuture <Integer > addToAndSavePartitionMetadata (@ Nonnull final Tuple groupingKey ,
490491 @ Nonnull final Tuple partitioningKey ,
491492 @ Nullable final Integer assignedPartitionIdOverride ) {
492-
493- final CompletableFuture <LucenePartitionInfoProto .LucenePartitionInfo > assignmentFuture ;
494- if (assignedPartitionIdOverride != null ) {
495- assignmentFuture = getPartitionMetaInfoById (assignedPartitionIdOverride , groupingKey );
496- } else {
497- assignmentFuture = getOrCreatePartitionInfo (groupingKey , partitioningKey );
498- }
499- return assignmentFuture .thenApply (assignedPartition -> {
500- // assignedPartition is not null, since a new one is created by the previous call if none exist
501- LucenePartitionInfoProto .LucenePartitionInfo .Builder builder = Objects .requireNonNull (assignedPartition ).toBuilder ();
502- builder .setCount (assignedPartition .getCount () + 1 );
503- if (isOlderThan (partitioningKey , assignedPartition )) {
504- // clear the previous key
505- state .context .ensureActive ().clear (partitionMetadataKeyFromPartitioningValue (groupingKey , getPartitionKey (assignedPartition )));
506- builder .setFrom (ByteString .copyFrom (partitioningKey .pack ()));
507- }
508- if (isNewerThan (partitioningKey , assignedPartition )) {
509- builder .setTo (ByteString .copyFrom (partitioningKey .pack ()));
510- }
511- savePartitionMetadata (groupingKey , builder );
512- return assignedPartition .getId ();
513- });
493+ return state .context .doWithWriteLock (new LockIdentifier (partitionMetadataSubspace (groupingKey )),
494+ () -> {
495+ final CompletableFuture <LucenePartitionInfoProto .LucenePartitionInfo > assignmentFuture ;
496+ if (assignedPartitionIdOverride != null ) {
497+ assignmentFuture = getPartitionMetaInfoById (assignedPartitionIdOverride , groupingKey );
498+ } else {
499+ assignmentFuture = getOrCreatePartitionInfo (groupingKey , partitioningKey );
500+ }
501+ return assignmentFuture .thenApply (assignedPartition -> {
502+ // assignedPartition is not null, since a new one is created by the previous call if none exist
503+ LucenePartitionInfoProto .LucenePartitionInfo .Builder builder = Objects .requireNonNull (assignedPartition ).toBuilder ();
504+ builder .setCount (assignedPartition .getCount () + 1 );
505+ if (isOlderThan (partitioningKey , assignedPartition )) {
506+ // clear the previous key
507+ state .context .ensureActive ().clear (partitionMetadataKeyFromPartitioningValue (groupingKey , getPartitionKey (assignedPartition )));
508+ builder .setFrom (ByteString .copyFrom (partitioningKey .pack ()));
509+ }
510+ if (isNewerThan (partitioningKey , assignedPartition )) {
511+ builder .setTo (ByteString .copyFrom (partitioningKey .pack ()));
512+ }
513+ savePartitionMetadata (groupingKey , builder );
514+ return assignedPartition .getId ();
515+ });
516+ });
514517 }
515518
516519 /**
@@ -525,6 +528,10 @@ byte[] partitionMetadataKeyFromPartitioningValue(@Nonnull Tuple groupKey, @Nonnu
525528 return state .indexSubspace .pack (partitionMetadataKeyTuple (groupKey , partitionKey ));
526529 }
527530
531+ Subspace partitionMetadataSubspace (@ Nonnull Tuple groupKey ) {
532+ return state .indexSubspace .subspace (groupKey .add (PARTITION_META_SUBSPACE ));
533+ }
534+
528535 private static Tuple partitionMetadataKeyTuple (final @ Nonnull Tuple groupKey , @ Nonnull Tuple partitionKey ) {
529536 return groupKey .add (PARTITION_META_SUBSPACE ).addAll (partitionKey );
530537 }
@@ -600,22 +607,30 @@ <M extends Message> CompletableFuture<LucenePartitionInfoProto.LucenePartitionIn
600607 * decrement the doc count of a partition, and save its partition metadata.
601608 *
602609 * @param groupingKey grouping key
603- * @param partitionInfo partition metadata
604610 * @param amount amount to subtract from the doc count
611+ * @param partitionId the id of the partition to decrement
605612 */
606- void decrementCountAndSave (@ Nonnull Tuple groupingKey ,
607- @ Nonnull LucenePartitionInfoProto .LucenePartitionInfo partitionInfo ,
608- int amount ) {
609- LucenePartitionInfoProto .LucenePartitionInfo .Builder builder = Objects .requireNonNull (partitionInfo ).toBuilder ();
610- // note that the to/from of the partition do not get updated, since that would require us to know what the next potential boundary
611- // value(s) are. The values, nonetheless, remain valid.
612- builder .setCount (partitionInfo .getCount () - amount );
613-
614- if (builder .getCount () < 0 ) {
615- // should never happen
616- throw new RecordCoreException ("Issue updating Lucene partition metadata (resulting count < 0)" , LogMessageKeys .PARTITION_ID , partitionInfo .getId ());
617- }
618- savePartitionMetadata (groupingKey , builder );
613+ CompletableFuture <Void > decrementCountAndSave (@ Nonnull Tuple groupingKey ,
614+ int amount , final int partitionId ) {
615+ return state .context .doWithWriteLock (new LockIdentifier (partitionMetadataSubspace (groupingKey )),
616+ () -> getPartitionMetaInfoById (partitionId , groupingKey ).thenAccept (serialized -> {
617+ if (serialized == null ) {
618+ throw new RecordCoreInternalException ("Lucene partition metadata changed during delete" )
619+ .addLogInfo (LogMessageKeys .INDEX_NAME , state .index .getName ())
620+ .addLogInfo (LogMessageKeys .INDEX_SUBSPACE , state .indexSubspace );
621+ }
622+ LucenePartitionInfoProto .LucenePartitionInfo .Builder builder = Objects .requireNonNull (serialized ).toBuilder ();
623+ // note that the to/from of the partition do not get updated, since that would require us to know
624+ // what the next potential boundary value(s) are. The values, nonetheless, remain valid.
625+ builder .setCount (serialized .getCount () - amount );
626+
627+ if (builder .getCount () < 0 ) {
628+ // should never happen
629+ throw new RecordCoreInternalException ("Issue updating Lucene partition metadata (resulting count < 0)" ,
630+ LogMessageKeys .PARTITION_ID , partitionId );
631+ }
632+ savePartitionMetadata (groupingKey , builder );
633+ }));
619634 }
620635
621636 /**
@@ -1024,7 +1039,7 @@ private CompletableFuture<Integer> moveDocsFromPartition(@Nonnull final LuceneRe
10241039
10251040 timings .initializationNanos = System .nanoTime ();
10261041 fetchedRecordsFuture = fetchedRecordsFuture .whenComplete ((ignored , throwable ) -> cursor .close ());
1027- return fetchedRecordsFuture .thenCompose (records -> {
1042+ return fetchedRecordsFuture .thenApply (records -> {
10281043 timings .searchNanos = System .nanoTime ();
10291044 if (records .size () == 0 ) {
10301045 throw new RecordCoreException ("Unexpected error: 0 records fetched. repartitionContext {}" , repartitioningContext );
@@ -1041,7 +1056,7 @@ private CompletableFuture<Integer> moveDocsFromPartition(@Nonnull final LuceneRe
10411056 if (LOGGER .isDebugEnabled ()) {
10421057 LOGGER .debug ("no records to move, partition {}" , partitionInfo );
10431058 }
1044- return CompletableFuture . completedFuture ( 0 ) ;
1059+ return 0 ;
10451060 }
10461061
10471062 // reset partition info
@@ -1092,40 +1107,43 @@ private CompletableFuture<Integer> moveDocsFromPartition(@Nonnull final LuceneRe
10921107 }
10931108 long updateStart = System .nanoTime ();
10941109
1095- Iterator <? extends FDBIndexableRecord <Message >> recordIterator = records .iterator ();
10961110 final int destinationPartitionId = destinationPartition .getId ();
1097- return AsyncUtil .whileTrue (() -> indexMaintainer .update (null , recordIterator .next (), destinationPartitionId )
1098- .thenApply (ignored -> recordIterator .hasNext ()), state .context .getExecutor ())
1099- .thenApply (ignored -> {
1100- if (LOGGER .isDebugEnabled ()) {
1101- long updateNanos = System .nanoTime ();
1102- final KeyValueLogMessage logMessage = repartitionLogMessage ("Repartitioned records" , groupingKey , records .size (), partitionInfo );
1103- logMessage .addKeyAndValue ("totalMicros" , TimeUnit .NANOSECONDS .toMicros (updateNanos - timings .startNanos ));
1104- logMessage .addKeyAndValue ("initializationMicros" , TimeUnit .NANOSECONDS .toMicros (timings .initializationNanos - timings .startNanos ));
1105- logMessage .addKeyAndValue ("searchMicros" , TimeUnit .NANOSECONDS .toMicros (timings .searchNanos - timings .initializationNanos ));
1106- logMessage .addKeyAndValue ("clearInfoMicros" , TimeUnit .NANOSECONDS .toMicros (timings .clearInfoNanos - timings .searchNanos ));
1107- if (timings .emptyingNanos > 0 ) {
1108- logMessage .addKeyAndValue ("emptyingMicros" , TimeUnit .NANOSECONDS .toMicros (timings .emptyingNanos - timings .clearInfoNanos ));
1109- }
1110- if (timings .deleteNanos > 0 ) {
1111- logMessage .addKeyAndValue ("deleteMicros" , TimeUnit .NANOSECONDS .toMicros (timings .deleteNanos - timings .clearInfoNanos ));
1112- }
1113- if (timings .metadataUpdateNanos > 0 ) {
1114- logMessage .addKeyAndValue ("metadataUpdateMicros" , TimeUnit .NANOSECONDS .toMicros (timings .metadataUpdateNanos - timings .deleteNanos ));
1115- }
1116- if (timings .createPartitionNanos > 0 ) {
1117- logMessage .addKeyAndValue ("createPartitionMicros" , TimeUnit .NANOSECONDS .toMicros (timings .createPartitionNanos - endCleanupNanos ));
1118- }
1119- logMessage .addKeyAndValue ("updateMicros" , TimeUnit .NANOSECONDS .toMicros (updateNanos - updateStart ));
1120- if (timerSnapshot != null && state .context .getTimer () != null ) {
1121- logMessage .addKeysAndValues (
1122- StoreTimer .getDifference (state .context .getTimer (), timerSnapshot )
1123- .getKeysAndValues ());
1124- }
1125- LOGGER .debug (logMessage .toString ());
1126- }
1127- return records .size ();
1128- });
1111+ for (FDBIndexableRecord <Message > record : records ) {
1112+ LuceneDocumentFromRecord .getRecordFields (state .index .getRootExpression (), record )
1113+ .entrySet ().forEach (entry -> {
1114+ indexMaintainer .writeDocument (record , entry , destinationPartitionId );
1115+ // TODO could update the partition once
1116+ addToAndSavePartitionMetadata (record , groupingKey , destinationPartitionId );
1117+ });
1118+ }
1119+ if (LOGGER .isDebugEnabled ()) {
1120+ long updateNanos = System .nanoTime ();
1121+ final KeyValueLogMessage logMessage = repartitionLogMessage ("Repartitioned records" , groupingKey , records .size (), partitionInfo );
1122+ logMessage .addKeyAndValue ("totalMicros" , TimeUnit .NANOSECONDS .toMicros (updateNanos - timings .startNanos ));
1123+ logMessage .addKeyAndValue ("initializationMicros" , TimeUnit .NANOSECONDS .toMicros (timings .initializationNanos - timings .startNanos ));
1124+ logMessage .addKeyAndValue ("searchMicros" , TimeUnit .NANOSECONDS .toMicros (timings .searchNanos - timings .initializationNanos ));
1125+ logMessage .addKeyAndValue ("clearInfoMicros" , TimeUnit .NANOSECONDS .toMicros (timings .clearInfoNanos - timings .searchNanos ));
1126+ if (timings .emptyingNanos > 0 ) {
1127+ logMessage .addKeyAndValue ("emptyingMicros" , TimeUnit .NANOSECONDS .toMicros (timings .emptyingNanos - timings .clearInfoNanos ));
1128+ }
1129+ if (timings .deleteNanos > 0 ) {
1130+ logMessage .addKeyAndValue ("deleteMicros" , TimeUnit .NANOSECONDS .toMicros (timings .deleteNanos - timings .clearInfoNanos ));
1131+ }
1132+ if (timings .metadataUpdateNanos > 0 ) {
1133+ logMessage .addKeyAndValue ("metadataUpdateMicros" , TimeUnit .NANOSECONDS .toMicros (timings .metadataUpdateNanos - timings .deleteNanos ));
1134+ }
1135+ if (timings .createPartitionNanos > 0 ) {
1136+ logMessage .addKeyAndValue ("createPartitionMicros" , TimeUnit .NANOSECONDS .toMicros (timings .createPartitionNanos - endCleanupNanos ));
1137+ }
1138+ logMessage .addKeyAndValue ("updateMicros" , TimeUnit .NANOSECONDS .toMicros (updateNanos - updateStart ));
1139+ if (timerSnapshot != null && state .context .getTimer () != null ) {
1140+ logMessage .addKeysAndValues (
1141+ StoreTimer .getDifference (state .context .getTimer (), timerSnapshot )
1142+ .getKeysAndValues ());
1143+ }
1144+ LOGGER .debug (logMessage .toString ());
1145+ }
1146+ return records .size ();
11291147 });
11301148 }
11311149
0 commit comments