diff --git a/DEPRECATED.md b/DEPRECATED.md index ab705e0ac558..1b2962adcb97 100644 --- a/DEPRECATED.md +++ b/DEPRECATED.md @@ -1,3 +1,3 @@ # DEPRECATED -The [deprecated log](https://www.envoyproxy.io/docs/envoy/latest/intro/deprecated) can be found in the official Envoy developer documentation. \ No newline at end of file +The [deprecated log](https://www.envoyproxy.io/docs/envoy/latest/intro/deprecated) can be found in the official Envoy developer documentation. diff --git a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto index 23448eff903f..16196cc07a3b 100644 --- a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto +++ b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto @@ -22,7 +22,13 @@ message RedisProxy { // Name of cluster from cluster manager. See the :ref:`configuration section // ` of the architecture overview for recommendations on // configuring the backing cluster. - string cluster = 2 [(validate.rules).string.min_bytes = 1]; + // + // .. attention:: + // + // This field is deprecated. Use a :ref:`catch-all + // cluster` + // instead. + string cluster = 2 [deprecated = true]; // Redis connection pool settings. message ConnPoolSettings { @@ -55,10 +61,63 @@ message RedisProxy { bool enable_redirection = 3; } - // Network settings for the connection pool to the upstream cluster. + // Network settings for the connection pool to the upstream clusters. ConnPoolSettings settings = 3 [(validate.rules).message.required = true]; // Indicates that latency stat should be computed in microseconds. By default it is computed in // milliseconds. bool latency_in_micros = 4; + + message PrefixRoutes { + message Route { + // String prefix that must match the beginning of the keys. Envoy will always favor the + // longest match. + string prefix = 1 [(validate.rules).string.min_bytes = 1]; + + // Indicates if the prefix needs to be removed from the key when forwarded. + bool remove_prefix = 2; + + // Upstream cluster to forward the command to. + string cluster = 3 [(validate.rules).string.min_bytes = 1]; + } + + // List of prefix routes. + repeated Route routes = 1 [(gogoproto.nullable) = false]; + + // Indicates that prefix matching should be case insensitive. + bool case_insensitive = 2; + + // Optional catch-all route to forward commands that doesn't match any of the routes. The + // catch-all route becomes required when no routes are specified. + string catch_all_cluster = 3; + } + + // List of **unique** prefixes used to separate keys from different workloads to different + // clusters. Envoy will always favor the longest match first in case of overlap. A catch-all + // cluster can be used to forward commands when there is no match. Time complexity of the + // lookups are in O(min(longest key prefix, key length)). + // + // Example: + // + // .. code-block:: yaml + // + // prefix_routes: + // routes: + // - prefix: "ab" + // cluster: "cluster_a" + // - prefix: "abc" + // cluster: "cluster_b" + // + // When using the above routes, the following prefixes would be sent to: + // + // * 'get abc:users' would retrive the key 'abc:users' from cluster_b. + // * 'get ab:users' would retrive the key 'ab:users' from cluster_a. + // * 'get z:users' would return a NoUpstreamHost error. A :ref:`catch-all + // cluster` + // would have retrieved the key from that cluster instead. + // + // See the :ref:`configuration section + // ` of the architecture overview for recommendations on + // configuring the backing clusters. + PrefixRoutes prefix_routes = 5 [(gogoproto.nullable) = false]; } diff --git a/docs/root/intro/arch_overview/redis.rst b/docs/root/intro/arch_overview/redis.rst index e2f8efbc2565..4d2929a14e2a 100644 --- a/docs/root/intro/arch_overview/redis.rst +++ b/docs/root/intro/arch_overview/redis.rst @@ -8,7 +8,9 @@ In this mode, the goals of Envoy are to maintain availability and partition tole over consistency. This is the key point when comparing Envoy to `Redis Cluster `_. Envoy is designed as a best-effort cache, meaning that it will not try to reconcile inconsistent data or keep a globally consistent -view of cluster membership. +view of cluster membership. It also supports routing commands from different workload to +different to different upstream clusters based on their access patterns, eviction, or isolation +requirements. The Redis project offers a thorough reference on partitioning as it relates to Redis. See "`Partitioning: how to split data among multiple Redis instances @@ -22,6 +24,7 @@ The Redis project offers a thorough reference on partitioning as it relates to R * Detailed command statistics. * Active and passive healthchecking. * Hash tagging. +* Prefix routing. **Planned future enhancements**: diff --git a/docs/root/intro/deprecated.rst b/docs/root/intro/deprecated.rst index 423a16d492e5..b3a7b7d918a2 100644 --- a/docs/root/intro/deprecated.rst +++ b/docs/root/intro/deprecated.rst @@ -12,6 +12,7 @@ Deprecated items below are listed in chronological order. Version 1.11.0 (Pending) ======================== +* Use of :ref:`cluster ` in :ref:`redis_proxy.proto ` is deprecated. Set a :ref:`catch_all_cluster ` instead. Version 1.10.0 (Apr 5, 2019) ============================ diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index ffe16eda6841..93dcb931cc0e 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -6,6 +6,7 @@ Version history * dubbo_proxy: support the :ref:`Dubbo proxy filter `. * event: added :ref:`loop duration and poll delay statistics `. * http: mitigated a race condition with the :ref:`delayed_close_timeout` where it could trigger while actively flushing a pending write buffer for a downstream connection. +* redis: added :ref:`prefix routing ` to enable routing commands based on their key's prefix to different upstream. * redis: add support for zpopmax and zpopmin commands. * upstream: added :ref:`upstream_cx_pool_overflow ` for the connection pool circuit breaker. diff --git a/source/common/common/utility.h b/source/common/common/utility.h index 785df6d8aa40..9eaddb7f64da 100644 --- a/source/common/common/utility.h +++ b/source/common/common/utility.h @@ -568,8 +568,11 @@ template struct TrieLookupTable { * Adds an entry to the Trie at the given Key. * @param key the key used to add the entry. * @param value the value to be associated with the key. + * @param overwrite_existing will overwrite the value when the value for a given key already + * exists. + * @return false when a value already exists for the given key. */ - void add(const char* key, Value value) { + bool add(const char* key, Value value, bool overwrite_existing = true) { TrieEntry* current = &root_; while (uint8_t c = *key) { if (!current->entries_[c]) { @@ -578,7 +581,11 @@ template struct TrieLookupTable { current = current->entries_[c].get(); key++; } + if (current->value_ && !overwrite_existing) { + return false; + } current->value_ = value; + return true; } /** @@ -599,6 +606,31 @@ template struct TrieLookupTable { return current->value_; } + /** + * Finds the entry associated with the longest prefix. Complexity is O(min(longest key prefix, key + * length)) + * @param key the key used to find. + * @return the value matching the longest prefix based on the key. + */ + Value findLongestPrefix(const char* key) const { + const TrieEntry* current = &root_; + const TrieEntry* result = nullptr; + while (uint8_t c = *key) { + if (current->value_) { + result = current; + } + + // https://github.com/facebook/mcrouter/blob/master/mcrouter/lib/fbi/cpp/Trie-inl.h#L126-L143 + current = current->entries_[c].get(); + if (current == nullptr) { + return result ? result->value_ : nullptr; + } + + key++; + } + return current ? current->value_ : result->value_; + } + TrieEntry root_; }; diff --git a/source/extensions/filters/network/redis_proxy/BUILD b/source/extensions/filters/network/redis_proxy/BUILD index 4c56109ada4c..9825a435144e 100644 --- a/source/extensions/filters/network/redis_proxy/BUILD +++ b/source/extensions/filters/network/redis_proxy/BUILD @@ -30,13 +30,22 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "router_interface", + hdrs = ["router.h"], + deps = [ + ":conn_pool_interface", + "@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc", + ], +) + envoy_cc_library( name = "command_splitter_lib", srcs = ["command_splitter_impl.cc"], hdrs = ["command_splitter_impl.h"], deps = [ ":command_splitter_interface", - ":conn_pool_interface", + ":router_interface", "//include/envoy/stats:stats_macros", "//include/envoy/stats:timespan", "//source/common/common:assert_lib", @@ -54,7 +63,6 @@ envoy_cc_library( hdrs = ["conn_pool_impl.h"], deps = [ ":conn_pool_interface", - "//include/envoy/router:router_interface", "//include/envoy/thread_local:thread_local_interface", "//include/envoy/upstream:cluster_manager_interface", "//source/common/buffer:buffer_lib", @@ -97,7 +105,21 @@ envoy_cc_library( "//source/extensions/filters/network/common:factory_base_lib", "//source/extensions/filters/network/common/redis:codec_lib", "//source/extensions/filters/network/redis_proxy:command_splitter_lib", - "//source/extensions/filters/network/redis_proxy:conn_pool_lib", "//source/extensions/filters/network/redis_proxy:proxy_filter_lib", + "//source/extensions/filters/network/redis_proxy:router_lib", + ], +) + +envoy_cc_library( + name = "router_lib", + srcs = ["router_impl.cc"], + hdrs = ["router_impl.h"], + deps = [ + ":router_interface", + "//include/envoy/thread_local:thread_local_interface", + "//include/envoy/upstream:cluster_manager_interface", + "//source/common/common:to_lower_table_lib", + "//source/extensions/filters/network/redis_proxy:conn_pool_lib", + "@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc", ], ) diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 9fc6189a393e..276c1ac15f86 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -140,16 +140,20 @@ void SingleServerRequest::cancel() { handle_ = nullptr; } -SplitRequestPtr SimpleRequest::create(ConnPool::Instance& conn_pool, +SplitRequestPtr SimpleRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool latency_in_micros) { std::unique_ptr request_ptr{ new SimpleRequest(callbacks, command_stats, time_source, latency_in_micros)}; - request_ptr->conn_pool_ = &conn_pool; - request_ptr->handle_ = conn_pool.makeRequest(incoming_request->asArray()[1].asString(), - *incoming_request, *request_ptr); + auto conn_pool = router.upstreamPool(incoming_request->asArray()[1].asString()); + if (conn_pool) { + request_ptr->conn_pool_ = conn_pool; + request_ptr->handle_ = conn_pool->makeRequest(incoming_request->asArray()[1].asString(), + *incoming_request, *request_ptr); + } + if (!request_ptr->handle_) { callbacks.onResponse(Utility::makeError(Response::get().NoUpstreamHost)); return nullptr; @@ -159,8 +163,7 @@ SplitRequestPtr SimpleRequest::create(ConnPool::Instance& conn_pool, return std::move(request_ptr); } -SplitRequestPtr EvalRequest::create(ConnPool::Instance& conn_pool, - Common::Redis::RespValuePtr&& incoming_request, +SplitRequestPtr EvalRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool latency_in_micros) { // EVAL looks like: EVAL script numkeys key [key ...] arg [arg ...] @@ -174,9 +177,13 @@ SplitRequestPtr EvalRequest::create(ConnPool::Instance& conn_pool, std::unique_ptr request_ptr{ new EvalRequest(callbacks, command_stats, time_source, latency_in_micros)}; - request_ptr->conn_pool_ = &conn_pool; - request_ptr->handle_ = conn_pool.makeRequest(incoming_request->asArray()[3].asString(), - *incoming_request, *request_ptr); + auto conn_pool = router.upstreamPool(incoming_request->asArray()[3].asString()); + if (conn_pool) { + request_ptr->conn_pool_ = conn_pool; + request_ptr->handle_ = conn_pool->makeRequest(incoming_request->asArray()[3].asString(), + *incoming_request, *request_ptr); + } + if (!request_ptr->handle_) { command_stats.error_.inc(); callbacks.onResponse(Utility::makeError(Response::get().NoUpstreamHost)); @@ -208,8 +215,7 @@ void FragmentedRequest::onChildFailure(uint32_t index) { onChildResponse(Utility::makeError(Response::get().UpstreamFailure), index); } -SplitRequestPtr MGETRequest::create(ConnPool::Instance& conn_pool, - Common::Redis::RespValuePtr&& incoming_request, +SplitRequestPtr MGETRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool latency_in_micros) { std::unique_ptr request_ptr{ @@ -237,9 +243,13 @@ SplitRequestPtr MGETRequest::create(ConnPool::Instance& conn_pool, single_mget.asArray()[1].asString() = incoming_request->asArray()[i].asString(); ENVOY_LOG(debug, "redis: parallel get: '{}'", single_mget.toString()); - pending_request.conn_pool_ = &conn_pool; - pending_request.handle_ = conn_pool.makeRequest(incoming_request->asArray()[i].asString(), - single_mget, pending_request); + auto conn_pool = router.upstreamPool(incoming_request->asArray()[i].asString()); + if (conn_pool) { + pending_request.conn_pool_ = conn_pool; + pending_request.handle_ = conn_pool->makeRequest(incoming_request->asArray()[i].asString(), + single_mget, pending_request); + } + if (!pending_request.handle_) { pending_request.onResponse(Utility::makeError(Response::get().NoUpstreamHost)); } @@ -254,7 +264,7 @@ SplitRequestPtr MGETRequest::create(ConnPool::Instance& conn_pool, } bool FragmentedRequest::onChildRedirection(const Common::Redis::RespValue& value, uint32_t index, - ConnPool::Instance* conn_pool) { + const ConnPool::InstanceSharedPtr& conn_pool) { std::vector err; bool ask_redirection = false; if (redirectionArgsInvalid(incoming_request_.get(), value, err, ask_redirection) || !conn_pool) { @@ -330,8 +340,7 @@ void MGETRequest::recreate(Common::Redis::RespValue& request, uint32_t index) { request.asArray().swap(values); } -SplitRequestPtr MSETRequest::create(ConnPool::Instance& conn_pool, - Common::Redis::RespValuePtr&& incoming_request, +SplitRequestPtr MSETRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool latency_in_micros) { if ((incoming_request->asArray().size() - 1) % 2 != 0) { @@ -366,9 +375,13 @@ SplitRequestPtr MSETRequest::create(ConnPool::Instance& conn_pool, single_mset.asArray()[2].asString() = incoming_request->asArray()[i + 1].asString(); ENVOY_LOG(debug, "redis: parallel set: '{}'", single_mset.toString()); - pending_request.conn_pool_ = &conn_pool; - pending_request.handle_ = conn_pool.makeRequest(incoming_request->asArray()[i].asString(), - single_mset, pending_request); + auto conn_pool = router.upstreamPool(incoming_request->asArray()[i].asString()); + if (conn_pool) { + pending_request.conn_pool_ = conn_pool; + pending_request.handle_ = conn_pool->makeRequest(incoming_request->asArray()[i].asString(), + single_mset, pending_request); + } + if (!pending_request.handle_) { pending_request.onResponse(Utility::makeError(Response::get().NoUpstreamHost)); } @@ -427,7 +440,7 @@ void MSETRequest::recreate(Common::Redis::RespValue& request, uint32_t index) { request.asArray().swap(values); } -SplitRequestPtr SplitKeysSumResultRequest::create(ConnPool::Instance& conn_pool, +SplitRequestPtr SplitKeysSumResultRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, @@ -456,9 +469,13 @@ SplitRequestPtr SplitKeysSumResultRequest::create(ConnPool::Instance& conn_pool, single_fragment.asArray()[1].asString() = incoming_request->asArray()[i].asString(); ENVOY_LOG(debug, "redis: parallel {}: '{}'", incoming_request->asArray()[0].asString(), single_fragment.toString()); - pending_request.conn_pool_ = &conn_pool; - pending_request.handle_ = conn_pool.makeRequest(incoming_request->asArray()[i].asString(), - single_fragment, pending_request); + auto conn_pool = router.upstreamPool(incoming_request->asArray()[i].asString()); + if (conn_pool) { + pending_request.conn_pool_ = conn_pool; + pending_request.handle_ = conn_pool->makeRequest(incoming_request->asArray()[i].asString(), + single_fragment, pending_request); + } + if (!pending_request.handle_) { pending_request.onResponse(Utility::makeError(Response::get().NoUpstreamHost)); } @@ -515,12 +532,11 @@ void SplitKeysSumResultRequest::recreate(Common::Redis::RespValue& request, uint request.asArray().swap(values); } -InstanceImpl::InstanceImpl(ConnPool::InstancePtr&& conn_pool, Stats::Scope& scope, - const std::string& stat_prefix, TimeSource& time_source, - bool latency_in_micros) - : conn_pool_(std::move(conn_pool)), simple_command_handler_(*conn_pool_), - eval_command_handler_(*conn_pool_), mget_handler_(*conn_pool_), mset_handler_(*conn_pool_), - split_keys_sum_result_handler_(*conn_pool_), +InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::string& stat_prefix, + TimeSource& time_source, bool latency_in_micros) + : router_(std::move(router)), simple_command_handler_(*router_), + eval_command_handler_(*router_), mget_handler_(*router_), mset_handler_(*router_), + split_keys_sum_result_handler_(*router_), stats_{ALL_COMMAND_SPLITTER_STATS(POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))}, latency_in_micros_(latency_in_micros), time_source_(time_source) { for (const std::string& command : Common::Redis::SupportedCommands::simpleCommands()) { diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h index 21eb847c73cf..5ca017ca8fdb 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h @@ -17,6 +17,7 @@ #include "extensions/filters/network/common/redis/client_impl.h" #include "extensions/filters/network/redis_proxy/command_splitter.h" #include "extensions/filters/network/redis_proxy/conn_pool.h" +#include "extensions/filters/network/redis_proxy/router.h" namespace Envoy { namespace Extensions { @@ -68,9 +69,9 @@ class CommandHandler { class CommandHandlerBase { protected: - CommandHandlerBase(ConnPool::Instance& conn_pool) : conn_pool_(conn_pool) {} + CommandHandlerBase(Router& router) : router_(router) {} - ConnPool::Instance& conn_pool_; + Router& router_; }; class SplitRequestBase : public SplitRequest { @@ -114,7 +115,7 @@ class SingleServerRequest : public SplitRequestBase, public Common::Redis::Clien : SplitRequestBase(command_stats, time_source, latency_in_micros), callbacks_(callbacks) {} SplitCallbacks& callbacks_; - ConnPool::Instance* conn_pool_{}; + ConnPool::InstanceSharedPtr conn_pool_; Common::Redis::Client::PoolRequest* handle_{}; Common::Redis::RespValuePtr incoming_request_; }; @@ -124,8 +125,7 @@ class SingleServerRequest : public SplitRequestBase, public Common::Redis::Clien */ class SimpleRequest : public SingleServerRequest { public: - static SplitRequestPtr create(ConnPool::Instance& conn_pool, - Common::Redis::RespValuePtr&& incoming_request, + static SplitRequestPtr create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool latency_in_micros); @@ -140,8 +140,7 @@ class SimpleRequest : public SingleServerRequest { */ class EvalRequest : public SingleServerRequest { public: - static SplitRequestPtr create(ConnPool::Instance& conn_pool, - Common::Redis::RespValuePtr&& incoming_request, + static SplitRequestPtr create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool latency_in_micros); @@ -184,13 +183,13 @@ class FragmentedRequest : public SplitRequestBase { FragmentedRequest& parent_; const uint32_t index_; Common::Redis::Client::PoolRequest* handle_{}; - ConnPool::Instance* conn_pool_{}; + ConnPool::InstanceSharedPtr conn_pool_; }; virtual void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) PURE; void onChildFailure(uint32_t index); bool onChildRedirection(const Common::Redis::RespValue& value, uint32_t index, - ConnPool::Instance* conn_pool); + const ConnPool::InstanceSharedPtr& conn_pool); virtual void recreate(Common::Redis::RespValue& request, uint32_t index) PURE; SplitCallbacks& callbacks_; @@ -208,8 +207,7 @@ class FragmentedRequest : public SplitRequestBase { */ class MGETRequest : public FragmentedRequest, Logger::Loggable { public: - static SplitRequestPtr create(ConnPool::Instance& conn_pool, - Common::Redis::RespValuePtr&& incoming_request, + static SplitRequestPtr create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool latency_in_micros); @@ -231,8 +229,7 @@ class MGETRequest : public FragmentedRequest, Logger::Loggable { public: - static SplitRequestPtr create(ConnPool::Instance& conn_pool, - Common::Redis::RespValuePtr&& incoming_request, + static SplitRequestPtr create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool latency_in_micros); @@ -255,8 +252,7 @@ class SplitKeysSumResultRequest : public FragmentedRequest, Logger::Loggable { public: - static SplitRequestPtr create(ConnPool::Instance& conn_pool, - Common::Redis::RespValuePtr&& incoming_request, + static SplitRequestPtr create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool latency_in_micros); @@ -277,12 +273,12 @@ class MSETRequest : public FragmentedRequest, Logger::Loggable class CommandHandlerFactory : public CommandHandler, CommandHandlerBase { public: - CommandHandlerFactory(ConnPool::Instance& conn_pool) : CommandHandlerBase(conn_pool) {} + CommandHandlerFactory(Router& router) : CommandHandlerBase(router) {} SplitRequestPtr startRequest(Common::Redis::RespValuePtr&& request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool latency_in_micros) { - return RequestClass::create(conn_pool_, std::move(request), callbacks, command_stats, - time_source, latency_in_micros); + return RequestClass::create(router_, std::move(request), callbacks, command_stats, time_source, + latency_in_micros); } }; @@ -304,8 +300,8 @@ struct InstanceStats { class InstanceImpl : public Instance, Logger::Loggable { public: - InstanceImpl(ConnPool::InstancePtr&& conn_pool, Stats::Scope& scope, - const std::string& stat_prefix, TimeSource& time_source, bool latency_in_micros); + InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::string& stat_prefix, + TimeSource& time_source, bool latency_in_micros); // RedisProxy::CommandSplitter::Instance SplitRequestPtr makeRequest(Common::Redis::RespValuePtr&& request, @@ -323,7 +319,7 @@ class InstanceImpl : public Instance, Logger::Loggable { CommandHandler& handler); void onInvalidRequest(SplitCallbacks& callbacks); - ConnPool::InstancePtr conn_pool_; + RouterPtr router_; CommandHandlerFactory simple_command_handler_; CommandHandlerFactory eval_command_handler_; CommandHandlerFactory mget_handler_; diff --git a/source/extensions/filters/network/redis_proxy/config.cc b/source/extensions/filters/network/redis_proxy/config.cc index bae74e863371..9838c2cc5ebf 100644 --- a/source/extensions/filters/network/redis_proxy/config.cc +++ b/source/extensions/filters/network/redis_proxy/config.cc @@ -11,8 +11,8 @@ #include "extensions/filters/network/common/redis/client_impl.h" #include "extensions/filters/network/common/redis/codec_impl.h" #include "extensions/filters/network/redis_proxy/command_splitter_impl.h" -#include "extensions/filters/network/redis_proxy/conn_pool_impl.h" #include "extensions/filters/network/redis_proxy/proxy_filter.h" +#include "extensions/filters/network/redis_proxy/router_impl.h" namespace Envoy { namespace Extensions { @@ -24,18 +24,43 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP Server::Configuration::FactoryContext& context) { ASSERT(!proto_config.stat_prefix().empty()); - ASSERT(!proto_config.cluster().empty()); ASSERT(proto_config.has_settings()); ProxyFilterConfigSharedPtr filter_config(std::make_shared( proto_config, context.scope(), context.drainDecision(), context.runtime())); - ConnPool::InstancePtr conn_pool( - new ConnPool::InstanceImpl(filter_config->cluster_name_, context.clusterManager(), - Common::Redis::Client::ClientFactoryImpl::instance_, - context.threadLocal(), proto_config.settings())); - std::shared_ptr splitter(new CommandSplitter::InstanceImpl( - std::move(conn_pool), context.scope(), filter_config->stat_prefix_, context.timeSource(), - proto_config.latency_in_micros())); + + envoy::config::filter::network::redis_proxy::v2::RedisProxy::PrefixRoutes prefix_routes( + proto_config.prefix_routes()); + + // set the catch-all route from the deprecated cluster and settings parameters. + if (prefix_routes.catch_all_cluster().empty() && prefix_routes.routes_size() == 0) { + if (proto_config.cluster().empty()) { + throw EnvoyException("cannot configure a redis-proxy without any upstream"); + } + + prefix_routes.set_catch_all_cluster(proto_config.cluster()); + } + + std::set unique_clusters; + for (auto& route : prefix_routes.routes()) { + unique_clusters.emplace(route.cluster()); + } + unique_clusters.emplace(prefix_routes.catch_all_cluster()); + + Upstreams upstreams; + for (auto& cluster : unique_clusters) { + upstreams.emplace(cluster, std::make_shared( + cluster, context.clusterManager(), + Common::Redis::Client::ClientFactoryImpl::instance_, + context.threadLocal(), proto_config.settings())); + } + + auto router = std::make_unique(prefix_routes, std::move(upstreams)); + + std::shared_ptr splitter = + std::make_shared( + std::move(router), context.scope(), filter_config->stat_prefix_, context.timeSource(), + proto_config.latency_in_micros()); return [splitter, filter_config](Network::FilterManager& filter_manager) -> void { Common::Redis::DecoderFactoryImpl factory; filter_manager.addReadFilter(std::make_shared( diff --git a/source/extensions/filters/network/redis_proxy/conn_pool.h b/source/extensions/filters/network/redis_proxy/conn_pool.h index 44ec83c76779..a926f568f062 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool.h @@ -50,7 +50,7 @@ class Instance { Common::Redis::Client::PoolCallbacks& callbacks) PURE; }; -typedef std::unique_ptr InstancePtr; +typedef std::shared_ptr InstanceSharedPtr; } // namespace ConnPool } // namespace RedisProxy diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h index ef39f732d1b9..8ed565ac50b9 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h @@ -40,7 +40,6 @@ class InstanceImpl : public Instance { const std::string& cluster_name, Upstream::ClusterManager& cm, Common::Redis::Client::ClientFactory& client_factory, ThreadLocal::SlotAllocator& tls, const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config); - // RedisProxy::ConnPool::Instance Common::Redis::Client::PoolRequest* makeRequest(const std::string& key, const Common::Redis::RespValue& request, diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.cc b/source/extensions/filters/network/redis_proxy/proxy_filter.cc index 4fa59b5ad320..acc5ccca0e21 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.cc +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.cc @@ -17,7 +17,7 @@ namespace RedisProxy { ProxyFilterConfig::ProxyFilterConfig( const envoy::config::filter::network::redis_proxy::v2::RedisProxy& config, Stats::Scope& scope, const Network::DrainDecision& drain_decision, Runtime::Loader& runtime) - : drain_decision_(drain_decision), runtime_(runtime), cluster_name_(config.cluster()), + : drain_decision_(drain_decision), runtime_(runtime), stat_prefix_(fmt::format("redis.{}.", config.stat_prefix())), stats_(generateStats(stat_prefix_, scope)) {} diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.h b/source/extensions/filters/network/redis_proxy/proxy_filter.h index 3f8dc62d6eec..ae2141a322d9 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.h +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.h @@ -56,7 +56,6 @@ class ProxyFilterConfig { const Network::DrainDecision& drain_decision_; Runtime::Loader& runtime_; - const std::string cluster_name_; const std::string stat_prefix_; const std::string redis_drain_close_runtime_key_{"redis.drain_close_enabled"}; ProxyStats stats_; diff --git a/source/extensions/filters/network/redis_proxy/router.h b/source/extensions/filters/network/redis_proxy/router.h new file mode 100644 index 000000000000..5312e34cea4b --- /dev/null +++ b/source/extensions/filters/network/redis_proxy/router.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include + +#include "envoy/common/pure.h" +#include "envoy/config/filter/network/redis_proxy/v2/redis_proxy.pb.h" + +#include "extensions/filters/network/redis_proxy/conn_pool.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace RedisProxy { + +/* + * Decorator of a connection pool in order to enable key based routing. + */ +class Router { +public: + virtual ~Router() = default; + + /** + * Returns a connection pool that matches a given route. When no match is found, the catch all + * pool is used. When remove prefix is set to true, the prefix will be removed from the key. + * @param key mutable reference to the key of the current command. + * @return a handle to the connection pool. + */ + virtual ConnPool::InstanceSharedPtr upstreamPool(std::string& key) PURE; +}; + +typedef std::unique_ptr RouterPtr; + +} // namespace RedisProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/redis_proxy/router_impl.cc b/source/extensions/filters/network/redis_proxy/router_impl.cc new file mode 100644 index 000000000000..cd963e1ec778 --- /dev/null +++ b/source/extensions/filters/network/redis_proxy/router_impl.cc @@ -0,0 +1,61 @@ +#include "extensions/filters/network/redis_proxy/router_impl.h" + +#include "common/common/fmt.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace RedisProxy { + +PrefixRoutes::PrefixRoutes( + const envoy::config::filter::network::redis_proxy::v2::RedisProxy::PrefixRoutes& config, + Upstreams&& upstreams) + : case_insensitive_(config.case_insensitive()), upstreams_(std::move(upstreams)), + catch_all_upstream_(config.catch_all_cluster().empty() + ? nullptr + : upstreams_.at(config.catch_all_cluster())) { + + for (auto const& route : config.routes()) { + std::string copy(route.prefix()); + + if (case_insensitive_) { + to_lower_table_.toLowerCase(copy); + } + + auto success = prefix_lookup_table_.add(copy.c_str(), + std::make_shared(Prefix{ + route.prefix(), + route.remove_prefix(), + upstreams_.at(route.cluster()), + }), + false); + if (!success) { + throw EnvoyException(fmt::format("prefix `{}` already exists.", route.prefix())); + } + } +} + +ConnPool::InstanceSharedPtr PrefixRoutes::upstreamPool(std::string& key) { + PrefixPtr value = nullptr; + if (case_insensitive_) { + std::string copy(key); + to_lower_table_.toLowerCase(copy); + value = prefix_lookup_table_.findLongestPrefix(copy.c_str()); + } else { + value = prefix_lookup_table_.findLongestPrefix(key.c_str()); + } + + if (value != nullptr) { + if (value->remove_prefix) { + key.erase(0, value->prefix.length()); + } + return value->upstream; + } + + return catch_all_upstream_; +} + +} // namespace RedisProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/redis_proxy/router_impl.h b/source/extensions/filters/network/redis_proxy/router_impl.h new file mode 100644 index 000000000000..2744e88eff4c --- /dev/null +++ b/source/extensions/filters/network/redis_proxy/router_impl.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "envoy/config/filter/network/redis_proxy/v2/redis_proxy.pb.h" +#include "envoy/thread_local/thread_local.h" +#include "envoy/upstream/cluster_manager.h" + +#include "common/common/to_lower_table.h" + +#include "extensions/filters/network/redis_proxy/conn_pool_impl.h" +#include "extensions/filters/network/redis_proxy/router.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace RedisProxy { + +typedef std::map Upstreams; + +class PrefixRoutes : public Router { +public: + PrefixRoutes(const envoy::config::filter::network::redis_proxy::v2::RedisProxy::PrefixRoutes& + prefix_routes, + Upstreams&& upstreams); + + ConnPool::InstanceSharedPtr upstreamPool(std::string& key) override; + +private: + struct Prefix { + const std::string prefix; + const bool remove_prefix; + ConnPool::InstanceSharedPtr upstream; + }; + + typedef std::shared_ptr PrefixPtr; + + TrieLookupTable prefix_lookup_table_; + const ToLowerTable to_lower_table_; + const bool case_insensitive_; + Upstreams upstreams_; + ConnPool::InstanceSharedPtr catch_all_upstream_; +}; + +} // namespace RedisProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/common/common/utility_test.cc b/test/common/common/utility_test.cc index 6434cd140280..e2a084651065 100644 --- a/test/common/common/utility_test.cc +++ b/test/common/common/utility_test.cc @@ -828,4 +828,41 @@ TEST(DateFormatter, FromTimeSameWildcard) { DateFormatter("%Y-%m-%dT%H:%M:%S.000Z%1f%2f").fromTime(time1)); } +TEST(TrieLookupTable, AddItems) { + TrieLookupTable trie; + EXPECT_TRUE(trie.add("foo", "a")); + EXPECT_TRUE(trie.add("bar", "b")); + EXPECT_EQ("a", trie.find("foo")); + EXPECT_EQ("b", trie.find("bar")); + + // overwrite_existing = false + EXPECT_FALSE(trie.add("foo", "c", false)); + EXPECT_EQ("a", trie.find("foo")); + + // overwrite_existing = true + EXPECT_TRUE(trie.add("foo", "c")); + EXPECT_EQ("c", trie.find("foo")); +} + +TEST(TrieLookupTable, LongestPrefix) { + TrieLookupTable trie; + EXPECT_TRUE(trie.add("foo", "a")); + EXPECT_TRUE(trie.add("bar", "b")); + EXPECT_TRUE(trie.add("baro", "c")); + + EXPECT_EQ("a", trie.find("foo")); + EXPECT_EQ("a", trie.findLongestPrefix("foo")); + EXPECT_EQ("a", trie.findLongestPrefix("foosball")); + + EXPECT_EQ("b", trie.find("bar")); + EXPECT_EQ("b", trie.findLongestPrefix("bar")); + EXPECT_EQ("b", trie.findLongestPrefix("baritone")); + EXPECT_EQ("c", trie.findLongestPrefix("barometer")); + + EXPECT_EQ(nullptr, trie.find("toto")); + EXPECT_EQ(nullptr, trie.findLongestPrefix("toto")); + EXPECT_EQ(nullptr, trie.find(" ")); + EXPECT_EQ(nullptr, trie.findLongestPrefix(" ")); +} + } // namespace Envoy diff --git a/test/extensions/filters/network/redis_proxy/BUILD b/test/extensions/filters/network/redis_proxy/BUILD index 2ae0acae9a7b..eb6d003e5927 100644 --- a/test/extensions/filters/network/redis_proxy/BUILD +++ b/test/extensions/filters/network/redis_proxy/BUILD @@ -75,6 +75,7 @@ envoy_cc_mock( "//source/extensions/filters/network/common/redis:codec_lib", "//source/extensions/filters/network/redis_proxy:command_splitter_interface", "//source/extensions/filters/network/redis_proxy:conn_pool_interface", + "//source/extensions/filters/network/redis_proxy:router_interface", ], ) @@ -106,6 +107,18 @@ envoy_extension_cc_test_binary( ], ) +envoy_extension_cc_test( + name = "router_impl_test", + srcs = ["router_impl_test.cc"], + extension_name = "envoy.filters.network.redis_proxy", + deps = [ + ":redis_mocks", + "//source/extensions/filters/network/redis_proxy:router_lib", + "//test/extensions/filters/network/common/redis:redis_mocks", + "//test/test_common:utility_lib", + ], +) + envoy_extension_cc_test( name = "redis_proxy_integration_test", size = "small", diff --git a/test/extensions/filters/network/redis_proxy/command_lookup_speed_test.cc b/test/extensions/filters/network/redis_proxy/command_lookup_speed_test.cc index aa1964df7f7a..555088f07573 100644 --- a/test/extensions/filters/network/redis_proxy/command_lookup_speed_test.cc +++ b/test/extensions/filters/network/redis_proxy/command_lookup_speed_test.cc @@ -32,17 +32,8 @@ class NoOpSplitCallbacks : public CommandSplitter::SplitCallbacks { void onResponse(Common::Redis::RespValuePtr&&) override {} }; -class NullInstanceImpl : public ConnPool::Instance { - Common::Redis::Client::PoolRequest* makeRequest(const std::string&, - const Common::Redis::RespValue&, - Common::Redis::Client::PoolCallbacks&) override { - return nullptr; - } - Common::Redis::Client::PoolRequest* - makeRequestToHost(const std::string&, const Common::Redis::RespValue&, - Common::Redis::Client::PoolCallbacks&) override { - return nullptr; - } +class NullRouterImpl : public Router { + ConnPool::InstanceSharedPtr upstreamPool(std::string&) override { return nullptr; } }; class CommandLookUpSpeedTest { @@ -73,11 +64,11 @@ class CommandLookUpSpeedTest { } } - ConnPool::Instance* conn_pool_{new NullInstanceImpl()}; + Router* router_{new NullRouterImpl()}; Stats::IsolatedStoreImpl store_; Event::SimulatedTimeSystem time_system_; - CommandSplitter::InstanceImpl splitter_{ConnPool::InstancePtr{conn_pool_}, store_, "redis.foo.", - time_system_, false}; + CommandSplitter::InstanceImpl splitter_{RouterPtr{router_}, store_, "redis.foo.", time_system_, + false}; NoOpSplitCallbacks callbacks_; CommandSplitter::SplitRequestPtr handle_; }; diff --git a/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc b/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc index ae7a9839147b..207f6871aed8 100644 --- a/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc +++ b/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc @@ -38,6 +38,16 @@ namespace NetworkFilters { namespace RedisProxy { namespace CommandSplitter { +class PassthruRouter : public Router { +public: + PassthruRouter(ConnPool::InstanceSharedPtr conn_pool) : conn_pool_(conn_pool) {} + + ConnPool::InstanceSharedPtr upstreamPool(std::string&) override { return conn_pool_; } + +private: + ConnPool::InstanceSharedPtr conn_pool_; +}; + class RedisCommandSplitterImplTest : public testing::Test { public: void makeBulkStringArray(Common::Redis::RespValue& value, @@ -55,8 +65,8 @@ class RedisCommandSplitterImplTest : public testing::Test { ConnPool::MockInstance* conn_pool_{new ConnPool::MockInstance()}; NiceMock store_; Event::SimulatedTimeSystem time_system_; - InstanceImpl splitter_{ConnPool::InstancePtr{conn_pool_}, store_, "redis.foo.", time_system_, - false}; + InstanceImpl splitter_{std::make_unique(ConnPool::InstanceSharedPtr{conn_pool_}), + store_, "redis.foo.", time_system_, false}; MockSplitCallbacks callbacks_; SplitRequestPtr handle_; }; @@ -226,6 +236,7 @@ TEST_P(RedisSingleServerRequestTest, NoUpstream) { Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; makeBulkStringArray(*request, {GetParam(), "hello"}); EXPECT_CALL(*conn_pool_, makeRequest("hello", Ref(*request), _)).WillOnce(Return(nullptr)); + Common::Redis::RespValue response; response.type(Common::Redis::RespType::Error); response.asString() = Response::get().NoUpstreamHost; @@ -328,6 +339,7 @@ TEST_F(RedisSingleServerRequestTest, EvalNoUpstream) { Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; makeBulkStringArray(*request, {"eval", "return {ARGV[1]}", "1", "key", "arg"}); EXPECT_CALL(*conn_pool_, makeRequest("key", Ref(*request), _)).WillOnce(Return(nullptr)); + Common::Redis::RespValue response; response.type(Common::Redis::RespType::Error); response.asString() = Response::get().NoUpstreamHost; @@ -1429,8 +1441,8 @@ class RedisSingleServerRequestWithLatencyMicrosTest : public RedisSingleServerRe } ConnPool::MockInstance* conn_pool_{new ConnPool::MockInstance()}; - InstanceImpl splitter_{ConnPool::InstancePtr{conn_pool_}, store_, "redis.foo.", time_system_, - true}; + InstanceImpl splitter_{std::make_unique(ConnPool::InstanceSharedPtr{conn_pool_}), + store_, "redis.foo.", time_system_, true}; }; TEST_P(RedisSingleServerRequestWithLatencyMicrosTest, Success) { diff --git a/test/extensions/filters/network/redis_proxy/config_test.cc b/test/extensions/filters/network/redis_proxy/config_test.cc index 351fc97a78c8..ffca740ec492 100644 --- a/test/extensions/filters/network/redis_proxy/config_test.cc +++ b/test/extensions/filters/network/redis_proxy/config_test.cc @@ -23,6 +23,21 @@ TEST(RedisProxyFilterConfigFactoryTest, ValidateFail) { ProtoValidationException); } +TEST(RedisProxyFilterConfigFactoryTest, NoUpstreamDefined) { + envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings settings; + settings.mutable_op_timeout()->CopyFrom(Protobuf::util::TimeUtil::MillisecondsToDuration(20)); + + envoy::config::filter::network::redis_proxy::v2::RedisProxy config; + config.set_stat_prefix("foo"); + config.mutable_settings()->CopyFrom(settings); + + NiceMock context; + + EXPECT_THROW_WITH_MESSAGE( + RedisProxyFilterConfigFactory().createFilterFactoryFromProto(config, context), EnvoyException, + "cannot configure a redis-proxy without any upstream"); +} + TEST(RedisProxyFilterConfigFactoryTest, RedisProxyNoSettings) { const std::string yaml = R"EOF( cluster: fake_cluster diff --git a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc index 01d93c9a541f..197cd910b001 100644 --- a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc +++ b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc @@ -43,6 +43,7 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client if (!cluster_exists) { EXPECT_CALL(cm_, get("fake_cluster")).WillOnce(Return(nullptr)); } + conn_pool_ = std::make_unique( cluster_name_, cm_, *this, tls_, Common::Redis::Client::createConnPoolSettings(20, hashtagging, true)); @@ -78,7 +79,7 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client const std::string cluster_name_{"fake_cluster"}; NiceMock cm_; NiceMock tls_; - InstancePtr conn_pool_; + InstanceSharedPtr conn_pool_; Upstream::ClusterUpdateCallbacks* update_callbacks_{}; Common::Redis::Client::MockClient* client_{}; Network::Address::InstanceConstSharedPtr test_address_; diff --git a/test/extensions/filters/network/redis_proxy/mocks.cc b/test/extensions/filters/network/redis_proxy/mocks.cc index 7e0ce1eff0bd..3bbb28baba80 100644 --- a/test/extensions/filters/network/redis_proxy/mocks.cc +++ b/test/extensions/filters/network/redis_proxy/mocks.cc @@ -15,6 +15,9 @@ namespace Extensions { namespace NetworkFilters { namespace RedisProxy { +MockRouter::MockRouter() {} +MockRouter::~MockRouter() {} + namespace ConnPool { MockInstance::MockInstance() {} diff --git a/test/extensions/filters/network/redis_proxy/mocks.h b/test/extensions/filters/network/redis_proxy/mocks.h index ecd104af4cd1..381ef0a19bf5 100644 --- a/test/extensions/filters/network/redis_proxy/mocks.h +++ b/test/extensions/filters/network/redis_proxy/mocks.h @@ -8,6 +8,7 @@ #include "extensions/filters/network/common/redis/codec_impl.h" #include "extensions/filters/network/redis_proxy/command_splitter.h" #include "extensions/filters/network/redis_proxy/conn_pool.h" +#include "extensions/filters/network/redis_proxy/router.h" #include "test/test_common/printers.h" @@ -18,6 +19,14 @@ namespace Extensions { namespace NetworkFilters { namespace RedisProxy { +class MockRouter : public Router { +public: + MockRouter(); + ~MockRouter(); + + MOCK_METHOD1(upstreamPool, ConnPool::InstanceSharedPtr(std::string& key)); +}; + namespace ConnPool { class MockInstance : public Instance { diff --git a/test/extensions/filters/network/redis_proxy/proxy_filter_test.cc b/test/extensions/filters/network/redis_proxy/proxy_filter_test.cc index 333a9687dc50..4cb73b89186b 100644 --- a/test/extensions/filters/network/redis_proxy/proxy_filter_test.cc +++ b/test/extensions/filters/network/redis_proxy/proxy_filter_test.cc @@ -60,7 +60,7 @@ TEST_F(RedisProxyFilterConfigTest, Normal) { envoy::config::filter::network::redis_proxy::v2::RedisProxy proto_config = parseProtoFromJson(json_string); ProxyFilterConfig config(proto_config, store_, drain_decision_, runtime_); - EXPECT_EQ("fake_cluster", config.cluster_name_); + EXPECT_EQ("redis.foo.", config.stat_prefix_); } TEST_F(RedisProxyFilterConfigTest, BadRedisProxyConfig) { diff --git a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc index d23c2b264951..07997b76c0f1 100644 --- a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc +++ b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc @@ -39,10 +39,10 @@ const std::string CONFIG = R"EOF( socket_address: address: 127.0.0.1 port_value: 0 - - endpoint: + - endpoint: address: socket_address: - address: 127.0.0.1 + address: 127.0.0.1 port_value: 0 listeners: name: listener_0 @@ -56,7 +56,7 @@ const std::string CONFIG = R"EOF( config: stat_prefix: redis_stats cluster: cluster_0 - settings: + settings: op_timeout: 5s )EOF"; @@ -65,6 +65,88 @@ const std::string CONFIG_WITH_REDIRECTION = CONFIG + R"EOF( enable_redirection: true )EOF"; +const std::string CONFIG_WITH_ROUTES = R"EOF( +admin: + access_log_path: /dev/null + address: + socket_address: + address: 127.0.0.1 + port_value: 0 +static_resources: + clusters: + - name: cluster_0 + type: STATIC + lb_policy: RANDOM + load_assignment: + cluster_name: cluster_0 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + - name: cluster_1 + type: STATIC + lb_policy: RANDOM + load_assignment: + cluster_name: cluster_1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 1 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 1 + - name: cluster_2 + type: STATIC + lb_policy: RANDOM + load_assignment: + cluster_name: cluster_2 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 2 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 2 + listeners: + name: listener_0 + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + filter_chains: + filters: + name: envoy.redis_proxy + config: + stat_prefix: redis_stats + settings: + op_timeout: 5s + prefix_routes: + catch_all_cluster: cluster_0 + routes: + - prefix: "foo:" + cluster: cluster_1 + - prefix: "baz:" + cluster: cluster_2 +)EOF"; + // This function encodes commands as an array of bulkstrings as transmitted by Redis clients to // Redis servers, according to the Redis protocol. std::string makeBulkStringArray(std::vector&& command_strings) { @@ -115,7 +197,19 @@ class RedisProxyIntegrationTest : public testing::TestWithParamwrite(request); FakeRawConnectionPtr fake_upstream_connection; - EXPECT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + EXPECT_TRUE(upstream->waitForRawConnection(fake_upstream_connection)); EXPECT_TRUE(fake_upstream_connection->waitForData(request.size(), &proxy_to_server)); // The original request should be the same as the data received by the server. EXPECT_EQ(request, proxy_to_server); @@ -445,5 +549,23 @@ TEST_P(RedisProxyWithRedirectionIntegrationTest, IgnoreRedirectionForAsking) { asking_response.str()); } +// This test verifies that it's possible to route keys to 3 different upstream pools. + +TEST_P(RedisProxyWithRoutesIntegrationTest, SimpleRequestAndResponseRoutedByPrefix) { + initialize(); + + // roundtrip to cluster_0 (catch_all route) + simpleRoundtripToUpstream(fake_upstreams_[0], makeBulkStringArray({"get", "toto"}), + "$3\r\nbar\r\n"); + + // roundtrip to cluster_1 (prefix "foo:" route) + simpleRoundtripToUpstream(fake_upstreams_[2], makeBulkStringArray({"get", "foo:123"}), + "$3\r\nbar\r\n"); + + // roundtrip to cluster_2 (prefix "baz:" route) + simpleRoundtripToUpstream(fake_upstreams_[4], makeBulkStringArray({"get", "baz:123"}), + "$3\r\nbar\r\n"); +} + } // namespace } // namespace Envoy diff --git a/test/extensions/filters/network/redis_proxy/router_impl_test.cc b/test/extensions/filters/network/redis_proxy/router_impl_test.cc new file mode 100644 index 000000000000..486fcb009994 --- /dev/null +++ b/test/extensions/filters/network/redis_proxy/router_impl_test.cc @@ -0,0 +1,183 @@ +#include + +#include "extensions/filters/network/redis_proxy/conn_pool_impl.h" +#include "extensions/filters/network/redis_proxy/router_impl.h" + +#include "test/extensions/filters/network/common/redis/mocks.h" +#include "test/extensions/filters/network/redis_proxy/mocks.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Eq; +using testing::InSequence; +using testing::Ref; +using testing::Return; +using testing::StrEq; + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace RedisProxy { + +envoy::config::filter::network::redis_proxy::v2::RedisProxy::PrefixRoutes createPrefixRoutes() { + envoy::config::filter::network::redis_proxy::v2::RedisProxy::PrefixRoutes prefix_routes; + auto* routes = prefix_routes.mutable_routes(); + + { + auto* route = routes->Add(); + route->set_prefix("ab"); + route->set_cluster("fake_clusterA"); + } + + { + auto* route = routes->Add(); + route->set_prefix("a"); + route->set_cluster("fake_clusterB"); + } + + return prefix_routes; +} + +TEST(PrefixRoutesTest, MissingCatchAll) { + Upstreams upstreams; + upstreams.emplace("fake_clusterA", std::make_shared()); + upstreams.emplace("fake_clusterB", std::make_shared()); + + PrefixRoutes router(createPrefixRoutes(), std::move(upstreams)); + + std::string key("c:bar"); + EXPECT_EQ(nullptr, router.upstreamPool(key)); +} + +TEST(PrefixRoutesTest, RoutedToCatchAll) { + auto upstream_c = std::make_shared(); + + Upstreams upstreams; + upstreams.emplace("fake_clusterA", std::make_shared()); + upstreams.emplace("fake_clusterB", std::make_shared()); + upstreams.emplace("fake_clusterC", upstream_c); + + auto prefix_routes = createPrefixRoutes(); + prefix_routes.set_catch_all_cluster("fake_clusterC"); + + PrefixRoutes router(prefix_routes, std::move(upstreams)); + + std::string key("c:bar"); + EXPECT_EQ(upstream_c, router.upstreamPool(key)); +} + +TEST(PrefixRoutesTest, RoutedToLongestPrefix) { + auto upstream_a = std::make_shared(); + + Upstreams upstreams; + upstreams.emplace("fake_clusterA", upstream_a); + upstreams.emplace("fake_clusterB", std::make_shared()); + + PrefixRoutes router(createPrefixRoutes(), std::move(upstreams)); + + std::string key("ab:bar"); + EXPECT_EQ(upstream_a, router.upstreamPool(key)); +} + +TEST(PrefixRoutesTest, CaseUnsensitivePrefix) { + auto upstream_a = std::make_shared(); + + Upstreams upstreams; + upstreams.emplace("fake_clusterA", upstream_a); + upstreams.emplace("fake_clusterB", std::make_shared()); + + auto prefix_routes = createPrefixRoutes(); + prefix_routes.set_case_insensitive(true); + + PrefixRoutes router(prefix_routes, std::move(upstreams)); + + std::string key("AB:bar"); + EXPECT_EQ(upstream_a, router.upstreamPool(key)); +} + +TEST(PrefixRoutesTest, RemovePrefix) { + auto upstream_a = std::make_shared(); + + Upstreams upstreams; + upstreams.emplace("fake_clusterA", upstream_a); + upstreams.emplace("fake_clusterB", std::make_shared()); + + auto prefix_routes = createPrefixRoutes(); + + { + auto* route = prefix_routes.mutable_routes()->Add(); + route->set_prefix("abc"); + route->set_cluster("fake_clusterA"); + route->set_remove_prefix(true); + } + + PrefixRoutes router(prefix_routes, std::move(upstreams)); + + std::string key("abc:bar"); + EXPECT_EQ(upstream_a, router.upstreamPool(key)); + EXPECT_EQ(":bar", key); +} + +TEST(PrefixRoutesTest, RoutedToShortestPrefix) { + auto upstream_b = std::make_shared(); + + Upstreams upstreams; + upstreams.emplace("fake_clusterA", std::make_shared()); + upstreams.emplace("fake_clusterB", upstream_b); + + PrefixRoutes router(createPrefixRoutes(), std::move(upstreams)); + + std::string key("a:bar"); + EXPECT_EQ(upstream_b, router.upstreamPool(key)); + EXPECT_EQ("a:bar", key); +} + +TEST(PrefixRoutesTest, DifferentPrefixesSameUpstream) { + auto upstream_b = std::make_shared(); + + Upstreams upstreams; + upstreams.emplace("fake_clusterA", std::make_shared()); + upstreams.emplace("fake_clusterB", upstream_b); + + auto prefix_routes = createPrefixRoutes(); + + { + auto* route = prefix_routes.mutable_routes()->Add(); + route->set_prefix("also_route_to_b"); + route->set_cluster("fake_clusterB"); + } + + PrefixRoutes router(prefix_routes, std::move(upstreams)); + + std::string key1("a:bar"); + EXPECT_EQ(upstream_b, router.upstreamPool(key1)); + + std::string key2("also_route_to_b:bar"); + EXPECT_EQ(upstream_b, router.upstreamPool(key2)); +} + +TEST(PrefixRoutesTest, DuplicatePrefix) { + Upstreams upstreams; + upstreams.emplace("fake_clusterA", std::make_shared()); + upstreams.emplace("fake_clusterB", std::make_shared()); + upstreams.emplace("this_will_throw", std::make_shared()); + + auto prefix_routes = createPrefixRoutes(); + + { + auto* route = prefix_routes.mutable_routes()->Add(); + route->set_prefix("ab"); + route->set_cluster("this_will_throw"); + } + + EXPECT_THROW_WITH_MESSAGE(PrefixRoutes router(prefix_routes, std::move(upstreams)), + EnvoyException, "prefix `ab` already exists.") +} + +} // namespace RedisProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy