@@ -25,8 +25,7 @@ Cluster::Cluster(
25
25
added_via_api, factory_context.dispatcher().timeSource()),
26
26
dns_cache_manager_ (cache_manager_factory.get()),
27
27
dns_cache_(dns_cache_manager_->getCache (config.dns_cache_config())),
28
- update_callbacks_handle_(dns_cache_->addUpdateCallbacks (*this )), local_info_(local_info),
29
- host_map_(std::make_shared<HostInfoMap>()) {
28
+ update_callbacks_handle_(dns_cache_->addUpdateCallbacks (*this )), local_info_(local_info) {
30
29
// Block certain TLS context parameters that don't make sense on a cluster-wide scale. We will
31
30
// support these parameters dynamically in the future. This is not an exhaustive list of
32
31
// parameters that don't make sense but should be the most obvious ones that a user might set
@@ -44,123 +43,116 @@ Cluster::Cluster(
44
43
45
44
void Cluster::startPreInit () {
46
45
// If we are attaching to a pre-populated cache we need to initialize our hosts.
47
- auto existing_hosts = dns_cache_->hosts ();
48
- if (!existing_hosts.empty ()) {
49
- std::shared_ptr<HostInfoMap> new_host_map;
50
- std::unique_ptr<Upstream::HostVector> hosts_added;
51
- for (const auto & existing_host : existing_hosts) {
52
- addOrUpdateWorker (existing_host.first , existing_host.second , new_host_map, hosts_added);
53
- }
54
- swapAndUpdateMap (new_host_map, *hosts_added, {});
46
+ std::unique_ptr<Upstream::HostVector> hosts_added;
47
+ dns_cache_->iterateHostMap (
48
+ [&](absl::string_view host, const Common::DynamicForwardProxy::DnsHostInfoSharedPtr& info) {
49
+ addOrUpdateHost (host, info, hosts_added);
50
+ });
51
+ if (hosts_added) {
52
+ updatePriorityState (*hosts_added, {});
55
53
}
56
-
57
54
onPreInitComplete ();
58
55
}
59
56
60
- void Cluster::addOrUpdateWorker (
61
- const std::string& host,
57
+ void Cluster::addOrUpdateHost (
58
+ absl::string_view host,
62
59
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info,
63
- std::shared_ptr<HostInfoMap>& new_host_map,
64
60
std::unique_ptr<Upstream::HostVector>& hosts_added) {
65
- // We should never get a host with no address from the cache.
66
- ASSERT (host_info->address () != nullptr );
67
-
68
- // NOTE: Right now we allow a DNS cache to be shared between multiple clusters. Though we have
69
- // connection/request circuit breakers on the cluster, we don't have any way to control the
70
- // maximum hosts on a cluster. We currently assume that host data shared via shared pointer is a
71
- // marginal memory cost above that already used by connections and requests, so relying on
72
- // connection/request circuit breakers is sufficient. We may have to revisit this in the future.
73
-
74
- HostInfoMapSharedPtr current_map = getCurrentHostMap ();
75
- const auto host_map_it = current_map->find (host);
76
- if (host_map_it != current_map->end ()) {
77
- // If we only have an address change, we can do that swap inline without any other updates.
78
- // The appropriate R/W locking is in place to allow this. The details of this locking are:
79
- // - Hosts are not thread local, they are global.
80
- // - We take a read lock when reading the address and a write lock when changing it.
81
- // - Address updates are very rare.
82
- // - Address reads are only done when a connection is being made and a "real" host
83
- // description is created or the host is queried via the admin endpoint. Both of
84
- // these operations are relatively rare and the read lock is held for a short period
85
- // of time.
86
- //
87
- // TODO(mattklein123): Right now the dynamic forward proxy / DNS cache works similar to how
88
- // logical DNS works, meaning that we only store a single address per
89
- // resolution. It would not be difficult to also expose strict DNS
90
- // semantics, meaning the cache would expose multiple addresses and the
91
- // cluster would create multiple logical hosts based on those addresses.
92
- // We will leave this is a follow up depending on need.
93
- ASSERT (host_info == host_map_it->second .shared_host_info_ );
94
- ASSERT (host_map_it->second .shared_host_info_ ->address () !=
95
- host_map_it->second .logical_host_ ->address ());
96
- ENVOY_LOG (debug, " updating dfproxy cluster host address '{}'" , host);
97
- host_map_it->second .logical_host_ ->setNewAddress (host_info->address (), dummy_lb_endpoint_);
98
- return ;
99
- }
61
+ Upstream::LogicalHostSharedPtr emplaced_host;
62
+ {
63
+ absl::WriterMutexLock lock{&host_map_lock_};
64
+ // We should never get a host with no address from the cache.
65
+ ASSERT (host_info->address () != nullptr );
66
+
67
+ // NOTE: Right now we allow a DNS cache to be shared between multiple clusters. Though we have
68
+ // connection/request circuit breakers on the cluster, we don't have any way to control the
69
+ // maximum hosts on a cluster. We currently assume that host data shared via shared pointer is a
70
+ // marginal memory cost above that already used by connections and requests, so relying on
71
+ // connection/request circuit breakers is sufficient. We may have to revisit this in the future.
72
+ const auto host_map_it = host_map_.find (host);
73
+ if (host_map_it != host_map_.end ()) {
74
+ // If we only have an address change, we can do that swap inline without any other updates.
75
+ // The appropriate R/W locking is in place to allow this. The details of this locking are:
76
+ // - Hosts are not thread local, they are global.
77
+ // - We take a read lock when reading the address and a write lock when changing it.
78
+ // - Address updates are very rare.
79
+ // - Address reads are only done when a connection is being made and a "real" host
80
+ // description is created or the host is queried via the admin endpoint. Both of
81
+ // these operations are relatively rare and the read lock is held for a short period
82
+ // of time.
83
+ //
84
+ // TODO(mattklein123): Right now the dynamic forward proxy / DNS cache works similar to how
85
+ // logical DNS works, meaning that we only store a single address per
86
+ // resolution. It would not be difficult to also expose strict DNS
87
+ // semantics, meaning the cache would expose multiple addresses and the
88
+ // cluster would create multiple logical hosts based on those addresses.
89
+ // We will leave this is a follow up depending on need.
90
+ ASSERT (host_info == host_map_it->second .shared_host_info_ );
91
+ ASSERT (host_map_it->second .shared_host_info_ ->address () !=
92
+ host_map_it->second .logical_host_ ->address ());
93
+ ENVOY_LOG (debug, " updating dfproxy cluster host address '{}'" , host);
94
+ host_map_it->second .logical_host_ ->setNewAddress (host_info->address (), dummy_lb_endpoint_);
95
+ return ;
96
+ }
100
97
101
- ENVOY_LOG (debug, " adding new dfproxy cluster host '{}'" , host);
98
+ ENVOY_LOG (debug, " adding new dfproxy cluster host '{}'" , host);
102
99
103
- if (new_host_map == nullptr ) {
104
- new_host_map = std::make_shared<HostInfoMap>(*current_map);
100
+ emplaced_host = host_map_
101
+ .try_emplace (host, host_info,
102
+ std::make_shared<Upstream::LogicalHost>(
103
+ info (), std::string{host}, host_info->address (),
104
+ dummy_locality_lb_endpoint_, dummy_lb_endpoint_, nullptr ,
105
+ time_source_))
106
+ .first ->second .logical_host_ ;
105
107
}
106
- const auto emplaced =
107
- new_host_map->try_emplace (host, host_info,
108
- std::make_shared<Upstream::LogicalHost>(
109
- info (), host, host_info->address (), dummy_locality_lb_endpoint_,
110
- dummy_lb_endpoint_, nullptr , time_source_));
108
+
109
+ ASSERT (emplaced_host);
111
110
if (hosts_added == nullptr ) {
112
111
hosts_added = std::make_unique<Upstream::HostVector>();
113
112
}
114
- hosts_added->emplace_back (emplaced. first -> second . logical_host_ );
113
+ hosts_added->emplace_back (emplaced_host );
115
114
}
116
115
117
116
void Cluster::onDnsHostAddOrUpdate (
118
117
const std::string& host,
119
118
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info) {
120
- std::shared_ptr<HostInfoMap> new_host_map;
119
+ ENVOY_LOG (debug, " Adding host info for {}" , host);
120
+
121
121
std::unique_ptr<Upstream::HostVector> hosts_added;
122
- addOrUpdateWorker (host, host_info, new_host_map , hosts_added);
122
+ addOrUpdateHost (host, host_info, hosts_added);
123
123
if (hosts_added != nullptr ) {
124
- ASSERT (!new_host_map->empty ());
125
124
ASSERT (!hosts_added->empty ());
126
- // Swap in the new map. This will be picked up when the per-worker LBs are recreated via
127
- // the host set update.
128
- swapAndUpdateMap (new_host_map, *hosts_added, {});
125
+ updatePriorityState (*hosts_added, {});
129
126
}
130
127
}
131
128
132
- void Cluster::swapAndUpdateMap (const HostInfoMapSharedPtr& new_hosts_map,
133
- const Upstream::HostVector& hosts_added,
134
- const Upstream::HostVector& hosts_removed) {
135
- {
136
- absl::WriterMutexLock lock (&host_map_lock_);
137
- host_map_ = new_hosts_map;
138
- }
139
-
129
+ void Cluster::updatePriorityState (const Upstream::HostVector& hosts_added,
130
+ const Upstream::HostVector& hosts_removed) {
140
131
Upstream::PriorityStateManager priority_state_manager (*this , local_info_, nullptr );
141
132
priority_state_manager.initializePriorityFor (dummy_locality_lb_endpoint_);
142
- for (const auto & host : (*new_hosts_map)) {
143
- priority_state_manager.registerHostForPriority (host.second .logical_host_ ,
144
- dummy_locality_lb_endpoint_);
133
+ {
134
+ absl::ReaderMutexLock lock{&host_map_lock_};
135
+ for (const auto & host : host_map_) {
136
+ priority_state_manager.registerHostForPriority (host.second .logical_host_ ,
137
+ dummy_locality_lb_endpoint_);
138
+ }
145
139
}
146
140
priority_state_manager.updateClusterPrioritySet (
147
141
0 , std::move (priority_state_manager.priorityState ()[0 ].first ), hosts_added, hosts_removed,
148
142
absl::nullopt, absl::nullopt);
149
143
}
150
144
151
145
void Cluster::onDnsHostRemove (const std::string& host) {
152
- HostInfoMapSharedPtr current_map = getCurrentHostMap ();
153
- const auto host_map_it = current_map->find (host);
154
- ASSERT (host_map_it != current_map->end ());
155
- const auto new_host_map = std::make_shared<HostInfoMap>(*current_map);
156
146
Upstream::HostVector hosts_removed;
157
- hosts_removed.emplace_back (host_map_it->second .logical_host_ );
158
- new_host_map->erase (host);
159
- ENVOY_LOG (debug, " removing dfproxy cluster host '{}'" , host);
160
-
161
- // Swap in the new map. This will be picked up when the per-worker LBs are recreated via
162
- // the host set update.
163
- swapAndUpdateMap (new_host_map, {}, hosts_removed);
147
+ {
148
+ absl::WriterMutexLock lock{&host_map_lock_};
149
+ const auto host_map_it = host_map_.find (host);
150
+ ASSERT (host_map_it != host_map_.end ());
151
+ hosts_removed.emplace_back (host_map_it->second .logical_host_ );
152
+ host_map_.erase (host);
153
+ ENVOY_LOG (debug, " removing dfproxy cluster host '{}'" , host);
154
+ }
155
+ updatePriorityState ({}, hosts_removed);
164
156
}
165
157
166
158
Upstream::HostConstSharedPtr
@@ -179,13 +171,15 @@ Cluster::LoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
179
171
if (host.empty ()) {
180
172
return nullptr ;
181
173
}
182
-
183
- const auto host_it = host_map_->find (host);
184
- if (host_it == host_map_->end ()) {
185
- return nullptr ;
186
- } else {
187
- host_it->second .shared_host_info_ ->touch ();
188
- return host_it->second .logical_host_ ;
174
+ {
175
+ absl::ReaderMutexLock lock{&cluster_.host_map_lock_ };
176
+ const auto host_it = cluster_.host_map_ .find (host);
177
+ if (host_it == cluster_.host_map_ .end ()) {
178
+ return nullptr ;
179
+ } else {
180
+ host_it->second .shared_host_info_ ->touch ();
181
+ return host_it->second .logical_host_ ;
182
+ }
189
183
}
190
184
}
191
185
0 commit comments