Skip to content

Commit dc79a65

Browse files
Pluggable engine with interfaces for searcher and reader
Signed-off-by: bharath-techie <[email protected]>
1 parent 387e663 commit dc79a65

40 files changed

+2830
-9
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
apply plugin: 'opensearch.build'
10+
11+
description = 'Common interfaces and SPIs for pluggable engines and data formats'
12+
13+
dependencies {
14+
api project(':libs:opensearch-core')
15+
api project(':libs:opensearch-common')
16+
17+
testImplementation(project(":test:framework")) {
18+
exclude group: 'org.opensearch', module: 'engine-dataformat-commons'
19+
}
20+
}
21+
22+
tasks.named('forbiddenApisMain').configure {
23+
replaceSignatureFiles 'jdk-signatures'
24+
}
25+
26+
jarHell.enabled = false
27+
28+
test {
29+
systemProperty 'tests.security.manager', 'false'
30+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.execution.search;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
13+
import java.util.List;
14+
import java.util.Map;
15+
16+
/**
17+
* Represents a data format.
18+
*/
19+
@ExperimentalApi
20+
public interface DataFormat {
21+
22+
/**
23+
*
24+
* @return name identifier for the data format.
25+
*/
26+
String name();
27+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.execution.search;
10+
11+
import java.util.concurrent.CompletableFuture;
12+
13+
/**
14+
* Represents a stream of record batches from a DataFusion query execution.
15+
* This interface provides access to query results in a streaming fashion.
16+
*/
17+
public interface RecordBatchStream extends AutoCloseable {
18+
19+
/**
20+
* Check if there are more record batches available in the stream.
21+
*
22+
* @return true if more batches are available, false otherwise
23+
*/
24+
boolean hasNext();
25+
26+
/**
27+
* Get the schema of the record batches in this stream.
28+
* @return the schema object
29+
*/
30+
Object getSchema();
31+
32+
/**
33+
* Get the next record batch from the stream.
34+
*
35+
* @return the next record batch or null if no more batches
36+
*/
37+
CompletableFuture<Object> next();
38+
39+
/**
40+
* Close the stream and free the associated resources.
41+
*/
42+
@Override
43+
void close();
44+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.execution.search.spi;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.execution.search.DataFormat;
13+
14+
import java.util.concurrent.CompletableFuture;
15+
16+
/**
17+
* Service Provider Interface for data source codecs.
18+
* Contains configurations , optimizers etc to support query of different data formats
19+
* through the pluggable engines.
20+
*/
21+
@ExperimentalApi
22+
public interface DataFormatCodec {
23+
/**
24+
* Returns the data format name
25+
*/
26+
DataFormat getDataFormat();
27+
28+
CompletableFuture<Void> closeSessionContext(long sessionContextId);
29+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
apply plugin: 'java'
10+
apply plugin: 'idea'
11+
apply plugin: 'opensearch.internal-cluster-test'
12+
apply plugin: 'opensearch.yaml-rest-test'
13+
apply plugin: 'opensearch.pluginzip'
14+
15+
def pluginName = 'engine-datafusion'
16+
def pluginDescription = 'OpenSearch plugin providing access to DataFusion via JNI'
17+
def projectPath = 'org.opensearch'
18+
def pathToPlugin = 'datafusion.DataFusionPlugin'
19+
def pluginClassName = 'DataFusionPlugin'
20+
21+
opensearchplugin {
22+
name = pluginName
23+
description = pluginDescription
24+
classname = "${projectPath}.${pathToPlugin}"
25+
licenseFile = rootProject.file('LICENSE.txt')
26+
noticeFile = rootProject.file('NOTICE.txt')
27+
}
28+
29+
dependencies {
30+
api project(':libs:opensearch-engine-dataformat-commons')
31+
implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}"
32+
implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
33+
34+
// Bundle Jackson in the plugin JAR using 'api' like other OpenSearch plugins
35+
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
36+
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
37+
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
38+
39+
// Apache Arrow dependencies for memory management
40+
implementation "org.apache.arrow:arrow-memory-core:17.0.0"
41+
implementation "org.apache.arrow:arrow-memory-unsafe:17.0.0"
42+
implementation "org.apache.arrow:arrow-vector:17.0.0"
43+
implementation "org.apache.arrow:arrow-c-data:17.0.0"
44+
implementation "org.apache.arrow:arrow-format:17.0.0"
45+
// SLF4J API for Arrow logging compatibility
46+
implementation "org.slf4j:slf4j-api:${versions.slf4j}"
47+
// CheckerFramework annotations required by Arrow 17.0.0
48+
implementation "org.checkerframework:checker-qual:3.42.0"
49+
// FlatBuffers dependency required by Arrow 17.0.0
50+
implementation "com.google.flatbuffers:flatbuffers-java:23.5.26"
51+
52+
testImplementation "junit:junit:${versions.junit}"
53+
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
54+
testImplementation "org.mockito:mockito-core:${versions.mockito}"
55+
56+
// Add CSV plugin for testing
57+
// testImplementation project(':plugins:dataformat-csv')
58+
}
59+
60+
yamlRestTest {
61+
systemProperty 'tests.security.manager', 'false'
62+
// Disable yamlRestTest since this plugin doesn't have REST API endpoints
63+
enabled = false
64+
}
65+
66+
tasks.named("dependencyLicenses").configure {
67+
mapping from: /jackson-.*/, to: 'jackson'
68+
mapping from: /arrow-.*/, to: 'arrow'
69+
mapping from: /slf4j-.*/, to: 'slf4j-api'
70+
mapping from: /checker-qual.*/, to: 'checker-qual'
71+
mapping from: /flatbuffers-.*/, to: 'flatbuffers-java'
72+
}
73+
74+
// Configure third party audit to handle Apache Arrow dependencies
75+
tasks.named('thirdPartyAudit').configure {
76+
ignoreMissingClasses(
77+
// Apache Commons Codec (missing dependency)
78+
'org.apache.commons.codec.binary.Hex'
79+
)
80+
ignoreViolations(
81+
// Apache Arrow internal classes that use Unsafe operations
82+
'org.apache.arrow.memory.ArrowBuf',
83+
'org.apache.arrow.memory.unsafe.UnsafeAllocationManager',
84+
'org.apache.arrow.memory.util.ByteFunctionHelpers',
85+
'org.apache.arrow.memory.util.MemoryUtil',
86+
'org.apache.arrow.memory.util.MemoryUtil$1',
87+
'org.apache.arrow.memory.util.hash.MurmurHasher',
88+
'org.apache.arrow.memory.util.hash.SimpleHasher',
89+
'org.apache.arrow.vector.BaseFixedWidthVector',
90+
'org.apache.arrow.vector.BitVectorHelper',
91+
'org.apache.arrow.vector.Decimal256Vector',
92+
'org.apache.arrow.vector.DecimalVector',
93+
'org.apache.arrow.vector.util.DecimalUtility',
94+
'org.apache.arrow.vector.util.VectorAppender'
95+
)
96+
}
97+
98+
// Configure Javadoc to skip package documentation requirements ie package-info.java
99+
missingJavadoc {
100+
javadocMissingIgnore = [
101+
'org.opensearch.datafusion',
102+
'org.opensearch.datafusion.action',
103+
'org.opensearch.datafusion.core'
104+
]
105+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.datafusion;
10+
11+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
12+
import org.opensearch.cluster.service.ClusterService;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
15+
import org.opensearch.core.xcontent.NamedXContentRegistry;
16+
import org.opensearch.env.Environment;
17+
import org.opensearch.env.NodeEnvironment;
18+
import org.opensearch.execution.search.spi.DataFormatCodec;
19+
import org.opensearch.index.engine.exec.engine.FileMetadata;
20+
import org.opensearch.index.engine.exec.format.DataFormat;
21+
import org.opensearch.index.engine.exec.read.SearchExecEngine;
22+
import org.opensearch.index.shard.ShardPath;
23+
import org.opensearch.plugins.ActionPlugin;
24+
import org.opensearch.plugins.Plugin;
25+
import org.opensearch.plugins.SearchEnginePlugin;
26+
import org.opensearch.repositories.RepositoriesService;
27+
import org.opensearch.script.ScriptService;
28+
import org.opensearch.threadpool.ThreadPool;
29+
import org.opensearch.transport.client.Client;
30+
import org.opensearch.watcher.ResourceWatcherService;
31+
32+
import java.io.IOException;
33+
import java.util.Collection;
34+
import java.util.Collections;
35+
import java.util.Map;
36+
import java.util.function.Supplier;
37+
38+
/**
39+
* Datafusion query engine plugin that enables datafusion to perform search
40+
*/
41+
public class DataFusionPlugin extends Plugin implements ActionPlugin, SearchEnginePlugin {
42+
43+
private final boolean isDataFusionEnabled;
44+
private DatafusionService datafusionService;
45+
46+
/**
47+
* Constructor for DataFusionPlugin.
48+
* @param settings The settings for the DataFusionPlugin.
49+
*/
50+
public DataFusionPlugin(Settings settings) {
51+
// For now, DataFusion is always enabled if the plugin is loaded
52+
// In the future, this could be controlled by a feature flag
53+
this.isDataFusionEnabled = true;
54+
}
55+
56+
/**
57+
* Creates components for the DataFusion plugin.
58+
* @param client The client instance.
59+
* @param clusterService The cluster service instance.
60+
* @param threadPool The thread pool instance.
61+
* @param resourceWatcherService The resource watcher service instance.
62+
* @param scriptService The script service instance.
63+
* @param xContentRegistry The named XContent registry.
64+
* @param environment The environment instance.
65+
* @param nodeEnvironment The node environment instance.
66+
* @param namedWriteableRegistry The named writeable registry.
67+
* @param indexNameExpressionResolver The index name expression resolver instance.
68+
* @param repositoriesServiceSupplier The supplier for the repositories service.
69+
* @param dataFormatCodecs dataformat implementations
70+
* @return Collection of created components
71+
*/
72+
@Override
73+
public Collection<Object> createComponents(
74+
Client client,
75+
ClusterService clusterService,
76+
ThreadPool threadPool,
77+
ResourceWatcherService resourceWatcherService,
78+
ScriptService scriptService,
79+
NamedXContentRegistry xContentRegistry,
80+
Environment environment,
81+
NodeEnvironment nodeEnvironment,
82+
NamedWriteableRegistry namedWriteableRegistry,
83+
IndexNameExpressionResolver indexNameExpressionResolver,
84+
Supplier<RepositoriesService> repositoriesServiceSupplier,
85+
Map<DataFormat, DataFormatCodec> dataFormatCodecs
86+
) {
87+
if (!isDataFusionEnabled) {
88+
return Collections.emptyList();
89+
}
90+
datafusionService = new DatafusionService(dataFormatCodecs);
91+
92+
// return Collections.emptyList();
93+
return Collections.singletonList(datafusionService);
94+
}
95+
96+
/**
97+
* Creates a shard specific read engine
98+
*/
99+
@Override
100+
public SearchExecEngine<?, ?, ?> createEngine(
101+
DataFormat dataFormat,
102+
Collection<FileMetadata> formatCatalogSnapshot,
103+
ShardPath shardPath
104+
) throws IOException {
105+
return null;
106+
}
107+
108+
}

0 commit comments

Comments
 (0)