Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions plugins/engine-datafusion/jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::{
DATAFUSION_VERSION,
};
use std::default::Default;
use std::path::PathBuf;
use std::time::{Duration, Instant};

mod util;
Expand All @@ -52,6 +53,7 @@ use datafusion::execution::memory_pool::{GreedyMemoryPool, TrackConsumersPool};
use object_store::ObjectMeta;
use tokio::runtime::Runtime;
use std::result;
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::TryStreamExt;

Expand Down Expand Up @@ -228,13 +230,18 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createGlo
monitor.clone(),
));

let mut builder = DiskManagerBuilder::default()
.with_max_temp_directory_size(100 * 1024 * 1024 * 1024);
let builder = builder.with_mode(DiskManagerMode::Directories(vec![PathBuf::from("/home/ec2-user/spill_dir")]));

if cache_manager_ptr != 0 {
// Take ownership of the CustomCacheManager
let custom_cache_manager = unsafe { *Box::from_raw(cache_manager_ptr as *mut CustomCacheManager) };
let cache_manager_config = custom_cache_manager.build_cache_manager_config();

let runtime_env = RuntimeEnvBuilder::new().with_cache_manager(cache_manager_config)
.with_memory_pool(memory_pool.clone())
.with_disk_manager_builder(builder)
.build().unwrap();

let runtime = DataFusionRuntime {
Expand All @@ -247,6 +254,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createGlo
} else {
let runtime_env = RuntimeEnvBuilder::new()
.with_memory_pool(memory_pool)
.with_disk_manager_builder(builder)
.build().unwrap();

let runtime = DataFusionRuntime {
Expand Down
Loading