Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions libs/engine-dataformat-commons/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

apply plugin: 'opensearch.build'

description = 'Common interfaces and SPIs for pluggable engines and data formats'

dependencies {
api project(':libs:opensearch-core')
api project(':libs:opensearch-common')

testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'engine-dataformat-commons'
}
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}

jarHell.enabled = false

test {
systemProperty 'tests.security.manager', 'false'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.execution.search;

import org.opensearch.common.annotation.ExperimentalApi;

import java.util.List;
import java.util.Map;

/**
* Represents a data format.
*/
@ExperimentalApi
public interface DataFormat {

/**
*
* @return name identifier for the data format.
*/
String name();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.execution.search;

import java.util.concurrent.CompletableFuture;

/**
* Represents a stream of record batches from a DataFusion query execution.
* This interface provides access to query results in a streaming fashion.
*/
public interface RecordBatchStream extends AutoCloseable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move these components in libs under the DF plugin as thats the only place they are used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, initially I had usage in parquet plugin but moved everything out and somehow this stayed in libs, so will move this out too.

    CompletableFuture<RecordBatchStream> executeSubstraitQuery(long sessionContextId, byte[] substraitPlanBytes);


/**
* Check if there are more record batches available in the stream.
*
* @return true if more batches are available, false otherwise
*/
boolean hasNext();

/**
* Get the schema of the record batches in this stream.
* @return the schema object
*/
Object getSchema();

/**
* Get the next record batch from the stream.
*
* @return the next record batch or null if no more batches
*/
CompletableFuture<Object> next();

/**
* Close the stream and free the associated resources.
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.execution.search.spi;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.execution.search.DataFormat;

import java.util.concurrent.CompletableFuture;

/**
* Service Provider Interface for data source codecs.
* Contains configurations , optimizers etc to support query of different data formats
* through the pluggable engines.
*/
@ExperimentalApi
public interface DataFormatCodec {
/**
* Returns the data format name
*/
DataFormat getDataFormat();

CompletableFuture<Void> closeSessionContext(long sessionContextId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the API here seems off - why is dataformatcodec managing sessioncontext lifecycle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah for this PR - there are no good examples, mainly today codec is only used for configuration and optimizers.
https://github.com/bharath-techie/OpenSearch/pull/47/files#diff-48735da23f1ac176775a474bd04cd70ec100c2e487d72fb9285e9146897ce043R153 - here is an example of how codec gets used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The close session context is not used , will remove it

}
105 changes: 105 additions & 0 deletions plugins/engine-datafusion/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'opensearch.internal-cluster-test'
apply plugin: 'opensearch.yaml-rest-test'
apply plugin: 'opensearch.pluginzip'

def pluginName = 'engine-datafusion'
def pluginDescription = 'OpenSearch plugin providing access to DataFusion via JNI'
def projectPath = 'org.opensearch'
def pathToPlugin = 'datafusion.DataFusionPlugin'
def pluginClassName = 'DataFusionPlugin'

opensearchplugin {
name = pluginName
description = pluginDescription
classname = "${projectPath}.${pathToPlugin}"
licenseFile = rootProject.file('LICENSE.txt')
noticeFile = rootProject.file('NOTICE.txt')
}

dependencies {
api project(':libs:opensearch-engine-dataformat-commons')
implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}"
implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"

// Bundle Jackson in the plugin JAR using 'api' like other OpenSearch plugins
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"

// Apache Arrow dependencies for memory management
implementation "org.apache.arrow:arrow-memory-core:17.0.0"
implementation "org.apache.arrow:arrow-memory-unsafe:17.0.0"
implementation "org.apache.arrow:arrow-vector:17.0.0"
implementation "org.apache.arrow:arrow-c-data:17.0.0"
implementation "org.apache.arrow:arrow-format:17.0.0"
// SLF4J API for Arrow logging compatibility
implementation "org.slf4j:slf4j-api:${versions.slf4j}"
// CheckerFramework annotations required by Arrow 17.0.0
implementation "org.checkerframework:checker-qual:3.42.0"
// FlatBuffers dependency required by Arrow 17.0.0
implementation "com.google.flatbuffers:flatbuffers-java:23.5.26"

testImplementation "junit:junit:${versions.junit}"
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
testImplementation "org.mockito:mockito-core:${versions.mockito}"

// Add CSV plugin for testing
// testImplementation project(':plugins:dataformat-csv')
}

yamlRestTest {
systemProperty 'tests.security.manager', 'false'
// Disable yamlRestTest since this plugin doesn't have REST API endpoints
enabled = false
}

tasks.named("dependencyLicenses").configure {
mapping from: /jackson-.*/, to: 'jackson'
mapping from: /arrow-.*/, to: 'arrow'
mapping from: /slf4j-.*/, to: 'slf4j-api'
mapping from: /checker-qual.*/, to: 'checker-qual'
mapping from: /flatbuffers-.*/, to: 'flatbuffers-java'
}

// Configure third party audit to handle Apache Arrow dependencies
tasks.named('thirdPartyAudit').configure {
ignoreMissingClasses(
// Apache Commons Codec (missing dependency)
'org.apache.commons.codec.binary.Hex'
)
ignoreViolations(
// Apache Arrow internal classes that use Unsafe operations
'org.apache.arrow.memory.ArrowBuf',
'org.apache.arrow.memory.unsafe.UnsafeAllocationManager',
'org.apache.arrow.memory.util.ByteFunctionHelpers',
'org.apache.arrow.memory.util.MemoryUtil',
'org.apache.arrow.memory.util.MemoryUtil$1',
'org.apache.arrow.memory.util.hash.MurmurHasher',
'org.apache.arrow.memory.util.hash.SimpleHasher',
'org.apache.arrow.vector.BaseFixedWidthVector',
'org.apache.arrow.vector.BitVectorHelper',
'org.apache.arrow.vector.Decimal256Vector',
'org.apache.arrow.vector.DecimalVector',
'org.apache.arrow.vector.util.DecimalUtility',
'org.apache.arrow.vector.util.VectorAppender'
)
}

// Configure Javadoc to skip package documentation requirements ie package-info.java
missingJavadoc {
javadocMissingIgnore = [
'org.opensearch.datafusion',
'org.opensearch.datafusion.action',
'org.opensearch.datafusion.core'
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.datafusion;

import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.execution.search.spi.DataFormatCodec;
import org.opensearch.index.engine.exec.engine.FileMetadata;
import org.opensearch.index.engine.exec.format.DataFormat;
import org.opensearch.index.engine.exec.read.SearchExecEngine;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchEnginePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.opensearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;

/**
* Datafusion query engine plugin that enables datafusion to perform search
*/
public class DataFusionPlugin extends Plugin implements ActionPlugin, SearchEnginePlugin {

private final boolean isDataFusionEnabled;
private DatafusionService datafusionService;

/**
* Constructor for DataFusionPlugin.
* @param settings The settings for the DataFusionPlugin.
*/
public DataFusionPlugin(Settings settings) {
// For now, DataFusion is always enabled if the plugin is loaded
// In the future, this could be controlled by a feature flag
this.isDataFusionEnabled = true;
}

/**
* Creates components for the DataFusion plugin.
* @param client The client instance.
* @param clusterService The cluster service instance.
* @param threadPool The thread pool instance.
* @param resourceWatcherService The resource watcher service instance.
* @param scriptService The script service instance.
* @param xContentRegistry The named XContent registry.
* @param environment The environment instance.
* @param nodeEnvironment The node environment instance.
* @param namedWriteableRegistry The named writeable registry.
* @param indexNameExpressionResolver The index name expression resolver instance.
* @param repositoriesServiceSupplier The supplier for the repositories service.
* @param dataFormatCodecs dataformat implementations
* @return Collection of created components
*/
@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier,
Map<DataFormat, DataFormatCodec> dataFormatCodecs
) {
if (!isDataFusionEnabled) {
return Collections.emptyList();
}
datafusionService = new DatafusionService(dataFormatCodecs);

// return Collections.emptyList();
return Collections.singletonList(datafusionService);
}

/**
* Creates a shard specific read engine
*/
@Override
public SearchExecEngine<?, ?, ?> createEngine(
DataFormat dataFormat,
Collection<FileMetadata> formatCatalogSnapshot,
ShardPath shardPath
) throws IOException {
return null;
}

}
Loading
Loading