Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js(-xds): Implement specified resolver and LB policy API changes #2925

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/grpc-js-xds/interop/xds-interop-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import PickResult = grpc.experimental.PickResult;
import PickResultType = grpc.experimental.PickResultType;
import createChildChannelControlHelper = grpc.experimental.createChildChannelControlHelper;
import parseLoadBalancingConfig = grpc.experimental.parseLoadBalancingConfig;
import StatusOr = grpc.experimental.StatusOr;
import { ChannelOptions } from '@grpc/grpc-js';

grpc_xds.register();
Expand Down Expand Up @@ -100,12 +101,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
return false;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options);
return this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options, resolutionNote);
}
exitIdle(): void {
this.child.exitIdle();
Expand Down
48 changes: 26 additions & 22 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import StatusOr = experimental.StatusOr;
import statusOrFromValue = experimental.statusOrFromValue;
import { XdsConfig } from './xds-dependency-manager';
import { LocalityEndpoint, PriorityChildRaw } from './load-balancer-priority';
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
Expand Down Expand Up @@ -205,7 +207,7 @@ function getLeafClusters(xdsConfig: XdsConfig, rootCluster: string, depth = 0):
if (!maybeClusterConfig) {
return [];
}
if (!maybeClusterConfig.success) {
if (!maybeClusterConfig.ok) {
return [rootCluster];
}
if (maybeClusterConfig.value.children.type === 'aggregate') {
Expand Down Expand Up @@ -240,26 +242,27 @@ export class CdsLoadBalancer implements LoadBalancer {
}

updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof CdsLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
return false;
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
const clusterName = lbConfig.getCluster();
const maybeClusterConfig = xdsConfig.clusters.get(clusterName);
if (!maybeClusterConfig) {
trace('Received update with no config for cluster ' + clusterName);
return;
return false;
}
if (!maybeClusterConfig.success) {
if (!maybeClusterConfig.ok) {
this.childBalancer.destroy();
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error), maybeClusterConfig.error.details);
return;
return true;
}
const clusterConfig = maybeClusterConfig.value;

Expand All @@ -270,8 +273,8 @@ export class CdsLoadBalancer implements LoadBalancer {
} catch (e) {
trace('xDS config parsing failed with error ' + (e as Error).message);
const errorMessage = `xDS config parsing failed with error ${(e as Error).message}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `${errorMessage} Resolution note: ${resolutionNote}`}), errorMessage);
return true;
}
const priorityChildren: {[name: string]: PriorityChildRaw} = {};
for (const cluster of leafClusters) {
Expand All @@ -296,16 +299,16 @@ export class CdsLoadBalancer implements LoadBalancer {
} catch (e) {
trace('LB policy config parsing failed with error ' + (e as Error).message);
const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `${errorMessage} Resolution note: ${resolutionNote}`}), errorMessage);
return true;
}
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName});
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName}, resolutionNote);
} else {
if (!clusterConfig.children.endpoints) {
trace('Received update with no resolved endpoints for cluster ' + clusterName);
const errorMessage = `Cluster ${clusterName} resolution failed: ${clusterConfig.children.resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
const newPriorityNames: string[] = [];
const newLocalityPriorities = new Map<string, number>();
Expand All @@ -317,7 +320,7 @@ export class CdsLoadBalancer implements LoadBalancer {
if (AGGREGATE_CLUSTER_BACKWARDS_COMPAT) {
if (typeof options[ROOT_CLUSTER_KEY] === 'string') {
const maybeRootClusterConfig = xdsConfig.clusters.get(options[ROOT_CLUSTER_KEY]);
if (maybeRootClusterConfig?.success) {
if (maybeRootClusterConfig?.ok) {
endpointPickingPolicy = maybeRootClusterConfig.value.cluster.lbPolicyConfig;
}
}
Expand Down Expand Up @@ -409,26 +412,26 @@ export class CdsLoadBalancer implements LoadBalancer {
typedChildConfig = parseLoadBalancingConfig(childConfig);
} catch (e) {
trace('LB policy config parsing failed with error ' + (e as Error).message);
const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}`;
const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
const childOptions: ChannelOptions = {...options};
if (clusterConfig.cluster.securityUpdate) {
const securityUpdate = clusterConfig.cluster.securityUpdate;
const xdsClient = options[XDS_CLIENT_KEY] as XdsClient;
const caCertProvider = xdsClient.getCertificateProvider(securityUpdate.caCertificateProviderInstance);
if (!caCertProvider) {
const errorMessage = `Cluster ${clusterName} configured with CA certificate provider ${securityUpdate.caCertificateProviderInstance} not in bootstrap`;
const errorMessage = `Cluster ${clusterName} configured with CA certificate provider ${securityUpdate.caCertificateProviderInstance} not in bootstrap. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
if (securityUpdate.identityCertificateProviderInstance) {
const identityCertProvider = xdsClient.getCertificateProvider(securityUpdate.identityCertificateProviderInstance);
if (!identityCertProvider) {
const errorMessage = `Cluster ${clusterName} configured with identity certificate provider ${securityUpdate.identityCertificateProviderInstance} not in bootstrap`;
const errorMessage = `Cluster ${clusterName} configured with identity certificate provider ${securityUpdate.identityCertificateProviderInstance} not in bootstrap. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
childOptions[IDENTITY_CERT_PROVIDER_KEY] = identityCertProvider;
}
Expand All @@ -440,8 +443,9 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Configured subject alternative name matcher: ' + sanMatcher);
childOptions[SAN_MATCHER_KEY] = this.latestSanMatcher;
}
this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, childOptions);
this.childBalancer.updateAddressList(statusOrFromValue(childEndpointList), typedChildConfig, childOptions, resolutionNote);
}
return true;
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand Down
49 changes: 33 additions & 16 deletions packages/grpc-js-xds/src/load-balancer-priority.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import QueuePicker = experimental.QueuePicker;
import UnavailablePicker = experimental.UnavailablePicker;
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import selectLbConfigFromList = experimental.selectLbConfigFromList;
import StatusOr = experimental.StatusOr;
import statusOrFromValue = experimental.statusOrFromValue;
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';

const TRACER_NAME = 'priority';
Expand Down Expand Up @@ -155,9 +157,10 @@ class PriorityLoadBalancingConfig implements TypedLoadBalancingConfig {

interface PriorityChildBalancer {
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
attributes: { [key: string]: unknown },
resolutionNote: string
): void;
exitIdle(): void;
resetBackoff(): void;
Expand Down Expand Up @@ -240,11 +243,12 @@ export class PriorityLoadBalancer implements LoadBalancer {
}

updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
attributes: { [key: string]: unknown },
resolutionNote: string
): void {
this.childBalancer.updateAddressList(endpointList, lbConfig, attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig, attributes, resolutionNote);
}

exitIdle() {
Expand Down Expand Up @@ -332,6 +336,8 @@ export class PriorityLoadBalancer implements LoadBalancer {

private updatesPaused = false;

private latestResolutionNote: string = '';

constructor(private channelControlHelper: ChannelControlHelper) {}

private updateState(state: ConnectivityState, picker: Picker, errorMessage: string | null) {
Expand Down Expand Up @@ -401,9 +407,10 @@ export class PriorityLoadBalancer implements LoadBalancer {
child = new this.PriorityChildImpl(this, childName, childUpdate.ignoreReresolutionRequests);
this.children.set(childName, child);
child.updateAddressList(
childUpdate.subchannelAddress,
statusOrFromValue(childUpdate.subchannelAddress),
childUpdate.lbConfig,
this.latestOptions
this.latestOptions,
this.latestResolutionNote
);
} else {
/* We're going to try to use this child, so reactivate it if it has been
Expand Down Expand Up @@ -440,14 +447,21 @@ export class PriorityLoadBalancer implements LoadBalancer {
}

updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof PriorityLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;
}
if (!endpointList.ok) {
if (this.latestUpdates.size === 0) {
this.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker(endpointList.error), endpointList.error.details);
}
return true;
}
/* For each address, the first element of its localityPath array determines
* which child it belongs to. So we bucket those addresses by that first
Expand All @@ -457,14 +471,14 @@ export class PriorityLoadBalancer implements LoadBalancer {
string,
LocalityEndpoint[]
>();
for (const endpoint of endpointList) {
for (const endpoint of endpointList.value) {
if (!isLocalityEndpoint(endpoint)) {
// Reject address that cannot be prioritized
return;
return false;
}
if (endpoint.localityPath.length < 1) {
// Reject address that cannot be prioritized
return;
return false;
}
const childName = endpoint.localityPath[0];
const childAddress: LocalityEndpoint = {
Expand Down Expand Up @@ -495,9 +509,10 @@ export class PriorityLoadBalancer implements LoadBalancer {
const existingChild = this.children.get(childName);
if (existingChild !== undefined) {
existingChild.updateAddressList(
childAddresses,
statusOrFromValue(childAddresses),
childConfig.config,
options
options,
resolutionNote
);
}
}
Expand All @@ -509,7 +524,9 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
}
this.updatesPaused = false;
this.latestResolutionNote = resolutionNote;
this.choosePriority();
return true;
}
exitIdle(): void {
if (this.currentPriority !== null) {
Expand Down
34 changes: 27 additions & 7 deletions packages/grpc-js-xds/src/load-balancer-ring-hash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import UnavailablePicker = experimental.UnavailablePicker;
import subchannelAddressToString = experimental.subchannelAddressToString;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import EndpointMap = experimental.EndpointMap;
import StatusOr = experimental.StatusOr;
import { loadXxhashApi, xxhashApi } from './xxhash';
import { EXPERIMENTAL_RING_HASH } from './environment';
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
Expand Down Expand Up @@ -401,26 +402,44 @@ class RingHashLoadBalancer implements LoadBalancer {
}

updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof RingHashLoadBalancingConfig)) {
trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;
}
if (!endpointList.ok) {
if (this.ring.length === 0) {
this.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(endpointList.error), endpointList.error.details);
}
return true;
}
if (endpointList.value.length === 0) {
for (const ringEntry of this.ring) {
ringEntry.leafBalancer.destroy();
}
this.ring = [];
this.leafMap.clear();
this.leafWeightMap.clear();
const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`;
this.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return false;
}
trace('Received update with config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
this.updatesPaused = true;
this.leafWeightMap.clear();
const dedupedEndpointList: Endpoint[] = [];
for (const endpoint of endpointList) {
for (const endpoint of endpointList.value) {
const leafBalancer = this.leafMap.get(endpoint);
if (leafBalancer) {
leafBalancer.updateEndpoint(endpoint, options);
} else {
this.leafMap.set(
endpoint,
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options)
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options, resolutionNote)
);
}
const weight = this.leafWeightMap.get(endpoint);
Expand All @@ -429,7 +448,7 @@ class RingHashLoadBalancer implements LoadBalancer {
}
this.leafWeightMap.set(endpoint, (weight ?? 0) + (isLocalityEndpoint(endpoint) ? endpoint.endpointWeight : 1));
}
const removedLeaves = this.leafMap.deleteMissing(endpointList);
const removedLeaves = this.leafMap.deleteMissing(endpointList.value);
for (const leaf of removedLeaves) {
leaf.destroy();
}
Expand All @@ -440,6 +459,7 @@ class RingHashLoadBalancer implements LoadBalancer {
this.calculateAndUpdateState();
this.maybeProactivelyConnect();
});
return true;
}
exitIdle(): void {
/* This operation does not make sense here. We don't want to make the whole
Expand Down
Loading