Skip to content

xds: fix support for load reporting in LOGICAL_DNS clusters #8170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []cluste
Cluster: cluster.ClusterName,
DNSHostname: cluster.DNSHostName,
MaxConcurrentRequests: cluster.MaxRequests,
LoadReportingServer: cluster.LRSServerConfig,
}
}
odJSON := cluster.OutlierDetection
Expand Down
91 changes: 91 additions & 0 deletions xds/internal/balancer/clusterimpl/tests/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,94 @@ func hostAndPortFromAddress(t *testing.T, addr string) (string, uint32) {
}
return host, uint32(port)
}

// Tests that LRS works correctly in a LOGICAL_DNS cluster.
func (s) TestLRSLogicalDNS(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add docstring for the test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Create an xDS management server that serves ADS and LRS requests.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{SupportLoadReportingService: true})

// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
testutils.CreateBootstrapFileForTesting(t, bc)

// Create an xDS resolver with the above bootstrap configuration.
var resolverBuilder resolver.Builder
var err error
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
resolverBuilder, err = newResolver.(func([]byte) (resolver.Builder, error))(bc)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
}

// Start a server backend exposing the test service.
server := stubserver.StartTestService(t, nil)
defer server.Stop()
host, port := hostAndPortFromAddress(t, server.Address)

// Configure the xDS management server with default resources. Override the
// default cluster to include an LRS server config pointing to self.
const serviceName = "my-test-xds-service"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: testutils.ParsePort(t, server.Address),
SecLevel: e2e.SecurityLevelNone,
})
resources.Clusters = []*v3clusterpb.Cluster{
e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: "cluster-" + serviceName,
Type: e2e.ClusterTypeLogicalDNS,
DNSHostName: host,
DNSPort: port,
}),
}
resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
Self: &v3corepb.SelfConfigSource{},
},
}
resources.Endpoints = nil
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// Ensure that an LRS stream is created.
if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(ctx); err != nil {
t.Fatalf("Failure when waiting for an LRS stream to be opened: %v", err)
}

// Handle the initial LRS request from the xDS client.
if _, err = mgmtServer.LRSServer.LRSRequestChan.Receive(ctx); err != nil {
t.Fatalf("Failure waiting for initial LRS request: %v", err)
}

resp := fakeserver.Response{
Resp: &v3lrspb.LoadStatsResponse{
SendAllClusters: true,
LoadReportingInterval: durationpb.New(10 * time.Millisecond),
},
}
mgmtServer.LRSServer.LRSResponseChan <- &resp

// Wait for load to be reported for locality of server 1.
if err := waitForSuccessfulLoadReport(ctx, mgmtServer.LRSServer, ""); err != nil {
t.Fatalf("Server did not receive load due to error: %v", err)
}
}
1 change: 1 addition & 0 deletions xds/internal/balancer/clusterresolver/configbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoin
TelemetryLabels: mechanism.TelemetryLabels,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
LoadReportingServer: mechanism.LoadReportingServer,
}, retEndpoints
}

Expand Down