From dc79a65cd0b23469b2a2b1651868c2a288cd7253 Mon Sep 17 00:00:00 2001 From: bharath-techie Date: Tue, 21 Oct 2025 22:57:32 +0530 Subject: [PATCH] Pluggable engine with interfaces for searcher and reader Signed-off-by: bharath-techie --- libs/engine-dataformat-commons/build.gradle | 30 + .../execution/search/DataFormat.java | 27 + .../execution/search/RecordBatchStream.java | 44 + .../execution/search/spi/DataFormatCodec.java | 29 + plugins/engine-datafusion/build.gradle | 105 +++ .../datafusion/DataFusionPlugin.java | 108 +++ .../datafusion/DataSourceRegistry.java | 73 ++ .../datafusion/DatafusionEngine.java | 176 ++++ .../datafusion/DatafusionService.java | 109 +++ .../datafusion/core/GlobalRuntimeEnv.java | 44 + .../datafusion/search/DatafusionContext.java | 811 ++++++++++++++++++ .../datafusion/search/DatafusionQuery.java | 34 + .../datafusion/search/DatafusionReader.java | 88 ++ .../search/DatafusionReaderManager.java | 67 ++ .../datafusion/search/DatafusionSearcher.java | 60 ++ .../search/DatafusionSearcherSupplier.java | 55 ++ .../search/DefaultRecordBatchStream.java | 118 +++ .../datafusion/search/SearchExecutor.java | 17 + .../search/SearchResultIterator.java | 21 + plugins/parquet-data-format/build.gradle | 163 ++++ .../parquet/ParquetDataFormatPlugin.java | 33 + .../parquet/engine/ParquetDataFormat.java | 15 + server/build.gradle | 1 + .../org/opensearch/index/engine/Engine.java | 17 +- .../index/engine/InternalEngine.java | 4 +- .../index/engine/NRTReplicationEngine.java | 2 +- .../index/engine/ReadOnlyEngine.java | 2 +- .../exec/bridge/SearcherOperations.java | 50 ++ .../engine/exec/engine/FileMetadata.java | 16 + .../engine/exec/engine/RefreshResult.java | 34 + .../index/engine/exec/format/DataFormat.java | 51 ++ .../engine/exec/manage/CatalogSnapshot.java | 134 +++ .../CatalogSnapshotAwareRefreshListener.java | 30 + .../engine/exec/read/EngineReaderManager.java | 27 + .../engine/exec/read/EngineSearcher.java | 34 + .../exec/read/EngineSearcherSupplier.java | 38 + .../engine/exec/read/SearchExecEngine.java | 48 ++ .../main/java/org/opensearch/node/Node.java | 34 + .../opensearch/plugins/DataFormatPlugin.java | 26 + .../plugins/SearchEnginePlugin.java | 64 ++ 40 files changed, 2830 insertions(+), 9 deletions(-) create mode 100644 libs/engine-dataformat-commons/build.gradle create mode 100644 libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/DataFormat.java create mode 100644 libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/RecordBatchStream.java create mode 100644 libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/spi/DataFormatCodec.java create mode 100644 plugins/engine-datafusion/build.gradle create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataSourceRegistry.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionService.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionQuery.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcherSupplier.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DefaultRecordBatchStream.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/SearchExecutor.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/SearchResultIterator.java create mode 100644 plugins/parquet-data-format/build.gradle create mode 100644 plugins/parquet-data-format/src/main/java/com/format/parquet/ParquetDataFormatPlugin.java create mode 100644 plugins/parquet-data-format/src/main/java/com/format/parquet/engine/ParquetDataFormat.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/bridge/SearcherOperations.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/engine/FileMetadata.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/engine/RefreshResult.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/format/DataFormat.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/manage/CatalogSnapshot.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/read/CatalogSnapshotAwareRefreshListener.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/read/EngineReaderManager.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/read/EngineSearcher.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/read/EngineSearcherSupplier.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/read/SearchExecEngine.java create mode 100644 server/src/main/java/org/opensearch/plugins/DataFormatPlugin.java create mode 100644 server/src/main/java/org/opensearch/plugins/SearchEnginePlugin.java diff --git a/libs/engine-dataformat-commons/build.gradle b/libs/engine-dataformat-commons/build.gradle new file mode 100644 index 0000000000000..d45e47381919b --- /dev/null +++ b/libs/engine-dataformat-commons/build.gradle @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +apply plugin: 'opensearch.build' + +description = 'Common interfaces and SPIs for pluggable engines and data formats' + +dependencies { + api project(':libs:opensearch-core') + api project(':libs:opensearch-common') + + testImplementation(project(":test:framework")) { + exclude group: 'org.opensearch', module: 'engine-dataformat-commons' + } +} + +tasks.named('forbiddenApisMain').configure { + replaceSignatureFiles 'jdk-signatures' +} + +jarHell.enabled = false + +test { + systemProperty 'tests.security.manager', 'false' +} diff --git a/libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/DataFormat.java b/libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/DataFormat.java new file mode 100644 index 0000000000000..fdb35fda59a92 --- /dev/null +++ b/libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/DataFormat.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.execution.search; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.util.List; +import java.util.Map; + +/** + * Represents a data format. + */ +@ExperimentalApi +public interface DataFormat { + + /** + * + * @return name identifier for the data format. + */ + String name(); +} diff --git a/libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/RecordBatchStream.java b/libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/RecordBatchStream.java new file mode 100644 index 0000000000000..067a91a7a7664 --- /dev/null +++ b/libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/RecordBatchStream.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.execution.search; + +import java.util.concurrent.CompletableFuture; + +/** + * Represents a stream of record batches from a DataFusion query execution. + * This interface provides access to query results in a streaming fashion. + */ +public interface RecordBatchStream extends AutoCloseable { + + /** + * Check if there are more record batches available in the stream. + * + * @return true if more batches are available, false otherwise + */ + boolean hasNext(); + + /** + * Get the schema of the record batches in this stream. + * @return the schema object + */ + Object getSchema(); + + /** + * Get the next record batch from the stream. + * + * @return the next record batch or null if no more batches + */ + CompletableFuture next(); + + /** + * Close the stream and free the associated resources. + */ + @Override + void close(); +} diff --git a/libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/spi/DataFormatCodec.java b/libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/spi/DataFormatCodec.java new file mode 100644 index 0000000000000..2ff2c7d2d8001 --- /dev/null +++ b/libs/engine-dataformat-commons/src/main/java/org/opensearch/execution/search/spi/DataFormatCodec.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.execution.search.spi; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.execution.search.DataFormat; + +import java.util.concurrent.CompletableFuture; + +/** + * Service Provider Interface for data source codecs. + * Contains configurations , optimizers etc to support query of different data formats + * through the pluggable engines. + */ +@ExperimentalApi +public interface DataFormatCodec { + /** + * Returns the data format name + */ + DataFormat getDataFormat(); + + CompletableFuture closeSessionContext(long sessionContextId); +} diff --git a/plugins/engine-datafusion/build.gradle b/plugins/engine-datafusion/build.gradle new file mode 100644 index 0000000000000..beedc1e82dba1 --- /dev/null +++ b/plugins/engine-datafusion/build.gradle @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +apply plugin: 'java' +apply plugin: 'idea' +apply plugin: 'opensearch.internal-cluster-test' +apply plugin: 'opensearch.yaml-rest-test' +apply plugin: 'opensearch.pluginzip' + +def pluginName = 'engine-datafusion' +def pluginDescription = 'OpenSearch plugin providing access to DataFusion via JNI' +def projectPath = 'org.opensearch' +def pathToPlugin = 'datafusion.DataFusionPlugin' +def pluginClassName = 'DataFusionPlugin' + +opensearchplugin { + name = pluginName + description = pluginDescription + classname = "${projectPath}.${pathToPlugin}" + licenseFile = rootProject.file('LICENSE.txt') + noticeFile = rootProject.file('NOTICE.txt') +} + +dependencies { + api project(':libs:opensearch-engine-dataformat-commons') + implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}" + implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}" + + // Bundle Jackson in the plugin JAR using 'api' like other OpenSearch plugins + api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" + api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" + api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" + + // Apache Arrow dependencies for memory management + implementation "org.apache.arrow:arrow-memory-core:17.0.0" + implementation "org.apache.arrow:arrow-memory-unsafe:17.0.0" + implementation "org.apache.arrow:arrow-vector:17.0.0" + implementation "org.apache.arrow:arrow-c-data:17.0.0" + implementation "org.apache.arrow:arrow-format:17.0.0" + // SLF4J API for Arrow logging compatibility + implementation "org.slf4j:slf4j-api:${versions.slf4j}" + // CheckerFramework annotations required by Arrow 17.0.0 + implementation "org.checkerframework:checker-qual:3.42.0" + // FlatBuffers dependency required by Arrow 17.0.0 + implementation "com.google.flatbuffers:flatbuffers-java:23.5.26" + + testImplementation "junit:junit:${versions.junit}" + testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}" + testImplementation "org.mockito:mockito-core:${versions.mockito}" + + // Add CSV plugin for testing + // testImplementation project(':plugins:dataformat-csv') +} + +yamlRestTest { + systemProperty 'tests.security.manager', 'false' + // Disable yamlRestTest since this plugin doesn't have REST API endpoints + enabled = false +} + +tasks.named("dependencyLicenses").configure { + mapping from: /jackson-.*/, to: 'jackson' + mapping from: /arrow-.*/, to: 'arrow' + mapping from: /slf4j-.*/, to: 'slf4j-api' + mapping from: /checker-qual.*/, to: 'checker-qual' + mapping from: /flatbuffers-.*/, to: 'flatbuffers-java' +} + +// Configure third party audit to handle Apache Arrow dependencies +tasks.named('thirdPartyAudit').configure { + ignoreMissingClasses( + // Apache Commons Codec (missing dependency) + 'org.apache.commons.codec.binary.Hex' + ) + ignoreViolations( + // Apache Arrow internal classes that use Unsafe operations + 'org.apache.arrow.memory.ArrowBuf', + 'org.apache.arrow.memory.unsafe.UnsafeAllocationManager', + 'org.apache.arrow.memory.util.ByteFunctionHelpers', + 'org.apache.arrow.memory.util.MemoryUtil', + 'org.apache.arrow.memory.util.MemoryUtil$1', + 'org.apache.arrow.memory.util.hash.MurmurHasher', + 'org.apache.arrow.memory.util.hash.SimpleHasher', + 'org.apache.arrow.vector.BaseFixedWidthVector', + 'org.apache.arrow.vector.BitVectorHelper', + 'org.apache.arrow.vector.Decimal256Vector', + 'org.apache.arrow.vector.DecimalVector', + 'org.apache.arrow.vector.util.DecimalUtility', + 'org.apache.arrow.vector.util.VectorAppender' + ) +} + +// Configure Javadoc to skip package documentation requirements ie package-info.java +missingJavadoc { + javadocMissingIgnore = [ + 'org.opensearch.datafusion', + 'org.opensearch.datafusion.action', + 'org.opensearch.datafusion.core' + ] +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java new file mode 100644 index 0000000000000..94cdb85efcc69 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion; + +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.execution.search.spi.DataFormatCodec; +import org.opensearch.index.engine.exec.engine.FileMetadata; +import org.opensearch.index.engine.exec.format.DataFormat; +import org.opensearch.index.engine.exec.read.SearchExecEngine; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SearchEnginePlugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.client.Client; +import org.opensearch.watcher.ResourceWatcherService; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Datafusion query engine plugin that enables datafusion to perform search + */ +public class DataFusionPlugin extends Plugin implements ActionPlugin, SearchEnginePlugin { + + private final boolean isDataFusionEnabled; + private DatafusionService datafusionService; + + /** + * Constructor for DataFusionPlugin. + * @param settings The settings for the DataFusionPlugin. + */ + public DataFusionPlugin(Settings settings) { + // For now, DataFusion is always enabled if the plugin is loaded + // In the future, this could be controlled by a feature flag + this.isDataFusionEnabled = true; + } + + /** + * Creates components for the DataFusion plugin. + * @param client The client instance. + * @param clusterService The cluster service instance. + * @param threadPool The thread pool instance. + * @param resourceWatcherService The resource watcher service instance. + * @param scriptService The script service instance. + * @param xContentRegistry The named XContent registry. + * @param environment The environment instance. + * @param nodeEnvironment The node environment instance. + * @param namedWriteableRegistry The named writeable registry. + * @param indexNameExpressionResolver The index name expression resolver instance. + * @param repositoriesServiceSupplier The supplier for the repositories service. + * @param dataFormatCodecs dataformat implementations + * @return Collection of created components + */ + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier, + Map dataFormatCodecs + ) { + if (!isDataFusionEnabled) { + return Collections.emptyList(); + } + datafusionService = new DatafusionService(dataFormatCodecs); + + // return Collections.emptyList(); + return Collections.singletonList(datafusionService); + } + + /** + * Creates a shard specific read engine + */ + @Override + public SearchExecEngine createEngine( + DataFormat dataFormat, + Collection formatCatalogSnapshot, + ShardPath shardPath + ) throws IOException { + return null; + } + +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataSourceRegistry.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataSourceRegistry.java new file mode 100644 index 0000000000000..36a9571a5434a --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataSourceRegistry.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.execution.search.spi.DataFormatCodec; +import org.opensearch.index.engine.exec.format.DataFormat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Registry for DataFusion data source codecs. + */ +public class DataSourceRegistry { + + private static final Logger logger = LogManager.getLogger(DataSourceRegistry.class); + + private final ConcurrentHashMap codecs = new ConcurrentHashMap<>(); + + public DataSourceRegistry(Map dataSourceCodecMap) { + codecs.putAll(dataSourceCodecMap); + } + + /** + * Check if any codecs are available. + * + * @return true if codecs are available, false otherwise + */ + public boolean hasCodecs() { + return !codecs.isEmpty(); + } + + /** + * Get the names of all registered codecs. + * + * @return list of codec names + */ + public List getCodecNames() { + return new ArrayList<>(codecs.keySet()); + } + + /** + * Get the default codec (first available codec). + * + * @return the default codec, or null if none available + */ + public DataFormatCodec getDefaultEngine() { + if (codecs.isEmpty()) { + return null; + } + return codecs.values().iterator().next(); + } + + /** + * Get a codec by name. + * + * @param name the codec name + * @return the codec, or null if not found + */ + public DataFormatCodec getCodec(String name) { + return codecs.get(name); + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java new file mode 100644 index 0000000000000..96be6d856e0bd --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.lease.Releasables; +import org.opensearch.common.util.BigArrays; +import org.opensearch.datafusion.search.DatafusionContext; +import org.opensearch.datafusion.search.DatafusionQuery; +import org.opensearch.datafusion.search.DatafusionReader; +import org.opensearch.datafusion.search.DatafusionReaderManager; +import org.opensearch.datafusion.search.DatafusionSearcher; +import org.opensearch.datafusion.search.DatafusionSearcherSupplier; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.EngineException; +import org.opensearch.index.engine.exec.engine.FileMetadata; +import org.opensearch.index.engine.exec.format.DataFormat; +import org.opensearch.index.engine.exec.read.CatalogSnapshotAwareRefreshListener; +import org.opensearch.index.engine.exec.read.EngineSearcherSupplier; +import org.opensearch.index.engine.exec.read.SearchExecEngine; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.ReaderContext; +import org.opensearch.search.internal.ShardSearchRequest; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +/** + * Base Datafusion engine for search + */ +public class DatafusionEngine extends SearchExecEngine { + + private static final Logger logger = LogManager.getLogger(DatafusionEngine.class); + + private DataFormat dataFormat; + private DatafusionReaderManager datafusionReaderManager; + private DatafusionService datafusionService; + + public DatafusionEngine( + DataFormat dataFormat, + Collection formatCatalogSnapshot, + DatafusionService dataFusionService, + ShardPath shardPath + ) throws IOException { + this.dataFormat = dataFormat; + + this.datafusionReaderManager = new DatafusionReaderManager( + shardPath.getDataPath().toString(), + formatCatalogSnapshot, + dataFormat.name() + ); + this.datafusionService = dataFusionService; + } + + @Override + public DatafusionContext createContext( + ReaderContext readerContext, + ShardSearchRequest request, + SearchShardTarget searchShardTarget, + SearchShardTask task, + BigArrays bigArrays + ) throws IOException { + DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, searchShardTarget, task, this, bigArrays); + // Parse source + datafusionContext.datafusionQuery(new DatafusionQuery(null/*TODO*/, new ArrayList<>())); + return datafusionContext; + } + + @Override + public EngineSearcherSupplier acquireSearcherSupplier(Function wrapper) + throws EngineException { + return acquireSearcherSupplier(wrapper, Engine.SearcherScope.EXTERNAL); + } + + @Override + public EngineSearcherSupplier acquireSearcherSupplier( + Function wrapper, + Engine.SearcherScope scope + ) throws EngineException { + // TODO : wrapper is ignored + EngineSearcherSupplier searcher = null; + // TODO : refcount needs to be revisited - add proper tests for exception etc + try { + DatafusionReader reader = datafusionReaderManager.acquire(); + searcher = new DatafusionSearcherSupplier(null) { + @Override + protected DatafusionSearcher acquireSearcherInternal(String source) { + return new DatafusionSearcher(source, reader, () -> {}); + } + + @Override + protected void doClose() { + try { + reader.decRef(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }; + } catch (Exception ex) { + // TODO + } + return searcher; + } + + @Override + public DatafusionSearcher acquireSearcher(String source) throws EngineException { + return acquireSearcher(source, Engine.SearcherScope.EXTERNAL); + } + + @Override + public DatafusionSearcher acquireSearcher(String source, Engine.SearcherScope scope) throws EngineException { + return acquireSearcher(source, scope, Function.identity()); + } + + @Override + public DatafusionSearcher acquireSearcher( + String source, + Engine.SearcherScope scope, + Function wrapper + ) throws EngineException { + DatafusionSearcherSupplier releasable = null; + try { + DatafusionSearcherSupplier searcherSupplier = releasable = (DatafusionSearcherSupplier) acquireSearcherSupplier(wrapper, scope); + DatafusionSearcher searcher = searcherSupplier.acquireSearcher(source); + releasable = null; + return new DatafusionSearcher(source, searcher.getReader(), () -> Releasables.close(searcher, searcherSupplier)); + } finally { + Releasables.close(releasable); + } + } + + @Override + public DatafusionReaderManager getReferenceManager(Engine.SearcherScope scope) { + return datafusionReaderManager; + } + + @Override + public CatalogSnapshotAwareRefreshListener getRefreshListener(Engine.SearcherScope scope) { + return datafusionReaderManager; + } + + @Override + public boolean assertSearcherIsWarmedUp(String source, Engine.SearcherScope scope) { + return false; + } + + @Override + public Map execute(DatafusionContext context) { + Map finalRes = new HashMap<>(); + try { + DatafusionSearcher datafusionSearcher = context.getEngineSearcher(); + datafusionSearcher.search(context.getDatafusionQuery()); + long streamPtr = context.getStreamNativePtr(); + // TODO : process stream to form result + } catch (Exception exception) { + logger.error("Failed to execute Substrait query plan", exception); + } + return finalRes; + } + +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionService.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionService.java new file mode 100644 index 0000000000000..4c58697fbb8a6 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionService.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.common.util.concurrent.ConcurrentMapLong; +import org.opensearch.datafusion.core.GlobalRuntimeEnv; +import org.opensearch.execution.search.spi.DataFormatCodec; +import org.opensearch.index.engine.exec.format.DataFormat; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Service for managing DataFusion contexts and search operations + */ +public class DatafusionService extends AbstractLifecycleComponent { + + private static final Logger logger = LogManager.getLogger(DatafusionService.class); + private final ConcurrentMapLong dataFormatCodecs = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); + + private final DataSourceRegistry dataSourceRegistry; + private final GlobalRuntimeEnv globalRuntimeEnv; + + /** + * Creates a new DataFusion service instance. + */ + public DatafusionService(Map dataSourceCodecs) { + this.dataSourceRegistry = new DataSourceRegistry(dataSourceCodecs); + this.globalRuntimeEnv = new GlobalRuntimeEnv(); + } + + @Override + protected void doStart() { + logger.info("Starting DataFusion service"); + try { + // Initialize the data source registry + // Test that at least one data source is available + if (!dataSourceRegistry.hasCodecs()) { + logger.warn("No data sources available"); + } else { + logger.info( + "DataFusion service started successfully with {} data sources: {}", + dataSourceRegistry.getCodecNames().size(), + dataSourceRegistry.getCodecNames() + ); + + } + } catch (Exception e) { + logger.error("Failed to start DataFusion service", e); + throw new RuntimeException("Failed to initialize DataFusion service", e); + } + } + + @Override + protected void doStop() { + logger.info("Stopping DataFusion service"); + + // Close all session contexts + for (Long sessionId : dataFormatCodecs.keySet()) { + try { + closeSessionContext(sessionId).get(); + } catch (Exception e) { + logger.warn("Error closing session context {}", sessionId, e); + } + } + dataFormatCodecs.clear(); + globalRuntimeEnv.close(); + logger.info("DataFusion service stopped"); + } + + @Override + protected void doClose() { + doStop(); + } + + public long getRuntimePointer() { + return globalRuntimeEnv.getPointer(); + } + + public long getTokioRuntimePointer() { + return globalRuntimeEnv.getTokioRuntimePtr(); + } + + /** + * Close the session context and clean up resources + * + * @param sessionContextId the session context ID to close + * @return future that completes when cleanup is done + */ + public CompletableFuture closeSessionContext(long sessionContextId) { + DataFormatCodec codec = dataFormatCodecs.remove(sessionContextId); + if (codec == null) { + logger.debug("Session context {} not found or already closed", sessionContextId); + return CompletableFuture.completedFuture(null); + } + + return codec.closeSessionContext(sessionContextId); + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java new file mode 100644 index 0000000000000..8edc18d9bdbff --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.core; + +/** + * Global runtime environment for DataFusion operations. + * Manages the lifecycle of the native DataFusion runtime. + */ +public class GlobalRuntimeEnv implements AutoCloseable { + // ptr to runtime environment in df + private final long ptr; + private final long tokio_runtime_ptr; + + /** + * Creates a new global runtime environment. + */ + public GlobalRuntimeEnv() { + this.ptr = 0; // todo + this.tokio_runtime_ptr = 0; // todo + } + + /** + * Gets the native pointer to the runtime environment. + * @return the native pointer + */ + public long getPointer() { + return ptr; + } + + public long getTokioRuntimePtr() { + return tokio_runtime_ptr; + } + + @Override + public void close() { + // TODO : close native runtimes + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java new file mode 100644 index 0000000000000..69fd26891e9bf --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java @@ -0,0 +1,811 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search; + +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.Query; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchType; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.BigArrays; +import org.opensearch.datafusion.DatafusionEngine; +import org.opensearch.index.IndexService; +import org.opensearch.index.cache.bitset.BitsetFilterCache; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.ObjectMapper; +import org.opensearch.index.query.ParsedQuery; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.search.SearchExtBuilder; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.aggregations.BucketCollectorProcessor; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.SearchContextAggregations; +import org.opensearch.search.collapse.CollapseContext; +import org.opensearch.search.dfs.DfsSearchResult; +import org.opensearch.search.fetch.FetchPhase; +import org.opensearch.search.fetch.FetchSearchResult; +import org.opensearch.search.fetch.StoredFieldsContext; +import org.opensearch.search.fetch.subphase.FetchDocValuesContext; +import org.opensearch.search.fetch.subphase.FetchFieldsContext; +import org.opensearch.search.fetch.subphase.FetchSourceContext; +import org.opensearch.search.fetch.subphase.ScriptFieldsContext; +import org.opensearch.search.fetch.subphase.highlight.SearchHighlightContext; +import org.opensearch.search.internal.ContextIndexSearcher; +import org.opensearch.search.internal.ReaderContext; +import org.opensearch.search.internal.ScrollContext; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.search.profile.Profilers; +import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.search.query.ReduceableSearchResult; +import org.opensearch.search.rescore.RescoreContext; +import org.opensearch.search.sort.SortAndFormats; +import org.opensearch.search.suggest.SuggestionSearchContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Search context for Datafusion engine + */ +public class DatafusionContext extends SearchContext { + private final ReaderContext readerContext; + private final ShardSearchRequest request; + private final SearchShardTask task; + private final DatafusionEngine readEngine; + private final DatafusionSearcher engineSearcher; + private final IndexShard indexShard; + private final QuerySearchResult queryResult; + private final FetchSearchResult fetchResult; + private final IndexService indexService; + private final QueryShardContext queryShardContext; + private DatafusionQuery datafusionQuery; + private Map dfResults; + private SearchContextAggregations aggregations; + private final BigArrays bigArrays; + private final Map, CollectorManager> queryCollectorManagers = new HashMap<>(); + private long streamNativePtr; + + /** + * Constructor + * @param readerContext The reader context + * @param request The shard search request + * @param task The search shard task + * @param engine The datafusion engine + */ + public DatafusionContext( + ReaderContext readerContext, + ShardSearchRequest request, + SearchShardTarget searchShardTarget, + SearchShardTask task, + DatafusionEngine engine, + BigArrays bigArrays + ) { + this.readerContext = readerContext; + this.indexShard = readerContext.indexShard(); + this.request = request; + this.task = task; + this.readEngine = engine; + this.engineSearcher = engine.acquireSearcher("search");// null;//TODO readerContext.contextEngineSearcher(); + this.queryResult = new QuerySearchResult(readerContext.id(), searchShardTarget, request); + this.fetchResult = new FetchSearchResult(readerContext.id(), searchShardTarget); + this.indexService = readerContext.indexService(); + this.queryShardContext = indexService.newQueryShardContext( + request.shardId().id(), + null, // TOOD : index searcher is null + request::nowInMillis, + searchShardTarget.getClusterAlias(), + false, // reevaluate the usage + false // specific to lucene + ); + this.bigArrays = bigArrays; + } + + /** + * Gets the read engine + * @return The datafusion engine + */ + public DatafusionEngine readEngine() { + return readEngine; + } + + /** + * Sets datafusion query + * @param datafusionQuery The datafusion query + */ + public DatafusionContext datafusionQuery(DatafusionQuery datafusionQuery) { + this.datafusionQuery = datafusionQuery; + return this; + } + + /** + * Gets the datafusion query + * @return The datafusion query + */ + public DatafusionQuery getDatafusionQuery() { + return datafusionQuery; + } + + /** + * Sets resultant stream pointer from datafusion + * @param streamNativePtr Result stream pointer + */ + public DatafusionContext streamPointer(long streamNativePtr) { + this.streamNativePtr = streamNativePtr; + return this; + } + + /** + * Returns the resultant stream pointer + * @return result stream pointer + */ + public long getStreamNativePtr() { + return streamNativePtr; + } + + /** + * Gets the engine searcher + * @return The datafusion searcher + */ + public DatafusionSearcher getEngineSearcher() { + return engineSearcher; + } + + /** + * {@inheritDoc} + * @param task The search shard task + */ + @Override + public void setTask(SearchShardTask task) { + + } + + @Override + public SearchShardTask getTask() { + return null; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + protected void doClose() { + + } + + /** + * {@inheritDoc} + * @param rewrite Whether to rewrite + */ + @Override + public void preProcess(boolean rewrite) { + + } + + /** + * {@inheritDoc} + * @param query The query + */ + @Override + public Query buildFilteredQuery(Query query) { + return null; + } + + @Override + public ShardSearchContextId id() { + return null; + } + + @Override + public String source() { + return ""; + } + + @Override + public ShardSearchRequest request() { + return request; + } + + @Override + public SearchType searchType() { + return null; + } + + @Override + public SearchShardTarget shardTarget() { + return null; + } + + @Override + public int numberOfShards() { + return 0; + } + + @Override + public float queryBoost() { + return 0; + } + + @Override + public ScrollContext scrollContext() { + return null; + } + + @Override + public SearchContextAggregations aggregations() { + return aggregations; + } + + /** + * {@inheritDoc} + * @param aggregations The search context aggregations + */ + @Override + public SearchContext aggregations(SearchContextAggregations aggregations) { + this.aggregations = aggregations; + return this; + } + + /** + * {@inheritDoc} + * @param searchExtBuilder The search extension builder + */ + @Override + public void addSearchExt(SearchExtBuilder searchExtBuilder) { + + } + + /** + * {@inheritDoc} + * @param name The name + */ + @Override + public SearchExtBuilder getSearchExt(String name) { + return null; + } + + @Override + public SearchHighlightContext highlight() { + return null; + } + + /** + * {@inheritDoc} + * @param highlight The search highlight context + */ + @Override + public void highlight(SearchHighlightContext highlight) { + + } + + @Override + public SuggestionSearchContext suggest() { + return null; + } + + /** + * {@inheritDoc} + * @param suggest The suggestion search context + */ + @Override + public void suggest(SuggestionSearchContext suggest) { + + } + + @Override + public List rescore() { + return List.of(); + } + + /** + * {@inheritDoc} + * @param rescore The rescore context + */ + @Override + public void addRescore(RescoreContext rescore) { + + } + + @Override + public boolean hasScriptFields() { + return false; + } + + @Override + public ScriptFieldsContext scriptFields() { + return null; + } + + @Override + public boolean sourceRequested() { + return false; + } + + @Override + public boolean hasFetchSourceContext() { + return false; + } + + @Override + public FetchSourceContext fetchSourceContext() { + return null; + } + + /** + * {@inheritDoc} + * @param fetchSourceContext The fetch source context + */ + @Override + public SearchContext fetchSourceContext(FetchSourceContext fetchSourceContext) { + return null; + } + + @Override + public FetchDocValuesContext docValuesContext() { + return null; + } + + /** + * {@inheritDoc} + * @param docValuesContext The fetch doc values context + */ + @Override + public SearchContext docValuesContext(FetchDocValuesContext docValuesContext) { + return null; + } + + @Override + public FetchFieldsContext fetchFieldsContext() { + return null; + } + + /** + * {@inheritDoc} + * @param fetchFieldsContext The fetch fields context + */ + @Override + public SearchContext fetchFieldsContext(FetchFieldsContext fetchFieldsContext) { + return null; + } + + @Override + public ContextIndexSearcher searcher() { + return null; + } + + @Override + public IndexShard indexShard() { + return this.indexShard; + } + + @Override + public MapperService mapperService() { + return null; + } + + @Override + public SimilarityService similarityService() { + return null; + } + + @Override + public BigArrays bigArrays() { + return bigArrays; + } + + @Override + public BitsetFilterCache bitsetFilterCache() { + return null; + } + + @Override + public TimeValue timeout() { + return null; + } + + /** + * {@inheritDoc} + * @param timeout The timeout value + */ + @Override + public void timeout(TimeValue timeout) { + + } + + @Override + public int terminateAfter() { + return 0; + } + + /** + * {@inheritDoc} + * @param terminateAfter The terminate after value + */ + @Override + public void terminateAfter(int terminateAfter) { + + } + + @Override + public boolean lowLevelCancellation() { + return false; + } + + /** + * {@inheritDoc} + * @param minimumScore The minimum score + */ + @Override + public SearchContext minimumScore(float minimumScore) { + return null; + } + + @Override + public Float minimumScore() { + return 0f; + } + + /** + * {@inheritDoc} + * @param sort The sort and formats + */ + @Override + public SearchContext sort(SortAndFormats sort) { + return null; + } + + @Override + public SortAndFormats sort() { + return null; + } + + /** + * {@inheritDoc} + * @param trackScores Whether to track scores + */ + @Override + public SearchContext trackScores(boolean trackScores) { + return null; + } + + @Override + public boolean trackScores() { + return false; + } + + /** + * {@inheritDoc} + * @param trackTotalHits The track total hits value + */ + @Override + public SearchContext trackTotalHitsUpTo(int trackTotalHits) { + return null; + } + + @Override + public int trackTotalHitsUpTo() { + return 0; + } + + @Override + /** + * {@inheritDoc} + * @param searchAfter The field doc for search after + */ + public SearchContext searchAfter(FieldDoc searchAfter) { + return null; + } + + @Override + public FieldDoc searchAfter() { + return null; + } + + @Override + /** + * {@inheritDoc} + * @param collapse The collapse context + */ + public SearchContext collapse(CollapseContext collapse) { + return null; + } + + @Override + public CollapseContext collapse() { + return null; + } + + @Override + /** + * {@inheritDoc} + * @param postFilter The parsed post filter query + */ + public SearchContext parsedPostFilter(ParsedQuery postFilter) { + return null; + } + + @Override + public ParsedQuery parsedPostFilter() { + return null; + } + + @Override + public Query aliasFilter() { + return null; + } + + @Override + /** + * {@inheritDoc} + * @param query The parsed query + */ + public SearchContext parsedQuery(ParsedQuery query) { + return null; + } + + @Override + public ParsedQuery parsedQuery() { + return null; + } + + // TODO : fix this + public Query query() { + // Extract query from request + return null; + } + + @Override + public int from() { + return 0; + } + + /** + * {@inheritDoc} + * @param from The from value + */ + @Override + public SearchContext from(int from) { + return null; + } + + @Override + public int size() { + return 0; + } + + /** + * {@inheritDoc} + * @param size The size value + */ + @Override + public SearchContext size(int size) { + return null; + } + + @Override + public boolean hasStoredFields() { + return false; + } + + @Override + public boolean hasStoredFieldsContext() { + return false; + } + + @Override + public boolean storedFieldsRequested() { + return false; + } + + @Override + public StoredFieldsContext storedFieldsContext() { + return null; + } + + /** + * {@inheritDoc} + * @param storedFieldsContext The stored fields context + */ + @Override + public SearchContext storedFieldsContext(StoredFieldsContext storedFieldsContext) { + return null; + } + + @Override + public boolean explain() { + return false; + } + + /** + * {@inheritDoc} + * @param explain Whether to explain + */ + @Override + public void explain(boolean explain) { + + } + + @Override + public List groupStats() { + return List.of(); + } + + /** + * {@inheritDoc} + * @param groupStats The group stats + */ + @Override + public void groupStats(List groupStats) { + + } + + @Override + public boolean version() { + return false; + } + + /** + * {@inheritDoc} + * @param version Whether to include version + */ + @Override + public void version(boolean version) { + + } + + @Override + public boolean seqNoAndPrimaryTerm() { + return false; + } + + /** + * {@inheritDoc} + * @param seqNoAndPrimaryTerm Whether to include sequence number and primary term + */ + @Override + public void seqNoAndPrimaryTerm(boolean seqNoAndPrimaryTerm) { + + } + + @Override + public int[] docIdsToLoad() { + return new int[0]; + } + + @Override + public int docIdsToLoadFrom() { + return 0; + } + + @Override + public int docIdsToLoadSize() { + return 0; + } + + /** + * {@inheritDoc} + * @param docIdsToLoad The document IDs to load + * @param docsIdsToLoadFrom The starting index for document IDs to load + * @param docsIdsToLoadSize The size of document IDs to load + */ + @Override + public SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int docsIdsToLoadSize) { + return null; + } + + @Override + public DfsSearchResult dfsResult() { + return null; + } + + @Override + public QuerySearchResult queryResult() { + return this.queryResult; + } + + @Override + public FetchPhase fetchPhase() { + return null; + } + + @Override + public FetchSearchResult fetchResult() { + return this.fetchResult; + } + + @Override + public Profilers getProfilers() { + return null; + } + + /** + * {@inheritDoc} + * @param name The field name + */ + @Override + public MappedFieldType fieldType(String name) { + return null; + } + + /** + * {@inheritDoc} + * @param name The object mapper name + */ + @Override + public ObjectMapper getObjectMapper(String name) { + return null; + } + + @Override + public long getRelativeTimeInMillis() { + return 0; + } + + @Override + public Map, CollectorManager> queryCollectorManagers() { + return queryCollectorManagers; + } + + @Override + public QueryShardContext getQueryShardContext() { + return queryShardContext; + } + + @Override + public ReaderContext readerContext() { + return null; + } + + @Override + public InternalAggregation.ReduceContext partialOnShard() { + return null; + } + + /** + * {@inheritDoc} + * @param bucketCollectorProcessor The bucket collector processor + */ + @Override + public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) { + + } + + @Override + public BucketCollectorProcessor bucketCollectorProcessor() { + return null; + } + + @Override + public int getTargetMaxSliceCount() { + return 0; + } + + @Override + public boolean shouldUseTimeSeriesDescSortOptimization() { + return false; + } + + public void setDFResults(Map dfResults) { + this.dfResults = dfResults; + } + + public Map getDFResults() { + return dfResults; + } + +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionQuery.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionQuery.java new file mode 100644 index 0000000000000..93333c6ea7f4c --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionQuery.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search; + +import java.util.List; + +/** + * Datafusion query class which accepts susbtrait bytes + */ +public class DatafusionQuery { + private final byte[] substraitBytes; + + // List of Search executors which returns a result iterator which contains row id which can be joined in datafusion + private final List searchExecutors; + + public DatafusionQuery(byte[] substraitBytes, List searchExecutors) { + this.substraitBytes = substraitBytes; + this.searchExecutors = searchExecutors; + } + + public byte[] getSubstraitBytes() { + return substraitBytes; + } + + public List getSearchExecutors() { + return searchExecutors; + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java new file mode 100644 index 0000000000000..837347404cc8a --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search; + +import org.opensearch.index.engine.exec.engine.FileMetadata; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * DataFusion reader for JNI operations backed by listing table + */ +public class DatafusionReader implements Closeable { + /** + * The directory path. + */ + public String directoryPath; + /** + * The file metadata collection. + */ + public Collection files; + /** + * The native reader pointer. + */ + public long nativeReaderPtr; + private AtomicInteger refCount = new AtomicInteger(0); + + /** + * Constructor + * @param directoryPath The directory path + * @param files The file metadata collection + */ + public DatafusionReader(String directoryPath, Collection files) { + this.directoryPath = directoryPath; + this.files = files; + // TODO : initialize the native reader pointer + this.nativeReaderPtr = 0; + incRef(); + } + + /** + * Gets the reader pointer. + * @return the reader pointer + */ + public long getReaderPtr() { + return nativeReaderPtr; + } + + /** + * Increments the reference count. + */ + public void incRef() { + refCount.getAndIncrement(); + } + + /** + * Decrements the reference count. + * @throws IOException if an I/O error occurs + */ + public void decRef() throws IOException { + if (refCount.get() == 0) { + throw new IllegalStateException("Listing table has been already closed"); + } + + int currRefCount = refCount.decrementAndGet(); + if (currRefCount == 0) { + this.close(); + } + + } + + @Override + public void close() throws IOException { + if (nativeReaderPtr == -1L) { + throw new IllegalStateException("Listing table has been already closed"); + } + // TODO : close the native reader + this.nativeReaderPtr = -1; + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java new file mode 100644 index 0000000000000..d3091432eac9b --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search; + +import org.opensearch.index.engine.exec.engine.FileMetadata; +import org.opensearch.index.engine.exec.manage.CatalogSnapshot; +import org.opensearch.index.engine.exec.read.CatalogSnapshotAwareRefreshListener; +import org.opensearch.index.engine.exec.read.EngineReaderManager; + +import java.io.IOException; +import java.util.Collection; + +/** + * Datafusion reader manager to manage datafusion readers + */ +public class DatafusionReaderManager + implements + EngineReaderManager, + CatalogSnapshotAwareRefreshListener { + private org.opensearch.datafusion.search.DatafusionReader current; + private String path; + private String dataFormat; + + public DatafusionReaderManager(String path, Collection files, String dataFormat) throws IOException { + this.current = null; + this.path = path; + this.dataFormat = dataFormat; + } + + @Override + public org.opensearch.datafusion.search.DatafusionReader acquire() throws IOException { + if (current == null) { + throw new RuntimeException("Invalid state for datafusion reader"); + } + current.incRef(); + return current; + } + + @Override + public void release(org.opensearch.datafusion.search.DatafusionReader reference) throws IOException { + assert reference != null : "Shard view can't be null"; + reference.decRef(); + } + + @Override + public void beforeRefresh() throws IOException { + // no op + } + + @Override + public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException { + if (didRefresh && catalogSnapshot != null) { + org.opensearch.datafusion.search.DatafusionReader old = this.current; + if (old != null) { + release(old); + } + this.current = new org.opensearch.datafusion.search.DatafusionReader(this.path, catalogSnapshot.getSearchableFiles(dataFormat)); + this.current.incRef(); + } + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java new file mode 100644 index 0000000000000..e91006b84b22a --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search; + +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.index.engine.exec.read.EngineSearcher; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; + +/** + * Datafusion searcher to query via substrait plan + */ +public class DatafusionSearcher implements EngineSearcher { + private final String source; + private org.opensearch.datafusion.search.DatafusionReader reader; + private Closeable closeable; + + public DatafusionSearcher(String source, org.opensearch.datafusion.search.DatafusionReader reader, Closeable close) { + this.source = source; + this.reader = reader; + } + + @Override + public String source() { + return source; + } + + @Override + public void search(DatafusionQuery datafusionQuery) { + // TODO : implementation for search + // return DataFusionQueryJNI.executeSubstraitQuery(reader.getReaderPtr(), datafusionQuery.getSubstraitBytes(), contextPtr); + } + + public org.opensearch.datafusion.search.DatafusionReader getReader() { + return reader; + } + + @Override + public void close() { + try { + if (closeable != null) { + closeable.close(); + } + } catch (IOException e) { + throw new UncheckedIOException("failed to close", e); + } catch (AlreadyClosedException e) { + // This means there's a bug somewhere: don't suppress it + throw new AssertionError(e); + } + + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcherSupplier.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcherSupplier.java new file mode 100644 index 0000000000000..d0dffd25d6480 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcherSupplier.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search; + +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.index.engine.exec.read.EngineSearcherSupplier; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +/** + * Searcher supplier which provides the point in time datafusion reader for search operations + */ +public abstract class DatafusionSearcherSupplier extends EngineSearcherSupplier { + + private final Function< + org.opensearch.datafusion.search.DatafusionSearcher, + org.opensearch.datafusion.search.DatafusionSearcher> wrapper; + private final AtomicBoolean released = new AtomicBoolean(false); + + public DatafusionSearcherSupplier( + Function wrapper + ) { + this.wrapper = wrapper; + } + + public final org.opensearch.datafusion.search.DatafusionSearcher acquireSearcher(String source) { + if (released.get()) { + throw new AlreadyClosedException("SearcherSupplier was closed"); + } + final org.opensearch.datafusion.search.DatafusionSearcher searcher = acquireSearcherInternal(source); + return searcher; + // TODO apply wrapper + } + + @Override + public final void close() { + if (released.compareAndSet(false, true)) { + doClose(); + } else { + assert false : "SearchSupplier was released twice"; + } + } + + protected abstract void doClose(); + + protected abstract org.opensearch.datafusion.search.DatafusionSearcher acquireSearcherInternal(String source); + +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DefaultRecordBatchStream.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DefaultRecordBatchStream.java new file mode 100644 index 0000000000000..0b6cc99e020df --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DefaultRecordBatchStream.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.execution.search.RecordBatchStream; + +import java.util.concurrent.CompletableFuture; + +/** + * Record batch stream implementation + */ +public class DefaultRecordBatchStream implements RecordBatchStream { + private static final Logger logger = LogManager.getLogger(DefaultRecordBatchStream.class); + + private final long nativeStreamPtr; + private volatile boolean closed = false; + private volatile boolean hasNextCached = false; + private volatile boolean hasNextValue = false; + + /** + * Creates a new wrapping the given native stream pointer. + * + * @param nativeStreamPtr Pointer to the native DataFusion RecordBatch stream + */ + public DefaultRecordBatchStream(long nativeStreamPtr) { + if (nativeStreamPtr == 0) { + throw new IllegalArgumentException("Invalid native stream pointer"); + } + this.nativeStreamPtr = nativeStreamPtr; + logger.debug("Created default record batch stream with pointer: {}", nativeStreamPtr); + } + + @Override + public Object getSchema() { + return "schema"; // Placeholder + } + + @Override + public CompletableFuture next() { + // PlaceholderImpl + return CompletableFuture.supplyAsync(() -> { + if (closed) { + return null; + } + + try { + // Get the next batch from native code + String batch = nativeNextBatch(nativeStreamPtr); + + // Reset cached hasNext value since we consumed a batch + hasNextCached = false; + + logger.trace("Retrieved next batch from stream pointer: {}", nativeStreamPtr); + return batch; + } catch (Exception e) { + logger.error("Error getting next batch from stream", e); + return null; + } + }); + } + + @Override + public boolean hasNext() { + // Placeholder impl + if (closed) { + return false; + } + + if (hasNextCached) { + return hasNextValue; + } + + try { + // Check if there's a next batch available + // TODO : peek at the stream without consuming the batch + String nextBatch = nativeNextBatch(nativeStreamPtr); + hasNextValue = (nextBatch != null); + hasNextCached = true; + + logger.trace("hasNext() = {} for stream pointer: {}", hasNextValue, nativeStreamPtr); + return hasNextValue; + } catch (Exception e) { + logger.error("Error checking for next batch in stream", e); + return false; + } + } + + @Override + public void close() { + if (!closed) { + logger.debug("Closing RecordBatchStream with pointer: {}", nativeStreamPtr); + try { + nativeCloseStream(nativeStreamPtr); + closed = true; + logger.debug("Successfully closed RecordBatchStream"); + } catch (Exception e) { + logger.error("Error closing RecordBatchStream", e); + throw e; + } + } + } + + // Native method declarations + + // TODO : implementation + private static native String nativeNextBatch(long streamPtr); + + // TODO : implementation + private static native void nativeCloseStream(long streamPtr); +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/SearchExecutor.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/SearchExecutor.java new file mode 100644 index 0000000000000..5d38d95a23eae --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/SearchExecutor.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search; + +/** + * Functional interface to execute search and get iterator + */ +@FunctionalInterface +public interface SearchExecutor { + SearchResultIterator execute(); +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/SearchResultIterator.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/SearchResultIterator.java new file mode 100644 index 0000000000000..1595f4926bcca --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/SearchResultIterator.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search; + +import java.util.Iterator; + +/** + * Interface for the search results iterator + */ +public interface SearchResultIterator extends Iterator { + // Basic Iterator methods + boolean hasNext(); + + Record next(); +} diff --git a/plugins/parquet-data-format/build.gradle b/plugins/parquet-data-format/build.gradle new file mode 100644 index 0000000000000..fc79a59ab85de --- /dev/null +++ b/plugins/parquet-data-format/build.gradle @@ -0,0 +1,163 @@ +import org.opensearch.gradle.test.RestIntegTestTask + +apply plugin: 'java' +apply plugin: 'idea' +apply plugin: 'eclipse' +apply plugin: 'opensearch.opensearchplugin' +apply plugin: 'opensearch.yaml-rest-test' +apply plugin: 'opensearch.pluginzip' +apply plugin: 'opensearch.java-agent' + +def pluginName = 'ParquetDataFormat' +def pluginDescription = 'Parquet data format plugin' +def packagePath = 'com.format' +def pathToPlugin = 'parquet' +def pluginClassName = 'ParquetDataFormatPlugin' +group = "ParquetDataFormatGroup" + +java { + targetCompatibility = JavaVersion.VERSION_21 + sourceCompatibility = JavaVersion.VERSION_21 +} + +tasks.register("preparePluginPathDirs") { + mustRunAfter clean + doLast { + def newPath = pathToPlugin.replace(".", "/") + mkdir "src/main/java/$packagePath/$newPath" + mkdir "src/test/java/$packagePath/$newPath" + mkdir "src/yamlRestTest/java/$packagePath/$newPath" + } +} + +publishing { + publications { + pluginZip(MavenPublication) { publication -> + } + } +} + +opensearchplugin { + name = pluginName + description = pluginDescription + classname = "${packagePath}.${pathToPlugin}.${pluginClassName}" + licenseFile = rootProject.file('LICENSE.txt') + noticeFile = rootProject.file('NOTICE.txt') +} + +// This requires an additional Jar not published as part of build-tools +loggerUsageCheck.enabled = false + +// No need to validate pom, as we do not upload to maven/sonatype +validateNebulaPom.enabled = false + +buildscript { + ext { + opensearch_version = System.getProperty("opensearch.version", "3.3.0-SNAPSHOT") + } + + repositories { + mavenLocal() + maven { url = "https://central.sonatype.com/repository/maven-snapshots/" } + mavenCentral() + maven { url = "https://plugins.gradle.org/m2/" } + } + + dependencies { + classpath "org.opensearch.gradle:build-tools:${opensearch_version}" + } +} + +repositories { + mavenLocal() + maven { url = "https://central.sonatype.com/repository/maven-snapshots/" } + mavenCentral() + maven { url = "https://plugins.gradle.org/m2/" } +} + +configurations.all { + resolutionStrategy { + force 'commons-codec:commons-codec:1.18.0' + force 'org.slf4j:slf4j-api:2.0.17' + } +} + +dependencies { + // Apache Arrow dependencies (using stable version with unsafe allocator) + implementation 'org.apache.arrow:arrow-vector:17.0.0' + implementation 'org.apache.arrow:arrow-memory-core:17.0.0' + implementation 'org.apache.arrow:arrow-memory-unsafe:17.0.0' + implementation 'org.apache.arrow:arrow-format:17.0.0' + implementation 'org.apache.arrow:arrow-c-data:17.0.0' + + // Checker Framework annotations (required by Arrow) + implementation 'org.checkerframework:checker-qual:3.42.0' + + // Jackson dependencies required by Arrow + implementation 'com.fasterxml.jackson.core:jackson-core:2.18.2' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.18.2' + implementation 'com.fasterxml.jackson.core:jackson-annotations:2.18.2' + + // FlatBuffers dependency required by Arrow + implementation 'com.google.flatbuffers:flatbuffers-java:2.0.0' + + // Netty dependencies required by Arrow memory management + implementation 'io.netty:netty-buffer:4.1.118.Final' + implementation 'io.netty:netty-common:4.1.118.Final' + + // SLF4J logging implementation (required by Apache Arrow) + implementation 'org.slf4j:slf4j-api:2.0.17' +} + +test { + include '**/*Tests.class' + // JVM args for Java 9+ only - remove if using Java 8 + if (JavaVersion.current().isJava9Compatible()) { + jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED' + jvmArgs '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED' + } +} + +task integTest(type: RestIntegTestTask) { + description = "Run tests against a cluster" + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath +} +tasks.named("check").configure { dependsOn(integTest) } + +integTest { + // JVM arguments required for Arrow memory access (Java 9+ only) + if (JavaVersion.current().isJava9Compatible()) { + jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED' + jvmArgs '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED' + } + + // The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable + if (System.getProperty("test.debug") != null) { + jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005' + } +} + +testClusters.integTest { + testDistribution = "INTEG_TEST" + + // This installs our plugin into the testClusters + plugin(project.tasks.bundlePlugin.archiveFile) +} + +run { + useCluster testClusters.integTest +} + +task updateVersion { + onlyIf { System.getProperty('newVersion') } + doLast { + ext.newVersion = System.getProperty('newVersion') + println "Setting version to ${newVersion}." + // String tokenization to support -SNAPSHOT + ant.replaceregexp(file:'build.gradle', match: '"opensearch.version", "\\d.*"', replace: '"opensearch.version", "' + newVersion.tokenize('-')[0] + '-SNAPSHOT"', flags:'g', byline:true) + } +} + +// Disable specific license tasks +licenseHeaders.enabled = false diff --git a/plugins/parquet-data-format/src/main/java/com/format/parquet/ParquetDataFormatPlugin.java b/plugins/parquet-data-format/src/main/java/com/format/parquet/ParquetDataFormatPlugin.java new file mode 100644 index 0000000000000..ba00174f77324 --- /dev/null +++ b/plugins/parquet-data-format/src/main/java/com/format/parquet/ParquetDataFormatPlugin.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.format.parquet; + +import org.opensearch.execution.search.spi.DataFormatCodec; +import org.opensearch.index.engine.exec.format.DataFormat; +import org.opensearch.plugins.DataFormatPlugin; +import org.opensearch.plugins.Plugin; + +import java.util.Map; +import java.util.Optional; + +import com.format.parquet.engine.ParquetDataFormat; + +/** + * OpenSearch plugin that provides Parquet data format support for indexing and query operations. + */ +public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin { + @Override + public Optional> getDataFormatCodecs() { + return DataFormatPlugin.super.getDataFormatCodecs(); + } + + @Override + public DataFormat getDataFormat() { + return new ParquetDataFormat(); + } +} diff --git a/plugins/parquet-data-format/src/main/java/com/format/parquet/engine/ParquetDataFormat.java b/plugins/parquet-data-format/src/main/java/com/format/parquet/engine/ParquetDataFormat.java new file mode 100644 index 0000000000000..331154cd2f1a3 --- /dev/null +++ b/plugins/parquet-data-format/src/main/java/com/format/parquet/engine/ParquetDataFormat.java @@ -0,0 +1,15 @@ +package com.format.parquet.engine; + +/** + * Data format implementation for Parquet-based document storage. + */ +public class ParquetDataFormat implements org.opensearch.index.engine.exec.format.DataFormat { + + @Override + public String name() { + return "parquet"; + } + + public static ParquetDataFormat PARQUET_DATA_FORMAT = new ParquetDataFormat(); + +} diff --git a/server/build.gradle b/server/build.gradle index c2b3b1b2788a1..328452ccfd1a3 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -73,6 +73,7 @@ dependencies { api project(":libs:opensearch-geo") api project(":libs:opensearch-telemetry") api project(":libs:opensearch-task-commons") + api project(':libs:opensearch-engine-dataformat-commons') compileOnly project(":libs:agent-sm:bootstrap") compileOnly project(':libs:opensearch-plugin-classloader') diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 8020d4469a274..4e551dfca828f 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -79,6 +79,9 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.VersionType; +import org.opensearch.index.engine.exec.bridge.SearcherOperations; +import org.opensearch.index.engine.exec.read.EngineSearcher; +import org.opensearch.index.engine.exec.read.EngineSearcherSupplier; import org.opensearch.index.mapper.DocumentMapperForType; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.Mapping; @@ -134,7 +137,11 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public abstract class Engine implements LifecycleAware, Closeable { +public abstract class Engine + implements + LifecycleAware, + Closeable, + SearcherOperations> { public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: remove sync_id in 3.0 public static final String HISTORY_UUID_KEY = "history_uuid"; @@ -836,9 +843,9 @@ public Searcher acquireSearcher(String source, SearcherScope scope, Function getReferenceManager(SearcherScope scope); + public abstract ReferenceManager getReferenceManager(SearcherScope scope); - boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) { + public boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) { return true; } @@ -1412,7 +1419,7 @@ default void onFailedEngine(String reason, @Nullable Exception e) {} * @opensearch.api */ @PublicApi(since = "1.0.0") - public abstract static class SearcherSupplier implements Releasable { + public abstract static class SearcherSupplier extends EngineSearcherSupplier { private final Function wrapper; private final AtomicBoolean released = new AtomicBoolean(false); @@ -1448,7 +1455,7 @@ public final void close() { * @opensearch.api */ @PublicApi(since = "1.0.0") - public static final class Searcher extends IndexSearcher implements Releasable { + public static final class Searcher extends IndexSearcher implements Releasable, EngineSearcher { private final String source; private final Closeable onClose; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index e154c69fabf81..7d4389bd1e63b 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -493,7 +493,7 @@ protected void decRef(OpenSearchDirectoryReader reference) throws IOException { } @Override - final boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) { + public final boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) { if (scope == SearcherScope.EXTERNAL) { switch (source) { // we can access segment_stats while a shard is still in the recovering state. @@ -2305,7 +2305,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { } @Override - protected final ReferenceManager getReferenceManager(SearcherScope scope) { + public final ReferenceManager getReferenceManager(SearcherScope scope) { switch (scope) { case INTERNAL: return internalReaderManager; diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 64cc076d97af5..0189b3ea428a9 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -277,7 +277,7 @@ public GetResult get(Get get, BiFunction search } @Override - protected ReferenceManager getReferenceManager(SearcherScope scope) { + public ReferenceManager getReferenceManager(SearcherScope scope) { return readerManager; } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index eba074e27f764..ad3cea6291eeb 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -277,7 +277,7 @@ public GetResult get(Get get, BiFunction } @Override - protected ReferenceManager getReferenceManager(SearcherScope scope) { + public ReferenceManager getReferenceManager(SearcherScope scope) { return readerManager; } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/bridge/SearcherOperations.java b/server/src/main/java/org/opensearch/index/engine/exec/bridge/SearcherOperations.java new file mode 100644 index 0000000000000..b671a3adf7734 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/bridge/SearcherOperations.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.bridge; + +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.EngineException; +import org.opensearch.index.engine.exec.read.CatalogSnapshotAwareRefreshListener; +import org.opensearch.index.engine.exec.read.EngineSearcher; +import org.opensearch.index.engine.exec.read.EngineSearcherSupplier; + +import java.util.function.Function; + +/** + * Basic interface for reader and searcher to acquire point in time view to search over same data throughout + * the query lifecycle + * @param Searcher + * @param Reader + */ +public interface SearcherOperations { + /** + * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. + */ + EngineSearcherSupplier acquireSearcherSupplier(Function wrapper) throws EngineException; + + /** + * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. + */ + EngineSearcherSupplier acquireSearcherSupplier(Function wrapper, Engine.SearcherScope scope) throws EngineException; + + S acquireSearcher(String source) throws EngineException; + + S acquireSearcher(String source, Engine.SearcherScope scope) throws EngineException; + + S acquireSearcher(String source, Engine.SearcherScope scope, Function wrapper) throws EngineException; + + R getReferenceManager(Engine.SearcherScope scope); + + boolean assertSearcherIsWarmedUp(String source, Engine.SearcherScope scope); + + default CatalogSnapshotAwareRefreshListener getRefreshListener(Engine.SearcherScope searcherScope) { + // default is no-op, TODO : revisit this + return null; + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/engine/FileMetadata.java b/server/src/main/java/org/opensearch/index/engine/exec/engine/FileMetadata.java new file mode 100644 index 0000000000000..e6af25b65259b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/engine/FileMetadata.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.engine; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.exec.format.DataFormat; + +@ExperimentalApi +public record FileMetadata(DataFormat df, String fileName, String directory, long generation) { +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/engine/RefreshResult.java b/server/src/main/java/org/opensearch/index/engine/exec/engine/RefreshResult.java new file mode 100644 index 0000000000000..3f13119d65858 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/engine/RefreshResult.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.engine; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.exec.format.DataFormat; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@ExperimentalApi +public class RefreshResult { + private Map> refreshedFiles = new HashMap<>(); + + public RefreshResult() { + + } + + public void add(DataFormat df, List fileMetadata) { + refreshedFiles.computeIfAbsent(df, ddf -> new ArrayList<>()).addAll(fileMetadata); + } + + public Map> getRefreshedFiles() { + return refreshedFiles; + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/format/DataFormat.java b/server/src/main/java/org/opensearch/index/engine/exec/format/DataFormat.java new file mode 100644 index 0000000000000..7285fdcf662be --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/format/DataFormat.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.format; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.mapper.ParametrizedFieldMapper; + +import java.util.List; +import java.util.Map; + +/** + * Represents a data format. + */ +@ExperimentalApi +public interface DataFormat { + + /** + * + * @return name identifier for the data format. + */ + String name(); + + /** + * Index level settings supported by this data format. + */ + default Settings dataFormatSettings() { + return Settings.EMPTY; + } + + /** + * Node level data format specific settings exposed by this data format. + */ + default Settings nodeLevelDataFormatSettings() { + return Settings.EMPTY; + } + + /** + * Mapping parameters which can be supported through the data format. + * @return map containing data type name, and the supported params. + */ + default Map>> parameters() { + return Map.of(); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/manage/CatalogSnapshot.java b/server/src/main/java/org/opensearch/index/engine/exec/manage/CatalogSnapshot.java new file mode 100644 index 0000000000000..44eabe07b3907 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/manage/CatalogSnapshot.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.manage; + +import org.opensearch.common.util.concurrent.AbstractRefCounted; +import org.opensearch.index.engine.exec.engine.FileMetadata; +import org.opensearch.index.engine.exec.engine.RefreshResult; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * CatalogSnapshot represents a searchable view of the data-files that has been created for the data present in the index. + * This is recreated upon every refresh/commit performed on the index. + * It is used for sharing the file metadata for searchable data across multiple search engines along with semantics around + * acquire/release so that operations like indexing/merge/searches can work on a view of data guaranteed to exist if being referenced, + * allowing to easily manage lifecycle of various files which are part of the index. + * + * The id associated to a CatalogSnapshot should always be higher than the previous instance as more data is indexed. + * + */ +public class CatalogSnapshot extends AbstractRefCounted { + + private Map> dfGroupedSearchableFiles = new HashMap<>(); + private final long id; + + public CatalogSnapshot(RefreshResult refreshResult, long id) { + super("catalog_snapshot"); + refreshResult.getRefreshedFiles() + .forEach((df, files) -> { dfGroupedSearchableFiles.put(df.name(), Collections.unmodifiableList(files)); }); + this.id = id; + } + + public Iterable dataFormats() { + return dfGroupedSearchableFiles.keySet(); + } + + public Collection getSearchableFiles(String df) { + return dfGroupedSearchableFiles.get(df); + } + + /** + * Creates a generation by generation view for files across data formats. + */ + public List getSegments() { + Map segments = new HashMap<>(); + for (Map.Entry> entry : dfGroupedSearchableFiles.entrySet()) { + for (FileMetadata fileMetadata : entry.getValue()) { + segments.compute(fileMetadata.generation(), (k, v) -> { + return Objects.requireNonNullElseGet(v, () -> new Segment.Builder(k)).addFileMetadata(fileMetadata); + }); + } + } + return segments.values().stream().map(Segment.Builder::build).sorted().toList(); + } + + @Override + protected void closeInternal() { + // notify to file deleter, search, etc + } + + public long getId() { + return id; + } + + @Override + public String toString() { + return "CatalogSnapshot{" + "dfGroupedSearchableFiles=" + dfGroupedSearchableFiles + ", id=" + id + '}'; + } + + public static class Segment implements Comparable { + private final long generation; + private final Map> dfGroupedSearchableFiles; + + public Segment(Map> dfGroupedSearchableFiles, long generation) { + this.dfGroupedSearchableFiles = dfGroupedSearchableFiles; + this.generation = generation; + } + + public Collection getSearchableFiles(String df) { + return dfGroupedSearchableFiles.get(df); + } + + public long getGeneration() { + return generation; + } + + @Override + public int compareTo(Segment o) { + if (this.generation < o.generation) { + return -1; + } else if (this.generation > o.generation) { + return 1; + } + return 0; + } + + private static class Builder { + private Map> dfGroupedSearchableFiles = new HashMap<>(); + private final long generation; + + Builder(long generation) { + this.generation = generation; + } + + Builder addFileMetadata(FileMetadata fileMetadata) { + dfGroupedSearchableFiles.compute(fileMetadata.df().name(), (df, fm) -> { + if (fm == null) { + fm = new ArrayList<>(); + } + fm.add(fileMetadata); + return fm; + }); + return this; + } + + Segment build() { + Map> dfGroupedSearchableFiles = new HashMap<>(this.dfGroupedSearchableFiles); + return new Segment(dfGroupedSearchableFiles, generation); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/read/CatalogSnapshotAwareRefreshListener.java b/server/src/main/java/org/opensearch/index/engine/exec/read/CatalogSnapshotAwareRefreshListener.java new file mode 100644 index 0000000000000..35f54bf1aa3b1 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/read/CatalogSnapshotAwareRefreshListener.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.read; + +import org.opensearch.index.engine.exec.manage.CatalogSnapshot; + +import java.io.IOException; + +/** + * Base Listener interface for refresh lifecycle of catalog snapshot + */ +public interface CatalogSnapshotAwareRefreshListener { + /** + * Called before refresh operation. + */ + void beforeRefresh() throws IOException; + + /** + * Called after refresh operation with catalog snapshot. + * @param didRefresh whether refresh actually occurred + * @param catalogSnapshot the current catalog snapshot with file information + */ + void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/read/EngineReaderManager.java b/server/src/main/java/org/opensearch/index/engine/exec/read/EngineReaderManager.java new file mode 100644 index 0000000000000..784ebf912e6e6 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/read/EngineReaderManager.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.read; + +import org.apache.lucene.search.ReferenceManager; + +import java.io.IOException; + +/** + * Reader manager for engine readers + * @param Reader manager type + */ +public interface EngineReaderManager { + T acquire() throws IOException; + + void release(T reader) throws IOException; + + default void addListener(ReferenceManager.RefreshListener listener) { + // no-op + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/read/EngineSearcher.java b/server/src/main/java/org/opensearch/index/engine/exec/read/EngineSearcher.java new file mode 100644 index 0000000000000..9eae0c267d610 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/read/EngineSearcher.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.read; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lease.Releasable; + +import java.io.IOException; + +/** + * Base class for read engine searcher + * @param Query type + */ +@ExperimentalApi +public interface EngineSearcher extends Releasable { + + /** + * The source that caused this searcher to be acquired. + */ + String source(); + + /** + * Perform search operation in the engine for the provided input query + */ + default void search(Q query) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/read/EngineSearcherSupplier.java b/server/src/main/java/org/opensearch/index/engine/exec/read/EngineSearcherSupplier.java new file mode 100644 index 0000000000000..d3907a306c902 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/read/EngineSearcherSupplier.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.read; + +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lease.Releasable; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Base class for searcher supplier + * @param Sub-class of EngineSearcher + */ +@ExperimentalApi +public abstract class EngineSearcherSupplier implements Releasable { + private final AtomicBoolean released = new AtomicBoolean(false); + + /** + * Acquire a searcher for the given source. + */ + public T acquireSearcher(String source) { + if (released.get()) { + throw new AlreadyClosedException("SearcherSupplier was closed"); + } + return acquireSearcherInternal(source); + } + + protected abstract T acquireSearcherInternal(String source); + + protected abstract void doClose(); +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/read/SearchExecEngine.java b/server/src/main/java/org/opensearch/index/engine/exec/read/SearchExecEngine.java new file mode 100644 index 0000000000000..ff955738dad42 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/read/SearchExecEngine.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.read; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.BigArrays; +import org.opensearch.index.engine.exec.bridge.SearcherOperations; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.ReaderContext; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.internal.ShardSearchRequest; + +import java.io.IOException; +import java.util.Map; + +/** + * Generic read engine interface that provides searcher operations and query phase execution + * @param Context type for query execution + * @param Searcher type that extends EngineSearcher + * @param Reference manager type + */ +@ExperimentalApi +public abstract class SearchExecEngine, R> implements SearcherOperations { + /** + * Create a search context for this engine + */ + public abstract C createContext( + ReaderContext readerContext, + ShardSearchRequest request, + SearchShardTarget searchShardTarget, + SearchShardTask task, + BigArrays bigArrays + ) throws IOException; + + /** + * execute query + * TODO : Result type + * @return query results + */ + public abstract Map execute(C context) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ae8299ee7ccb5..aa24b21aa9934 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -141,6 +141,7 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.NodeMetadata; +import org.opensearch.execution.search.spi.DataFormatCodec; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.NoopExtensionsManager; import org.opensearch.gateway.GatewayAllocator; @@ -165,6 +166,7 @@ import org.opensearch.index.compositeindex.CompositeIndexSettings; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.MergedSegmentWarmerFactory; +import org.opensearch.index.engine.exec.format.DataFormat; import org.opensearch.index.mapper.MappingTransformerRegistry; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.remote.RemoteIndexPathUploader; @@ -218,6 +220,7 @@ import org.opensearch.plugins.ClusterPlugin; import org.opensearch.plugins.CryptoKeyProviderPlugin; import org.opensearch.plugins.CryptoPlugin; +import org.opensearch.plugins.DataFormatPlugin; import org.opensearch.plugins.DiscoveryPlugin; import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.ExtensionAwarePlugin; @@ -235,6 +238,7 @@ import org.opensearch.plugins.PluginsService; import org.opensearch.plugins.RepositoryPlugin; import org.opensearch.plugins.ScriptPlugin; +import org.opensearch.plugins.SearchEnginePlugin; import org.opensearch.plugins.SearchPipelinePlugin; import org.opensearch.plugins.SearchPlugin; import org.opensearch.plugins.SecureSettingsFactory; @@ -1115,6 +1119,36 @@ protected Node(final Environment initialEnvironment, Collection clas // Add the telemetryAwarePlugin components to the existing pluginComponents collection. pluginComponents.addAll(telemetryAwarePluginComponents); + Map dataFormatCodecMap = new HashMap<>(); + for (DataFormatPlugin dataSourcePlugin : pluginsService.filterPlugins(DataFormatPlugin.class)) { + if (dataSourcePlugin.getDataFormatCodecs().isPresent()) { + dataFormatCodecMap.putAll(dataSourcePlugin.getDataFormatCodecs().get()); + } + } + + Collection dataFormatAwareComponents = pluginsService.filterPlugins(SearchEnginePlugin.class) + .stream() + .flatMap( + p -> p.createComponents( + client, + clusterService, + threadPool, + resourceWatcherService, + scriptService, + xContentRegistry, + environment, + nodeEnvironment, + namedWriteableRegistry, + clusterModule.getIndexNameExpressionResolver(), + repositoriesServiceReference::get, + dataFormatCodecMap + ).stream() + ) + .toList(); + + // Add all dataFormat components to the existing pluginComponents + pluginComponents.addAll(dataFormatAwareComponents); + List identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class); identityService.initializeIdentityAwarePlugins(identityAwarePlugins); diff --git a/server/src/main/java/org/opensearch/plugins/DataFormatPlugin.java b/server/src/main/java/org/opensearch/plugins/DataFormatPlugin.java new file mode 100644 index 0000000000000..a6a0389cc428b --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/DataFormatPlugin.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.execution.search.spi.DataFormatCodec; +import org.opensearch.index.engine.exec.format.DataFormat; + +import java.util.Map; +import java.util.Optional; + +/** + * Base data format plugin interface to extend query and writer capabilities to any data format such as parquet + */ +public interface DataFormatPlugin { + default Optional> getDataFormatCodecs() { + return Optional.empty(); + } + + DataFormat getDataFormat(); +} diff --git a/server/src/main/java/org/opensearch/plugins/SearchEnginePlugin.java b/server/src/main/java/org/opensearch/plugins/SearchEnginePlugin.java new file mode 100644 index 0000000000000..ac7df60d8d4aa --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/SearchEnginePlugin.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.execution.search.spi.DataFormatCodec; +import org.opensearch.index.engine.exec.engine.FileMetadata; +import org.opensearch.index.engine.exec.format.DataFormat; +import org.opensearch.index.engine.exec.read.SearchExecEngine; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.client.Client; +import org.opensearch.watcher.ResourceWatcherService; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Search engine plugin used to create pluggable query engine plugins + */ +public interface SearchEnginePlugin extends SearchPlugin { + + /** + * Make dataSourceCodecs available for the DataSourceAwarePlugin(s) + */ + default Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier, + Map dataFormatCodecs + ) { + return Collections.emptyList(); + } + + /** + * Creates a shard specific read engine + */ + SearchExecEngine createEngine(DataFormat dataFormat, Collection formatCatalogSnapshot, ShardPath shardPath) + throws IOException; +}