Skip to content

Commit

Permalink
redis: prefixed routing (envoyproxy#6413)
Browse files Browse the repository at this point in the history
Signed-off-by: Maxime Bedard <[email protected]>
  • Loading branch information
maximebedard authored and mattklein123 committed Apr 16, 2019
1 parent 21fd119 commit 7163451
Show file tree
Hide file tree
Showing 28 changed files with 789 additions and 99 deletions.
2 changes: 1 addition & 1 deletion DEPRECATED.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# DEPRECATED

The [deprecated log](https://www.envoyproxy.io/docs/envoy/latest/intro/deprecated) can be found in the official Envoy developer documentation.
The [deprecated log](https://www.envoyproxy.io/docs/envoy/latest/intro/deprecated) can be found in the official Envoy developer documentation.
63 changes: 61 additions & 2 deletions api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ message RedisProxy {
// Name of cluster from cluster manager. See the :ref:`configuration section
// <arch_overview_redis_configuration>` of the architecture overview for recommendations on
// configuring the backing cluster.
string cluster = 2 [(validate.rules).string.min_bytes = 1];
//
// .. attention::
//
// This field is deprecated. Use a :ref:`catch-all
// cluster<envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.PrefixRoutes.catch_all_cluster>`
// instead.
string cluster = 2 [deprecated = true];

// Redis connection pool settings.
message ConnPoolSettings {
Expand Down Expand Up @@ -55,10 +61,63 @@ message RedisProxy {
bool enable_redirection = 3;
}

// Network settings for the connection pool to the upstream cluster.
// Network settings for the connection pool to the upstream clusters.
ConnPoolSettings settings = 3 [(validate.rules).message.required = true];

// Indicates that latency stat should be computed in microseconds. By default it is computed in
// milliseconds.
bool latency_in_micros = 4;

message PrefixRoutes {
message Route {
// String prefix that must match the beginning of the keys. Envoy will always favor the
// longest match.
string prefix = 1 [(validate.rules).string.min_bytes = 1];

// Indicates if the prefix needs to be removed from the key when forwarded.
bool remove_prefix = 2;

// Upstream cluster to forward the command to.
string cluster = 3 [(validate.rules).string.min_bytes = 1];
}

// List of prefix routes.
repeated Route routes = 1 [(gogoproto.nullable) = false];

// Indicates that prefix matching should be case insensitive.
bool case_insensitive = 2;

// Optional catch-all route to forward commands that doesn't match any of the routes. The
// catch-all route becomes required when no routes are specified.
string catch_all_cluster = 3;
}

// List of **unique** prefixes used to separate keys from different workloads to different
// clusters. Envoy will always favor the longest match first in case of overlap. A catch-all
// cluster can be used to forward commands when there is no match. Time complexity of the
// lookups are in O(min(longest key prefix, key length)).
//
// Example:
//
// .. code-block:: yaml
//
// prefix_routes:
// routes:
// - prefix: "ab"
// cluster: "cluster_a"
// - prefix: "abc"
// cluster: "cluster_b"
//
// When using the above routes, the following prefixes would be sent to:
//
// * 'get abc:users' would retrive the key 'abc:users' from cluster_b.
// * 'get ab:users' would retrive the key 'ab:users' from cluster_a.
// * 'get z:users' would return a NoUpstreamHost error. A :ref:`catch-all
// cluster<envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.PrefixRoutes.catch_all_cluster>`
// would have retrieved the key from that cluster instead.
//
// See the :ref:`configuration section
// <arch_overview_redis_configuration>` of the architecture overview for recommendations on
// configuring the backing clusters.
PrefixRoutes prefix_routes = 5 [(gogoproto.nullable) = false];
}
5 changes: 4 additions & 1 deletion docs/root/intro/arch_overview/redis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ In this mode, the goals of Envoy are to maintain availability and partition tole
over consistency. This is the key point when comparing Envoy to `Redis Cluster
<https://redis.io/topics/cluster-spec>`_. Envoy is designed as a best-effort cache,
meaning that it will not try to reconcile inconsistent data or keep a globally consistent
view of cluster membership.
view of cluster membership. It also supports routing commands from different workload to
different to different upstream clusters based on their access patterns, eviction, or isolation
requirements.

The Redis project offers a thorough reference on partitioning as it relates to Redis. See
"`Partitioning: how to split data among multiple Redis instances
Expand All @@ -22,6 +24,7 @@ The Redis project offers a thorough reference on partitioning as it relates to R
* Detailed command statistics.
* Active and passive healthchecking.
* Hash tagging.
* Prefix routing.

**Planned future enhancements**:

Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/deprecated.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Deprecated items below are listed in chronological order.

Version 1.11.0 (Pending)
========================
* Use of :ref:`cluster <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.cluster>` in :ref:`redis_proxy.proto <envoy_api_file_envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto>` is deprecated. Set a :ref:`catch_all_cluster <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.PrefixRoutes.catch_all_cluster>` instead.

Version 1.10.0 (Apr 5, 2019)
============================
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Version history
* dubbo_proxy: support the :ref:`Dubbo proxy filter <config_network_filters_dubbo_proxy>`.
* event: added :ref:`loop duration and poll delay statistics <operations_performance>`.
* http: mitigated a race condition with the :ref:`delayed_close_timeout<envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.delayed_close_timeout>` where it could trigger while actively flushing a pending write buffer for a downstream connection.
* redis: added :ref:`prefix routing <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.prefix_routes>` to enable routing commands based on their key's prefix to different upstream.
* redis: add support for zpopmax and zpopmin commands.
* upstream: added :ref:`upstream_cx_pool_overflow <config_cluster_manager_cluster_stats>` for the connection pool circuit breaker.

Expand Down
34 changes: 33 additions & 1 deletion source/common/common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,11 @@ template <class Value> struct TrieLookupTable {
* Adds an entry to the Trie at the given Key.
* @param key the key used to add the entry.
* @param value the value to be associated with the key.
* @param overwrite_existing will overwrite the value when the value for a given key already
* exists.
* @return false when a value already exists for the given key.
*/
void add(const char* key, Value value) {
bool add(const char* key, Value value, bool overwrite_existing = true) {
TrieEntry<Value>* current = &root_;
while (uint8_t c = *key) {
if (!current->entries_[c]) {
Expand All @@ -578,7 +581,11 @@ template <class Value> struct TrieLookupTable {
current = current->entries_[c].get();
key++;
}
if (current->value_ && !overwrite_existing) {
return false;
}
current->value_ = value;
return true;
}

/**
Expand All @@ -599,6 +606,31 @@ template <class Value> struct TrieLookupTable {
return current->value_;
}

/**
* Finds the entry associated with the longest prefix. Complexity is O(min(longest key prefix, key
* length))
* @param key the key used to find.
* @return the value matching the longest prefix based on the key.
*/
Value findLongestPrefix(const char* key) const {
const TrieEntry<Value>* current = &root_;
const TrieEntry<Value>* result = nullptr;
while (uint8_t c = *key) {
if (current->value_) {
result = current;
}

// https://github.com/facebook/mcrouter/blob/master/mcrouter/lib/fbi/cpp/Trie-inl.h#L126-L143
current = current->entries_[c].get();
if (current == nullptr) {
return result ? result->value_ : nullptr;
}

key++;
}
return current ? current->value_ : result->value_;
}

TrieEntry<Value> root_;
};

Expand Down
28 changes: 25 additions & 3 deletions source/extensions/filters/network/redis_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,22 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "router_interface",
hdrs = ["router.h"],
deps = [
":conn_pool_interface",
"@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc",
],
)

envoy_cc_library(
name = "command_splitter_lib",
srcs = ["command_splitter_impl.cc"],
hdrs = ["command_splitter_impl.h"],
deps = [
":command_splitter_interface",
":conn_pool_interface",
":router_interface",
"//include/envoy/stats:stats_macros",
"//include/envoy/stats:timespan",
"//source/common/common:assert_lib",
Expand All @@ -54,7 +63,6 @@ envoy_cc_library(
hdrs = ["conn_pool_impl.h"],
deps = [
":conn_pool_interface",
"//include/envoy/router:router_interface",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/buffer:buffer_lib",
Expand Down Expand Up @@ -97,7 +105,21 @@ envoy_cc_library(
"//source/extensions/filters/network/common:factory_base_lib",
"//source/extensions/filters/network/common/redis:codec_lib",
"//source/extensions/filters/network/redis_proxy:command_splitter_lib",
"//source/extensions/filters/network/redis_proxy:conn_pool_lib",
"//source/extensions/filters/network/redis_proxy:proxy_filter_lib",
"//source/extensions/filters/network/redis_proxy:router_lib",
],
)

envoy_cc_library(
name = "router_lib",
srcs = ["router_impl.cc"],
hdrs = ["router_impl.h"],
deps = [
":router_interface",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/common:to_lower_table_lib",
"//source/extensions/filters/network/redis_proxy:conn_pool_lib",
"@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,20 @@ void SingleServerRequest::cancel() {
handle_ = nullptr;
}

SplitRequestPtr SimpleRequest::create(ConnPool::Instance& conn_pool,
SplitRequestPtr SimpleRequest::create(Router& router,
Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool latency_in_micros) {
std::unique_ptr<SimpleRequest> request_ptr{
new SimpleRequest(callbacks, command_stats, time_source, latency_in_micros)};

request_ptr->conn_pool_ = &conn_pool;
request_ptr->handle_ = conn_pool.makeRequest(incoming_request->asArray()[1].asString(),
*incoming_request, *request_ptr);
auto conn_pool = router.upstreamPool(incoming_request->asArray()[1].asString());
if (conn_pool) {
request_ptr->conn_pool_ = conn_pool;
request_ptr->handle_ = conn_pool->makeRequest(incoming_request->asArray()[1].asString(),
*incoming_request, *request_ptr);
}

if (!request_ptr->handle_) {
callbacks.onResponse(Utility::makeError(Response::get().NoUpstreamHost));
return nullptr;
Expand All @@ -159,8 +163,7 @@ SplitRequestPtr SimpleRequest::create(ConnPool::Instance& conn_pool,
return std::move(request_ptr);
}

SplitRequestPtr EvalRequest::create(ConnPool::Instance& conn_pool,
Common::Redis::RespValuePtr&& incoming_request,
SplitRequestPtr EvalRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool latency_in_micros) {
// EVAL looks like: EVAL script numkeys key [key ...] arg [arg ...]
Expand All @@ -174,9 +177,13 @@ SplitRequestPtr EvalRequest::create(ConnPool::Instance& conn_pool,
std::unique_ptr<EvalRequest> request_ptr{
new EvalRequest(callbacks, command_stats, time_source, latency_in_micros)};

request_ptr->conn_pool_ = &conn_pool;
request_ptr->handle_ = conn_pool.makeRequest(incoming_request->asArray()[3].asString(),
*incoming_request, *request_ptr);
auto conn_pool = router.upstreamPool(incoming_request->asArray()[3].asString());
if (conn_pool) {
request_ptr->conn_pool_ = conn_pool;
request_ptr->handle_ = conn_pool->makeRequest(incoming_request->asArray()[3].asString(),
*incoming_request, *request_ptr);
}

if (!request_ptr->handle_) {
command_stats.error_.inc();
callbacks.onResponse(Utility::makeError(Response::get().NoUpstreamHost));
Expand Down Expand Up @@ -208,8 +215,7 @@ void FragmentedRequest::onChildFailure(uint32_t index) {
onChildResponse(Utility::makeError(Response::get().UpstreamFailure), index);
}

SplitRequestPtr MGETRequest::create(ConnPool::Instance& conn_pool,
Common::Redis::RespValuePtr&& incoming_request,
SplitRequestPtr MGETRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool latency_in_micros) {
std::unique_ptr<MGETRequest> request_ptr{
Expand Down Expand Up @@ -237,9 +243,13 @@ SplitRequestPtr MGETRequest::create(ConnPool::Instance& conn_pool,

single_mget.asArray()[1].asString() = incoming_request->asArray()[i].asString();
ENVOY_LOG(debug, "redis: parallel get: '{}'", single_mget.toString());
pending_request.conn_pool_ = &conn_pool;
pending_request.handle_ = conn_pool.makeRequest(incoming_request->asArray()[i].asString(),
single_mget, pending_request);
auto conn_pool = router.upstreamPool(incoming_request->asArray()[i].asString());
if (conn_pool) {
pending_request.conn_pool_ = conn_pool;
pending_request.handle_ = conn_pool->makeRequest(incoming_request->asArray()[i].asString(),
single_mget, pending_request);
}

if (!pending_request.handle_) {
pending_request.onResponse(Utility::makeError(Response::get().NoUpstreamHost));
}
Expand All @@ -254,7 +264,7 @@ SplitRequestPtr MGETRequest::create(ConnPool::Instance& conn_pool,
}

bool FragmentedRequest::onChildRedirection(const Common::Redis::RespValue& value, uint32_t index,
ConnPool::Instance* conn_pool) {
const ConnPool::InstanceSharedPtr& conn_pool) {
std::vector<absl::string_view> err;
bool ask_redirection = false;
if (redirectionArgsInvalid(incoming_request_.get(), value, err, ask_redirection) || !conn_pool) {
Expand Down Expand Up @@ -330,8 +340,7 @@ void MGETRequest::recreate(Common::Redis::RespValue& request, uint32_t index) {
request.asArray().swap(values);
}

SplitRequestPtr MSETRequest::create(ConnPool::Instance& conn_pool,
Common::Redis::RespValuePtr&& incoming_request,
SplitRequestPtr MSETRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool latency_in_micros) {
if ((incoming_request->asArray().size() - 1) % 2 != 0) {
Expand Down Expand Up @@ -366,9 +375,13 @@ SplitRequestPtr MSETRequest::create(ConnPool::Instance& conn_pool,
single_mset.asArray()[2].asString() = incoming_request->asArray()[i + 1].asString();

ENVOY_LOG(debug, "redis: parallel set: '{}'", single_mset.toString());
pending_request.conn_pool_ = &conn_pool;
pending_request.handle_ = conn_pool.makeRequest(incoming_request->asArray()[i].asString(),
single_mset, pending_request);
auto conn_pool = router.upstreamPool(incoming_request->asArray()[i].asString());
if (conn_pool) {
pending_request.conn_pool_ = conn_pool;
pending_request.handle_ = conn_pool->makeRequest(incoming_request->asArray()[i].asString(),
single_mset, pending_request);
}

if (!pending_request.handle_) {
pending_request.onResponse(Utility::makeError(Response::get().NoUpstreamHost));
}
Expand Down Expand Up @@ -427,7 +440,7 @@ void MSETRequest::recreate(Common::Redis::RespValue& request, uint32_t index) {
request.asArray().swap(values);
}

SplitRequestPtr SplitKeysSumResultRequest::create(ConnPool::Instance& conn_pool,
SplitRequestPtr SplitKeysSumResultRequest::create(Router& router,
Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks,
CommandStats& command_stats,
Expand Down Expand Up @@ -456,9 +469,13 @@ SplitRequestPtr SplitKeysSumResultRequest::create(ConnPool::Instance& conn_pool,
single_fragment.asArray()[1].asString() = incoming_request->asArray()[i].asString();
ENVOY_LOG(debug, "redis: parallel {}: '{}'", incoming_request->asArray()[0].asString(),
single_fragment.toString());
pending_request.conn_pool_ = &conn_pool;
pending_request.handle_ = conn_pool.makeRequest(incoming_request->asArray()[i].asString(),
single_fragment, pending_request);
auto conn_pool = router.upstreamPool(incoming_request->asArray()[i].asString());
if (conn_pool) {
pending_request.conn_pool_ = conn_pool;
pending_request.handle_ = conn_pool->makeRequest(incoming_request->asArray()[i].asString(),
single_fragment, pending_request);
}

if (!pending_request.handle_) {
pending_request.onResponse(Utility::makeError(Response::get().NoUpstreamHost));
}
Expand Down Expand Up @@ -515,12 +532,11 @@ void SplitKeysSumResultRequest::recreate(Common::Redis::RespValue& request, uint
request.asArray().swap(values);
}

InstanceImpl::InstanceImpl(ConnPool::InstancePtr&& conn_pool, Stats::Scope& scope,
const std::string& stat_prefix, TimeSource& time_source,
bool latency_in_micros)
: conn_pool_(std::move(conn_pool)), simple_command_handler_(*conn_pool_),
eval_command_handler_(*conn_pool_), mget_handler_(*conn_pool_), mset_handler_(*conn_pool_),
split_keys_sum_result_handler_(*conn_pool_),
InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::string& stat_prefix,
TimeSource& time_source, bool latency_in_micros)
: router_(std::move(router)), simple_command_handler_(*router_),
eval_command_handler_(*router_), mget_handler_(*router_), mset_handler_(*router_),
split_keys_sum_result_handler_(*router_),
stats_{ALL_COMMAND_SPLITTER_STATS(POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))},
latency_in_micros_(latency_in_micros), time_source_(time_source) {
for (const std::string& command : Common::Redis::SupportedCommands::simpleCommands()) {
Expand Down
Loading

0 comments on commit 7163451

Please sign in to comment.