From c01312c979e0422eb590564a56130fbd63a28f2e Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Sun, 10 Dec 2023 08:34:51 +0000 Subject: [PATCH] Use `Value::Map` to aggregate cluster responses. --- redis/src/cluster.rs | 7 ++----- redis/src/cluster_async/mod.rs | 8 ++------ redis/tests/test_cluster_async.rs | 19 ++++++++++++------- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index b1a1c561e..6f236b179 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -586,18 +586,15 @@ where } Some(ResponsePolicy::Special) | None => { // This is our assumption - if there's no coherent way to aggregate the responses, we just map each response to the sender, and pass it to the user. - // TODO - once RESP3 is merged, return a map value here. // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes. let results = results .enumerate() .map(|(index, result)| { let addr = addresses[index]; - result.map(|val| { - Value::Array(vec![Value::BulkString(addr.as_bytes().to_vec()), val]) - }) + result.map(|val| (Value::BulkString(addr.as_bytes().to_vec()), val)) }) .collect::>>()?; - Ok(Value::Array(results)) + Ok(Value::Map(results)) } } } diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index a74ec9179..ebe974dab 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -737,17 +737,13 @@ where } Some(ResponsePolicy::Special) | None => { // This is our assumption - if there's no coherent way to aggregate the responses, we just map each response to the sender, and pass it to the user. - // TODO - once RESP3 is merged, return a map value here. // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes. future::try_join_all(receivers.into_iter().map(|(addr, receiver)| async move { let result = convert_result(receiver.await)?; - Ok(Value::Array(vec![ - Value::BulkString(addr.as_bytes().to_vec()), - result, - ])) + Ok((Value::BulkString(addr.as_bytes().to_vec()), result)) })) .await - .map(Value::Array) + .map(Value::Map) } } } diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index e3326be25..8a2c6cb68 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -130,10 +130,15 @@ fn test_async_cluster_route_info_to_nodes() { let cluster = TestClusterContext::new(12, 1); let split_to_addresses_and_info = |res| -> (Vec, Vec) { - if let Value::Array(values) = res { + if let Value::Map(values) = res { let mut pairs: Vec<_> = values .into_iter() - .map(|value| redis::from_redis_value::<(String, String)>(&value).unwrap()) + .map(|(key, value)| { + ( + redis::from_redis_value::(&key).unwrap(), + redis::from_redis_value::(&value).unwrap(), + ) + }) .collect(); pairs.sort_by(|(address1, _), (address2, _)| address1.cmp(address2)); pairs.into_iter().unzip() @@ -1474,16 +1479,16 @@ fn test_cluster_fan_out_and_return_map_of_results_for_special_response_policy() // TODO once RESP3 is in, return this as a map let mut result = runtime - .block_on(cmd.query_async::<_, Vec>>(&mut connection)) + .block_on(cmd.query_async::<_, Vec<(String, String)>>(&mut connection)) .unwrap(); result.sort(); assert_eq!( result, vec![ - vec![format!("{name}:6379"), "latency: 6379".to_string()], - vec![format!("{name}:6380"), "latency: 6380".to_string()], - vec![format!("{name}:6381"), "latency: 6381".to_string()], - vec![format!("{name}:6382"), "latency: 6382".to_string()] + (format!("{name}:6379"), "latency: 6379".to_string()), + (format!("{name}:6380"), "latency: 6380".to_string()), + (format!("{name}:6381"), "latency: 6381".to_string()), + (format!("{name}:6382"), "latency: 6382".to_string()) ], "{result:?}" );