Skip to content

Commit c9ef30b

Browse files
authored
Add spill directory for datafusion (#20199)
* Add spill directory * Add memory limit Signed-off-by: Arpit Bandejiya <[email protected]> * Fix tests --------- Signed-off-by: Arpit Bandejiya <[email protected]>
1 parent 165df72 commit c9ef30b

File tree

9 files changed

+85
-54
lines changed

9 files changed

+85
-54
lines changed

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use datafusion::{
2929
DATAFUSION_VERSION,
3030
};
3131
use std::default::Default;
32+
use std::path::PathBuf;
3233
use std::time::{Duration, Instant};
3334

3435
mod util;
@@ -52,6 +53,7 @@ use datafusion::execution::memory_pool::{GreedyMemoryPool, TrackConsumersPool};
5253
use object_store::ObjectMeta;
5354
use tokio::runtime::Runtime;
5455
use std::result;
56+
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
5557
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
5658
use futures::TryStreamExt;
5759

@@ -214,11 +216,29 @@ fn log_task_metrics(operation: &str, metrics: &tokio_metrics::TaskMetrics) {
214216

215217
#[no_mangle]
216218
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createGlobalRuntime(
217-
_env: JNIEnv,
219+
mut env: JNIEnv,
218220
_class: JClass,
219221
memory_pool_limit: jlong,
220-
cache_manager_ptr: jlong
222+
cache_manager_ptr: jlong,
223+
spill_dir: JString,
224+
spill_limit: jlong
221225
) -> jlong {
226+
let spill_dir: String = match env.get_string(&spill_dir) {
227+
Ok(path) => path.into(),
228+
Err(e) => {
229+
let _ = env.throw_new(
230+
"java/lang/IllegalArgumentException",
231+
format!("Invalid table path: {:?}", e),
232+
);
233+
return 0;
234+
}
235+
};
236+
237+
let mut builder = DiskManagerBuilder::default()
238+
.with_max_temp_directory_size(spill_limit as u64);
239+
println!("Spill Limit is being set to : {}", spill_limit);
240+
let builder = builder.with_mode(DiskManagerMode::Directories(vec![PathBuf::from(spill_dir)]));
241+
222242
let monitor = Arc::new(Monitor::default());
223243
let memory_pool = Arc::new(MonitoredMemoryPool::new(
224244
Arc::new(TrackConsumersPool::new(
@@ -228,35 +248,29 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createGlo
228248
monitor.clone(),
229249
));
230250

231-
if cache_manager_ptr != 0 {
232-
// Take ownership of the CustomCacheManager
233-
let custom_cache_manager = unsafe { *Box::from_raw(cache_manager_ptr as *mut CustomCacheManager) };
234-
let cache_manager_config = custom_cache_manager.build_cache_manager_config();
235-
236-
let runtime_env = RuntimeEnvBuilder::new().with_cache_manager(cache_manager_config)
237-
.with_memory_pool(memory_pool.clone())
238-
.build().unwrap();
239-
240-
let runtime = DataFusionRuntime {
241-
runtime_env,
242-
custom_cache_manager: Some(custom_cache_manager),
243-
monitor,
244-
};
245-
246-
Box::into_raw(Box::new(runtime)) as jlong
247-
} else {
248-
let runtime_env = RuntimeEnvBuilder::new()
249-
.with_memory_pool(memory_pool)
250-
.build().unwrap();
251-
252-
let runtime = DataFusionRuntime {
253-
runtime_env,
254-
custom_cache_manager: None,
255-
monitor,
256-
};
257-
258-
Box::into_raw(Box::new(runtime)) as jlong
259-
}
251+
let (cache_manager_config, custom_cache_manager) = match cache_manager_ptr {
252+
0 => {
253+
(CacheManagerConfig::default(), None)
254+
}
255+
_ => {
256+
let custom_cache_manager = unsafe { *Box::from_raw(cache_manager_ptr as *mut CustomCacheManager) };
257+
(custom_cache_manager.build_cache_manager_config(), Some(custom_cache_manager))
258+
}
259+
};
260+
261+
let runtime_env = RuntimeEnvBuilder::new()
262+
.with_cache_manager(cache_manager_config)
263+
.with_memory_pool(memory_pool.clone())
264+
.with_disk_manager_builder(builder)
265+
.build().unwrap();
266+
267+
let runtime = DataFusionRuntime {
268+
runtime_env,
269+
custom_cache_manager,
270+
monitor,
271+
};
272+
273+
Box::into_raw(Box::new(runtime)) as jlong
260274
}
261275

262276
#[no_mangle]

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.datafusion;
1010

11+
import java.nio.file.Path;
1112
import java.util.ArrayList;
1213
import java.util.Arrays;
1314
import java.util.stream.Collectors;
@@ -60,11 +61,8 @@
6061
import java.util.Map;
6162
import java.util.function.Supplier;
6263

63-
import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_ENABLED;
64-
import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_EVICTION_TYPE;
65-
import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_SIZE_LIMIT;
66-
67-
import static org.opensearch.datafusion.core.DataFusionRuntimeEnv.MEMORY_POOL_CONFIGURATION_DATAFUSION;
64+
import static org.opensearch.datafusion.core.DataFusionRuntimeEnv.DATAFUSION_MEMORY_POOL_CONFIGURATION;
65+
import static org.opensearch.datafusion.core.DataFusionRuntimeEnv.DATAFUSION_SPILL_MEMORY_LIMIT_CONFIGURATION;
6866

6967

7068
/**
@@ -116,10 +114,11 @@ public Collection<Object> createComponents(
116114
Supplier<RepositoriesService> repositoriesServiceSupplier,
117115
Map<DataFormat, DataSourceCodec> dataSourceCodecs
118116
) {
117+
String spill_dir = Arrays.stream(environment.dataFiles()).findFirst().get().getParent().resolve("tmp").toAbsolutePath().toString();
119118
if (!isDataFusionEnabled) {
120119
return Collections.emptyList();
121120
}
122-
dataFusionService = new DataFusionService(dataSourceCodecs, clusterService);
121+
dataFusionService = new DataFusionService(dataSourceCodecs, clusterService, spill_dir);
123122

124123
for(DataFormat format : this.getSupportedFormats()) {
125124
dataSourceCodecs.get(format);
@@ -176,7 +175,8 @@ public List<RestHandler> getRestHandlers(
176175
public List<Setting<?>> getSettings() {
177176
List<Setting<?>> settingList = new ArrayList<>();
178177

179-
settingList.add(MEMORY_POOL_CONFIGURATION_DATAFUSION);
178+
settingList.add(DATAFUSION_MEMORY_POOL_CONFIGURATION);
179+
settingList.add(DATAFUSION_SPILL_MEMORY_LIMIT_CONFIGURATION);
180180
settingList.addAll(Stream.of(
181181
CacheSettings.CACHE_SETTINGS,
182182
CacheSettings.CACHE_ENABLED)

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ public class DataFusionService extends AbstractLifecycleComponent {
3434
/**
3535
* Creates a new DataFusion service instance.
3636
*/
37-
public DataFusionService(Map<DataFormat, DataSourceCodec> dataSourceCodecs, ClusterService clusterService) {
37+
public DataFusionService(Map<DataFormat, DataSourceCodec> dataSourceCodecs, ClusterService clusterService, String spill_dir) {
3838
this.dataSourceRegistry = new DataSourceRegistry(dataSourceCodecs);
3939

4040
// to verify jni
4141
String version = NativeBridge.getVersionInfo();
42-
this.runtimeEnv = new DataFusionRuntimeEnv(clusterService);
42+
this.runtimeEnv = new DataFusionRuntimeEnv(clusterService, spill_dir);
4343
}
4444

4545
@Override

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/DataFusionRuntimeEnv.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,33 @@ public final class DataFusionRuntimeEnv implements AutoCloseable {
3131
/**
3232
* Controls the memory used for the datafusion query execution
3333
*/
34-
public static final Setting<ByteSizeValue> MEMORY_POOL_CONFIGURATION_DATAFUSION = Setting.byteSizeSetting(
34+
public static final Setting<ByteSizeValue> DATAFUSION_MEMORY_POOL_CONFIGURATION = Setting.byteSizeSetting(
3535
"datafusion.search.memory_pool",
3636
new ByteSizeValue(10, ByteSizeUnit.GB),
3737
Setting.Property.Final,
3838
Setting.Property.NodeScope
3939
);
4040

41+
/**
42+
* Controls the spill memory used for the datafusion query execution
43+
*/
44+
public static final Setting<ByteSizeValue> DATAFUSION_SPILL_MEMORY_LIMIT_CONFIGURATION = Setting.byteSizeSetting(
45+
"datafusion.spill.memory_limit",
46+
new ByteSizeValue(20, ByteSizeUnit.GB),
47+
Setting.Property.Final,
48+
Setting.Property.NodeScope
49+
);
50+
4151
/**
4252
* Creates a new DataFusion runtime environment.
4353
*/
44-
public DataFusionRuntimeEnv(ClusterService clusterService) {
45-
long memoryLimit = clusterService.getClusterSettings().get(MEMORY_POOL_CONFIGURATION_DATAFUSION).getBytes();
54+
public DataFusionRuntimeEnv(ClusterService clusterService, String spill_dir) {
55+
long memoryLimit = clusterService.getClusterSettings().get(DATAFUSION_MEMORY_POOL_CONFIGURATION).getBytes();
56+
long spillLimit = clusterService.getClusterSettings().get(DATAFUSION_SPILL_MEMORY_LIMIT_CONFIGURATION).getBytes();
4657
long cacheManagerConfigPtr = CacheUtils.createCacheConfig(clusterService.getClusterSettings());
4758
NativeBridge.initTokioRuntimeManager(Runtime.getRuntime().availableProcessors());
4859
NativeBridge.startTokioRuntimeMonitoring(); // TODO : do we need this control in java ?
49-
this.runtimeHandle = new GlobalRuntimeHandle(memoryLimit, cacheManagerConfigPtr);
60+
this.runtimeHandle = new GlobalRuntimeHandle(memoryLimit, cacheManagerConfigPtr, spill_dir, spillLimit);
5061
System.out.println("Runtime : " + this.runtimeHandle);
5162
this.cacheManager = new CacheManager(this.runtimeHandle);
5263
}

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/NativeBridge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public final class NativeBridge {
2424
private NativeBridge() {}
2525

2626
// Runtime management
27-
public static native long createGlobalRuntime(long limit, long cacheManagerPtr);
27+
public static native long createGlobalRuntime(long limit, long cacheManagerPtr, String spillDir, long spillLimit);
2828
public static native void closeGlobalRuntime(long ptr);
2929

3030
// Tokio runtime

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/handle/GlobalRuntimeHandle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
*/
1717
public final class GlobalRuntimeHandle extends NativeHandle {
1818

19-
public GlobalRuntimeHandle(long memoryLimit, long cacheManagerConfigPtr) {
20-
super(NativeBridge.createGlobalRuntime(memoryLimit,cacheManagerConfigPtr));
19+
public GlobalRuntimeHandle(long memoryLimit, long cacheManagerConfigPtr, String spillDir, long spillLimit) {
20+
super(NativeBridge.createGlobalRuntime(memoryLimit,cacheManagerConfigPtr, spillDir, spillLimit));
2121
}
2222

2323
/**

plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionReaderManagerTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.opensearch.common.settings.Settings;
2828
import org.opensearch.core.index.Index;
2929
import org.opensearch.core.index.shard.ShardId;
30+
import org.opensearch.datafusion.core.DataFusionRuntimeEnv;
3031
import org.opensearch.datafusion.search.*;
3132
import org.opensearch.env.Environment;
3233
import org.opensearch.index.engine.exec.*;
@@ -66,12 +67,13 @@ public void setup() {
6667
clusterSettingsToAdd.add(METADATA_CACHE_ENABLED);
6768
clusterSettingsToAdd.add(METADATA_CACHE_SIZE_LIMIT);
6869
clusterSettingsToAdd.add(METADATA_CACHE_EVICTION_TYPE);
69-
clusterSettingsToAdd.add(org.opensearch.datafusion.core.DataFusionRuntimeEnv.MEMORY_POOL_CONFIGURATION_DATAFUSION);
70+
clusterSettingsToAdd.add(DataFusionRuntimeEnv.DATAFUSION_MEMORY_POOL_CONFIGURATION);
71+
clusterSettingsToAdd.add(DataFusionRuntimeEnv.DATAFUSION_SPILL_MEMORY_LIMIT_CONFIGURATION);
7072
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, clusterSettingsToAdd);
7173

7274
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);
7375
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
74-
service = new DataFusionService(Collections.emptyMap(),clusterService);
76+
service = new DataFusionService(Collections.emptyMap(),clusterService, "/tmp");
7577
service.doStart();
7678
noOpFileDeleterSupplier = () -> {
7779
try {

plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.opensearch.core.common.Strings;
2626
import org.opensearch.core.index.Index;
2727
import org.opensearch.core.index.shard.ShardId;
28+
import org.opensearch.datafusion.core.DataFusionRuntimeEnv;
2829
import org.opensearch.datafusion.search.DatafusionContext;
2930
import org.opensearch.datafusion.search.DatafusionQuery;
3031
import org.opensearch.datafusion.search.DatafusionSearcher;
@@ -96,20 +97,21 @@ public void setup() {
9697
Settings mockSettings = Settings.builder().put("path.data", "/tmp/test-data").build();
9798

9899
when(mockEnvironment.settings()).thenReturn(mockSettings);
99-
service = new DataFusionService(Map.of(), clusterService);
100+
service = new DataFusionService(Map.of(), clusterService, "/tmp");
100101
Set<Setting<?>> clusterSettingsToAdd = new HashSet<>(BUILT_IN_CLUSTER_SETTINGS);
101102
clusterSettingsToAdd.add(METADATA_CACHE_ENABLED);
102103
clusterSettingsToAdd.add(METADATA_CACHE_SIZE_LIMIT);
103104
clusterSettingsToAdd.add(METADATA_CACHE_EVICTION_TYPE);
104-
clusterSettingsToAdd.add(org.opensearch.datafusion.core.DataFusionRuntimeEnv.MEMORY_POOL_CONFIGURATION_DATAFUSION);
105+
clusterSettingsToAdd.add(DataFusionRuntimeEnv.DATAFUSION_MEMORY_POOL_CONFIGURATION);
106+
clusterSettingsToAdd.add(DataFusionRuntimeEnv.DATAFUSION_SPILL_MEMORY_LIMIT_CONFIGURATION);
105107

106108

107109
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, clusterSettingsToAdd);
108110
clusterService = mock(ClusterService.class);
109111
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);
110112
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
111113

112-
service = new DataFusionService(Collections.emptyMap(), clusterService);
114+
service = new DataFusionService(Collections.emptyMap(), clusterService, "/tmp");
113115
//service = new DataFusionService(Map.of());
114116
service.doStart();
115117
}

plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DatafusionCacheManagerTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.opensearch.common.settings.ClusterSettings;
2828
import org.opensearch.common.settings.Setting;
2929
import org.opensearch.common.settings.Settings;
30+
import org.opensearch.datafusion.core.DataFusionRuntimeEnv;
3031
import org.opensearch.datafusion.search.cache.CacheManager;
3132
import org.opensearch.datafusion.search.cache.CacheUtils;
3233
import org.opensearch.env.Environment;
@@ -56,14 +57,15 @@ public void setup() {
5657
clusterSettingsToAdd.add(METADATA_CACHE_ENABLED);
5758
clusterSettingsToAdd.add(METADATA_CACHE_SIZE_LIMIT);
5859
clusterSettingsToAdd.add(METADATA_CACHE_EVICTION_TYPE);
59-
clusterSettingsToAdd.add(org.opensearch.datafusion.core.DataFusionRuntimeEnv.MEMORY_POOL_CONFIGURATION_DATAFUSION);
60+
clusterSettingsToAdd.add(DataFusionRuntimeEnv.DATAFUSION_MEMORY_POOL_CONFIGURATION);
61+
clusterSettingsToAdd.add(DataFusionRuntimeEnv.DATAFUSION_SPILL_MEMORY_LIMIT_CONFIGURATION);
6062

6163

6264
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, clusterSettingsToAdd);
6365
clusterService = mock(ClusterService.class);
6466
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);
6567
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
66-
service = new DataFusionService(Collections.emptyMap(), clusterService);
68+
service = new DataFusionService(Collections.emptyMap(), clusterService, "/tmp");
6769
service.doStart();
6870
}
6971

0 commit comments

Comments
 (0)