Skip to content

Commit

Permalink
comment filter process
Browse files Browse the repository at this point in the history
  • Loading branch information
billowqiu committed Sep 15, 2024
1 parent f0a482e commit f4413e1
Show file tree
Hide file tree
Showing 28 changed files with 238 additions and 58 deletions.
4 changes: 4 additions & 0 deletions envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ using RouteConfigUpdatedCallbackSharedPtr = std::shared_ptr<RouteConfigUpdatedCa
* Stream decoder filter callbacks add additional callbacks that allow a decoding filter to restart
* decoding if they decide to hold data (e.g. for buffering or rate limiting).
*/
// 目前有两个实现分别是: AsyncStreamImpl(async_client_impl.h) 和 ActiveStreamDecoderFilter (filter_manager.h)
class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
public:
/**
Expand Down Expand Up @@ -743,6 +744,7 @@ using StreamDecoderFilterSharedPtr = std::shared_ptr<StreamDecoderFilter>;
* Stream encoder filter callbacks add additional callbacks that allow a encoding filter to restart
* encoding if they decide to hold data (e.g. for buffering or rate limiting).
*/
// 目前就一个实现 ActiveStreamEncoderFilter
class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
public:
/**
Expand Down Expand Up @@ -990,6 +992,7 @@ class HttpMatchingData {
* These callbacks are provided by the connection manager to the factory so that the factory can
* build the filter chain in an application specific way.
*/
// 这里一个典型的实现就是 FilterManager
class FilterChainFactoryCallbacks {
public:
virtual ~FilterChainFactoryCallbacks() = default;
Expand Down Expand Up @@ -1059,6 +1062,7 @@ class FilterChainFactoryCallbacks {
* function will install a single filter, but it's technically possibly to install more than one
* if desired.
*/
// 这些创建 filter 的回调函数对象,在初始化的时候就会被设置好,实际使用是在一个 http stream 新构建出来之后才会真正来调用,以便构造出来实际的 filter 来处理这些stream
using FilterFactoryCb = std::function<void(FilterChainFactoryCallbacks& callbacks)>;

/**
Expand Down
154 changes: 113 additions & 41 deletions examples/ext_authz/config/grpc-service/v3.yaml
Original file line number Diff line number Diff line change
@@ -1,54 +1,121 @@
static_resources:
listeners:
- address:
socket_address:
address: 0.0.0.0
port_value: 8000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
codec_type: AUTO
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: upstream
domains:
- "*"
routes:
- match:
prefix: "/"
route:
cluster: upstream-service
http_filters:
- name: envoy.filters.http.ext_authz
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthz
grpc_service:
envoy_grpc:
cluster_name: ext_authz-grpc-service
timeout: 0.250s
transport_api_version: V3
- name: envoy.filters.http.router
- name: staticreply
address:
socket_address:
address: 127.0.0.1
port_value: 8099
filter_chains:
- filters:
- name: envoy.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
codec_type: auto
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains:
- "*"
routes:
- match:
prefix: "/ok"
direct_response:
status: 200
body:
inline_string: "example body from behind Envoy!\n"
- match:
prefix: "/fail"
direct_response:
status: 503

http_filters:
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
- name: main
address:
socket_address:
address: 0.0.0.0
port_value: 8000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
codec_type: AUTO
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: upstream
domains:
- "*"
routes:
- match:
prefix: "/httpbin"
route:
cluster: httpbin
- match:
prefix: "/"
route:
cluster: upstream-service

http_filters:
# - name: envoy.filters.http.ext_authz
# typed_config:
# "@type": type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthz
# grpc_service:
# envoy_grpc:
# cluster_name: ext_authz-grpc-service
# timeout: 0.250s
# transport_api_version: V3

- name: envoy.filters.http.router

clusters:
- name: upstream-service
type: STRICT_DNS
connect_timeout: 0.25s
type: STATIC
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: upstream-service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: upstream-service
port_value: 8080
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 8099
- name: httpbin
connect_timeout: 5000s
type: strict_dns
lb_policy: round_robin
load_assignment:
cluster_name: httpbin
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: httpbin.org
port_value: 80
# - name: upstream-service
# type: STRICT_DNS
# lb_policy: ROUND_ROBIN
# load_assignment:
# cluster_name: upstream-service
# endpoints:
# - lb_endpoints:
# - endpoint:
# address:
# socket_address:
# address: upstream-service
# port_value: 8080

- name: ext_authz-grpc-service
type: STRICT_DNS
type: STATIC
lb_policy: ROUND_ROBIN
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
Expand All @@ -62,5 +129,10 @@ static_resources:
- endpoint:
address:
socket_address:
address: ext_authz-grpc-service
address: 127.0.0.1
port_value: 9001
- endpoint:
address:
socket_address:
address: 9.134.189.148
port_value: 9001
18 changes: 15 additions & 3 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config,
overload_state_.getState(Server::OverloadActionNames::get().DisableHttpKeepAlive)),
time_source_(time_source),
enable_internal_redirects_with_body_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.internal_redirects_with_body")) {}
"envoy.reloadable_features.internal_redirects_with_body")) {
ENVOY_LOG(debug, "construct http_connection_manager filter {}", static_cast<void*>(this));
}

const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() {
static const auto headers = createHeaderMap<ResponseHeaderMapImpl>(
Expand Down Expand Up @@ -322,6 +324,7 @@ void ConnectionManagerImpl::handleCodecError(absl::string_view error) {

void ConnectionManagerImpl::createCodec(Buffer::Instance& data) {
ASSERT(!codec_);
// 第三个参数this会一直带到具体的 codec 对象中,用于回调
codec_ = config_.createCodec(read_callbacks_->connection(), data, *this);

switch (codec_->protocol()) {
Expand Down Expand Up @@ -351,6 +354,7 @@ Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool
do {
redispatch = false;

// 回调 codec
const Status status = codec_->dispatch(data);

if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
Expand Down Expand Up @@ -614,7 +618,9 @@ ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connect
filter_manager_(*this, connection_manager_.read_callbacks_->connection().dispatcher(),
connection_manager_.read_callbacks_->connection(), stream_id_,
std::move(account), connection_manager_.config_.proxy100Continue(),
buffer_limit, connection_manager_.config_.filterFactory(),
buffer_limit,
// 保存 HttpConnectionManagerConfig 的引用 FilterChainFactory& filterFactory() override { return *this; }
connection_manager_.config_.filterFactory(),
connection_manager_.config_.localReply(),
connection_manager_.codec_->protocol(), connection_manager_.timeSource(),
connection_manager_.read_callbacks_->connection().streamInfo().filterState(),
Expand Down Expand Up @@ -1004,6 +1010,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& he
ASSERT(filter_manager_.streamInfo().downstreamAddressProvider().remoteAddress() != nullptr);

ASSERT(!cached_route_);
// 刷路由缓存
refreshCachedRoute();

if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
Expand All @@ -1015,6 +1022,8 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& he

filter_manager_.streamInfo().setRequestHeaders(*request_headers_);

// 构建 filter 链,调用完之后针对此 stream 的 filter 链就构建好了
ENVOY_STREAM_LOG(debug, "filter_manager createFilterChain", *this);
const bool upgrade_rejected = filter_manager_.createFilterChain() == false;

// TODO if there are no filters when starting a filter iteration, the connection manager
Expand Down Expand Up @@ -1063,7 +1072,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& he
if (connection_manager_.config_.tracingConfig()) {
traceRequest();
}

// 回调各个插件
filter_manager_.decodeHeaders(*request_headers_, end_stream);

// Reset it here for both global and overridden cases.
Expand Down Expand Up @@ -1281,8 +1290,11 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::Route
snapScopedRouteConfig();
}
if (snapped_route_config_ != nullptr) {
// 查找路由,根据配置的 route 规则查找当前请求匹配的规则,会调用到 RouteMatcher::route
route = snapped_route_config_->route(cb, *request_headers_, filter_manager_.streamInfo(),
stream_id_);
ENVOY_STREAM_LOG(debug, "refreshCachedRoute route cluster name {}", *this,
route==nullptr? "empty":route->routeEntry()->clusterName());
}
}

Expand Down
1 change: 1 addition & 0 deletions source/common/http/conn_manager_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ ServerConnectionPtr ConnectionManagerUtility::autoCreateCodec(
uint32_t max_request_headers_kb, uint32_t max_request_headers_count,
envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
headers_with_underscores_action) {
// 协议探测
if (determineNextProtocol(connection, data) == Utility::AlpnNames::get().Http2) {
Http2::CodecStats& stats = Http2::CodecStats::atomicGet(http2_codec_stats, scope);
return std::make_unique<Http2::ServerConnectionImpl>(
Expand Down
16 changes: 15 additions & 1 deletion source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ void FilterManager::addStreamDecoderFilterWorker(StreamDecoderFilterSharedPtr fi
// - B
// - C
// The decoder filter chain will iterate through filters A, B, C.
// 按照定义的顺序从上到下依次调用
LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_);
}

Expand Down Expand Up @@ -519,7 +520,11 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext);
std::list<ActiveStreamDecoderFilterPtr>::iterator continue_data_entry = decoder_filters_.end();

// 开始回调各个 filter
for (; entry != decoder_filters_.end(); entry++) {
ENVOY_STREAM_LOG(trace,
"decodeHeaders filter iteration filter={}",
*this, static_cast<const void*>((*entry).get()));
(*entry)->maybeEvaluateMatchTreeWithNewData(
[&](auto& matching_data) { matching_data.onRequestHeaders(headers); });

Expand Down Expand Up @@ -620,6 +625,9 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan
commonDecodePrefix(filter, filter_iteration_start_state);

for (; entry != decoder_filters_.end(); entry++) {
ENVOY_STREAM_LOG(trace,
"decodeData filter iteration filter={}",
*this, static_cast<const void*>((*entry).get()));
if ((*entry)->skipFilter()) {
continue;
}
Expand Down Expand Up @@ -766,6 +774,9 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent);

for (; entry != decoder_filters_.end(); entry++) {
ENVOY_STREAM_LOG(trace,
"addDecodedMetadata filter iteration filter={}",
*this, static_cast<const void*>((*entry).get()));
(*entry)->maybeEvaluateMatchTreeWithNewData(
[&](auto& matching_data) { matching_data.onRequestTrailers(trailers); });

Expand Down Expand Up @@ -807,6 +818,9 @@ void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMa
commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent);

for (; entry != decoder_filters_.end(); entry++) {
ENVOY_STREAM_LOG(trace,
"decodeMetadata filter iteration filter={}",
*this, static_cast<const void*>((*entry).get()));
if ((*entry)->skipFilter()) {
continue;
}
Expand Down Expand Up @@ -1425,7 +1439,7 @@ bool FilterManager::createFilterChain() {
// will send a local reply indicating that the upgrade failed.
}
}

// filter_chain_factory_ 是 HttpConnectionManagerConfig,相当于调用到 HttpConnectionManagerConfig::createFilterChain
filter_chain_factory_.createFilterChain(*this);
return !upgrade_rejected;
}
Expand Down
7 changes: 7 additions & 0 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,
// called here may change the content type, so we must check it before the call.
FilterHeadersStatus decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
is_grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
ENVOY_STREAM_LOG(trace,
"call decodeHeaders in real filter={}",
*this, static_cast<const void*>(handle_.get()));
FilterHeadersStatus status = handle_->decodeHeaders(headers, end_stream);
return status;
}
Expand Down Expand Up @@ -687,6 +690,8 @@ class FilterManager : public ScopeTrackedObject,
}

// Http::FilterChainFactoryCallbacks
// envoy 里面搞了太多继承用法,幸好是注释还是有一些,比如下面这些方法都是实现了FilterChainFactoryCallbacks这个接口,用于构造 filter chain
// 这些方法一般都是在 filter 的 config 的工厂方法里面调用,这些 filter 也是在那里被 make 出来的
Event::Dispatcher& dispatcher() override { return dispatcher_; }
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override {
addStreamDecoderFilterWorker(filter, nullptr, false);
Expand Down Expand Up @@ -723,6 +728,7 @@ class FilterManager : public ScopeTrackedObject,
addStreamEncoderFilterWorker(filter, nullptr, false);
}
void addStreamFilter(StreamFilterSharedPtr filter) override {
// 添加 filter 到 chain,包括 decoder 和 encode 两个方向的
addStreamDecoderFilterWorker(filter, nullptr, true);
addStreamEncoderFilterWorker(filter, nullptr, true);
StreamDecoderFilter* decoder_filter = filter.get();
Expand Down Expand Up @@ -1009,6 +1015,7 @@ class FilterManager : public ScopeTrackedObject,
Buffer::BufferMemoryAccountSharedPtr account_;
const bool proxy_100_continue_;

// decoder 和 encoder 的 filter 链
std::list<ActiveStreamDecoderFilterPtr> decoder_filters_;
std::list<ActiveStreamEncoderFilterPtr> encoder_filters_;
std::list<StreamFilterBase*> filters_;
Expand Down
Loading

0 comments on commit f4413e1

Please sign in to comment.