Skip to content

Commit 1ed87ec

Browse files
internal/xds: move the LDS and RDS watchers to dependency manager (#8651)
This PR moves the LDS and RDS watchers to dependency manager without changing the current functionality or behaviour. This is a part of implementation of gRFC [A74](https://github.com/grpc/proposal/blob/master/A74-xds-config-tears.md). RELEASE NOTES: None --------- Co-authored-by: Easwar Swaminathan <[email protected]>
1 parent f191b45 commit 1ed87ec

File tree

5 files changed

+1324
-17
lines changed

5 files changed

+1324
-17
lines changed

internal/grpctest/tlogger.go

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ type tLogger struct {
6666
v int
6767
initialized bool
6868

69-
mu sync.Mutex // guards t, start, and errors
70-
t *testing.T
71-
start time.Time
72-
errors map[*regexp.Regexp]int
69+
mu sync.Mutex
70+
t *testing.T
71+
start time.Time
72+
logs map[logType]map[*regexp.Regexp]int
7373
}
7474

7575
func init() {
@@ -87,7 +87,11 @@ func init() {
8787
}
8888
}
8989
// Initialize tLogr with the determined verbosity level.
90-
tLogr = &tLogger{errors: make(map[*regexp.Regexp]int), v: vLevel}
90+
logsMap := map[logType]map[*regexp.Regexp]int{
91+
errorLog: {},
92+
warningLog: {},
93+
}
94+
tLogr = &tLogger{logs: logsMap, v: vLevel}
9195
}
9296

9397
// getCallingPrefix returns the <file:line> at the given depth from the stack.
@@ -115,11 +119,14 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) {
115119
switch ltype {
116120
case errorLog:
117121
// fmt.Sprintln is used rather than fmt.Sprint because tl.Log uses fmt.Sprintln behavior.
118-
if tl.expected(fmt.Sprintln(args...)) {
122+
if tl.expected(fmt.Sprintln(args...), errorLog) {
119123
tl.t.Log(args...)
120124
} else {
121125
tl.t.Error(args...)
122126
}
127+
case warningLog:
128+
tl.expected(fmt.Sprintln(args...), warningLog)
129+
tl.t.Log(args...)
123130
case fatalLog:
124131
panic(fmt.Sprint(args...))
125132
default:
@@ -130,11 +137,14 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) {
130137
format = "%v " + format + "%s"
131138
switch ltype {
132139
case errorLog:
133-
if tl.expected(fmt.Sprintf(format, args...)) {
140+
if tl.expected(fmt.Sprintf(format, args...), errorLog) {
134141
tl.t.Logf(format, args...)
135142
} else {
136143
tl.t.Errorf(format, args...)
137144
}
145+
case warningLog:
146+
tl.expected(fmt.Sprintln(args...), warningLog)
147+
tl.t.Log(args...)
138148
case fatalLog:
139149
panic(fmt.Sprintf(format, args...))
140150
default:
@@ -154,7 +164,8 @@ func (tl *tLogger) update(t *testing.T) {
154164
}
155165
tl.t = t
156166
tl.start = time.Now()
157-
tl.errors = map[*regexp.Regexp]int{}
167+
tl.logs[errorLog] = map[*regexp.Regexp]int{}
168+
tl.logs[warningLog] = map[*regexp.Regexp]int{}
158169
}
159170

160171
// ExpectError declares an error to be expected. For the next test, the first
@@ -163,40 +174,56 @@ func (tl *tLogger) update(t *testing.T) {
163174
// Update(). Note that if an expected error is not encountered, this will cause
164175
// the test to fail.
165176
func ExpectError(expr string) {
166-
ExpectErrorN(expr, 1)
177+
expectLogsN(expr, 1, errorLog)
167178
}
168179

169180
// ExpectErrorN declares an error to be expected n times.
170181
func ExpectErrorN(expr string, n int) {
182+
expectLogsN(expr, n, errorLog)
183+
}
184+
185+
// ExpectWarning declares a warning to be expected.
186+
func ExpectWarning(expr string) {
187+
expectLogsN(expr, 1, warningLog)
188+
}
189+
190+
func expectLogsN(expr string, n int, logType logType) {
171191
tLogr.mu.Lock()
172192
defer tLogr.mu.Unlock()
173193
re, err := regexp.Compile(expr)
174194
if err != nil {
175195
tLogr.t.Error(err)
176196
return
177197
}
178-
tLogr.errors[re] += n
198+
tLogr.logs[logType][re] += n
179199
}
180200

181201
// endTest checks if expected errors were not encountered.
182202
func (tl *tLogger) endTest(t *testing.T) {
183203
tl.mu.Lock()
184204
defer tl.mu.Unlock()
185-
for re, count := range tl.errors {
205+
for re, count := range tl.logs[errorLog] {
186206
if count > 0 {
187207
t.Errorf("Expected error '%v' not encountered", re.String())
188208
}
189209
}
190-
tl.errors = map[*regexp.Regexp]int{}
210+
for re, count := range tl.logs[warningLog] {
211+
if count > 0 {
212+
t.Errorf("Expected warning '%v' not encountered", re.String())
213+
}
214+
}
215+
tl.logs[errorLog] = map[*regexp.Regexp]int{}
216+
tl.logs[warningLog] = map[*regexp.Regexp]int{}
191217
}
192218

193-
// expected determines if the error string is protected or not.
194-
func (tl *tLogger) expected(s string) bool {
195-
for re, count := range tl.errors {
219+
// expected determines if the log string of the particular type is protected or
220+
// not.
221+
func (tl *tLogger) expected(s string, logType logType) bool {
222+
for re, count := range tl.logs[logType] {
196223
if re.FindStringIndex(s) != nil {
197-
tl.errors[re]--
224+
tl.logs[logType][re]--
198225
if count <= 1 {
199-
delete(tl.errors, re)
226+
delete(tl.logs[logType], re)
200227
}
201228
return true
202229
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
*
3+
* Copyright 2025 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package xdsresource
19+
20+
import "google.golang.org/grpc/resolver"
21+
22+
// XDSConfig holds the complete gRPC client-side xDS configuration containing
23+
// all necessary resources.
24+
type XDSConfig struct {
25+
// Listener holds the listener configuration. It is guaranteed to be
26+
// non-nil.
27+
Listener *ListenerUpdate
28+
29+
// RouteConfig holds the route configuration. It will be populated even if
30+
// the route configuration was inlined into the Listener resource. It is
31+
// guaranteed to be non-nil.
32+
RouteConfig *RouteConfigUpdate
33+
34+
// VirtualHost is selected from the route configuration whose domain field
35+
// offers the best match against the provided dataplane authority. It is
36+
// guaranteed to be non-nil.
37+
VirtualHost *VirtualHost
38+
39+
// Clusters is a map from cluster name to its configuration.
40+
Clusters map[string]*ClusterResult
41+
}
42+
43+
// ClusterResult contains a cluster's configuration when a valid resource is
44+
// received from the management server. It contains an error when:
45+
// - an invalid resource is received from the management server and
46+
// a valid resource was not already present or
47+
// - the cluster resource does not exist on the management server
48+
type ClusterResult struct {
49+
Config ClusterConfig
50+
Err error
51+
}
52+
53+
// ClusterConfig contains configuration for a single cluster.
54+
type ClusterConfig struct {
55+
// Cluster configuration for the cluster. This field is always set to a
56+
// non-nil value.
57+
Cluster *ClusterUpdate
58+
// EndpointConfig contains endpoint configuration for a leaf cluster. This
59+
// field is only set for EDS and LOGICAL_DNS clusters.
60+
EndpointConfig *EndpointConfig
61+
// AggregateConfig contains configuration for an aggregate cluster. This
62+
// field is only set for AGGREGATE clusters.
63+
AggregateConfig *AggregateConfig
64+
}
65+
66+
// AggregateConfig holds the configuration for an aggregate cluster.
67+
type AggregateConfig struct {
68+
// LeafClusters contains a prioritized list of names of the leaf clusters
69+
// for the cluster.
70+
LeafClusters []string
71+
}
72+
73+
// EndpointConfig contains configuration corresponding to the endpoints in a
74+
// cluster. Only one of EDSUpdate or DNSEndpoints will be populated based on the
75+
// cluster type.
76+
type EndpointConfig struct {
77+
// Endpoint configurartion for the EDS clusters.
78+
EDSUpdate *EndpointsUpdate
79+
// Endpoint configuration for the LOGICAL_DNS clusters.
80+
DNSEndpoints *DNSUpdate
81+
// ResolutionNote stores error encountered while obtaining endpoints data
82+
// for the cluster. It will contain a nil value when a valid endpoint data is
83+
// received. It contains an error when:
84+
// - an invalid resource is received from the management server or
85+
// - the endpoint resource does not exist on the management server
86+
ResolutionNote error
87+
}
88+
89+
// DNSUpdate represents the result of a DNS resolution, containing a list of
90+
// discovered endpoints.
91+
type DNSUpdate struct {
92+
// Endpoints is the complete list of endpoints returned by the DNS resolver.
93+
Endpoints []resolver.Endpoint
94+
}
95+
96+
// xdsConfigkey is the type used as the key to store XDSConfig in the Attributes
97+
// field of resolver.State.
98+
type xdsConfigkey struct{}
99+
100+
// SetXDSConfig returns a copy of state in which the Attributes field is updated
101+
// with the XDSConfig.
102+
func SetXDSConfig(state resolver.State, config *XDSConfig) resolver.State {
103+
state.Attributes = state.Attributes.WithValue(xdsConfigkey{}, config)
104+
return state
105+
}
106+
107+
// XDSConfigFromResolverState returns XDSConfig stored as an attribute in the
108+
// resolver state.
109+
func XDSConfigFromResolverState(state resolver.State) *XDSConfig {
110+
if v := state.Attributes.Value(xdsConfigkey{}); v != nil {
111+
return v.(*XDSConfig)
112+
}
113+
return nil
114+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
*
3+
* Copyright 2025 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package xdsdepmgr
20+
21+
import "google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
22+
23+
type listenerWatcher struct {
24+
resourceName string
25+
cancel func()
26+
depMgr *DependencyManager
27+
}
28+
29+
func newListenerWatcher(resourceName string, depMgr *DependencyManager) *listenerWatcher {
30+
lw := &listenerWatcher{resourceName: resourceName, depMgr: depMgr}
31+
lw.cancel = xdsresource.WatchListener(depMgr.xdsClient, resourceName, lw)
32+
return lw
33+
}
34+
35+
func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
36+
l.depMgr.onListenerResourceUpdate(update, onDone)
37+
}
38+
39+
func (l *listenerWatcher) ResourceError(err error, onDone func()) {
40+
l.depMgr.onListenerResourceError(err, onDone)
41+
}
42+
43+
func (l *listenerWatcher) AmbientError(err error, onDone func()) {
44+
l.depMgr.onListenerResourceAmbientError(err, onDone)
45+
}
46+
47+
func (l *listenerWatcher) stop() {
48+
l.cancel()
49+
if l.depMgr.logger.V(2) {
50+
l.depMgr.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
51+
}
52+
}
53+
54+
type routeConfigWatcher struct {
55+
resourceName string
56+
cancel func()
57+
depMgr *DependencyManager
58+
}
59+
60+
func newRouteConfigWatcher(resourceName string, depMgr *DependencyManager) *routeConfigWatcher {
61+
rw := &routeConfigWatcher{resourceName: resourceName, depMgr: depMgr}
62+
rw.cancel = xdsresource.WatchRouteConfig(depMgr.xdsClient, resourceName, rw)
63+
return rw
64+
}
65+
66+
func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigUpdate, onDone func()) {
67+
r.depMgr.onRouteConfigResourceUpdate(r.resourceName, u, onDone)
68+
}
69+
70+
func (r *routeConfigWatcher) ResourceError(err error, onDone func()) {
71+
r.depMgr.onRouteConfigResourceError(r.resourceName, err, onDone)
72+
}
73+
74+
func (r *routeConfigWatcher) AmbientError(err error, onDone func()) {
75+
r.depMgr.onRouteConfigResourceAmbientError(r.resourceName, err, onDone)
76+
}
77+
78+
func (r *routeConfigWatcher) stop() {
79+
r.cancel()
80+
if r.depMgr.logger.V(2) {
81+
r.depMgr.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
82+
}
83+
}

0 commit comments

Comments
 (0)