Skip to content
This repository was archived by the owner on Jan 20, 2023. It is now read-only.

Commit eba0fcf

Browse files
authored
Add Keda autoscaling for ballista in k8s (apache#586)
1 parent 62b0349 commit eba0fcf

File tree

7 files changed

+201
-21
lines changed

7 files changed

+201
-21
lines changed

ballista/rust/scheduler/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ uuid = { version = "0.8", features = ["v4"] }
6161

6262
[build-dependencies]
6363
configure_me_codegen = "0.4.0"
64+
tonic-build = { version = "0.4" }
6465

6566
[package.metadata.configure_me.bin]
6667
scheduler = "scheduler_config_spec.toml"

ballista/rust/scheduler/build.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,10 @@ extern crate configure_me_codegen;
2020
fn main() -> Result<(), String> {
2121
println!("cargo:rerun-if-changed=scheduler_config_spec.toml");
2222
configure_me_codegen::build_script_auto()
23-
.map_err(|e| format!("configure_me code generation failed: {}", e))
23+
.map_err(|e| format!("configure_me code generation failed: {}", e))?;
24+
25+
println!("cargo:rerun-if-changed=proto/keda.proto");
26+
tonic_build::configure()
27+
.compile(&["proto/keda.proto"], &["proto"])
28+
.map_err(|e| format!("protobuf compilation failed: {}", e))
2429
}
+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Copyright 2020 The KEDA Authors.
3+
4+
and others that have contributed code to the public domain.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at.
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
// This file comes from https://github.com/kedacore/keda/blob/main/pkg/scalers/externalscaler/externalscaler.proto
19+
syntax = "proto3";
20+
21+
package externalscaler;
22+
option go_package = ".;externalscaler";
23+
24+
service ExternalScaler {
25+
rpc IsActive(ScaledObjectRef) returns (IsActiveResponse) {}
26+
// Commented out since we aren't supporting the streaming scaler interface at the moment
27+
// rpc StreamIsActive(ScaledObjectRef) returns (stream IsActiveResponse) {}
28+
rpc GetMetricSpec(ScaledObjectRef) returns (GetMetricSpecResponse) {}
29+
rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse) {}
30+
}
31+
32+
message ScaledObjectRef {
33+
string name = 1;
34+
string namespace = 2;
35+
map<string, string> scalerMetadata = 3;
36+
}
37+
38+
message IsActiveResponse {
39+
bool result = 1;
40+
}
41+
42+
message GetMetricSpecResponse {
43+
repeated MetricSpec metricSpecs = 1;
44+
}
45+
46+
message MetricSpec {
47+
string metricName = 1;
48+
int64 targetSize = 2;
49+
}
50+
51+
message GetMetricsRequest {
52+
ScaledObjectRef scaledObjectRef = 1;
53+
string metricName = 2;
54+
}
55+
56+
message GetMetricsResponse {
57+
repeated MetricValue metricValues = 1;
58+
}
59+
60+
message MetricValue {
61+
string metricName = 1;
62+
int64 metricValue = 2;
63+
}

ballista/rust/scheduler/src/lib.rs

+64-5
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,22 @@ pub use standalone::new_standalone_scheduler;
2828
#[cfg(test)]
2929
pub mod test_utils;
3030

31+
// include the generated protobuf source as a submodule
32+
#[allow(clippy::all)]
33+
pub mod externalscaler {
34+
include!(concat!(env!("OUT_DIR"), "/externalscaler.rs"));
35+
}
36+
3137
use std::{convert::TryInto, sync::Arc};
3238
use std::{fmt, net::IpAddr};
3339

3440
use ballista_core::serde::protobuf::{
3541
execute_query_params::Query, executor_registration::OptionalHost, job_status,
36-
scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, ExecuteQueryResult,
37-
FailedJob, FilePartitionMetadata, FileType, GetFileMetadataParams,
38-
GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
39-
PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition,
40-
TaskStatus,
42+
scheduler_grpc_server::SchedulerGrpc, task_status, ExecuteQueryParams,
43+
ExecuteQueryResult, FailedJob, FilePartitionMetadata, FileType,
44+
GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult,
45+
JobStatus, PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob,
46+
TaskDefinition, TaskStatus,
4147
};
4248
use ballista_core::serde::scheduler::ExecutorMeta;
4349

@@ -62,6 +68,10 @@ impl parse_arg::ParseArgFromStr for ConfigBackend {
6268
}
6369
}
6470

71+
use crate::externalscaler::{
72+
external_scaler_server::ExternalScaler, GetMetricSpecResponse, GetMetricsRequest,
73+
GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef,
74+
};
6575
use crate::planner::DistributedPlanner;
6676

6777
use log::{debug, error, info, warn};
@@ -103,6 +113,55 @@ impl SchedulerServer {
103113
}
104114
}
105115

116+
const INFLIGHT_TASKS_METRIC_NAME: &str = "inflight_tasks";
117+
118+
#[tonic::async_trait]
119+
impl ExternalScaler for SchedulerServer {
120+
async fn is_active(
121+
&self,
122+
_request: Request<ScaledObjectRef>,
123+
) -> Result<Response<IsActiveResponse>, tonic::Status> {
124+
let tasks = self.state.get_all_tasks().await.map_err(|e| {
125+
let msg = format!("Error reading tasks: {}", e);
126+
error!("{}", msg);
127+
tonic::Status::internal(msg)
128+
})?;
129+
let result = tasks.iter().any(|(_key, task)| {
130+
!matches!(
131+
task.status,
132+
Some(task_status::Status::Completed(_))
133+
| Some(task_status::Status::Failed(_))
134+
)
135+
});
136+
debug!("Are there active tasks? {}", result);
137+
Ok(Response::new(IsActiveResponse { result }))
138+
}
139+
140+
async fn get_metric_spec(
141+
&self,
142+
_request: Request<ScaledObjectRef>,
143+
) -> Result<Response<GetMetricSpecResponse>, tonic::Status> {
144+
Ok(Response::new(GetMetricSpecResponse {
145+
metric_specs: vec![MetricSpec {
146+
metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
147+
target_size: 1,
148+
}],
149+
}))
150+
}
151+
152+
async fn get_metrics(
153+
&self,
154+
_request: Request<GetMetricsRequest>,
155+
) -> Result<Response<GetMetricsResponse>, tonic::Status> {
156+
Ok(Response::new(GetMetricsResponse {
157+
metric_values: vec![MetricValue {
158+
metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
159+
metric_value: 10000000, // A very high number to saturate the HPA
160+
}],
161+
}))
162+
}
163+
}
164+
106165
#[tonic::async_trait]
107166
impl SchedulerGrpc for SchedulerServer {
108167
async fn poll_work(

ballista/rust/scheduler/src/main.rs

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Ballista Rust scheduler binary.
1919
2020
use anyhow::{Context, Result};
21+
use ballista_scheduler::externalscaler::external_scaler_server::ExternalScalerServer;
2122
use futures::future::{self, Either, TryFutureExt};
2223
use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
2324
use std::convert::Infallible;
@@ -72,8 +73,11 @@ async fn start_server(
7273
let scheduler_grpc_server =
7374
SchedulerGrpcServer::new(scheduler_server.clone());
7475

76+
let keda_scaler = ExternalScalerServer::new(scheduler_server.clone());
77+
7578
let mut tonic = TonicServer::builder()
7679
.add_service(scheduler_grpc_server)
80+
.add_service(keda_scaler)
7781
.into_service();
7882
let mut warp = warp::service(get_routes(scheduler_server));
7983

ballista/rust/scheduler/src/state/mod.rs

+15-13
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,15 @@ impl SchedulerState {
236236
Ok((&value).try_into()?)
237237
}
238238

239+
pub async fn get_all_tasks(&self) -> Result<HashMap<String, TaskStatus>> {
240+
self.config_client
241+
.get_from_prefix(&get_task_prefix(&self.namespace))
242+
.await?
243+
.into_iter()
244+
.map(|(key, bytes)| Ok((key, decode_protobuf(&bytes)?)))
245+
.collect()
246+
}
247+
239248
/// This function ensures that the task wasn't assigned to an executor that died.
240249
/// If that is the case, then the task is re-scheduled.
241250
/// Returns true if the task was dead, false otherwise.
@@ -274,18 +283,12 @@ impl SchedulerState {
274283
&self,
275284
executor_id: &str,
276285
) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
277-
let kvs: HashMap<String, Vec<u8>> = self
278-
.config_client
279-
.get_from_prefix(&get_task_prefix(&self.namespace))
280-
.await?
281-
.into_iter()
282-
.collect();
286+
let tasks = self.get_all_tasks().await?;
283287
// TODO: Make the duration a configurable parameter
284288
let executors = self
285289
.get_alive_executors_metadata(Duration::from_secs(60))
286290
.await?;
287-
'tasks: for (_key, value) in kvs.iter() {
288-
let mut status: TaskStatus = decode_protobuf(value)?;
291+
'tasks: for (_key, status) in tasks.iter() {
289292
if status.status.is_none() {
290293
let partition = status.partition_id.as_ref().unwrap();
291294
let plan = self
@@ -301,31 +304,29 @@ impl SchedulerState {
301304
for unresolved_shuffle in unresolved_shuffles {
302305
for stage_id in unresolved_shuffle.query_stage_ids {
303306
for partition_id in 0..unresolved_shuffle.partition_count {
304-
let referenced_task = kvs
307+
let referenced_task = tasks
305308
.get(&get_task_status_key(
306309
&self.namespace,
307310
&partition.job_id,
308311
stage_id,
309312
partition_id,
310313
))
311314
.unwrap();
312-
let referenced_task: TaskStatus =
313-
decode_protobuf(referenced_task)?;
314315
let task_is_dead = self
315316
.reschedule_dead_task(&referenced_task, &executors)
316317
.await?;
317318
if task_is_dead {
318319
continue 'tasks;
319320
} else if let Some(task_status::Status::Completed(
320321
CompletedTask { executor_id },
321-
)) = referenced_task.status
322+
)) = &referenced_task.status
322323
{
323324
let empty = vec![];
324325
let locations =
325326
partition_locations.entry(stage_id).or_insert(empty);
326327
let executor_meta = executors
327328
.iter()
328-
.find(|exec| exec.id == executor_id)
329+
.find(|exec| exec.id == *executor_id)
329330
.unwrap()
330331
.clone();
331332
locations.push(vec![
@@ -350,6 +351,7 @@ impl SchedulerState {
350351
remove_unresolved_shuffles(plan.as_ref(), &partition_locations)?;
351352

352353
// If we get here, there are no more unresolved shuffled and the task can be run
354+
let mut status = status.clone();
353355
status.status = Some(task_status::Status::Running(RunningTask {
354356
executor_id: executor_id.to_owned(),
355357
}));

docs/user-guide/src/distributed/kubernetes.md

+48-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The k8s deployment consists of:
2828
- k8s deployment for one or more executor processes
2929
- k8s service to route traffic to the schedulers
3030
- k8s persistent volume and persistent volume claims to make local data accessible to Ballista
31+
- _(optional)_ a [keda](http://keda.sh) instance for autoscaling the number of executors
3132

3233
## Limitations
3334

@@ -163,8 +164,8 @@ spec:
163164
image: <your-image>
164165
command: ["/executor"]
165166
args:
166-
- "--bind-port=50051",
167-
- "--scheduler-host=ballista-scheduler",
167+
- "--bind-port=50051"
168+
- "--scheduler-host=ballista-scheduler"
168169
- "--scheduler-port=50050"
169170
ports:
170171
- containerPort: 50051
@@ -216,3 +217,48 @@ Run the following kubectl command to delete the cluster.
216217
```bash
217218
kubectl delete -f cluster.yaml
218219
```
220+
221+
## Adding autoscaling for executors
222+
223+
Ballista supports autoscaling for executors through [Keda](http://keda.sh). Keda allows scaling a deployment
224+
through custom metrics which are exposed through the Ballista scheduler, and it can even scale the number of
225+
executors down to 0 if there is no activity in the cluster.
226+
227+
Keda can be installed in your kubernetes cluster through a single command line:
228+
229+
```bash
230+
kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.3.0/keda-2.3.0.yaml
231+
```
232+
233+
Once you have deployed Keda on your cluster, you can now deploy a new kubernetes object called `ScaledObject`
234+
which will let Keda know how to scale your executors. In order to do that, copy the following YAML into a
235+
`scale.yaml` file:
236+
237+
```yaml
238+
apiVersion: keda.sh/v1alpha1
239+
kind: ScaledObject
240+
metadata:
241+
name: ballista-executor
242+
spec:
243+
scaleTargetRef:
244+
name: ballista-executor
245+
minReplicaCount: 0
246+
maxReplicaCount: 5
247+
triggers:
248+
- type: external
249+
metadata:
250+
# Change this DNS if the scheduler isn't deployed in the "default" namespace
251+
scalerAddress: ballista-scheduler.default.svc.cluster.local:50050
252+
```
253+
254+
And then deploy it into the cluster:
255+
256+
```bash
257+
kubectl apply -f scale.yaml
258+
```
259+
260+
If the cluster is inactive, Keda will now scale the number of executors down to 0, and will scale them up when
261+
you launch a query. Please note that Keda will perform a scan once every 30 seconds, so it might take a bit to
262+
scale the executors.
263+
264+
Please visit Keda's [documentation page](https://keda.sh/docs/2.3/concepts/scaling-deployments/) for more information.

0 commit comments

Comments
 (0)