Skip to content

Commit 43f7f8d

Browse files
fix: use wildcard query when Redis index value is empty (#2069)
* fix: use wildcard query when Redis index value is empty When searching Redis with an empty filter, the query was generating `@nl_state:{ }` which causes a syntax error at offset 10. This fix uses `*` (match all) when the sanitized field is empty. Also updates the test mock to handle wildcard queries and skip entries without version fields (e.g., those created via HSET). Fixes: Syntax error at offset 10 near state * complete merge to comply with CI * fix: resolve merge conflict in search_by_index_prefix Merged main into fix-redis-query-syntax branch and resolved conflict in the search_by_index_prefix method to work with RecoverablePool. Changes: - Extract Client from ClientWithPermit using client.client.clone() - Pass sanitized_field as parameter to closure for proper ownership - Maintain wildcard query fix (*) for empty filter values - Maintain input sanitization security ,
1 parent 14b2cc6 commit 43f7f8d

File tree

2 files changed

+70
-46
lines changed

2 files changed

+70
-46
lines changed

nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -217,37 +217,56 @@ impl Mocks for FakeRedisBackend {
217217
}
218218

219219
if actual.cmd == Str::from_static("FT.AGGREGATE") {
220-
// The query is @field:value where value might be wrapped in braces.
220+
// The query is either "*" (match all) or @field:{ value }.
221221
let query = actual.args[1]
222222
.clone()
223223
.into_string()
224224
.expect("Aggregate query should be a string");
225-
assert_eq!(&query[..1], "@");
226-
let mut parts = query[1..].split(':');
227-
let field = parts.next().expect("No field name");
228-
let value = parts.next().expect("No value");
229-
let value = value
230-
.strip_prefix("{ ")
231-
.and_then(|s| s.strip_suffix(" }"))
232-
.unwrap_or(value);
233225
// Lazy implementation making assumptions.
234226
assert_eq!(
235227
actual.args[2..6],
236228
vec!["LOAD".into(), 2.into(), "data".into(), "version".into()]
237229
);
238230
let mut results = vec![RedisValue::Integer(0)];
239-
for fields in self.table.lock().values() {
240-
if let Some(key_value) = fields.get(field) {
241-
if *key_value == RedisValue::Bytes(Bytes::from(value.to_owned())) {
231+
232+
if query == "*" {
233+
// Wildcard query - return all records that have both data and version fields.
234+
// Some entries (e.g., from HSET) may not have version field.
235+
for fields in self.table.lock().values() {
236+
if let (Some(data), Some(version)) = (fields.get("data"), fields.get("version"))
237+
{
242238
results.push(RedisValue::Array(vec![
243239
RedisValue::Bytes(Bytes::from("data")),
244-
fields.get("data").expect("No data field").clone(),
240+
data.clone(),
245241
RedisValue::Bytes(Bytes::from("version")),
246-
fields.get("version").expect("No version field").clone(),
242+
version.clone(),
247243
]));
248244
}
249245
}
246+
} else {
247+
// Field-specific query: @field:{ value }
248+
assert_eq!(&query[..1], "@");
249+
let mut parts = query[1..].split(':');
250+
let field = parts.next().expect("No field name");
251+
let value = parts.next().expect("No value");
252+
let value = value
253+
.strip_prefix("{ ")
254+
.and_then(|s| s.strip_suffix(" }"))
255+
.unwrap_or(value);
256+
for fields in self.table.lock().values() {
257+
if let Some(key_value) = fields.get(field) {
258+
if *key_value == RedisValue::Bytes(Bytes::from(value.to_owned())) {
259+
results.push(RedisValue::Array(vec![
260+
RedisValue::Bytes(Bytes::from("data")),
261+
fields.get("data").expect("No data field").clone(),
262+
RedisValue::Bytes(Bytes::from("version")),
263+
fields.get("version").expect("No version field").clone(),
264+
]));
265+
}
266+
}
267+
}
250268
}
269+
251270
results[0] = u32::try_from(results.len() - 1).unwrap_or(u32::MAX).into();
252271
return Ok(RedisValue::Array(vec![
253272
RedisValue::Array(results),

nativelink-store/src/redis_store.rs

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,39 +1328,44 @@ impl SchedulerStore for RedisStore {
13281328
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
13291329
);
13301330

1331-
let run_ft_aggregate =
1332-
|client: Arc<ClientWithPermit>, index_name: String, field: String| async move {
1333-
ft_aggregate(
1334-
client.client.clone(),
1335-
index_name,
1336-
format!("@{}:{{ {} }}", K::INDEX_NAME, field),
1337-
FtAggregateOptions {
1338-
load: Some(Load::Some(vec![
1339-
SearchField {
1340-
identifier: DATA_FIELD_NAME.into(),
1341-
property: None,
1342-
},
1343-
SearchField {
1344-
identifier: VERSION_FIELD_NAME.into(),
1345-
property: None,
1346-
},
1347-
])),
1348-
cursor: Some(WithCursor {
1349-
count: Some(MAX_COUNT_PER_CURSOR),
1350-
max_idle: Some(CURSOR_IDLE_MS),
1331+
let run_ft_aggregate = |client: Arc<ClientWithPermit>,
1332+
index_name: String,
1333+
sanitized_field: String| async move {
1334+
ft_aggregate(
1335+
client.client.clone(),
1336+
index_name,
1337+
if sanitized_field.is_empty() {
1338+
"*".to_string()
1339+
} else {
1340+
format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field)
1341+
},
1342+
FtAggregateOptions {
1343+
load: Some(Load::Some(vec![
1344+
SearchField {
1345+
identifier: DATA_FIELD_NAME.into(),
1346+
property: None,
1347+
},
1348+
SearchField {
1349+
identifier: VERSION_FIELD_NAME.into(),
1350+
property: None,
1351+
},
1352+
])),
1353+
cursor: Some(WithCursor {
1354+
count: Some(MAX_COUNT_PER_CURSOR),
1355+
max_idle: Some(CURSOR_IDLE_MS),
1356+
}),
1357+
pipeline: vec![AggregateOperation::SortBy {
1358+
properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| {
1359+
vec![(format!("@{v}").into(), SortOrder::Asc)]
13511360
}),
1352-
pipeline: vec![AggregateOperation::SortBy {
1353-
properties: K::MAYBE_SORT_KEY.map_or_else(Vec::new, |v| {
1354-
vec![(format!("@{v}").into(), SortOrder::Asc)]
1355-
}),
1356-
max: None,
1357-
}],
1358-
..Default::default()
1359-
},
1360-
)
1361-
.await
1362-
.map(|stream| (stream, client))
1363-
};
1361+
max: None,
1362+
}],
1363+
..Default::default()
1364+
},
1365+
)
1366+
.await
1367+
.map(|stream| (stream, client))
1368+
};
13641369

13651370
let client = Arc::new(self.get_client().await?);
13661371
let (stream, client_guard) = if let Ok(result) =

0 commit comments

Comments
 (0)