diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java index cf20e7ade8..95a7f8ecc4 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java @@ -22,20 +22,20 @@ import com.apple.foundationdb.FDBException; import com.apple.foundationdb.record.RecordCoreException; +import com.apple.foundationdb.record.RecordMetaDataProvider; import com.apple.foundationdb.record.ScanProperties; import com.apple.foundationdb.record.lucene.directory.InjectedFailureRepository; import com.apple.foundationdb.record.lucene.directory.MockedLuceneIndexMaintainerFactory; import com.apple.foundationdb.record.lucene.directory.TestingIndexMaintainerRegistry; -import com.apple.foundationdb.record.metadata.Index; import com.apple.foundationdb.record.provider.common.StoreTimer; import com.apple.foundationdb.record.provider.foundationdb.FDBExceptions; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase; import com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer; import com.apple.foundationdb.record.provider.foundationdb.OnlineIndexer; +import com.apple.foundationdb.record.provider.foundationdb.keyspace.KeySpacePath; import com.apple.foundationdb.record.provider.foundationdb.properties.RecordLayerPropertyStorage; -import com.apple.foundationdb.record.query.plan.QueryPlanner; -import com.apple.foundationdb.record.util.pair.Pair; import com.apple.foundationdb.tuple.Tuple; import com.apple.foundationdb.util.LoggableException; import com.apple.test.BooleanSource; @@ -50,26 +50,25 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Comparator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import static com.apple.foundationdb.record.lucene.LuceneIndexTestUtils.createComplexDocument; import static com.apple.foundationdb.record.lucene.directory.InjectedFailureRepository.Methods.LUCENE_GET_ALL_FIELDS_INFO_STREAM; import static com.apple.foundationdb.record.lucene.directory.InjectedFailureRepository.Methods.LUCENE_GET_FDB_LUCENE_FILE_REFERENCE_ASYNC; +import static com.apple.foundationdb.record.lucene.directory.InjectedFailureRepository.Methods.LUCENE_GET_FILE_REFERENCE_CACHE_ASYNC; import static com.apple.foundationdb.record.lucene.directory.InjectedFailureRepository.Methods.LUCENE_GET_PRIMARY_KEY_SEGMENT_INDEX; import static com.apple.foundationdb.record.lucene.directory.InjectedFailureRepository.Methods.LUCENE_LIST_ALL; import static com.apple.foundationdb.record.lucene.directory.InjectedFailureRepository.Methods.LUCENE_READ_BLOCK; -import static com.apple.foundationdb.record.provider.foundationdb.indexes.TextIndexTestUtils.COMPLEX_DOC; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -79,7 +78,7 @@ * A test that uses a few of the tests from {@link LuceneIndexTest} under a fault-injection scenario. */ @Tag(Tags.RequiresFDB) -public class FDBLuceneIndexFailureTest extends FDBLuceneTestBase { +public class FDBLuceneIndexFailureTest extends FDBRecordStoreTestBase { private TestingIndexMaintainerRegistry registry; private InjectedFailureRepository injectedFailures; @@ -91,56 +90,42 @@ public void setup() { registry.overrideFactory(new MockedLuceneIndexMaintainerFactory(injectedFailures)); } - @ParameterizedTest - @BooleanSource - void basicGroupedPartitionedTest(boolean useLegacyAsyncToSync) { - final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() - .addProp(LuceneRecordContextProperties.LUCENE_USE_LEGACY_ASYNC_TO_SYNC, useLegacyAsyncToSync) - .build(); - try (FDBRecordContext context = openContext(contextProps)) { - rebuildIndexMetaData(context, COMPLEX_DOC, COMPLEX_PARTITIONED); - final LuceneScanBounds scanBounds = groupedTextSearch(COMPLEX_PARTITIONED, "text:propose", 2); - injectedFailures.addFailure(LUCENE_GET_FDB_LUCENE_FILE_REFERENCE_ASYNC, - new FDBExceptions.FDBStoreTransactionIsTooOldException("Blah", new FDBException("Blah", 7)), - 0); - - recordStore.saveRecord(createComplexDocument(6666L, ENGINEER_JOKE, 1, Instant.now().toEpochMilli())); - recordStore.saveRecord(createComplexDocument(7777L, ENGINEER_JOKE, 2, Instant.now().toEpochMilli())); - recordStore.saveRecord(createComplexDocument(8888L, WAYLON, 2, Instant.now().plus(1, ChronoUnit.DAYS).toEpochMilli())); - recordStore.saveRecord(createComplexDocument(9999L, "hello world!", 1, Instant.now().plus(2, ChronoUnit.DAYS).toEpochMilli())); - - // This fails with the mock exception - assertThrows(FDBExceptions.FDBStoreTransactionIsTooOldException.class, - () -> LuceneConcurrency.asyncToSync(LuceneEvents.Waits.WAIT_LUCENE_GET_FILE_REFERENCE, - recordStore.scanIndex(COMPLEX_PARTITIONED, scanBounds, null, ScanProperties.FORWARD_SCAN).asList(), - context)); - assertNull(getCounter(context, FDBStoreTimer.Counts.LOAD_SCAN_ENTRY)); - } + public static Stream legacySyncGrouping() { + return Stream.of(true, false) + .flatMap(useLegacyAsyncToSync -> Stream.of(true, false) + .map(isGrouped -> Arguments.of(useLegacyAsyncToSync, isGrouped))); } @ParameterizedTest - @BooleanSource - void basicNonGroupedPartitionedTest(boolean useLegacyAsyncToSync) { + @MethodSource("legacySyncGrouping") + void basicPartitionedTest(boolean useLegacyAsyncToSync, boolean isGrouped) { final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() .addProp(LuceneRecordContextProperties.LUCENE_USE_LEGACY_ASYNC_TO_SYNC, useLegacyAsyncToSync) .build(); + final long seed = 6474737L; + final long start = Instant.now().toEpochMilli(); + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilderWithRegistry, pathManager) + .setIsGrouped(isGrouped) + .setPartitionHighWatermark(10) + .build(); + try (FDBRecordContext context = openContext(contextProps)) { - rebuildIndexMetaData(context, COMPLEX_DOC, COMPLEX_PARTITIONED_NOGROUP); - final LuceneScanBounds scanBounds = fullTextSearch(COMPLEX_PARTITIONED_NOGROUP, "text:propose"); - injectedFailures.addFailure(LUCENE_GET_FDB_LUCENE_FILE_REFERENCE_ASYNC, - new LuceneConcurrency.AsyncToSyncTimeoutException("Blah", new TimeoutException("Blah")), - 0); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + dataModel.saveRecords(5, start, context, 1); - recordStore.saveRecord(createComplexDocument(6666L, ENGINEER_JOKE, 1, Instant.now().toEpochMilli())); - recordStore.saveRecord(createComplexDocument(7777L, ENGINEER_JOKE, 2, Instant.now().toEpochMilli())); - recordStore.saveRecord(createComplexDocument(8888L, WAYLON, 2, Instant.now().plus(1, ChronoUnit.DAYS).toEpochMilli())); - recordStore.saveRecord(createComplexDocument(9999L, "hello world!", 1, Instant.now().plus(2, ChronoUnit.DAYS).toEpochMilli())); + injectedFailures.addFailure(LUCENE_GET_FILE_REFERENCE_CACHE_ASYNC, + new FDBExceptions.FDBStoreTransactionIsTooOldException("Blah", new FDBException("Blah", 7)), + 0); - // This should fail because the mock exception is thrown - assertThrows(LuceneConcurrency.AsyncToSyncTimeoutException.class, + // This fails with the mock exception + final LuceneScanBounds scanBounds = isGrouped + ? LuceneIndexTestValidator.groupedSortedTextSearch(store, dataModel.index, "text:blah", null, 1) + : LuceneIndexTestUtils.fullTextSearch(store, dataModel.index, "text:blah", false); + assertThrows(FDBExceptions.FDBStoreTransactionIsTooOldException.class, () -> LuceneConcurrency.asyncToSync(LuceneEvents.Waits.WAIT_LUCENE_GET_FILE_REFERENCE, - recordStore.scanIndex(COMPLEX_PARTITIONED_NOGROUP, scanBounds, null, ScanProperties.FORWARD_SCAN).asList(), + store.scanIndex(dataModel.index, scanBounds, null, ScanProperties.FORWARD_SCAN).asList(), context)); + assertNull(LuceneIndexTestUtils.getCounter(context, FDBStoreTimer.Counts.LOAD_SCAN_ENTRY)); } } @@ -153,51 +138,55 @@ public static Stream legacySyncAndMapping() { @ParameterizedTest @MethodSource("legacySyncAndMapping") void repartitionGroupedTestWithExceptionMapping(boolean useLegacyAsyncToSync, boolean useExceptionMapping) throws IOException { - Index index = COMPLEX_PARTITIONED; - Tuple groupingKey = Tuple.from(1L); final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 6) .addProp(LuceneRecordContextProperties.LUCENE_USE_LEGACY_ASYNC_TO_SYNC, useLegacyAsyncToSync) .build(); - setupExceptionMapping(useExceptionMapping); + final long seed = 6474737L; + Tuple groupingKey = Tuple.from(1L); final int totalDocCount = 20; - Consumer schemaSetup = context -> rebuildIndexMetaData(context, COMPLEX_DOC, index); - long docGroupFieldValue = groupingKey.isEmpty() ? 0L : groupingKey.getLong(0); + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilderWithRegistry, pathManager) + .setIsGrouped(true) + .setPartitionHighWatermark(10) + .build(); // create/save documents + long docGroupFieldValue = groupingKey.isEmpty() ? 0L : groupingKey.getLong(0); + final long start = Instant.now().toEpochMilli(); try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); - long start = Instant.now().toEpochMilli(); - for (int i = 0; i < totalDocCount; i++) { - recordStore.saveRecord(createComplexDocument(1000L + i, ENGINEER_JOKE, docGroupFieldValue, start + i * 100)); - } + setupExceptionMapping(context, useExceptionMapping); + dataModel.saveRecords(totalDocCount, start, context, (int)docGroupFieldValue); commit(context); } + List partitionInfos; // initially, all documents are saved into one partition - List partitionInfos = getPartitionMeta(index, groupingKey, contextProps, schemaSetup); - assertEquals(1, partitionInfos.size()); - assertEquals(totalDocCount, partitionInfos.get(0).getCount()); + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + partitionInfos = LuceneIndexTestUtils.getPartitionMeta(dataModel.index, groupingKey, store); + assertEquals(1, partitionInfos.size()); + assertEquals(totalDocCount, partitionInfos.get(0).getCount()); + } // run re-partitioning with failure // When exception mapping is in place, the thrown exception is UnknownLoggableException // When no exception mapping is taking place, the thrown exception is the injected one (AsyncToSyncTimeout) if (useExceptionMapping) { assertThrows(UnknownLoggableException.class, - () -> explicitMergeIndex(index, contextProps, schemaSetup, true, 0)); + () -> explicitMergeIndex(true, 0, contextProps, dataModel)); } else { assertThrows(LuceneConcurrency.AsyncToSyncTimeoutException.class, - () -> explicitMergeIndex(index, contextProps, schemaSetup, true, 0)); + () -> explicitMergeIndex(true, 0, contextProps, dataModel)); } // run partitioning without failure - make sure the index is still in good shape - explicitMergeIndex(index, contextProps, schemaSetup, false, 0); + explicitMergeIndex(false, 0, contextProps, dataModel); try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); - assertEquals(2, getCounter(context, LuceneEvents.Counts.LUCENE_REPARTITION_CALLS).getCount()); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + assertEquals(2, LuceneIndexTestUtils.getCounter(context, LuceneEvents.Counts.LUCENE_REPARTITION_CALLS).getCount()); + partitionInfos = LuceneIndexTestUtils.getPartitionMeta(dataModel.index, groupingKey, store); } - partitionInfos = getPartitionMeta(index, groupingKey, contextProps, schemaSetup); // It should first move 6 from the most-recent to a new, older partition, then move 6 again into a partition // in between the two assertEquals(List.of(6, 6, 8), @@ -216,7 +205,7 @@ void repartitionGroupedTestWithExceptionMapping(boolean useLegacyAsyncToSync, bo @ParameterizedTest @BooleanSource void repartitionAndMerge(boolean useLegacyAsyncToSync) throws IOException { - Index index = COMPLEX_PARTITIONED; + final long seed = 67826354623L; Tuple groupingKey = Tuple.from(1); int mergeSegmentsPerTier = 2; @@ -226,44 +215,54 @@ void repartitionAndMerge(boolean useLegacyAsyncToSync) throws IOException { .addProp(LuceneRecordContextProperties.LUCENE_USE_LEGACY_ASYNC_TO_SYNC, useLegacyAsyncToSync) .build(); - Consumer schemaSetup = context -> rebuildIndexMetaData(context, COMPLEX_DOC, index); + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilderWithRegistry, pathManager) + .setIsGrouped(true) + .setPartitionHighWatermark(10) + .build(); + long docGroupFieldValue = groupingKey.isEmpty() ? 0L : groupingKey.getLong(0); int transactionCount = 100; int docsPerTransaction = 2; // create/save documents long id = 0; - List allIds = new ArrayList<>(); + List allIds = new ArrayList<>(); for (int i = 0; i < transactionCount; i++) { try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); - recordStore.getIndexDeferredMaintenanceControl().setAutoMergeDuringCommit(false); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); long start = Instant.now().toEpochMilli(); for (int j = 0; j < docsPerTransaction; j++) { id++; - recordStore.saveRecord(createComplexDocument(id, ENGINEER_JOKE, docGroupFieldValue, start + id)); - allIds.add(id); + Tuple pk = dataModel.saveRecord(start + id, store, (int)docGroupFieldValue); + allIds.add(pk); } commit(context); } } // we haven't done any merges yet, or repartitioning, so each transaction should be one new segment - assertEquals(Map.of(0, transactionCount), - getSegmentCounts(index, groupingKey, contextProps, schemaSetup)); + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + assertEquals(Map.of(0, transactionCount), + LuceneIndexTestUtils.getSegmentCounts(dataModel.index, groupingKey, store)); + } // Run a few merges with different failure points assertThrows(LuceneConcurrency.AsyncToSyncTimeoutException.class, - () -> explicitMergeIndex(index, contextProps, schemaSetup, true, 0)); + () -> explicitMergeIndex(true, 0, contextProps, dataModel)); assertThrows(LuceneConcurrency.AsyncToSyncTimeoutException.class, - () -> explicitMergeIndex(index, contextProps, schemaSetup, true, 5)); + () -> explicitMergeIndex(true, 5, contextProps, dataModel)); assertThrows(LuceneConcurrency.AsyncToSyncTimeoutException.class, - () -> explicitMergeIndex(index, contextProps, schemaSetup, true, 10)); + () -> explicitMergeIndex(true, 10, contextProps, dataModel)); // Continue with the regular merge and verify that index is still valid timer.reset(); - explicitMergeIndex(index, contextProps, schemaSetup, false, 0); - final Map segmentCounts = getSegmentCounts(index, groupingKey, contextProps, schemaSetup); + explicitMergeIndex(false, 0, contextProps, dataModel); + final Map segmentCounts; + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + segmentCounts = LuceneIndexTestUtils.getSegmentCounts(dataModel.index, groupingKey, store); + } final int partitionSize = 10; final int partitionCount; partitionCount = allIds.size() / partitionSize; @@ -273,25 +272,27 @@ void repartitionAndMerge(boolean useLegacyAsyncToSync) throws IOException { segmentCounts); try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); - validateDocsInPartition(index, 0, groupingKey, + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + dataModel.validateDocsInPartition(dataModel.index, 0, groupingKey, allIds.stream() .skip(190) - .map(idLong -> Tuple.from(docGroupFieldValue, idLong)) .collect(Collectors.toSet()), - "text:propose"); + LuceneIndexTestDataModel.PARENT_SEARCH_TERM, + store); for (int i = 1; i < 20; i++) { // 0 should have the newest // everyone else should increase - validateDocsInPartition(index, i, groupingKey, + dataModel.validateDocsInPartition(dataModel.index, i, groupingKey, allIds.stream().skip((i - 1) * partitionSize) .limit(partitionSize) - .map(idLong -> Tuple.from(docGroupFieldValue, idLong)) .collect(Collectors.toSet()), - "text:propose"); + LuceneIndexTestDataModel.PARENT_SEARCH_TERM, + store); } - List partitionInfos = getPartitionMeta(index, - groupingKey, contextProps, schemaSetup); + } + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + List partitionInfos = LuceneIndexTestUtils.getPartitionMeta(dataModel.index, groupingKey, store); assertEquals(partitionCount, partitionInfos.size()); } } @@ -299,80 +300,94 @@ void repartitionAndMerge(boolean useLegacyAsyncToSync) throws IOException { @ParameterizedTest @BooleanSource void optimizedPartitionInsertionTest(boolean useLegacyAsyncToSync) throws IOException { - Index index = COMPLEX_PARTITIONED; + long seed = 657368233L; Tuple groupingKey = Tuple.from(1L); final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() - .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 6) + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 10) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)2) .addProp(LuceneRecordContextProperties.LUCENE_USE_LEGACY_ASYNC_TO_SYNC, useLegacyAsyncToSync) .build(); final int totalDocCount = 10; // configured index's highwater mark - Consumer schemaSetup = context -> rebuildIndexMetaData(context, COMPLEX_DOC, index); - long docGroupFieldValue = groupingKey.isEmpty() ? 0L : groupingKey.getLong(0); + + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilderWithRegistry, pathManager) + .setIsGrouped(true) + .setPartitionHighWatermark(10) + .build(); + + int docGroupFieldValue = groupingKey.isEmpty() ? 0 : (int)groupingKey.getLong(0); // create/save documents long start = Instant.now().toEpochMilli(); + Set allIds = new LinkedHashSet<>(); try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); for (int i = 0; i < totalDocCount; i++) { - recordStore.saveRecord(createComplexDocument(1000L + i, ENGINEER_JOKE, docGroupFieldValue, start + i * 100)); + allIds.add(dataModel.saveRecord(start + i * 100, store, docGroupFieldValue)); } commit(context); } // partition 0 should be at capacity now try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); - validateDocsInPartition(index, 0, groupingKey, makeKeyTuples(docGroupFieldValue, 1000, 1009), "text:propose"); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + dataModel.validateDocsInPartition(dataModel.index, 0, groupingKey, allIds, LuceneIndexTestDataModel.PARENT_SEARCH_TERM, store); } // Add docs and fail the insertion try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); // Inject failures (using partition 1) injectedFailures.addFailure(LUCENE_GET_PRIMARY_KEY_SEGMENT_INDEX, new LuceneConcurrency.AsyncToSyncTimeoutException("Blah", new TimeoutException("Blah")), 0); // Save more docs - this should fail with injected exception assertThrows(LuceneConcurrency.AsyncToSyncTimeoutException.class, - () -> recordStore.saveRecord(createComplexDocument(1000L + totalDocCount, ENGINEER_JOKE, docGroupFieldValue, start - 1))); + () -> dataModel.saveRecord(start, store, docGroupFieldValue)); } - // now add 20 documents older than the oldest document in partition 0 - // they should go into partitions 1 and 2 + // now add 20 documents, repartition and make sure the index is valid try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); injectedFailures.clear(); - for (int i = 0; i < 20; i++) { - recordStore.saveRecord(createComplexDocument(1000L + totalDocCount + i, ENGINEER_JOKE, docGroupFieldValue, start - i - 1)); + for (int i = 0; i < 10; i++) { + dataModel.saveRecord(start - i - 1, store, docGroupFieldValue); + } + for (int i = 10; i < 20; i++) { + dataModel.saveRecord(start - i - 1, store, docGroupFieldValue); } - validateDocsInPartition(index, 1, groupingKey, makeKeyTuples(docGroupFieldValue, 1010, 1019), "text:propose"); - validateDocsInPartition(index, 2, groupingKey, makeKeyTuples(docGroupFieldValue, 1020, 1029), "text:propose"); + commit(context); } + explicitMergeIndex(false, 0, contextProps, dataModel); + dataModel.validate(() -> openContext(contextProps)); } @ParameterizedTest @BooleanSource void addDocumentTest(boolean useLegacyAsyncToSync) throws IOException { - Index index = COMPLEX_PARTITIONED; + long seed = 4652783648726L; Tuple groupingKey = Tuple.from(1L); final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() .addProp(LuceneRecordContextProperties.LUCENE_USE_LEGACY_ASYNC_TO_SYNC, useLegacyAsyncToSync) .build(); - Consumer schemaSetup = context -> rebuildIndexMetaData(context, COMPLEX_DOC, index); - long docGroupFieldValue = groupingKey.isEmpty() ? 0L : groupingKey.getLong(0); + int docGroupFieldValue = groupingKey.isEmpty() ? 0 : (int)groupingKey.getLong(0); + + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilderWithRegistry, pathManager) + .setIsGrouped(true) + .setPartitionHighWatermark(10) + .build(); // create/save documents try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); // Inject failures (using partition 0) injectedFailures.addFailure(LUCENE_GET_PRIMARY_KEY_SEGMENT_INDEX, new LuceneConcurrency.AsyncToSyncTimeoutException("Blah", new TimeoutException("Blah")), 0); // this should fail with injected exception assertThrows(LuceneConcurrency.AsyncToSyncTimeoutException.class, - () -> recordStore.saveRecord(createComplexDocument(1000L , ENGINEER_JOKE, docGroupFieldValue, 1))); + () -> dataModel.saveRecord(1, store, docGroupFieldValue)); } } @@ -381,31 +396,34 @@ void addDocumentTest(boolean useLegacyAsyncToSync) throws IOException { void updateDocumentFailedTest(boolean useLegacyAsyncToSync) throws IOException { // This test injects a failure late in the update process, into the commit part of the update, where the updated // index is written to DB - Index index = COMPLEX_PARTITIONED; + long seed = 828737L; Tuple groupingKey = Tuple.from(1L); final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() .addProp(LuceneRecordContextProperties.LUCENE_USE_LEGACY_ASYNC_TO_SYNC, useLegacyAsyncToSync) .build(); - Consumer schemaSetup = context -> rebuildIndexMetaData(context, COMPLEX_DOC, index); - long docGroupFieldValue = groupingKey.isEmpty() ? 0L : groupingKey.getLong(0); + int docGroupFieldValue = groupingKey.isEmpty() ? 0 : (int)groupingKey.getLong(0); + + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilderWithRegistry, pathManager) + .setIsGrouped(true) + .setPartitionHighWatermark(10) + .build(); // create documents try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); - // this should fail with injected exception - recordStore.saveRecord(createComplexDocument(1000L , ENGINEER_JOKE, docGroupFieldValue, 1)); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + dataModel.saveRecord(1, store, docGroupFieldValue); context.commit(); } // Update documents with failure try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); // Inject failures (using partition 0) injectedFailures.addFailure(LUCENE_READ_BLOCK, new LuceneConcurrency.AsyncToSyncTimeoutException("Blah", new TimeoutException("Blah")), 5); // this should fail with injected exception - recordStore.saveRecord(createComplexDocument(1000L , ENGINEER_JOKE, docGroupFieldValue, 2)); + dataModel.saveRecord(2, store, docGroupFieldValue); assertThrows(LuceneConcurrency.AsyncToSyncTimeoutException.class, () -> context.commit()); } @@ -414,49 +432,50 @@ void updateDocumentFailedTest(boolean useLegacyAsyncToSync) throws IOException { @ParameterizedTest @BooleanSource void addDocumentWithUnknownExceptionTest(boolean useLegacyAsyncToSync) throws IOException { - Index index = COMPLEX_PARTITIONED; - Tuple groupingKey = Tuple.from(1L); + long seed = 66477848; final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() .addProp(LuceneRecordContextProperties.LUCENE_USE_LEGACY_ASYNC_TO_SYNC, useLegacyAsyncToSync) .build(); - Consumer schemaSetup = context -> rebuildIndexMetaData(context, COMPLEX_DOC, index); - long docGroupFieldValue = groupingKey.isEmpty() ? 0L : groupingKey.getLong(0); + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilderWithRegistry, pathManager) + .setIsGrouped(true) + .setPartitionHighWatermark(10) + .build(); // Unknown RecordCoreException (rethrown as-is) try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); // Inject failures (using partition 0) injectedFailures.addFailure(LUCENE_GET_PRIMARY_KEY_SEGMENT_INDEX, new UnknownRecordCoreException("Blah"), 0); // this should fail with injected exception assertThrows(UnknownRecordCoreException.class, - () -> recordStore.saveRecord(createComplexDocument(1000L , ENGINEER_JOKE, docGroupFieldValue, 1))); + () -> dataModel.saveRecord(1, store, 1)); } // Unknown IOException with RecordCoreException (cause rethrown) try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); // Inject failures (using partition 0) injectedFailures.addFailure(LUCENE_LIST_ALL, new IOException((new UnknownRecordCoreException("Blah"))), 0); // this should fail with injected exception assertThrows(UnknownRecordCoreException.class, - () -> recordStore.saveRecord(createComplexDocument(1000L , ENGINEER_JOKE, docGroupFieldValue, 1))); + () -> dataModel.saveRecord(1, store, 1)); } // Unknown IOException with RuntimeException (cause rethrown) try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); // Inject failures (using partition 0) injectedFailures.addFailure(LUCENE_LIST_ALL, new IOException((new UnknownRuntimeException("Blah"))), 0); // this should fail with injected exception assertThrows(UnknownRuntimeException.class, - () -> recordStore.saveRecord(createComplexDocument(1000L , ENGINEER_JOKE, docGroupFieldValue, 1))); + () -> dataModel.saveRecord(1, store, 1)); } } @@ -471,26 +490,30 @@ void addDocumentWithUnknownExceptionTest(boolean useLegacyAsyncToSync) throws IO @ParameterizedTest @MethodSource("legacySyncAndMapping") void saveDocumentWithMappedInjectedException(boolean useLegacyAsyncToSync, boolean useExceptionMapping) { - Index index = COMPLEX_PARTITIONED; + long seed = 5524442; Tuple groupingKey = Tuple.from(1L); final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() .addProp(LuceneRecordContextProperties.LUCENE_USE_LEGACY_ASYNC_TO_SYNC, useLegacyAsyncToSync) .build(); setupExceptionMapping(useExceptionMapping); + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilderWithRegistry, pathManager) + .setIsGrouped(true) + .setPartitionHighWatermark(10) + .build(); + - Consumer schemaSetup = context -> rebuildIndexMetaData(context, COMPLEX_DOC, index); long docGroupFieldValue = groupingKey.isEmpty() ? 0L : groupingKey.getLong(0); // Save a document try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); - recordStore.saveRecord(createComplexDocument(1000L, ENGINEER_JOKE, docGroupFieldValue, 100)); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + dataModel.saveRecord(1, store, 1); commit(context); } // Save another, with injected failure try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); // Inject failures (using partition 0) injectedFailures.addFailure(LUCENE_GET_ALL_FIELDS_INFO_STREAM, new FDBExceptions.FDBStoreTransactionIsTooOldException("Blah", new FDBException("Blah", 7)), @@ -501,18 +524,18 @@ void saveDocumentWithMappedInjectedException(boolean useLegacyAsyncToSync, boole // calls the non-lucene asyncToSync and that gets wrapped by the mapper. if (useExceptionMapping) { assertThrows(UnknownLoggableException.class, - () -> recordStore.saveRecord(createComplexDocument(1000L, ENGINEER_JOKE, docGroupFieldValue, 1))); + () -> dataModel.saveRecord(2, store, 1)); } else { assertThrows(FDBExceptions.FDBStoreTransactionIsTooOldException.class, () -> LuceneConcurrency.asyncToSync(FDBStoreTimer.Waits.WAIT_SAVE_RECORD, - recordStore.saveRecordAsync(createComplexDocument(1000L, ENGINEER_JOKE, docGroupFieldValue, 1)), + dataModel.saveRecordAsync(true, 3, store, 1), context)); } } // Save using saveAsync with Lucene asyncToSync, with injected failure try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); // Inject failures (using partition 0) injectedFailures.addFailure(LUCENE_GET_ALL_FIELDS_INFO_STREAM, new FDBExceptions.FDBStoreTransactionIsTooOldException("Blah", new FDBException("Blah", 7)), @@ -523,12 +546,12 @@ void saveDocumentWithMappedInjectedException(boolean useLegacyAsyncToSync, boole if (useLegacyAsyncToSync && useExceptionMapping) { assertThrows(UnknownLoggableException.class, () -> LuceneConcurrency.asyncToSync(FDBStoreTimer.Waits.WAIT_SAVE_RECORD, - recordStore.saveRecordAsync(createComplexDocument(1000L, ENGINEER_JOKE, docGroupFieldValue, 1)), + dataModel.saveRecordAsync(true, 5, store, 1), context)); } else { assertThrows(FDBExceptions.FDBStoreTransactionIsTooOldException.class, () -> LuceneConcurrency.asyncToSync(FDBStoreTimer.Waits.WAIT_SAVE_RECORD, - recordStore.saveRecordAsync(createComplexDocument(1000L, ENGINEER_JOKE, docGroupFieldValue, 1)), + dataModel.saveRecordAsync(true, 6, store, 1), context)); } } @@ -541,20 +564,12 @@ private RuntimeException mapExceptions(Throwable throwable, StoreTimer.Event eve return new UnknownLoggableException(throwable); } - /** - * Private utility to set up the test environment. - * This method creates the {@link TestingIndexMaintainerRegistry} and the {@link MockedLuceneIndexMaintainerFactory} - * that create the MockedDirectory* classes that allow failure injection to take place. - * - * @param context the context for the store - * @param document the record type - * @param index the index to use - */ - private void rebuildIndexMetaData(final FDBRecordContext context, final String document, final Index index) { - Pair pair = LuceneIndexTestUtils.rebuildIndexMetaData(context, path, document, index, isUseCascadesPlanner(), registry); - this.recordStore = pair.getLeft(); - this.planner = pair.getRight(); - this.recordStore.getIndexDeferredMaintenanceControl().setAutoMergeDuringCommit(true); + private void setupExceptionMapping(FDBRecordContext context, boolean useExceptionMapping) { + if (useExceptionMapping) { + context.getDatabase().setAsyncToSyncExceptionMapper(this::mapExceptions); + } else { + context.getDatabase().setAsyncToSyncExceptionMapper((ex, ev) -> FDBExceptions.wrapException(ex)); + } } private void setupExceptionMapping(boolean useExceptionMapping) { @@ -567,20 +582,20 @@ private void setupExceptionMapping(boolean useExceptionMapping) { } } - private void explicitMergeIndex(Index index, RecordLayerPropertyStorage contextProps, Consumer schemaSetup, boolean injectFailure, final int failureAtCount) { + private void explicitMergeIndex(boolean injectFailure, final int failureAtCount, RecordLayerPropertyStorage contextProps, LuceneIndexTestDataModel dataModel) { try (FDBRecordContext context = openContext(contextProps)) { - schemaSetup.accept(context); + FDBRecordStore store = dataModel.createOrOpenRecordStore(context); if (injectFailure) { injectedFailures.addFailure(LUCENE_GET_FDB_LUCENE_FILE_REFERENCE_ASYNC, - new LuceneConcurrency.AsyncToSyncTimeoutException("Blah", new TimeoutException("Blah")), + new LuceneConcurrency.AsyncToSyncTimeoutException("Blah", new TimeoutException("Blah")), failureAtCount); // Since the merge creates a new directory, we need global scope for the failure } else { injectedFailures.removeFailure(LUCENE_GET_FDB_LUCENE_FILE_REFERENCE_ASYNC); } try (OnlineIndexer indexBuilder = OnlineIndexer.newBuilder() - .setRecordStore(recordStore) - .setIndex(index) + .setRecordStore(store) + .setIndex(dataModel.index) .setTimer(timer) .build()) { indexBuilder.mergeIndex(); @@ -588,6 +603,13 @@ private void explicitMergeIndex(Index index, RecordLayerPropertyStorage contextP } } + @Nonnull + protected FDBRecordStore.Builder getStoreBuilderWithRegistry(@Nonnull FDBRecordContext context, + @Nonnull RecordMetaDataProvider metaData, + @Nonnull final KeySpacePath path) { + return super.getStoreBuilder(context, metaData, path).setIndexMaintainerRegistry(registry); + } + private class UnknownRecordCoreException extends RecordCoreException { private static final long serialVersionUID = 0L; diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java index 9c5bcb988d..b398ecbe93 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java @@ -257,6 +257,11 @@ public void validate(final Supplier openContext) throws IOExce validator.validate(index, groupingKeyToPrimaryKeyToPartitionKey, isSynthetic ? CHILD_SEARCH_TERM : PARENT_SEARCH_TERM); } + public void validateDocsInPartition(Index index, int partitionId, Tuple groupingKey, + Set expectedPrimaryKeys, final String universalSearch, FDBRecordStore store) throws IOException { + LuceneIndexTestValidator.validateDocsInPartition(store, index, partitionId, groupingKey, expectedPrimaryKeys, universalSearch); + } + @Nonnull static Index addIndex(final boolean isSynthetic, final KeyExpression rootExpression, final Map options, final RecordMetaDataBuilder metaDataBuilder) { diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestUtils.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestUtils.java index f158e7ae9b..52b604e6b2 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestUtils.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestUtils.java @@ -24,6 +24,7 @@ import com.apple.foundationdb.record.RecordMetaData; import com.apple.foundationdb.record.RecordMetaDataBuilder; import com.apple.foundationdb.record.TestRecordsTextProto; +import com.apple.foundationdb.record.lucene.directory.FDBDirectoryManager; import com.apple.foundationdb.record.lucene.ngram.NgramAnalyzer; import com.apple.foundationdb.record.lucene.synonym.EnglishSynonymMapConfig; import com.apple.foundationdb.record.lucene.synonym.SynonymAnalyzer; @@ -32,6 +33,7 @@ import com.apple.foundationdb.record.metadata.IndexTypes; import com.apple.foundationdb.record.metadata.expressions.GroupingKeyExpression; import com.apple.foundationdb.record.metadata.expressions.KeyExpression; +import com.apple.foundationdb.record.provider.common.StoreTimer; import com.apple.foundationdb.record.provider.common.text.AllSuffixesTextTokenizer; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; @@ -47,16 +49,21 @@ import com.apple.foundationdb.record.query.plan.cascades.debug.Debugger; import com.apple.foundationdb.record.query.plan.debug.DebuggerWithSymbolTables; import com.apple.foundationdb.record.util.pair.Pair; +import com.apple.foundationdb.tuple.Tuple; import com.google.auto.service.AutoService; +import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import org.apache.lucene.analysis.en.EnglishAnalyzer; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.Sort; +import org.junit.jupiter.api.Assertions; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -64,6 +71,7 @@ import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; +import java.util.stream.Collectors; import java.util.stream.Stream; import static com.apple.foundationdb.record.metadata.Key.Expressions.concat; @@ -848,6 +856,46 @@ public static void rebalancePartitions(final FDBRecordStore recordStore, final I } } + public static List getPartitionMeta(Index index, + Tuple groupingKey, + FDBRecordStore store) { + LuceneIndexMaintainer indexMaintainer = (LuceneIndexMaintainer)store.getIndexMaintainer(index); + return indexMaintainer.getPartitioner().getAllPartitionMetaInfo(groupingKey).join(); + } + + public static StoreTimer.Counter getCounter(@Nonnull final FDBRecordContext recordContext, @Nonnull final StoreTimer.Event event) { + return Verify.verifyNotNull(recordContext.getTimer()).getCounter(event); + } + + public static Map getSegmentCounts(Index index, + Tuple groupingKey, + FDBRecordStore store) { + final List partitionMeta = getPartitionMeta(index, groupingKey, store); + return partitionMeta.stream() + .collect(Collectors.toMap( + LucenePartitionInfoProto.LucenePartitionInfo::getId, + partitionInfo -> Assertions.assertDoesNotThrow(() -> + getIndexReader(index, groupingKey, partitionInfo.getId(), store).getContext().leaves().size()) + )); + } + + public static IndexReader getIndexReader(final Index index, + final Tuple groupingKey, + final int partitionId, + FDBRecordStore store) throws IOException { + final FDBDirectoryManager manager = getDirectoryManager(index, store); + return manager.getIndexReader(groupingKey, partitionId); + } + + public static FDBDirectoryManager getDirectoryManager(final Index index, final FDBRecordStore store) { + return getIndexMaintainer(index, store).getDirectoryManager(); + } + + @Nonnull + public static LuceneIndexMaintainer getIndexMaintainer(final Index index, final FDBRecordStore store) { + return (LuceneIndexMaintainer)store.getIndexMaintainer(index); + } + /** * A testing analyzer factory to verify the logic for {@link AnalyzerChooser}. */