Skip to content
This repository was archived by the owner on Jan 20, 2023. It is now read-only.

Commit f7dff76

Browse files
authored
Ballista: Make shuffle partitions configurable in benchmarks (apache#702)
1 parent 7d24567 commit f7dff76

File tree

9 files changed

+271
-27
lines changed

9 files changed

+271
-27
lines changed

ballista/rust/client/src/context.rs

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ use std::sync::{Arc, Mutex};
2323
use std::{collections::HashMap, convert::TryInto};
2424
use std::{fs, time::Duration};
2525

26-
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
27-
use ballista_core::serde::protobuf::PartitionLocation;
26+
use ballista_core::config::BallistaConfig;
2827
use ballista_core::serde::protobuf::{
29-
execute_query_params::Query, job_status, ExecuteQueryParams, GetJobStatusParams,
30-
GetJobStatusResult,
28+
execute_query_params::Query, job_status, scheduler_grpc_client::SchedulerGrpcClient,
29+
ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
30+
PartitionLocation,
3131
};
32-
use ballista_core::utils::WrappedStream;
3332
use ballista_core::{
3433
client::BallistaClient, datasource::DfTableAdapter, utils::create_datafusion_context,
34+
utils::WrappedStream,
3535
};
3636

3737
use datafusion::arrow::datatypes::Schema;
@@ -45,6 +45,8 @@ use futures::StreamExt;
4545
use log::{error, info};
4646

4747
struct BallistaContextState {
48+
/// Ballista configuration
49+
config: BallistaConfig,
4850
/// Scheduler host
4951
scheduler_host: String,
5052
/// Scheduler port
@@ -54,8 +56,13 @@ struct BallistaContextState {
5456
}
5557

5658
impl BallistaContextState {
57-
pub fn new(scheduler_host: String, scheduler_port: u16) -> Self {
59+
pub fn new(
60+
scheduler_host: String,
61+
scheduler_port: u16,
62+
config: &BallistaConfig,
63+
) -> Self {
5864
Self {
65+
config: config.clone(),
5966
scheduler_host,
6067
scheduler_port,
6168
tables: HashMap::new(),
@@ -64,6 +71,7 @@ impl BallistaContextState {
6471

6572
#[cfg(feature = "standalone")]
6673
pub async fn new_standalone(
74+
config: &BallistaConfig,
6775
concurrent_tasks: usize,
6876
) -> ballista_core::error::Result<Self> {
6977
info!("Running in local mode. Scheduler will be run in-proc");
@@ -87,11 +95,16 @@ impl BallistaContextState {
8795

8896
ballista_executor::new_standalone_executor(scheduler, concurrent_tasks).await?;
8997
Ok(Self {
98+
config: config.clone(),
9099
scheduler_host: "localhost".to_string(),
91100
scheduler_port: addr.port(),
92101
tables: HashMap::new(),
93102
})
94103
}
104+
105+
pub fn config(&self) -> &BallistaConfig {
106+
&self.config
107+
}
95108
}
96109

97110
pub struct BallistaContext {
@@ -100,8 +113,8 @@ pub struct BallistaContext {
100113

101114
impl BallistaContext {
102115
/// Create a context for executing queries against a remote Ballista scheduler instance
103-
pub fn remote(host: &str, port: u16) -> Self {
104-
let state = BallistaContextState::new(host.to_owned(), port);
116+
pub fn remote(host: &str, port: u16, config: &BallistaConfig) -> Self {
117+
let state = BallistaContextState::new(host.to_owned(), port, config);
105118

106119
Self {
107120
state: Arc::new(Mutex::new(state)),
@@ -110,9 +123,11 @@ impl BallistaContext {
110123

111124
#[cfg(feature = "standalone")]
112125
pub async fn standalone(
126+
config: &BallistaConfig,
113127
concurrent_tasks: usize,
114128
) -> ballista_core::error::Result<Self> {
115-
let state = BallistaContextState::new_standalone(concurrent_tasks).await?;
129+
let state =
130+
BallistaContextState::new_standalone(config, concurrent_tasks).await?;
116131

117132
Ok(Self {
118133
state: Arc::new(Mutex::new(state)),
@@ -127,7 +142,7 @@ impl BallistaContext {
127142
let path = fs::canonicalize(&path)?;
128143

129144
// use local DataFusion context for now but later this might call the scheduler
130-
let mut ctx = create_datafusion_context();
145+
let mut ctx = create_datafusion_context(&self.state.lock().unwrap().config());
131146
let df = ctx.read_parquet(path.to_str().unwrap())?;
132147
Ok(df)
133148
}
@@ -144,7 +159,7 @@ impl BallistaContext {
144159
let path = fs::canonicalize(&path)?;
145160

146161
// use local DataFusion context for now but later this might call the scheduler
147-
let mut ctx = create_datafusion_context();
162+
let mut ctx = create_datafusion_context(&self.state.lock().unwrap().config());
148163
let df = ctx.read_csv(path.to_str().unwrap(), options)?;
149164
Ok(df)
150165
}
@@ -176,9 +191,9 @@ impl BallistaContext {
176191
/// Create a DataFrame from a SQL statement
177192
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
178193
// use local DataFusion context for now but later this might call the scheduler
179-
let mut ctx = create_datafusion_context();
180194
// register tables
181195
let state = self.state.lock().unwrap();
196+
let mut ctx = create_datafusion_context(&state.config());
182197
for (name, plan) in &state.tables {
183198
let plan = ctx.optimize(plan)?;
184199
let execution_plan = ctx.create_physical_plan(&plan)?;
@@ -217,10 +232,11 @@ impl BallistaContext {
217232
&self,
218233
plan: &LogicalPlan,
219234
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
220-
let scheduler_url = {
235+
let (scheduler_url, config) = {
221236
let state = self.state.lock().unwrap();
222-
223-
format!("http://{}:{}", state.scheduler_host, state.scheduler_port)
237+
let scheduler_url =
238+
format!("http://{}:{}", state.scheduler_host, state.scheduler_port);
239+
(scheduler_url, state.config.clone())
224240
};
225241

226242
info!("Connecting to Ballista scheduler at {}", scheduler_url);
@@ -238,6 +254,14 @@ impl BallistaContext {
238254
.try_into()
239255
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?,
240256
)),
257+
settings: config
258+
.settings()
259+
.iter()
260+
.map(|(k, v)| KeyValuePair {
261+
key: k.to_owned(),
262+
value: v.to_owned(),
263+
})
264+
.collect::<Vec<_>>(),
241265
})
242266
.await
243267
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?

ballista/rust/client/src/prelude.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
//! Ballista Prelude (common imports)
1919
2020
pub use crate::context::BallistaContext;
21+
pub use ballista_core::config::BallistaConfig;
22+
pub use ballista_core::config::BALLISTA_DEFAULT_SHUFFLE_PARTITIONS;
2123
pub use ballista_core::error::{BallistaError, Result};
2224

2325
pub use futures::StreamExt;

ballista/rust/core/proto/ballista.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,9 @@ message ExecuteQueryParams {
803803
oneof query {
804804
LogicalPlanNode logical_plan = 1;
805805
string sql = 2;
806-
}}
806+
}
807+
repeated KeyValuePair settings = 3;
808+
}
807809

808810
message ExecuteSqlParams {
809811
string sql = 1;

ballista/rust/core/src/config.rs

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
//
18+
19+
//! Ballista configuration
20+
21+
use std::collections::HashMap;
22+
23+
use crate::error::{BallistaError, Result};
24+
25+
use datafusion::arrow::datatypes::DataType;
26+
use log::warn;
27+
28+
pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions";
29+
30+
/// Configuration option meta-data
31+
#[derive(Debug, Clone)]
32+
pub struct ConfigEntry {
33+
name: String,
34+
description: String,
35+
data_type: DataType,
36+
default_value: Option<String>,
37+
}
38+
39+
impl ConfigEntry {
40+
fn new(
41+
name: String,
42+
description: String,
43+
data_type: DataType,
44+
default_value: Option<String>,
45+
) -> Self {
46+
Self {
47+
name,
48+
description,
49+
data_type,
50+
default_value,
51+
}
52+
}
53+
}
54+
55+
/// Ballista configuration builder
56+
pub struct BallistaConfigBuilder {
57+
settings: HashMap<String, String>,
58+
}
59+
60+
impl Default for BallistaConfigBuilder {
61+
/// Create a new config builder
62+
fn default() -> Self {
63+
Self {
64+
settings: HashMap::new(),
65+
}
66+
}
67+
}
68+
69+
impl BallistaConfigBuilder {
70+
/// Create a new config with an additional setting
71+
pub fn set(&self, k: &str, v: &str) -> Self {
72+
let mut settings = self.settings.clone();
73+
settings.insert(k.to_owned(), v.to_owned());
74+
Self { settings }
75+
}
76+
77+
pub fn build(&self) -> Result<BallistaConfig> {
78+
BallistaConfig::with_settings(self.settings.clone())
79+
}
80+
}
81+
82+
/// Ballista configuration
83+
#[derive(Debug, Clone)]
84+
pub struct BallistaConfig {
85+
/// Settings stored in map for easy serde
86+
settings: HashMap<String, String>,
87+
}
88+
89+
impl BallistaConfig {
90+
/// Create a default configuration
91+
pub fn new() -> Result<Self> {
92+
Self::with_settings(HashMap::new())
93+
}
94+
95+
/// Create a configuration builder
96+
pub fn builder() -> BallistaConfigBuilder {
97+
BallistaConfigBuilder::default()
98+
}
99+
100+
/// Create a new configuration based on key-value pairs
101+
pub fn with_settings(settings: HashMap<String, String>) -> Result<Self> {
102+
let supported_entries = BallistaConfig::valid_entries();
103+
for (name, entry) in &supported_entries {
104+
if let Some(v) = settings.get(name) {
105+
// validate that we can parse the user-supplied value
106+
let _ = v.parse::<usize>().map_err(|e| BallistaError::General(format!("Failed to parse user-supplied value '{}' for configuration setting '{}': {:?}", name, v, e)))?;
107+
} else if let Some(v) = entry.default_value.clone() {
108+
let _ = v.parse::<usize>().map_err(|e| BallistaError::General(format!("Failed to parse default value '{}' for configuration setting '{}': {:?}", name, v, e)))?;
109+
} else {
110+
return Err(BallistaError::General(format!(
111+
"No value specified for mandatory configuration setting '{}'",
112+
name
113+
)));
114+
}
115+
}
116+
117+
Ok(Self { settings })
118+
}
119+
120+
/// All available configuration options
121+
pub fn valid_entries() -> HashMap<String, ConfigEntry> {
122+
let entries = vec![
123+
ConfigEntry::new(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(),
124+
"Sets the default number of partitions to create when repartitioning query stages".to_string(),
125+
DataType::UInt16, Some("2".to_string())),
126+
];
127+
entries
128+
.iter()
129+
.map(|e| (e.name.clone(), e.clone()))
130+
.collect::<HashMap<_, _>>()
131+
}
132+
133+
pub fn settings(&self) -> &HashMap<String, String> {
134+
&self.settings
135+
}
136+
137+
pub fn default_shuffle_partitions(&self) -> usize {
138+
self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS)
139+
}
140+
141+
fn get_usize_setting(&self, key: &str) -> usize {
142+
if let Some(v) = self.settings.get(key) {
143+
// infallible because we validate all configs in the constructor
144+
v.parse().unwrap()
145+
} else {
146+
let entries = Self::valid_entries();
147+
// infallible because we validate all configs in the constructor
148+
let v = entries.get(key).unwrap().default_value.as_ref().unwrap();
149+
v.parse().unwrap()
150+
}
151+
}
152+
}
153+
154+
#[cfg(test)]
155+
mod tests {
156+
use super::*;
157+
158+
#[test]
159+
fn default_config() -> Result<()> {
160+
let config = BallistaConfig::new()?;
161+
assert_eq!(2, config.default_shuffle_partitions());
162+
Ok(())
163+
}
164+
165+
#[test]
166+
fn custom_config() -> Result<()> {
167+
let config = BallistaConfig::builder()
168+
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "123")
169+
.build()?;
170+
assert_eq!(123, config.default_shuffle_partitions());
171+
Ok(())
172+
}
173+
174+
#[test]
175+
fn custom_config_invalid() -> Result<()> {
176+
let config = BallistaConfig::builder()
177+
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "true")
178+
.build();
179+
assert!(config.is_err());
180+
assert_eq!("General(\"Failed to parse user-supplied value 'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError { kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err()));
181+
Ok(())
182+
}
183+
}

ballista/rust/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub fn print_version() {
2424
}
2525

2626
pub mod client;
27+
pub mod config;
2728
pub mod datasource;
2829
pub mod error;
2930
pub mod execution_plans;

ballista/rust/core/src/utils.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
2727
use crate::memory_stream::MemoryStream;
2828
use crate::serde::scheduler::PartitionStats;
2929

30+
use crate::config::BallistaConfig;
3031
use datafusion::arrow::error::Result as ArrowResult;
3132
use datafusion::arrow::{
3233
array::{
@@ -233,8 +234,9 @@ fn build_exec_plan_diagram(
233234
}
234235

235236
/// Create a DataFusion context that is compatible with Ballista
236-
pub fn create_datafusion_context() -> ExecutionContext {
237-
let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
237+
pub fn create_datafusion_context(config: &BallistaConfig) -> ExecutionContext {
238+
let config =
239+
ExecutionConfig::new().with_concurrency(config.default_shuffle_partitions());
238240
ExecutionContext::with_config(config)
239241
}
240242

0 commit comments

Comments
 (0)