diff --git a/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/ScaledFloatFieldMapper.java b/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/ScaledFloatFieldMapper.java index be20f1a3aec84..4c101a02d01d6 100644 --- a/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/ScaledFloatFieldMapper.java +++ b/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/ScaledFloatFieldMapper.java @@ -498,7 +498,7 @@ protected void parseCreateField(ParseContext context) throws IOException { } long scaledValue = Math.round(doubleValue * scalingFactor); - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), scaledValue); } else { List fields = NumberFieldMapper.NumberType.LONG.createFields( diff --git a/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/TokenCountFieldMapper.java b/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/TokenCountFieldMapper.java index ab19445e865fe..ab5aefb21ff4a 100644 --- a/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/TokenCountFieldMapper.java +++ b/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/TokenCountFieldMapper.java @@ -185,7 +185,7 @@ protected void parseCreateField(ParseContext context) throws IOException { tokenCount = countPositions(analyzer, name(), value, enablePositionIncrements); } - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), tokenCount); } else { context.doc() diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java index 11ab030bdc9ab..6f0fb74a481e2 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -126,8 +126,8 @@ protected void doClose() { } }; } catch (Exception ex) { - logger.error("Failed to acquire searcher {}", ex.toString(), ex); - // TODO + logger.error("Failed to acquire searcher", ex); + throw new RuntimeException(ex); } return searcher; } diff --git a/plugins/mapper-size/src/main/java/org/opensearch/index/mapper/size/SizeFieldMapper.java b/plugins/mapper-size/src/main/java/org/opensearch/index/mapper/size/SizeFieldMapper.java index ab355883c841e..e8622fdf1a271 100644 --- a/plugins/mapper-size/src/main/java/org/opensearch/index/mapper/size/SizeFieldMapper.java +++ b/plugins/mapper-size/src/main/java/org/opensearch/index/mapper/size/SizeFieldMapper.java @@ -100,7 +100,7 @@ public void postParse(ParseContext context) throws IOException { } final int value = context.sourceToParse().source().length(); - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), value); } else { context.doc().addAll(NumberType.INTEGER.createFields(name(), value, true, true, false, true)); diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 8a5eafef4a10a..7d2f7c537ac62 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -289,6 +289,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_DERIVED_SOURCE_SETTING, IndexSettings.INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING, + IndexSettings.OPTIMIZED_INDEX_ENABLED_SETTING, + // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 2abdf79584d82..6effbcfaaa319 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -810,6 +810,13 @@ public static IndexMergePolicy fromString(String text) { Property.Dynamic ); + public static final Setting OPTIMIZED_INDEX_ENABLED_SETTING = Setting.boolSetting( + "index.optimized.enabled", + false, + Property.IndexScope, + Property.Final + ); + private final Index index; private final Version version; private final Logger logger; @@ -955,6 +962,8 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { */ private volatile boolean isStarTreeIndexEnabled; + private final boolean isOptimizedIndex; + /** * Returns the default search fields for this index. */ @@ -1119,6 +1128,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING)); isCompositeIndex = scopedSettings.get(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING); isStarTreeIndexEnabled = scopedSettings.get(StarTreeIndexSettings.STAR_TREE_SEARCH_ENABLED_SETTING); + isOptimizedIndex = scopedSettings.get(OPTIMIZED_INDEX_ENABLED_SETTING); + scopedSettings.addSettingsUpdateConsumer( TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING, tieredMergePolicyProvider::setNoCFSRatio @@ -2166,4 +2177,6 @@ public boolean isDerivedSourceEnabledForTranslog() { public boolean isDerivedSourceEnabled() { return derivedSourceEnabled; } + + public boolean isOptimizedIndex() { return isOptimizedIndex; } } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java b/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java index 6a958d9606748..932c1c9151d83 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java @@ -18,6 +18,7 @@ import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.engine.Segment; +import org.opensearch.index.engine.exec.composite.CompositeDataFormatWriter; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogManager; @@ -233,6 +234,14 @@ default void maybeDie(final Logger logger, final String maybeMessage, final Thro }); } + default CompositeDataFormatWriter.CompositeDocumentInput documentInput() { + return null; + } + + default long getNativeBytesUsed() { + return 0; + } + /** * Event listener for the engine * diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java index b47f82d815e52..e8c77a0d11984 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java @@ -409,7 +409,8 @@ LocalCheckpointTracker getLocalCheckpointTracker() { return readEngines.values().stream().filter(list -> !list.isEmpty()).findFirst().map(List::getFirst).orElse(null); } - public CompositeDataFormatWriter.CompositeDocumentInput documentInput() throws IOException { + @Override + public CompositeDataFormatWriter.CompositeDocumentInput documentInput() { return engine.createCompositeWriter().newDocumentInput(); } diff --git a/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java index 1239cf57fe447..a53e9b1000fa7 100644 --- a/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java @@ -206,7 +206,7 @@ protected void parseCreateField(ParseContext context) throws IOException { return; } - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), value); } else { if (stored) { diff --git a/server/src/main/java/org/opensearch/index/mapper/BooleanFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/BooleanFieldMapper.java index ad702460f50af..6ee3de6187557 100644 --- a/server/src/main/java/org/opensearch/index/mapper/BooleanFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/BooleanFieldMapper.java @@ -390,7 +390,7 @@ protected void parseCreateField(ParseContext context) throws IOException { return; } - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), value); } else { if (indexed) { diff --git a/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java index b997e5d289cae..112fd32d5814e 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java @@ -849,7 +849,7 @@ protected void parseCreateField(ParseContext context) throws IOException { } } - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), timestamp); } else { if (indexed) { diff --git a/server/src/main/java/org/opensearch/index/mapper/DocCountFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/DocCountFieldMapper.java index cbf5cd22df2c2..3327022efb2d1 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DocCountFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/DocCountFieldMapper.java @@ -150,7 +150,7 @@ protected void parseCreateField(ParseContext context) throws IOException { throw new IllegalArgumentException("Field [" + fieldType().name() + "] must be a positive integer."); } - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), value); } else { final Field docCount = new NumericDocValuesField(NAME, value); diff --git a/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java index 3dd2d614b3795..b0e3173d41607 100644 --- a/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java @@ -337,7 +337,7 @@ protected final void createFieldNamesField(ParseContext context) { FieldNamesFieldType fieldNamesFieldType = context.docMapper().metadataMapper(FieldNamesFieldMapper.class).fieldType(); if (fieldNamesFieldType != null && fieldNamesFieldType.isEnabled()) { for (String fieldName : FieldNamesFieldMapper.extractFieldNames(fieldType().name())) { - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldNamesFieldType, fieldName); } else { context.doc().add(new Field(FieldNamesFieldMapper.NAME, fieldName, FieldNamesFieldMapper.Defaults.FIELD_TYPE)); @@ -346,8 +346,8 @@ protected final void createFieldNamesField(ParseContext context) { } } - protected final boolean isPluggableDataFormatFeatureEnabled() { - return FeatureFlags.isEnabled(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG); + protected final boolean isPluggableDataFormatFeatureEnabled(ParseContext parseContext) { + return FeatureFlags.isEnabled(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG) && parseContext.indexSettings().isOptimizedIndex(); } @Override diff --git a/server/src/main/java/org/opensearch/index/mapper/IdFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/IdFieldMapper.java index 9e36ea6e6b9b2..cd59cda9b0133 100644 --- a/server/src/main/java/org/opensearch/index/mapper/IdFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/IdFieldMapper.java @@ -298,7 +298,7 @@ private IdFieldMapper(Supplier fieldDataEnabled) { @Override public void preParse(ParseContext context) { BytesRef id = Uid.encodeId(context.sourceToParse().id()); - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), id); } else { context.doc().add(new Field(NAME, id, Defaults.FIELD_TYPE)); diff --git a/server/src/main/java/org/opensearch/index/mapper/IgnoredFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/IgnoredFieldMapper.java index 8c95702b281a0..41f779f800a2c 100644 --- a/server/src/main/java/org/opensearch/index/mapper/IgnoredFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/IgnoredFieldMapper.java @@ -114,7 +114,7 @@ private IgnoredFieldMapper() { @Override public void postParse(ParseContext context) { for (String field : context.getIgnoredFields()) { - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), field); } else { context.doc().add(new Field(NAME, field, Defaults.FIELD_TYPE)); diff --git a/server/src/main/java/org/opensearch/index/mapper/IpFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/IpFieldMapper.java index b58f20b3642a7..f44757487e88c 100644 --- a/server/src/main/java/org/opensearch/index/mapper/IpFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/IpFieldMapper.java @@ -665,7 +665,7 @@ protected void parseCreateField(ParseContext context) throws IOException { } } - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), address); } else { if (indexed && hasDocValues) { diff --git a/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java index 7514345e3ecf4..b81d6e83c84e2 100644 --- a/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java @@ -862,7 +862,7 @@ protected void parseCreateField(ParseContext context) throws IOException { value = normalizeValue(normalizer, name(), value); } - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), value); } else { // convert to utf8 only once before feeding postings/dv/stored fields diff --git a/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java index a03e69a05acce..f6c306c6daa92 100644 --- a/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java @@ -2183,7 +2183,7 @@ protected void parseCreateField(ParseContext context) throws IOException { numericValue = fieldType().type.parse(value, coerce.value()); } - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), numericValue); } else { context.doc().addAll(fieldType().type.createFields(fieldType().name(), numericValue, indexed, hasDocValues, skiplist, stored)); diff --git a/server/src/main/java/org/opensearch/index/mapper/RoutingFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/RoutingFieldMapper.java index 46a1b6a76f1ec..97982655eea98 100644 --- a/server/src/main/java/org/opensearch/index/mapper/RoutingFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/RoutingFieldMapper.java @@ -148,7 +148,7 @@ public boolean required() { public void preParse(ParseContext context) { String routing = context.sourceToParse().routing(); if (routing != null) { - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), routing); } else { context.doc().add(new Field(fieldType().name(), routing, Defaults.FIELD_TYPE)); diff --git a/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java index edab9b64488d2..26d091bf9cb48 100644 --- a/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java @@ -1046,7 +1046,7 @@ protected void parseCreateField(ParseContext context) throws IOException { return; } - if (isPluggableDataFormatFeatureEnabled()) { + if (isPluggableDataFormatFeatureEnabled(context)) { context.compositeDocumentInput().addField(fieldType(), value); } else { if (fieldType.indexOptions() != IndexOptions.NONE || fieldType.stored()) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 29953f38c7c42..813e2253c5041 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1134,8 +1134,9 @@ public Engine.IndexResult applyIndexOperationOnPrimary( boolean isRetry ) throws IOException { assert versionType.validateVersionForWrites(version); + Indexer indexer = getIndexer(); return applyIndexOperation( - getIndexingExecutionCoordinator(), + indexer, UNASSIGNED_SEQ_NO, getOperationPrimaryTerm(), version, @@ -1147,7 +1148,7 @@ public Engine.IndexResult applyIndexOperationOnPrimary( Engine.Operation.Origin.PRIMARY, sourceToParse, null, - currentCompositeEngineReference.get()::documentInput + indexer::documentInput ); } @@ -1264,7 +1265,7 @@ public static Engine.Index prepareIndex( CompositeDataFormatWriter.CompositeDocumentInput documentInput ) { long startTime = System.nanoTime(); - ParsedDocument doc = docMapper.getDocumentMapper().parse(source, documentInput);; + ParsedDocument doc = docMapper.getDocumentMapper().parse(source, documentInput); if (docMapper.getMapping() != null) { doc.addDynamicMappingsUpdate(docMapper.getMapping()); } @@ -1493,8 +1494,7 @@ public void refresh(String source) { if (logger.isTraceEnabled()) { logger.trace("refresh with source [{}]", source); } - getIndexingExecutionCoordinator().refresh(source); -// getIndexer().refresh(source); + getIndexer().refresh(source); } /** @@ -1647,7 +1647,7 @@ public void flush(FlushRequest request) { */ verifyNotClosed(); final long time = System.nanoTime(); - getIndexingExecutionCoordinator().flush(force, waitIfOngoing); + getIndexer().flush(force, waitIfOngoing); flushMetric.inc(System.nanoTime() - time); } @@ -1660,7 +1660,7 @@ public void trimTranslog() { return; } verifyNotClosed(); - currentCompositeEngineReference.get().translogManager().trimUnreferencedTranslogFiles(); + getIndexer().translogManager().trimUnreferencedTranslogFiles(); } /** @@ -1676,7 +1676,7 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException { if (logger.isTraceEnabled()) { logger.trace("force merge with {}", forceMerge); } - Indexer engine = currentCompositeEngineReference.get(); + Indexer engine = getIndexer(); engine.forceMerge( forceMerge.flush(), forceMerge.maxNumSegments(), @@ -2191,7 +2191,7 @@ public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scop readAllowed(); markSearcherAccessed(); final Engine engine = getEngine(); - currentCompositeEngineReference.get().getPrimaryReadEngine().acquireSearcherSupplier(null, scope); + //currentCompositeEngineReference.get().getPrimaryReadEngine().acquireSearcherSupplier(null, scope); return engine.acquireSearcherSupplier(this::wrapSearcher, scope); } @@ -2335,7 +2335,6 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO } finally { final CompositeEngine compositeEngine = this.currentCompositeEngineReference.getAndSet(null); final Engine engine = this.currentEngineReference.getAndSet(null); - getIndexingExecutionCoordinator().close(); try { if (engine != null && flushEngine) { engine.flushAndClose(); @@ -2690,7 +2689,7 @@ private Engine.Result applyTranslogOperation(Indexer engine, Translog.Operation index.routing() ), index.id(), - currentCompositeEngineReference.get()::documentInput + getIndexer()::documentInput ); break; case DELETE: @@ -2898,18 +2897,20 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b } // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); - CompositeEngine compositeEngine = new CompositeEngine( - config, - mapperService, - pluginsService, - indexSettings, - path, - LocalCheckpointTracker::new, - TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER - ); + if (indexSettings.isOptimizedIndex()) { + CompositeEngine compositeEngine = new CompositeEngine( + config, + mapperService, + pluginsService, + indexSettings, + path, + LocalCheckpointTracker::new, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER + ); + currentCompositeEngineReference.set(compositeEngine); + } onNewEngine(newEngine); currentEngineReference.set(newEngine); - currentCompositeEngineReference.set(compositeEngine); if (indexSettings.isSegRepEnabledOrRemoteNode()) { // set initial replication checkpoints into tracker. @@ -3123,7 +3124,7 @@ public long getIndexBufferRAMBytesUsed() { } public long getNativeBytesUsed() { - return getIndexingExecutionCoordinator().getNativeBytesUsed(); + return getIndexer().getNativeBytesUsed(); } public void addShardFailureCallback(Consumer onShardFailure) { @@ -3461,7 +3462,7 @@ private void handleRefreshException(Exception e) { */ public void writeIndexingBuffer() { try { - getIndexingExecutionCoordinator().writeIndexingBuffer(); + getIndexer().writeIndexingBuffer(); } catch (Exception e) { handleRefreshException(e); } @@ -4025,11 +4026,11 @@ private void doCheckIndex() throws IOException { public Indexer getIndexer() { - return getIndexingExecutionCoordinator(); + return indexSettings.isOptimizedIndex() ? getIndexingExecutionCoordinator() : currentEngineReference.get(); } public CheckpointState getCheckpointState() { - return getIndexingExecutionCoordinator(); + return (CheckpointState) getIndexer(); } public StatsHolder getStatsHolder() { @@ -4050,7 +4051,7 @@ public Engine getEngine() { protected Indexer getIndexerOrNull() { - return getIndexingExecutionCoordinator(); + return getIndexer(); } public CheckpointState getCheckpointStateOrNull() { @@ -5000,19 +5001,26 @@ && isSearchIdle() // lets skip this refresh since we are search idle and // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will // cause the next schedule to refresh. -// final Engine engine = getEngine(); -// engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some -// setRefreshPending(engine); -// return false; - getIndexingExecutionCoordinator().refresh("schedule"); - return true; + // TODO : Merge into a common refresh method via Indexer + if (indexSettings.isOptimizedIndex()) { + getIndexingExecutionCoordinator().refresh("schedule"); + return true; + } else { + final Engine engine = getEngine(); + engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some + setRefreshPending(engine); + return false; + } } else { if (logger.isTraceEnabled()) { logger.trace("refresh with source [schedule]"); } - getIndexingExecutionCoordinator().refresh("schedule"); - return true; -// return getEngine().maybeRefresh("schedule"); + if (indexSettings.isOptimizedIndex()) { + getIndexingExecutionCoordinator().refresh("schedule"); + return true; + } else { + return getEngine().maybeRefresh("schedule"); + } } } final Engine engine = getEngine(); diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 52863c08f0788..4f5dfd0fe22ce 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -225,7 +225,6 @@ final class DefaultSearchContext extends SearchContext { private boolean isStreamSearch; private StreamSearchChannelListener listener; - private Map dfResults; private final SetOnce cachedFlushMode = new SetOnce<>(); DefaultSearchContext( @@ -1313,11 +1312,4 @@ public long getStreamingMinEstimatedBucketCount() { return clusterService.getClusterSettings().get(STREAMING_MIN_ESTIMATED_BUCKET_COUNT); } - public void setDFResults(Map dfResults) { - this.dfResults = dfResults; - } - - public Map getDFResults() { - return dfResults; - } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index ee5061dd09e51..f7e59b0651c86 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -85,6 +85,7 @@ import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineSearcherSupplier; import org.opensearch.index.engine.SearchExecEngine; +import org.opensearch.index.engine.exec.coord.CompositeEngine; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; import org.opensearch.index.query.InnerHitContextBuilder; @@ -833,10 +834,9 @@ private SearchPhaseResult executeQueryPhase( // Till here things are generic but for datafusion , we need to abstract out and get the read engine specific implementation // it could be reusing existing final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext); + CompositeEngine compositeEngine = readerContext.indexShard().getIndexingExecutionCoordinator(); @SuppressWarnings("unchecked") - SearchExecEngine searchExecEngine = readerContext.indexShard() - .getIndexingExecutionCoordinator() - .getPrimaryReadEngine(); + SearchExecEngine searchExecEngine = compositeEngine != null ? compositeEngine.getPrimaryReadEngine() : null; SearchShardTarget shardTarget = new SearchShardTarget( clusterService.localNode().getId(), readerContext.indexShard().shardId(), @@ -856,9 +856,9 @@ private SearchPhaseResult executeQueryPhase( //context.aggregations(context1.aggregations()); // TODO Execute plan here // TODO : figure out how to tie this - byte[] substraitQuery = request.source().queryPlanIR(); context.queryResult().from(context.from()); context.queryResult().size(context.size()); + byte[] substraitQuery = request.source().queryPlanIR(); if (substraitQuery != null) { // setDFResults in context Map result = searchExecEngine.executeQueryPhase(context); @@ -1460,14 +1460,16 @@ private SearchContext createContext( request.getClusterAlias(), OriginalIndices.NONE ); - SearchContext context = searchExecEngine.createContext(readerContext, request, shardTarget, task, bigArrays, originalContext); + SearchContext context = searchExecEngine == null ? originalContext : searchExecEngine.createContext(readerContext, request, shardTarget, task, bigArrays, originalContext); try { if (request.scroll() != null) { context.scrollContext().scroll = request.scroll(); } // FIXME : We don't need to do both, but commenting the one on Datafusion Context hangs up the JVM need to debug. parseSource(context, request.source(), includeAggregations); - parseSource(context.getOriginalContext(), request.source(), includeAggregations); + if (searchExecEngine != null) { + parseSource(context.getOriginalContext(), request.source(), includeAggregations); + } // if the from and size are still not set, default them if (context.from() == -1) { diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index c86f540dc759d..4bdba1b0ff43d 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -157,7 +157,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep // here to make sure it happens during the QUERY phase aggregationProcessor.preProcess(searchContext.getOriginalContext()); - if(Optional.ofNullable(searchContext.queryResult().topDocs().topDocs.totalHits).isEmpty() || searchContext.queryResult().topDocs().topDocs.totalHits.value() == 0) { + if(searchContext.queryResult().hasConsumedTopDocs() || Optional.ofNullable(searchContext.queryResult().topDocs().topDocs.totalHits).isEmpty() || searchContext.queryResult().topDocs().topDocs.totalHits.value() == 0) { searchContext.queryResult() .topDocs( new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN), @@ -165,16 +165,17 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep ); } - // boolean rescore = executeInternal(searchContext, queryPhaseSearcher); - - // Post process - SearchEngineResultConversionUtils.convertDFResultGeneric(searchContext); + if (searchContext.getDFResults() != null && searchContext.getDFResults().isEmpty() == false) { + SearchEngineResultConversionUtils.convertDFResultGeneric(searchContext); + } else { + boolean rescore = executeInternal(searchContext, queryPhaseSearcher); + if (rescore) { // only if we do a regular search + rescoreProcessor.process(searchContext); + } + suggestProcessor.process(searchContext); + } - // if (rescore) { // only if we do a regular search - // rescoreProcessor.process(searchContext); - // } - // suggestProcessor.process(searchContext); - aggregationProcessor.postProcess(searchContext); + aggregationProcessor.postProcess(searchContext); if (searchContext.getProfilers() != null) { ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(