Skip to content

Commit a93af2f

Browse files
committed
Move Bytestream to array config
1 parent f70c487 commit a93af2f

File tree

14 files changed

+318
-191
lines changed

14 files changed

+318
-191
lines changed

Cargo.lock

Lines changed: 0 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-config/README.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,10 @@ A very basic configuration that's a pure in-memory store is:
5353
"capabilities": [{
5454
"instance_name": "main"
5555
}],
56-
"bytestream": {
57-
"cas_stores": {
58-
"main": "CAS_MAIN_STORE",
59-
}
60-
}
56+
"bytestream": [{
57+
"instance_name": "main",
58+
"cas_store": "CAS_MAIN_STORE",
59+
}]
6160
}
6261
}]
6362
}

nativelink-config/examples/README.md

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -291,11 +291,10 @@ The `public` server consists of a `listener` object and a `services` object. The
291291
"scheduler": "MAIN_SCHEDULER",
292292
}
293293
}],
294-
"bytestream": {
295-
"cas_stores": {
296-
"main": "WORKER_FAST_SLOW_STORE",
297-
}
298-
}
294+
"bytestream": [{
295+
"instance_name": "main",
296+
"cas_store": "WORKER_FAST_SLOW_STORE",
297+
}]
299298
},
300299
},{
301300
"name": "private_workers_servers"
@@ -342,11 +341,10 @@ The `private` server consists of a `listener` object and a `services` object. Th
342341
"scheduler": "MAIN_SCHEDULER",
343342
}
344343
}],
345-
"bytestream": {
346-
"cas_stores": {
347-
"main": "WORKER_FAST_SLOW_STORE",
348-
}
349-
}
344+
"bytestream": [{
345+
"instance_name": "main",
346+
"cas_store": "WORKER_FAST_SLOW_STORE",
347+
}]
350348
},
351349
},{
352350
"name": "private_workers_servers",
@@ -519,11 +517,10 @@ Below, you will find a fully tested example that you can also find in [basic_cas
519517
"scheduler": "MAIN_SCHEDULER",
520518
}
521519
}],
522-
"bytestream": {
523-
"cas_stores": {
524-
"main": "WORKER_FAST_SLOW_STORE",
525-
}
526-
}
520+
"bytestream": [{
521+
"instance_name": "main",
522+
"cas_store": "WORKER_FAST_SLOW_STORE",
523+
}]
527524
}
528525
}, {
529526
"name": "private_workers_servers",

nativelink-config/examples/basic_cas.json5

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,12 @@
156156
},
157157
},
158158
],
159-
bytestream: {
160-
cas_stores: {
161-
main: "WORKER_FAST_SLOW_STORE",
159+
bytestream: [
160+
{
161+
instance_name: "main",
162+
cas_store: "WORKER_FAST_SLOW_STORE",
162163
},
163-
},
164+
],
164165
},
165166
},
166167
{
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{
2+
stores: [],
3+
servers: [
4+
{
5+
listener: {
6+
http: {
7+
socket_address: "0.0.0.0:50051",
8+
},
9+
},
10+
services: {
11+
cas: {
12+
"": {
13+
cas_store: "CAS_MAIN_STORE",
14+
},
15+
},
16+
ac: {
17+
"": {
18+
ac_store: "AC_MAIN_STORE",
19+
},
20+
},
21+
execution: {
22+
"": {
23+
cas_store: "WORKER_FAST_SLOW_STORE",
24+
scheduler: "MAIN_SCHEDULER",
25+
},
26+
},
27+
capabilities: {
28+
"": {
29+
remote_execution: {
30+
scheduler: "MAIN_SCHEDULER",
31+
},
32+
},
33+
},
34+
bytestream: {
35+
cas_stores: {
36+
main: "WORKER_FAST_SLOW_STORE",
37+
},
38+
},
39+
},
40+
},
41+
],
42+
}

nativelink-config/src/backcompat.rs

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::collections::HashMap;
33
use serde::{Deserialize, Deserializer, Serialize};
44
use tracing::warn;
55

6-
use crate::cas_server::WithInstanceName;
6+
use crate::cas_server::{ByteStreamConfig, OldByteStreamConfig, WithInstanceName};
77

88
#[derive(Debug, Deserialize)]
99
#[serde(untagged)]
@@ -12,7 +12,19 @@ enum WithInstanceNameBackCompat<T> {
1212
Vec(Vec<WithInstanceName<T>>),
1313
}
1414

15-
/// Use `#[serde(default, deserialize_with = "backcompat::opt_vec_named_config")]` for backwards
15+
fn deprecated(old_map: &String, new_map: &String) {
16+
warn!(
17+
r"WARNING: Using deprecated map format for services. Please migrate to the new array format:
18+
// Old:
19+
{}
20+
// New:
21+
{}
22+
",
23+
old_map, new_map
24+
);
25+
}
26+
27+
/// Use `#[serde(default, deserialize_with = "backcompat::opt_vec_with_instance_name")]` for backwards
1628
/// compatibility with map-based access. A deprecation warning will be written to stderr if the
1729
/// old format is used.
1830
pub(crate) fn opt_vec_with_instance_name<'de, D, T>(
@@ -38,24 +50,73 @@ where
3850
config,
3951
})
4052
.collect();
41-
warn!(
42-
r"WARNING: Using deprecated map format for services. Please migrate to the new array format:
43-
// Old:
44-
{}
45-
// New:
46-
{}
47-
",
48-
serde_map,
53+
deprecated(
54+
&serde_map,
4955
// TODO(palfrey): ideally this would be serde_json5::to_string_pretty but that doesn't exist
5056
// JSON is close enough to be workable for now
51-
serde_json::to_string_pretty(&vec).expect("valid new map")
57+
&serde_json::to_string_pretty(&vec).expect("valid new map"),
5258
);
5359
Ok(Some(vec))
5460
}
5561
WithInstanceNameBackCompat::Vec(vec) => Ok(Some(vec)),
5662
}
5763
}
5864

65+
#[derive(Debug, Deserialize)]
66+
#[serde(untagged)]
67+
enum ByteStreamKind {
68+
Old(OldByteStreamConfig),
69+
New(Vec<WithInstanceName<ByteStreamConfig>>),
70+
}
71+
72+
/// Use `#[serde(default, deserialize_with = "backcompat::opt_bytestream")]` for backwards
73+
/// compatibility with older bytestream config . A deprecation warning will be written to stderr if the
74+
/// old format is used.
75+
pub(crate) fn opt_bytestream<'de, D>(
76+
deserializer: D,
77+
) -> Result<Option<Vec<WithInstanceName<ByteStreamConfig>>>, D::Error>
78+
where
79+
D: Deserializer<'de>,
80+
{
81+
let Some(back_compat) = Option::deserialize(deserializer)? else {
82+
return Ok(None);
83+
};
84+
85+
match back_compat {
86+
ByteStreamKind::Old(old_config) => {
87+
if old_config.max_decoding_message_size != 0 {
88+
warn!(
89+
"WARNING: max_decoding_message_size is ignored on Bytestream now. Please set on the HTTP listener instead"
90+
);
91+
}
92+
// TODO(palfrey): ideally this would be serde_json5::to_string_pretty but that doesn't exist
93+
// JSON is close enough to be workable for now
94+
let serde_map = serde_json::to_string_pretty(&old_config).expect("valid map");
95+
let names = old_config.cas_stores;
96+
let vec: Vec<WithInstanceName<_>> = names
97+
.iter()
98+
.map(|(instance_name, cas_store)| WithInstanceName {
99+
instance_name: instance_name.clone(),
100+
config: ByteStreamConfig {
101+
cas_store: cas_store.clone(),
102+
max_bytes_per_stream: old_config.max_bytes_per_stream,
103+
persist_stream_on_disconnect_timeout: old_config
104+
.persist_stream_on_disconnect_timeout,
105+
},
106+
})
107+
.collect();
108+
deprecated(
109+
&serde_map,
110+
// TODO(palfrey): ideally this would be serde_json5::to_string_pretty but that doesn't exist
111+
// JSON is close enough to be workable for now
112+
&serde_json::to_string_pretty(&vec).expect("valid new map"),
113+
);
114+
Ok(Some(vec))
115+
}
116+
ByteStreamKind::New(vec) => Ok(Some(vec)),
117+
}
118+
}
119+
59120
#[cfg(test)]
60121
mod tests {
61122
use serde_json::json;

nativelink-config/src/cas_server.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ pub struct PushConfig {
180180
#[serde(deny_unknown_fields)]
181181
pub struct ByteStreamConfig {
182182
/// Name of the store in the "stores" configuration.
183-
pub cas_stores: HashMap<InstanceName, StoreRefName>,
183+
pub cas_store: StoreRefName,
184184

185185
/// Max number of bytes to send on each grpc stream chunk.
186186
/// According to <https://github.com/grpc/grpc.github.io/issues/371>
@@ -191,11 +191,6 @@ pub struct ByteStreamConfig {
191191
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
192192
pub max_bytes_per_stream: usize,
193193

194-
/// Maximum number of bytes to decode on each grpc stream chunk.
195-
/// Default: 4 MiB
196-
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
197-
pub max_decoding_message_size: usize,
198-
199194
/// In the event a client disconnects while uploading a blob, we will hold
200195
/// the internal stream open for this many seconds before closing it.
201196
/// This allows clients that disconnect to reconnect and continue uploading
@@ -206,6 +201,21 @@ pub struct ByteStreamConfig {
206201
pub persist_stream_on_disconnect_timeout: usize,
207202
}
208203

204+
// Older bytestream config. All fields are as per the newer docs, but this requires
205+
// the hashed cas_stores v.s. the WithInstanceName approach. This should _not_ be updated
206+
// with newer fields, and eventually dropped
207+
#[derive(Deserialize, Serialize, Debug, Clone)]
208+
#[serde(deny_unknown_fields)]
209+
pub struct OldByteStreamConfig {
210+
pub cas_stores: HashMap<InstanceName, StoreRefName>,
211+
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
212+
pub max_bytes_per_stream: usize,
213+
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
214+
pub max_decoding_message_size: usize,
215+
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
216+
pub persist_stream_on_disconnect_timeout: usize,
217+
}
218+
209219
#[derive(Deserialize, Serialize, Debug)]
210220
#[serde(deny_unknown_fields)]
211221
pub struct WorkerApiConfig {
@@ -322,7 +332,8 @@ pub struct ServicesConfig {
322332
/// This is the service used to stream data to and from the CAS.
323333
/// Bazel's protocol strongly encourages users to use this streaming
324334
/// interface to interact with the CAS when the data is large.
325-
pub bytestream: Option<ByteStreamConfig>,
335+
#[serde(default, deserialize_with = "super::backcompat::opt_bytestream")]
336+
pub bytestream: Option<Vec<WithInstanceName<ByteStreamConfig>>>,
326337

327338
/// These two are collectively the Remote Asset protocol, but it's
328339
/// defined as two separate services
@@ -462,7 +473,7 @@ pub enum ListenerConfig {
462473
Http(HttpListener),
463474
}
464475

465-
#[derive(Deserialize, Serialize, Debug)]
476+
#[derive(Deserialize, Serialize, Debug, Default)]
466477
#[serde(deny_unknown_fields)]
467478
pub struct HttpListener {
468479
/// Address to listen on. Example: `127.0.0.1:8080` or `:8080` to listen
@@ -478,6 +489,11 @@ pub struct HttpListener {
478489
#[serde(default)]
479490
pub advanced_http: HttpServerConfig,
480491

492+
/// Maximum number of bytes to decode on each grpc stream chunk.
493+
/// Default: 4 MiB
494+
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
495+
pub max_decoding_message_size: usize,
496+
481497
/// Tls Configuration for this server.
482498
/// If not set, the server will not use TLS.
483499
///

nativelink-service/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ rust_test_suite(
8585
"@crates//:http-body-util",
8686
"@crates//:hyper-1.7.0",
8787
"@crates//:hyper-util",
88-
"@crates//:maplit",
8988
"@crates//:pretty_assertions",
9089
"@crates//:prost",
9190
"@crates//:prost-types",
@@ -112,7 +111,6 @@ rust_test(
112111
"@crates//:async-lock",
113112
"@crates//:hyper",
114113
"@crates//:hyper-util",
115-
"@crates//:maplit",
116114
"@crates//:pretty_assertions",
117115
"@crates//:prost-types",
118116
],

nativelink-service/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ async-trait = "0.1.88"
6363
hex = { version = "0.4.3", default-features = false }
6464
hyper = "1.6.0"
6565
hyper-util = "0.1.11"
66-
maplit = "1.0.2"
6766
pretty_assertions = { version = "1.4.1", features = ["std"] }
6867
prost-types = { version = "0.13.5", default-features = false }
6968
sha2 = { version = "0.10.8", default-features = false }

0 commit comments

Comments
 (0)