Skip to content

Commit 6c7423e

Browse files
committed
Limit to 3 pipelines per node per source
1 parent 4dc3f71 commit 6c7423e

File tree

4 files changed

+307
-51
lines changed

4 files changed

+307
-51
lines changed

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
5151
Duration::from_secs(30)
5252
};
5353

54+
/// That's 80% of a pipeline capacity
55+
const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200);
56+
5457
#[derive(Debug, Clone, Default, Serialize)]
5558
pub struct IndexingSchedulerState {
5659
pub num_applied_physical_indexing_plan: usize,
@@ -257,8 +260,12 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
257260
source_uid,
258261
source_type: SourceToScheduleType::NonSharded {
259262
num_pipelines: source_config.num_pipelines.get() as u32,
260-
// FIXME
261-
load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis())
263+
// FIXME:
264+
// - implementing adaptative load contains the risk of generating
265+
// rebalancing storms for sources like Kafka
266+
// - this is coupled with the scheduling logic that misses the notion of
267+
// pipeline
268+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis())
262269
.unwrap(),
263270
},
264271
params_fingerprint,

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ We also want to observe some interesting properties such as:
1515
To simplify the logic and make it easier to test it, we first abstract this in the following
1616
optimization problem. In Quickwit, we have two types of source:
1717

18-
- The push api source: they have a given (changing) set of shards associated to them.
19-
A shard is rate-limited to ensure their throughput is lower than `5MB/s` worth of
18+
- The push api source: indexes have a given (changing) set of shards associated to them.
19+
Shards are stored on indexer nodes and are spread randomly accross them. A shard is
20+
rate-limited to ensure their throughput is lower than `5MB/s` worth of
2021
uncompressed data. This guarantees that a given shard can be indexed by a
2122
single indexing pipeline.
2223

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,8 +757,8 @@ mod tests {
757757
convert_scheduling_solution_to_physical_plan_single_node_single_source,
758758
};
759759
use crate::indexing_plan::PhysicalIndexingPlan;
760-
use crate::indexing_scheduler::get_shard_locality_metrics;
761760
use crate::indexing_scheduler::scheduling::assign_shards;
761+
use crate::indexing_scheduler::{MAX_LOAD_PER_PIPELINE, get_shard_locality_metrics};
762762
use crate::model::ShardLocations;
763763

764764
fn source_id() -> SourceUid {
@@ -939,6 +939,146 @@ mod tests {
939939
}
940940
}
941941

942+
#[test]
943+
fn test_build_physical_plan_with_pipeline_limit() {
944+
let indexer1 = "indexer1".to_string();
945+
let indexer2 = "indexer2".to_string();
946+
let source_uid0 = source_id();
947+
let source_uid1 = source_id();
948+
let source_0 = SourceToSchedule {
949+
source_uid: source_uid0.clone(),
950+
source_type: SourceToScheduleType::Sharded {
951+
shard_ids: (0..16).map(ShardId::from).collect(),
952+
load_per_shard: NonZeroU32::new(800).unwrap(),
953+
},
954+
params_fingerprint: 0,
955+
};
956+
let source_1 = SourceToSchedule {
957+
source_uid: source_uid1.clone(),
958+
source_type: SourceToScheduleType::NonSharded {
959+
num_pipelines: 4,
960+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(),
961+
},
962+
params_fingerprint: 0,
963+
};
964+
let mut indexer_id_to_cpu_capacities = FnvHashMap::default();
965+
indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000));
966+
indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000));
967+
let shard_locations = ShardLocations::default();
968+
let indexing_plan = build_physical_indexing_plan(
969+
&[source_0, source_1],
970+
&indexer_id_to_cpu_capacities,
971+
None,
972+
&shard_locations,
973+
);
974+
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2);
975+
976+
let node1_plan = indexing_plan.indexer(&indexer1).unwrap();
977+
let node2_plan = indexing_plan.indexer(&indexer2).unwrap();
978+
979+
let source_0_on_node1 = node1_plan
980+
.iter()
981+
.filter(|task| task.source_id == source_uid0.source_id)
982+
.count();
983+
let source_0_on_node2 = node2_plan
984+
.iter()
985+
.filter(|task| task.source_id == source_uid0.source_id)
986+
.count();
987+
assert!(source_0_on_node1 <= 3);
988+
assert!(source_0_on_node2 <= 3);
989+
assert_eq!(source_0_on_node1 + source_0_on_node2, 4);
990+
991+
let source_1_on_node1 = node1_plan
992+
.iter()
993+
.filter(|task| task.source_id == source_uid1.source_id)
994+
.count();
995+
let source_1_on_node2 = node2_plan
996+
.iter()
997+
.filter(|task| task.source_id == source_uid1.source_id)
998+
.count();
999+
assert!(source_1_on_node1 <= 3);
1000+
assert!(source_1_on_node2 <= 3);
1001+
assert_eq!(source_1_on_node1 + source_1_on_node2, 4);
1002+
}
1003+
1004+
#[test]
1005+
fn test_build_physical_plan_second_iteration() {
1006+
let indexer1 = "indexer1".to_string();
1007+
let indexer2 = "indexer2".to_string();
1008+
let indexer3 = "indexer3".to_string();
1009+
let mut sources = Vec::new();
1010+
for _ in 0..10 {
1011+
sources.push(SourceToSchedule {
1012+
source_uid: source_id(),
1013+
source_type: SourceToScheduleType::NonSharded {
1014+
num_pipelines: 4,
1015+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(),
1016+
},
1017+
params_fingerprint: 0,
1018+
});
1019+
}
1020+
let mut indexer_id_to_cpu_capacities = FnvHashMap::default();
1021+
indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000));
1022+
indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000));
1023+
indexer_id_to_cpu_capacities.insert(indexer3.clone(), mcpu(16_000));
1024+
let shard_locations = ShardLocations::default();
1025+
let indexing_plan = build_physical_indexing_plan(
1026+
&sources,
1027+
&indexer_id_to_cpu_capacities,
1028+
None,
1029+
&shard_locations,
1030+
);
1031+
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 3);
1032+
1033+
for source in &sources {
1034+
let pipelines_per_indexer_for_source = indexing_plan
1035+
.indexing_tasks_per_indexer()
1036+
.values()
1037+
.map(|tasks| {
1038+
tasks
1039+
.iter()
1040+
.filter(|t| t.source_id == source.source_uid.source_id)
1041+
.count()
1042+
})
1043+
.collect_vec();
1044+
assert!(pipelines_per_indexer_for_source.contains(&3));
1045+
assert!(pipelines_per_indexer_for_source.contains(&1));
1046+
assert!(pipelines_per_indexer_for_source.contains(&0));
1047+
assert_eq!(pipelines_per_indexer_for_source.iter().sum::<usize>(), 4);
1048+
}
1049+
1050+
for source in &mut sources {
1051+
if let SourceToScheduleType::NonSharded { num_pipelines, .. } = &mut source.source_type
1052+
{
1053+
*num_pipelines = 5;
1054+
}
1055+
}
1056+
1057+
let new_indexing_plan = build_physical_indexing_plan(
1058+
&sources,
1059+
&indexer_id_to_cpu_capacities,
1060+
Some(&indexing_plan),
1061+
&shard_locations,
1062+
);
1063+
1064+
for source in &sources {
1065+
let pipelines_per_indexer_for_source = new_indexing_plan
1066+
.indexing_tasks_per_indexer()
1067+
.values()
1068+
.map(|tasks| {
1069+
tasks
1070+
.iter()
1071+
.filter(|t| t.source_id == source.source_uid.source_id)
1072+
.count()
1073+
})
1074+
.collect_vec();
1075+
assert!(pipelines_per_indexer_for_source.contains(&3));
1076+
assert!(pipelines_per_indexer_for_source.contains(&2));
1077+
assert!(pipelines_per_indexer_for_source.contains(&0));
1078+
assert_eq!(pipelines_per_indexer_for_source.iter().sum::<usize>(), 5);
1079+
}
1080+
}
1081+
9421082
fn make_indexing_tasks(
9431083
source_uid: &SourceUid,
9441084
shards: &[(PipelineUid, &[ShardId])],

0 commit comments

Comments
 (0)