Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions nativelink-config/examples/mongo.json5
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,23 @@
worker_timeout_s: 300,
},
},
{
// Example of a scheduler using MongoDB backend for state management
name: "MONGO_BACKED_SCHEDULER",
simple: {
supported_platform_properties: {
cpu_arch: "minimum",
OS: "exact",
},
max_job_retries: 3,
worker_timeout_s: 300,
experimental_backend: {
mongo: {
mongo_store: "MONGO_SCHEDULER",
},
},
},
},
],
servers: [
{
Expand Down
10 changes: 10 additions & 0 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ pub enum ExperimentalSimpleSchedulerBackend {
Memory,
/// Use a redis store for the scheduler.
Redis(ExperimentalRedisSchedulerBackend),
/// Use a mongodb store for the scheduler.
Mongo(ExperimentalMongoSchedulerBackend),
}

#[derive(Deserialize, Debug, Default)]
Expand All @@ -148,6 +150,14 @@ pub struct ExperimentalRedisSchedulerBackend {
pub redis_store: StoreRefName,
}

#[derive(Deserialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct ExperimentalMongoSchedulerBackend {
/// A reference to the mongo store to use for the scheduler.
/// Note: This MUST resolve to a `ExperimentalMongoSpec`.
pub mongo_store: StoreRefName,
}

/// A scheduler that simply forwards requests to an upstream scheduler. This
/// is useful to use when doing some kind of local action cache or CAS away from
/// the main cluster of workers. In general, it's more efficient to point the
Expand Down
35 changes: 35 additions & 0 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use nativelink_config::schedulers::{
use nativelink_config::stores::EvictionPolicy;
use nativelink_error::{Error, ResultExt, make_input_err};
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
use nativelink_store::mongo_store::ExperimentalMongoStore;
use nativelink_store::redis_store::RedisStore;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::instant_wrapper::InstantWrapper;
Expand Down Expand Up @@ -150,6 +151,40 @@ fn simple_scheduler_factory(
);
Ok((Some(action_scheduler), Some(worker_scheduler)))
}
ExperimentalSimpleSchedulerBackend::Mongo(mongo_config) => {
let store = store_manager
.get_store(mongo_config.mongo_store.as_ref())
.err_tip(|| {
format!(
"'mongo_store': '{}' does not exist",
mongo_config.mongo_store
)
})?;
let task_change_notify = Arc::new(Notify::new());
let store = store
.into_inner()
.as_any_arc()
.downcast::<ExperimentalMongoStore>()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This downcast is causing some problems and I'm trying to figure out. I'm confident that Mongo can present a more usable option for some customers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you demo the problem here? Possibly also add a test to show it working/broken?

.map_err(|_| {
make_input_err!(
"Could not downcast to mongo store in MongoAwaitedActionDb::new"
)
})?;
let awaited_action_db = StoreAwaitedActionDb::new(
store,
task_change_notify.clone(),
now_fn,
Default::default,
)
.err_tip(|| "In state_manager_factory::mongo_state_manager")?;
let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
spec,
awaited_action_db,
task_change_notify,
maybe_origin_event_tx.cloned(),
);
Ok((Some(action_scheduler), Some(worker_scheduler)))
}
}
}

Expand Down
Loading