From 024d5d8fdf6b1d27c7245738106774c9f106adf8 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 20 Mar 2025 16:14:54 -0700 Subject: [PATCH] grpc-js(-xds): Implement specified resolver and LB policy API changes --- .../grpc-js-xds/interop/xds-interop-client.ts | 7 +- packages/grpc-js-xds/src/load-balancer-cds.ts | 48 +- .../grpc-js-xds/src/load-balancer-priority.ts | 49 +- .../src/load-balancer-ring-hash.ts | 34 +- .../src/load-balancer-weighted-target.ts | 40 +- .../src/load-balancer-xds-cluster-impl.ts | 24 +- .../src/load-balancer-xds-cluster-manager.ts | 14 +- .../src/load-balancer-xds-wrr-locality.ts | 16 +- packages/grpc-js-xds/src/resolver-xds.ts | 19 +- .../grpc-js-xds/src/xds-dependency-manager.ts | 63 +- .../test/test-custom-lb-policies.ts | 7 +- packages/grpc-js/src/call-interface.ts | 29 + packages/grpc-js/src/experimental.ts | 8 +- .../src/load-balancer-child-handler.ts | 10 +- .../src/load-balancer-outlier-detection.ts | 37 +- .../grpc-js/src/load-balancer-pick-first.ts | 45 +- .../grpc-js/src/load-balancer-round-robin.ts | 35 +- packages/grpc-js/src/load-balancer.ts | 11 +- packages/grpc-js/src/resolver-dns.ts | 101 ++- packages/grpc-js/src/resolver-ip.ts | 18 +- packages/grpc-js/src/resolver-uds.ts | 10 +- packages/grpc-js/src/resolver.ts | 45 +- .../grpc-js/src/resolving-load-balancer.ts | 133 ++- packages/grpc-js/src/server.ts | 46 +- packages/grpc-js/test/test-pick-first.ts | 194 +++-- packages/grpc-js/test/test-resolver.ts | 778 +++++++++--------- 26 files changed, 1031 insertions(+), 790 deletions(-) diff --git a/packages/grpc-js-xds/interop/xds-interop-client.ts b/packages/grpc-js-xds/interop/xds-interop-client.ts index 2e142555f..e5145f31f 100644 --- a/packages/grpc-js-xds/interop/xds-interop-client.ts +++ b/packages/grpc-js-xds/interop/xds-interop-client.ts @@ -41,6 +41,7 @@ import PickResult = grpc.experimental.PickResult; import PickResultType = grpc.experimental.PickResultType; import createChildChannelControlHelper = grpc.experimental.createChildChannelControlHelper; import parseLoadBalancingConfig = grpc.experimental.parseLoadBalancingConfig; +import StatusOr = grpc.experimental.StatusOr; import { ChannelOptions } from '@grpc/grpc-js'; grpc_xds.register(); @@ -100,12 +101,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer { }); this.child = new ChildLoadBalancerHandler(childChannelControlHelper); } - updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { + updateAddressList(endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean { if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) { - return; + return false; } this.latestConfig = lbConfig; - this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options); + return this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options, resolutionNote); } exitIdle(): void { this.child.exitIdle(); diff --git a/packages/grpc-js-xds/src/load-balancer-cds.ts b/packages/grpc-js-xds/src/load-balancer-cds.ts index c1d92d69d..43da3b8d5 100644 --- a/packages/grpc-js-xds/src/load-balancer-cds.ts +++ b/packages/grpc-js-xds/src/load-balancer-cds.ts @@ -26,6 +26,8 @@ import ChannelControlHelper = experimental.ChannelControlHelper; import registerLoadBalancerType = experimental.registerLoadBalancerType; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig; +import StatusOr = experimental.StatusOr; +import statusOrFromValue = experimental.statusOrFromValue; import { XdsConfig } from './xds-dependency-manager'; import { LocalityEndpoint, PriorityChildRaw } from './load-balancer-priority'; import { Locality__Output } from './generated/envoy/config/core/v3/Locality'; @@ -205,7 +207,7 @@ function getLeafClusters(xdsConfig: XdsConfig, rootCluster: string, depth = 0): if (!maybeClusterConfig) { return []; } - if (!maybeClusterConfig.success) { + if (!maybeClusterConfig.ok) { return [rootCluster]; } if (maybeClusterConfig.value.children.type === 'aggregate') { @@ -240,13 +242,14 @@ export class CdsLoadBalancer implements LoadBalancer { } updateAddressList( - endpointList: Endpoint[], + endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, - options: ChannelOptions - ): void { + options: ChannelOptions, + resolutionNote: string + ): boolean { if (!(lbConfig instanceof CdsLoadBalancingConfig)) { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2)); - return; + return false; } trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2)); const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig; @@ -254,12 +257,12 @@ export class CdsLoadBalancer implements LoadBalancer { const maybeClusterConfig = xdsConfig.clusters.get(clusterName); if (!maybeClusterConfig) { trace('Received update with no config for cluster ' + clusterName); - return; + return false; } - if (!maybeClusterConfig.success) { + if (!maybeClusterConfig.ok) { this.childBalancer.destroy(); this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error), maybeClusterConfig.error.details); - return; + return true; } const clusterConfig = maybeClusterConfig.value; @@ -270,8 +273,8 @@ export class CdsLoadBalancer implements LoadBalancer { } catch (e) { trace('xDS config parsing failed with error ' + (e as Error).message); const errorMessage = `xDS config parsing failed with error ${(e as Error).message}`; - this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage); - return; + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `${errorMessage} Resolution note: ${resolutionNote}`}), errorMessage); + return true; } const priorityChildren: {[name: string]: PriorityChildRaw} = {}; for (const cluster of leafClusters) { @@ -296,16 +299,16 @@ export class CdsLoadBalancer implements LoadBalancer { } catch (e) { trace('LB policy config parsing failed with error ' + (e as Error).message); const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}`; - this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage); - return; + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `${errorMessage} Resolution note: ${resolutionNote}`}), errorMessage); + return true; } - this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName}); + this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName}, resolutionNote); } else { if (!clusterConfig.children.endpoints) { trace('Received update with no resolved endpoints for cluster ' + clusterName); const errorMessage = `Cluster ${clusterName} resolution failed: ${clusterConfig.children.resolutionNote}`; this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage); - return; + return false; } const newPriorityNames: string[] = []; const newLocalityPriorities = new Map(); @@ -317,7 +320,7 @@ export class CdsLoadBalancer implements LoadBalancer { if (AGGREGATE_CLUSTER_BACKWARDS_COMPAT) { if (typeof options[ROOT_CLUSTER_KEY] === 'string') { const maybeRootClusterConfig = xdsConfig.clusters.get(options[ROOT_CLUSTER_KEY]); - if (maybeRootClusterConfig?.success) { + if (maybeRootClusterConfig?.ok) { endpointPickingPolicy = maybeRootClusterConfig.value.cluster.lbPolicyConfig; } } @@ -409,9 +412,9 @@ export class CdsLoadBalancer implements LoadBalancer { typedChildConfig = parseLoadBalancingConfig(childConfig); } catch (e) { trace('LB policy config parsing failed with error ' + (e as Error).message); - const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}`; + const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}. Resolution note: ${resolutionNote}`; this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage); - return; + return false; } const childOptions: ChannelOptions = {...options}; if (clusterConfig.cluster.securityUpdate) { @@ -419,16 +422,16 @@ export class CdsLoadBalancer implements LoadBalancer { const xdsClient = options[XDS_CLIENT_KEY] as XdsClient; const caCertProvider = xdsClient.getCertificateProvider(securityUpdate.caCertificateProviderInstance); if (!caCertProvider) { - const errorMessage = `Cluster ${clusterName} configured with CA certificate provider ${securityUpdate.caCertificateProviderInstance} not in bootstrap`; + const errorMessage = `Cluster ${clusterName} configured with CA certificate provider ${securityUpdate.caCertificateProviderInstance} not in bootstrap. Resolution note: ${resolutionNote}`; this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage); - return; + return false; } if (securityUpdate.identityCertificateProviderInstance) { const identityCertProvider = xdsClient.getCertificateProvider(securityUpdate.identityCertificateProviderInstance); if (!identityCertProvider) { - const errorMessage = `Cluster ${clusterName} configured with identity certificate provider ${securityUpdate.identityCertificateProviderInstance} not in bootstrap`; + const errorMessage = `Cluster ${clusterName} configured with identity certificate provider ${securityUpdate.identityCertificateProviderInstance} not in bootstrap. Resolution note: ${resolutionNote}`; this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage); - return; + return false; } childOptions[IDENTITY_CERT_PROVIDER_KEY] = identityCertProvider; } @@ -440,8 +443,9 @@ export class CdsLoadBalancer implements LoadBalancer { trace('Configured subject alternative name matcher: ' + sanMatcher); childOptions[SAN_MATCHER_KEY] = this.latestSanMatcher; } - this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, childOptions); + this.childBalancer.updateAddressList(statusOrFromValue(childEndpointList), typedChildConfig, childOptions, resolutionNote); } + return true; } exitIdle(): void { this.childBalancer.exitIdle(); diff --git a/packages/grpc-js-xds/src/load-balancer-priority.ts b/packages/grpc-js-xds/src/load-balancer-priority.ts index 9d6ed5c64..adba3ea6b 100644 --- a/packages/grpc-js-xds/src/load-balancer-priority.ts +++ b/packages/grpc-js-xds/src/load-balancer-priority.ts @@ -27,6 +27,8 @@ import QueuePicker = experimental.QueuePicker; import UnavailablePicker = experimental.UnavailablePicker; import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; import selectLbConfigFromList = experimental.selectLbConfigFromList; +import StatusOr = experimental.StatusOr; +import statusOrFromValue = experimental.statusOrFromValue; import { Locality__Output } from './generated/envoy/config/core/v3/Locality'; const TRACER_NAME = 'priority'; @@ -155,9 +157,10 @@ class PriorityLoadBalancingConfig implements TypedLoadBalancingConfig { interface PriorityChildBalancer { updateAddressList( - endpointList: Endpoint[], + endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, - attributes: { [key: string]: unknown } + attributes: { [key: string]: unknown }, + resolutionNote: string ): void; exitIdle(): void; resetBackoff(): void; @@ -240,11 +243,12 @@ export class PriorityLoadBalancer implements LoadBalancer { } updateAddressList( - endpointList: Endpoint[], + endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, - attributes: { [key: string]: unknown } + attributes: { [key: string]: unknown }, + resolutionNote: string ): void { - this.childBalancer.updateAddressList(endpointList, lbConfig, attributes); + this.childBalancer.updateAddressList(endpointList, lbConfig, attributes, resolutionNote); } exitIdle() { @@ -332,6 +336,8 @@ export class PriorityLoadBalancer implements LoadBalancer { private updatesPaused = false; + private latestResolutionNote: string = ''; + constructor(private channelControlHelper: ChannelControlHelper) {} private updateState(state: ConnectivityState, picker: Picker, errorMessage: string | null) { @@ -401,9 +407,10 @@ export class PriorityLoadBalancer implements LoadBalancer { child = new this.PriorityChildImpl(this, childName, childUpdate.ignoreReresolutionRequests); this.children.set(childName, child); child.updateAddressList( - childUpdate.subchannelAddress, + statusOrFromValue(childUpdate.subchannelAddress), childUpdate.lbConfig, - this.latestOptions + this.latestOptions, + this.latestResolutionNote ); } else { /* We're going to try to use this child, so reactivate it if it has been @@ -440,14 +447,21 @@ export class PriorityLoadBalancer implements LoadBalancer { } updateAddressList( - endpointList: Endpoint[], + endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, - options: ChannelOptions - ): void { + options: ChannelOptions, + resolutionNote: string + ): boolean { if (!(lbConfig instanceof PriorityLoadBalancingConfig)) { // Reject a config of the wrong type trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); - return; + return false; + } + if (!endpointList.ok) { + if (this.latestUpdates.size === 0) { + this.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker(endpointList.error), endpointList.error.details); + } + return true; } /* For each address, the first element of its localityPath array determines * which child it belongs to. So we bucket those addresses by that first @@ -457,14 +471,14 @@ export class PriorityLoadBalancer implements LoadBalancer { string, LocalityEndpoint[] >(); - for (const endpoint of endpointList) { + for (const endpoint of endpointList.value) { if (!isLocalityEndpoint(endpoint)) { // Reject address that cannot be prioritized - return; + return false; } if (endpoint.localityPath.length < 1) { // Reject address that cannot be prioritized - return; + return false; } const childName = endpoint.localityPath[0]; const childAddress: LocalityEndpoint = { @@ -495,9 +509,10 @@ export class PriorityLoadBalancer implements LoadBalancer { const existingChild = this.children.get(childName); if (existingChild !== undefined) { existingChild.updateAddressList( - childAddresses, + statusOrFromValue(childAddresses), childConfig.config, - options + options, + resolutionNote ); } } @@ -509,7 +524,9 @@ export class PriorityLoadBalancer implements LoadBalancer { } } this.updatesPaused = false; + this.latestResolutionNote = resolutionNote; this.choosePriority(); + return true; } exitIdle(): void { if (this.currentPriority !== null) { diff --git a/packages/grpc-js-xds/src/load-balancer-ring-hash.ts b/packages/grpc-js-xds/src/load-balancer-ring-hash.ts index af3ebac86..4b01e2a31 100644 --- a/packages/grpc-js-xds/src/load-balancer-ring-hash.ts +++ b/packages/grpc-js-xds/src/load-balancer-ring-hash.ts @@ -31,6 +31,7 @@ import UnavailablePicker = experimental.UnavailablePicker; import subchannelAddressToString = experimental.subchannelAddressToString; import registerLoadBalancerType = experimental.registerLoadBalancerType; import EndpointMap = experimental.EndpointMap; +import StatusOr = experimental.StatusOr; import { loadXxhashApi, xxhashApi } from './xxhash'; import { EXPERIMENTAL_RING_HASH } from './environment'; import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util'; @@ -401,26 +402,44 @@ class RingHashLoadBalancer implements LoadBalancer { } updateAddressList( - endpointList: Endpoint[], + endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, - options: ChannelOptions - ): void { + options: ChannelOptions, + resolutionNote: string + ): boolean { if (!(lbConfig instanceof RingHashLoadBalancingConfig)) { trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); - return; + return false; + } + if (!endpointList.ok) { + if (this.ring.length === 0) { + this.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(endpointList.error), endpointList.error.details); + } + return true; + } + if (endpointList.value.length === 0) { + for (const ringEntry of this.ring) { + ringEntry.leafBalancer.destroy(); + } + this.ring = []; + this.leafMap.clear(); + this.leafWeightMap.clear(); + const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`; + this.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage); + return false; } trace('Received update with config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); this.updatesPaused = true; this.leafWeightMap.clear(); const dedupedEndpointList: Endpoint[] = []; - for (const endpoint of endpointList) { + for (const endpoint of endpointList.value) { const leafBalancer = this.leafMap.get(endpoint); if (leafBalancer) { leafBalancer.updateEndpoint(endpoint, options); } else { this.leafMap.set( endpoint, - new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options) + new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options, resolutionNote) ); } const weight = this.leafWeightMap.get(endpoint); @@ -429,7 +448,7 @@ class RingHashLoadBalancer implements LoadBalancer { } this.leafWeightMap.set(endpoint, (weight ?? 0) + (isLocalityEndpoint(endpoint) ? endpoint.endpointWeight : 1)); } - const removedLeaves = this.leafMap.deleteMissing(endpointList); + const removedLeaves = this.leafMap.deleteMissing(endpointList.value); for (const leaf of removedLeaves) { leaf.destroy(); } @@ -440,6 +459,7 @@ class RingHashLoadBalancer implements LoadBalancer { this.calculateAndUpdateState(); this.maybeProactivelyConnect(); }); + return true; } exitIdle(): void { /* This operation does not make sense here. We don't want to make the whole diff --git a/packages/grpc-js-xds/src/load-balancer-weighted-target.ts b/packages/grpc-js-xds/src/load-balancer-weighted-target.ts index 2e73a7d97..ed66b8bf8 100644 --- a/packages/grpc-js-xds/src/load-balancer-weighted-target.ts +++ b/packages/grpc-js-xds/src/load-balancer-weighted-target.ts @@ -15,7 +15,7 @@ * */ -import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig, ChannelOptions } from "@grpc/grpc-js"; +import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig, ChannelOptions, connectivityState, status } from "@grpc/grpc-js"; import { isLocalityEndpoint, LocalityEndpoint } from "./load-balancer-priority"; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import LoadBalancer = experimental.LoadBalancer; @@ -30,6 +30,8 @@ import UnavailablePicker = experimental.UnavailablePicker; import Endpoint = experimental.Endpoint; import endpointToString = experimental.endpointToString; import selectLbConfigFromList = experimental.selectLbConfigFromList; +import StatusOr = experimental.StatusOr; +import statusOrFromValue = experimental.statusOrFromValue; const TRACER_NAME = 'weighted_target'; @@ -154,7 +156,7 @@ class WeightedTargetPicker implements Picker { } interface WeightedChild { - updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void; + updateAddressList(endpointList: StatusOr, lbConfig: WeightedTarget, options: ChannelOptions, resolutionNote: string): void; exitIdle(): void; resetBackoff(): void; destroy(): void; @@ -193,9 +195,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { this.parent.maybeUpdateState(); } - updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, options: ChannelOptions): void { + updateAddressList(endpointList: StatusOr, lbConfig: WeightedTarget, options: ChannelOptions, resolutionNote: string): void { this.weight = lbConfig.weight; - this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, options); + this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, options, resolutionNote); } exitIdle(): void { this.childBalancer.exitIdle(); @@ -325,26 +327,41 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { this.channelControlHelper.updateState(connectivityState, picker, errorMessage); } - updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { + updateAddressList(addressList: StatusOr, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean { if (!(lbConfig instanceof WeightedTargetLoadBalancingConfig)) { // Reject a config of the wrong type trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); - return; + return false; + } + if (!addressList.ok) { + if (this.targets.size === 0) { + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(addressList.error), addressList.error.details); + } + return true; + } + if (addressList.value.length === 0) { + for (const target of this.targets.values()) { + target.destroy(); + } + this.targets.clear(); + this.targetList = []; + const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`; + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage); + return false; } - /* For each address, the first element of its localityPath array determines * which child it belongs to. So we bucket those addresses by that first * element, and pass along the rest of the localityPath for that child * to use. */ const childEndpointMap = new Map(); - for (const address of addressList) { + for (const address of addressList.value) { if (!isLocalityEndpoint(address)) { // Reject address that cannot be associated with targets - return; + return false; } if (address.localityPath.length < 1) { // Reject address that cannot be associated with targets - return; + return false; } const childName = address.localityPath[0]; const childAddress: LocalityEndpoint = { @@ -371,7 +388,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { } const targetEndpoints = childEndpointMap.get(targetName) ?? []; trace('Assigning target ' + targetName + ' address list ' + targetEndpoints.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')')); - target.updateAddressList(targetEndpoints, targetConfig, options); + target.updateAddressList(statusOrFromValue(targetEndpoints), targetConfig, options, resolutionNote); } // Deactivate targets that are not in the new config @@ -384,6 +401,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { this.updatesPaused = false; this.updateState(); + return true; } exitIdle(): void { for (const targetName of this.targetList) { diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts index a7a981090..d00c02cae 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts @@ -37,6 +37,7 @@ import selectLbConfigFromList = experimental.selectLbConfigFromList; import SubchannelInterface = experimental.SubchannelInterface; import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper; import UnavailablePicker = experimental.UnavailablePicker; +import StatusOr = experimental.StatusOr; import { Locality__Output } from "./generated/envoy/config/core/v3/Locality"; import { ClusterConfig, XdsConfig } from "./xds-dependency-manager"; import { CdsUpdate } from "./xds-resource-type/cluster-resource-type"; @@ -206,7 +207,7 @@ function getCallCounterMapKey(cluster: string, edsServiceName?: string): string class XdsClusterImplBalancer implements LoadBalancer { private childBalancer: ChildLoadBalancerHandler; - private lastestEndpointList: Endpoint[] | null = null; + private lastestEndpointList: StatusOr | null = null; private latestConfig: XdsClusterImplLoadBalancingConfig | null = null; private clusterDropStats: XdsClusterDropStats | null = null; private xdsClient: XdsClient | null = null; @@ -215,12 +216,12 @@ class XdsClusterImplBalancer implements LoadBalancer { constructor(private readonly channelControlHelper: ChannelControlHelper) { this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, { createSubchannel: (subchannelAddress, subchannelArgs) => { - if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) { - throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated'); + if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.lastestEndpointList.ok || !this.latestClusterConfig) { + throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated or with resolver error'); } const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs); let locality: Locality__Output | null = null; - for (const endpoint of this.lastestEndpointList) { + for (const endpoint of this.lastestEndpointList.value) { if (endpointHasAddress(endpoint, subchannelAddress)) { locality = (endpoint as LocalityEndpoint).locality; } @@ -251,28 +252,28 @@ class XdsClusterImplBalancer implements LoadBalancer { } })); } - updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { + updateAddressList(endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean { if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); - return; + return false; } trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig; const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster()); if (!maybeClusterConfig) { trace('Received update with no config for cluster ' + lbConfig.getCluster()); - return; + return false; } - if (!maybeClusterConfig.success) { + if (!maybeClusterConfig.ok) { this.latestClusterConfig = null; this.childBalancer.destroy(); this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error), maybeClusterConfig.error.details); - return; + return false; } const clusterConfig = maybeClusterConfig.value; if (clusterConfig.children.type === 'aggregate') { trace('Received update for aggregate cluster ' + lbConfig.getCluster()); - return; + return false; } if (!clusterConfig.children.endpoints) { this.childBalancer.destroy(); @@ -291,7 +292,8 @@ class XdsClusterImplBalancer implements LoadBalancer { ); } - this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), options); + this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), options, resolutionNote); + return true; } exitIdle(): void { this.childBalancer.exitIdle(); diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts index 9dc4ee948..59d15dde2 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts @@ -30,6 +30,7 @@ import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; import ChannelControlHelper = experimental.ChannelControlHelper; import selectLbConfigFromList = experimental.selectLbConfigFromList; import registerLoadBalancerType = experimental.registerLoadBalancerType; +import StatusOr = experimental.StatusOr; const TRACER_NAME = 'xds_cluster_manager'; @@ -111,7 +112,7 @@ class XdsClusterManagerPicker implements Picker { } interface XdsClusterManagerChild { - updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void; + updateAddressList(endpointList: StatusOr, childConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): void; exitIdle(): void; resetBackoff(): void; destroy(): void; @@ -145,8 +146,8 @@ class XdsClusterManager implements LoadBalancer { } this.parent.maybeUpdateState(); } - updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { - this.childBalancer.updateAddressList(endpointList, childConfig, options); + updateAddressList(endpointList: StatusOr, childConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): void { + this.childBalancer.updateAddressList(endpointList, childConfig, options, resolutionNote); } exitIdle(): void { this.childBalancer.exitIdle(); @@ -213,11 +214,11 @@ class XdsClusterManager implements LoadBalancer { this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap), errorMessage); } - updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { + updateAddressList(endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean { if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) { // Reject a config of the wrong type trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); - return; + return false;; } trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); const configChildren = lbConfig.getChildren(); @@ -240,10 +241,11 @@ class XdsClusterManager implements LoadBalancer { child = new this.XdsClusterManagerChildImpl(this, name); this.children.set(name, child); } - child.updateAddressList(endpointList, childConfig, options); + child.updateAddressList(endpointList, childConfig, options, resolutionNote); } this.updatesPaused = false; this.updateState(); + return true; } exitIdle(): void { for (const child of this.children.values()) { diff --git a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts index be353d29d..01e3b09b5 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts @@ -29,6 +29,7 @@ import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; import Endpoint = experimental.Endpoint; import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig; import registerLoadBalancerType = experimental.registerLoadBalancerType; +import StatusOr = experimental.StatusOr; import { Any__Output } from "./generated/google/protobuf/Any"; import { WrrLocality__Output } from "./generated/envoy/extensions/load_balancing_policies/wrr_locality/v3/WrrLocality"; import { TypedExtensionConfig__Output } from "./generated/envoy/config/core/v3/TypedExtensionConfig"; @@ -76,15 +77,19 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer { constructor(private readonly channelControlHelper: ChannelControlHelper) { this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); } - updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { + updateAddressList(endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean { if (!(lbConfig instanceof XdsWrrLocalityLoadBalancingConfig)) { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2)); - return; + return false; + } + if (!endpointList.ok) { + this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig({weighted_target: { targets: [] }}), options, resolutionNote); + return true; } const targets: {[localityName: string]: WeightedTargetRaw} = {}; - for (const address of endpointList) { + for (const address of endpointList.value) { if (!isLocalityEndpoint(address)) { - return; + return false; } const localityName = localityToName(address.locality); if (!(localityName in targets)) { @@ -99,7 +104,8 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer { targets: targets } }; - this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), options); + this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), options, resolutionNote); + return true; } exitIdle(): void { this.childBalancer.exitIdle(); diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 980ac982c..80c385188 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -41,6 +41,9 @@ import { loadXxhashApi } from './xxhash'; import { formatTemplateString } from './xds-bootstrap'; import { getPredicateForMatcher } from './route'; import { XdsConfig, XdsConfigWatcher, XdsDependencyManager } from './xds-dependency-manager'; +import statusOrFromValue = experimental.statusOrFromValue; +import statusOrFromError = experimental.statusOrFromError; +import CHANNEL_ARGS_CONFIG_SELECTOR_KEY = experimental.CHANNEL_ARGS_CONFIG_SELECTOR_KEY; const TRACER_NAME = 'xds_resolver'; @@ -143,7 +146,7 @@ class XdsResolver implements Resolver { trace('Resolution error for target ' + uriToString(this.target) + ': ' + context + ' does not exist'); /* Return an empty endpoint list and service config, to explicitly * invalidate any previously returned service config */ - this.listener.onSuccessfulResolution([], null, null, null, {}); + this.listener(statusOrFromValue([]), {}, null, ''); } } } @@ -407,20 +410,20 @@ class XdsResolver implements Resolver { methodConfig: [], loadBalancingConfig: [lbPolicyConfig] } - this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, { + this.listener(statusOrFromValue([]), { [XDS_CLIENT_KEY]: this.xdsClient, - [XDS_CONFIG_KEY]: xdsConfig - }); + [XDS_CONFIG_KEY]: xdsConfig, + [CHANNEL_ARGS_CONFIG_SELECTOR_KEY]: configSelector + }, statusOrFromValue(serviceConfig), ''); } private reportResolutionError(reason: string) { - this.listener.onError({ + this.listener(statusOrFromError({ code: status.UNAVAILABLE, details: `xDS name resolution failed for target ${uriToString( this.target - )}: ${reason}`, - metadata: new Metadata(), - }); + )}: ${reason}` + }), {}, null, ''); } private startResolution(): void { diff --git a/packages/grpc-js-xds/src/xds-dependency-manager.ts b/packages/grpc-js-xds/src/xds-dependency-manager.ts index 40aa9f6e3..1263d18ac 100644 --- a/packages/grpc-js-xds/src/xds-dependency-manager.ts +++ b/packages/grpc-js-xds/src/xds-dependency-manager.ts @@ -25,6 +25,7 @@ import { DropCategory } from "./load-balancer-xds-cluster-impl"; import Endpoint = experimental.Endpoint; import Resolver = experimental.Resolver; import createResolver = experimental.createResolver; +import StatusOr = experimental.StatusOr; import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from "./resources"; import { RouteConfigurationResourceType } from "./xds-resource-type/route-config-resource-type"; import { ListenerResourceType } from "./xds-resource-type/listener-resource-type"; @@ -75,14 +76,6 @@ export interface ClusterConfig { children: EndpointConfig | AggregateConfig; } -export type StatusOr = { - success: true; - value: T -} | { - success: false; - error: StatusObject; -} - export interface ClusterResult { clusterConfig?: ClusterConfig; status?: StatusObject; @@ -159,7 +152,7 @@ function isClusterTreeFullyUpdated(tree: ClusterGraph, roots: string[]): Cluster reason: 'Cluster entry ' + next + ' not updated' }; } - if (tree[next].latestUpdate.success) { + if (tree[next].latestUpdate.ok) { if (tree[next].latestUpdate.value.type !== 'AGGREGATE') { if (!(tree[next].latestUpdate.value.latestUpdate)) { return { @@ -470,7 +463,7 @@ export class XdsDependencyManager { this.trace('Not sending update: Cluster entry ' + clusterName + ' not updated (not caught by isClusterTreeFullyUpdated)'); return; } - if (entry.latestUpdate.success) { + if (entry.latestUpdate.ok) { let clusterChildren: EndpointConfig | AggregateConfig; if (entry.latestUpdate.value.type === 'AGGREGATE') { clusterChildren = { @@ -485,7 +478,7 @@ export class XdsDependencyManager { }; } update.clusters.set(clusterName, { - success: true, + ok: true, value: { cluster: entry.latestUpdate.value.cdsUpdate, children: clusterChildren @@ -493,7 +486,7 @@ export class XdsDependencyManager { }); } else { update.clusters.set(clusterName, { - success: false, + ok: false, error: entry.latestUpdate.error }); } @@ -510,7 +503,7 @@ export class XdsDependencyManager { onResourceChanged: (update: CdsUpdate) => { switch (update.type) { case 'AGGREGATE': - if (entry.latestUpdate?.success) { + if (entry.latestUpdate?.ok) { switch (entry.latestUpdate.value.type) { case 'AGGREGATE': break; @@ -525,7 +518,7 @@ export class XdsDependencyManager { } entry.children = update.aggregateChildren; entry.latestUpdate = { - success: true, + ok: true, value: { type: 'AGGREGATE', cdsUpdate: update @@ -539,7 +532,7 @@ export class XdsDependencyManager { break; case 'EDS': const edsServiceName = update.edsServiceName ?? clusterName; - if (entry.latestUpdate?.success) { + if (entry.latestUpdate?.ok) { switch (entry.latestUpdate.value.type) { case 'AGGREGATE': entry.children = []; @@ -566,14 +559,14 @@ export class XdsDependencyManager { } const edsWatcher = new Watcher({ onResourceChanged: (endpoint: ClusterLoadAssignment__Output) => { - if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') { + if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'EDS') { entry.latestUpdate.value.latestUpdate = getEdsResource(endpoint); entry.latestUpdate.value.resolutionNote = undefined; this.maybeSendUpdate(); } }, onError: error => { - if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') { + if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'EDS') { if (!entry.latestUpdate.value.latestUpdate) { entry.latestUpdate.value.resolutionNote = `Control plane error: ${error.details}`; this.maybeSendUpdate(); @@ -581,7 +574,7 @@ export class XdsDependencyManager { } }, onResourceDoesNotExist: () => { - if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') { + if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'EDS') { entry.latestUpdate.value.resolutionNote = 'Resource does not exist'; entry.latestUpdate.value.latestUpdate = undefined; this.maybeSendUpdate(); @@ -589,7 +582,7 @@ export class XdsDependencyManager { } }); entry.latestUpdate = { - success: true, + ok: true, value: { type: 'EDS', cdsUpdate: update, @@ -602,7 +595,7 @@ export class XdsDependencyManager { this.maybeSendUpdate(); break; case 'LOGICAL_DNS': { - if (entry.latestUpdate?.success) { + if (entry.latestUpdate?.ok) { switch (entry.latestUpdate.value.type) { case 'AGGREGATE': entry.children = []; @@ -621,24 +614,24 @@ export class XdsDependencyManager { } } this.trace('Creating DNS resolver for hostname ' + update.dnsHostname!); - const resolver = createResolver({scheme: 'dns', path: update.dnsHostname!}, { - onSuccessfulResolution: endpointList => { - if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'LOGICAL_DNS') { - entry.latestUpdate.value.latestUpdate = getDnsResource(endpointList); + const resolver = createResolver({scheme: 'dns', path: update.dnsHostname!}, endpointList => { + if (endpointList.ok) { + if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'LOGICAL_DNS') { + entry.latestUpdate.value.latestUpdate = getDnsResource(endpointList.value); this.maybeSendUpdate(); } - }, - onError: error => { - if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'LOGICAL_DNS') { + } else { + if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'LOGICAL_DNS') { if (!entry.latestUpdate.value.latestUpdate) { - entry.latestUpdate.value.resolutionNote = `DNS resolution error: ${error.details}`; + entry.latestUpdate.value.resolutionNote = `DNS resolution error: ${endpointList.error.details}`; this.maybeSendUpdate(); } } } + return true; }, {'grpc.service_config_disable_resolution': 1}); entry.latestUpdate = { - success: true, + ok: true, value: { type: 'LOGICAL_DNS', cdsUpdate: update, @@ -653,16 +646,16 @@ export class XdsDependencyManager { } }, onError: error => { - if (!entry.latestUpdate?.success) { + if (!entry.latestUpdate?.ok) { entry.latestUpdate = { - success: false, + ok: false, error: error }; this.maybeSendUpdate(); } }, onResourceDoesNotExist: () => { - if (entry.latestUpdate?.success) { + if (entry.latestUpdate?.ok) { switch (entry.latestUpdate.value.type) { case 'EDS': this.trace('EDS.cancelWatch(' + entry.latestUpdate.value.edsServiceName + '): CDS resource does not exist'); @@ -676,7 +669,7 @@ export class XdsDependencyManager { } } entry.latestUpdate = { - success: false, + ok: false, error: { code: status.UNAVAILABLE, details: `Cluster resource ${clusterName} does not exist`, @@ -718,7 +711,7 @@ export class XdsDependencyManager { return; } const entry = this.clusterForest[clusterName]; - if (entry.latestUpdate?.success) { + if (entry.latestUpdate?.ok) { switch (entry.latestUpdate.value.type) { case 'EDS': this.trace('EDS.cancelWatch(' + entry.latestUpdate.value.edsServiceName + '): Cluster ' + clusterName + ' removed'); @@ -806,7 +799,7 @@ export class XdsDependencyManager { updateResolution() { for (const clusterEntry of Object.values(this.clusterForest)) { - if (clusterEntry.latestUpdate?.success && clusterEntry.latestUpdate.value.type === 'LOGICAL_DNS') { + if (clusterEntry.latestUpdate?.ok && clusterEntry.latestUpdate.value.type === 'LOGICAL_DNS') { clusterEntry.latestUpdate.value.resolver.updateResolution(); } } diff --git a/packages/grpc-js-xds/test/test-custom-lb-policies.ts b/packages/grpc-js-xds/test/test-custom-lb-policies.ts index bec7986b2..111a31b48 100644 --- a/packages/grpc-js-xds/test/test-custom-lb-policies.ts +++ b/packages/grpc-js-xds/test/test-custom-lb-policies.ts @@ -38,6 +38,7 @@ import PickResultType = experimental.PickResultType; import createChildChannelControlHelper = experimental.createChildChannelControlHelper; import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig; import registerLoadBalancerType = experimental.registerLoadBalancerType; +import StatusOr = experimental.StatusOr; import { PickFirst } from "../src/generated/envoy/extensions/load_balancing_policies/pick_first/v3/PickFirst"; const LB_POLICY_NAME = 'test.RpcBehaviorLoadBalancer'; @@ -95,12 +96,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer { }); this.child = new ChildLoadBalancerHandler(childChannelControlHelper); } - updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { + updateAddressList(endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean { if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) { - return; + return false; } this.latestConfig = lbConfig; - this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options); + return this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options, resolutionNote); } exitIdle(): void { this.child.exitIdle(); diff --git a/packages/grpc-js/src/call-interface.ts b/packages/grpc-js/src/call-interface.ts index 3cf70124e..637228c7e 100644 --- a/packages/grpc-js/src/call-interface.ts +++ b/packages/grpc-js/src/call-interface.ts @@ -41,6 +41,35 @@ export type PartialStatusObject = Pick & { metadata?: Metadata | null | undefined; }; +export interface StatusOrOk { + ok: true; + value: T; +} + +export interface StatusOrError { + ok: false; + error: StatusObject; +} + +export type StatusOr = StatusOrOk | StatusOrError; + +export function statusOrFromValue(value: T): StatusOr { + return { + ok: true, + value: value + }; +} + +export function statusOrFromError(error: PartialStatusObject): StatusOr { + return { + ok: false, + error: { + ...error, + metadata: error.metadata ?? new Metadata() + } + }; +} + export const enum WriteFlags { BufferHint = 1, NoCompress = 2, diff --git a/packages/grpc-js/src/experimental.ts b/packages/grpc-js/src/experimental.ts index a381e1d9b..b8f7766bb 100644 --- a/packages/grpc-js/src/experimental.ts +++ b/packages/grpc-js/src/experimental.ts @@ -5,6 +5,7 @@ export { registerResolver, ConfigSelector, createResolver, + CHANNEL_ARGS_CONFIG_SELECTOR_KEY, } from './resolver'; export { GrpcUri, uriToString, splitHostPort, HostPort } from './uri-parser'; export { Duration, durationToMs, parseDuration } from './duration'; @@ -37,7 +38,12 @@ export { PickArgs, PickResultType, } from './picker'; -export { Call as CallStream } from './call-interface'; +export { + Call as CallStream, + StatusOr, + statusOrFromValue, + statusOrFromError +} from './call-interface'; export { Filter, BaseFilter, FilterFactory } from './filter'; export { FilterStackFactory } from './filter-stack'; export { registerAdminService } from './admin'; diff --git a/packages/grpc-js/src/load-balancer-child-handler.ts b/packages/grpc-js/src/load-balancer-child-handler.ts index 1fcb30b31..025780880 100644 --- a/packages/grpc-js/src/load-balancer-child-handler.ts +++ b/packages/grpc-js/src/load-balancer-child-handler.ts @@ -27,6 +27,7 @@ import { ConnectivityState } from './connectivity-state'; import { Picker } from './picker'; import type { ChannelRef, SubchannelRef } from './channelz'; import { SubchannelInterface } from './subchannel-interface'; +import { StatusOr } from './call-interface'; const TYPE_NAME = 'child_load_balancer_helper'; @@ -102,10 +103,11 @@ export class ChildLoadBalancerHandler { * @param attributes */ updateAddressList( - endpointList: Endpoint[], + endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, - options: ChannelOptions - ): void { + options: ChannelOptions, + resolutionNote: string + ): boolean { let childToUpdate: LoadBalancer; if ( this.currentChild === null || @@ -133,7 +135,7 @@ export class ChildLoadBalancerHandler { } } this.latestConfig = lbConfig; - childToUpdate.updateAddressList(endpointList, lbConfig, options); + return childToUpdate.updateAddressList(endpointList, lbConfig, options, resolutionNote); } exitIdle(): void { if (this.currentChild) { diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index a83e40bcf..9a0840da2 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -43,6 +43,7 @@ import { } from './subchannel-interface'; import * as logging from './logging'; import { LoadBalancingConfig } from './service-config'; +import { StatusOr } from './call-interface'; const TRACER_NAME = 'outlier_detection'; @@ -757,28 +758,31 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { } updateAddressList( - endpointList: Endpoint[], + endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, - options: ChannelOptions - ): void { + options: ChannelOptions, + resolutionNote: string + ): boolean { if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) { - return; + return false; } - trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)) - for (const endpoint of endpointList) { - if (!this.entryMap.has(endpoint)) { - trace('Adding map entry for ' + endpointToString(endpoint)); - this.entryMap.set(endpoint, { - counter: new CallCounter(), - currentEjectionTimestamp: null, - ejectionTimeMultiplier: 0, - subchannelWrappers: [], - }); + trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); + if (endpointList.ok) { + for (const endpoint of endpointList.value) { + if (!this.entryMap.has(endpoint)) { + trace('Adding map entry for ' + endpointToString(endpoint)); + this.entryMap.set(endpoint, { + counter: new CallCounter(), + currentEjectionTimestamp: null, + ejectionTimeMultiplier: 0, + subchannelWrappers: [], + }); + } } + this.entryMap.deleteMissing(endpointList.value); } - this.entryMap.deleteMissing(endpointList); const childPolicy = lbConfig.getChildPolicy(); - this.childBalancer.updateAddressList(endpointList, childPolicy, options); + this.childBalancer.updateAddressList(endpointList, childPolicy, options, resolutionNote); if ( lbConfig.getSuccessRateEjectionConfig() || @@ -808,6 +812,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { } this.latestConfig = lbConfig; + return true; } exitIdle(): void { this.childBalancer.exitIdle(); diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 28410ed77..f0df79d26 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -43,6 +43,7 @@ import { import { isTcpSubchannelAddress } from './subchannel-address'; import { isIPv6 } from 'net'; import { ChannelOptions } from './channel-options'; +import { StatusOr, statusOrFromValue } from './call-interface'; const TRACER_NAME = 'pick_first'; @@ -236,6 +237,8 @@ export class PickFirstLoadBalancer implements LoadBalancer { private latestOptions: ChannelOptions = {}; + private latestResolutionNote: string = ''; + /** * Load balancer that attempts to connect to each backend in the address list * in order, and picks the first one that connects, using it for every @@ -277,7 +280,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { ); } } else if (this.latestAddressList?.length === 0) { - const errorMessage = `No connection established. Last error: ${this.lastError}`; + const errorMessage = `No connection established. Last error: ${this.lastError}. Resolution note: ${this.latestResolutionNote}`; this.updateState( ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({ @@ -289,7 +292,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.updateState(ConnectivityState.IDLE, new QueuePicker(this), null); } else { if (this.stickyTransientFailureMode) { - const errorMessage = `No connection established. Last error: ${this.lastError}`; + const errorMessage = `No connection established. Last error: ${this.lastError}. Resolution note: ${this.latestResolutionNote}`; this.updateState( ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({ @@ -505,13 +508,25 @@ export class PickFirstLoadBalancer implements LoadBalancer { } updateAddressList( - endpointList: Endpoint[], + maybeEndpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, - options: ChannelOptions - ): void { + options: ChannelOptions, + resolutionNote: string + ): boolean { if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) { - return; + return false; + } + if (!maybeEndpointList.ok) { + if (this.children.length === 0 && this.currentPick === null) { + this.channelControlHelper.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker(maybeEndpointList.error), + maybeEndpointList.error.details + ); + } + return true; } + let endpointList = maybeEndpointList.value; this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME]; /* Previously, an update would be discarded if it was identical to the * previous update, to minimize churn. Now the DNS resolver is @@ -523,13 +538,17 @@ export class PickFirstLoadBalancer implements LoadBalancer { ...endpointList.map(endpoint => endpoint.addresses) ); trace('updateAddressList([' + rawAddressList.map(address => subchannelAddressToString(address)) + '])'); - if (rawAddressList.length === 0) { - this.lastError = 'No addresses resolved'; - } const addressList = interleaveAddressFamilies(rawAddressList); this.latestAddressList = addressList; this.latestOptions = options; this.connectToAddressList(addressList, options); + this.latestResolutionNote = resolutionNote; + if (rawAddressList.length > 0) { + return true; + } else { + this.lastError = 'No addresses resolved'; + return false; + } } exitIdle() { @@ -570,7 +589,8 @@ export class LeafLoadBalancer { constructor( private endpoint: Endpoint, channelControlHelper: ChannelControlHelper, - private options: ChannelOptions + private options: ChannelOptions, + private resolutionNote: string ) { const childChannelControlHelper = createChildChannelControlHelper( channelControlHelper, @@ -590,9 +610,10 @@ export class LeafLoadBalancer { startConnecting() { this.pickFirstBalancer.updateAddressList( - [this.endpoint], + statusOrFromValue([this.endpoint]), LEAF_CONFIG, - { ...this.options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true } + { ...this.options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true }, + this.resolutionNote ); } diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index 1b1d97fc8..15a071c8f 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -39,6 +39,7 @@ import { } from './subchannel-address'; import { LeafLoadBalancer } from './load-balancer-pick-first'; import { ChannelOptions } from './channel-options'; +import { StatusOr } from './call-interface'; const TRACER_NAME = 'round_robin'; @@ -205,14 +206,38 @@ export class RoundRobinLoadBalancer implements LoadBalancer { for (const child of this.children) { child.destroy(); } + this.children = []; } updateAddressList( - endpointList: Endpoint[], + maybeEndpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, - options: ChannelOptions - ): void { + options: ChannelOptions, + resolutionNote: string + ): boolean { + if (!(lbConfig instanceof RoundRobinLoadBalancingConfig)) { + return false; + } + if (!maybeEndpointList.ok) { + if (this.children.length === 0) { + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker(maybeEndpointList.error), + maybeEndpointList.error.details + ); + } + return true; + } + const endpointList = maybeEndpointList.value; this.resetSubchannelList(); + if (endpointList.length === 0) { + const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`; + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker({details: errorMessage}), + errorMessage + ); + } trace('Connect to endpoint list ' + endpointList.map(endpointToString)); this.updatesPaused = true; this.children = endpointList.map( @@ -220,7 +245,8 @@ export class RoundRobinLoadBalancer implements LoadBalancer { new LeafLoadBalancer( endpoint, this.childChannelControlHelper, - options + options, + resolutionNote ) ); for (const child of this.children) { @@ -228,6 +254,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer { } this.updatesPaused = false; this.calculateAndUpdateState(); + return true; } exitIdle(): void { diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index 22f0c4f57..18f762e1a 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -24,6 +24,7 @@ import { SubchannelInterface } from './subchannel-interface'; import { LoadBalancingConfig } from './service-config'; import { log } from './logging'; import { LogVerbosity } from './constants'; +import { StatusOr } from './call-interface'; /** * A collection of functions associated with a channel that a load balancer @@ -102,12 +103,16 @@ export interface LoadBalancer { * @param endpointList The new list of addresses to connect to * @param lbConfig The load balancing config object from the service config, * if one was provided + * @param channelOptions Channel options from the channel, plus resolver + * attributes + * @param resolutionNote A not from the resolver to include in errors */ updateAddressList( - endpointList: Endpoint[], + endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, - channelOptions: ChannelOptions - ): void; + channelOptions: ChannelOptions, + resolutionNote: string + ): boolean; /** * If the load balancer is currently in the IDLE state, start connecting. */ diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 9e7b8bbfb..5245fe126 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -23,7 +23,7 @@ import { import { promises as dns } from 'dns'; import { extractAndSelectServiceConfig, ServiceConfig } from './service-config'; import { Status } from './constants'; -import { StatusObject } from './call-interface'; +import { StatusObject, StatusOr, statusOrFromError, statusOrFromValue } from './call-interface'; import { Metadata } from './metadata'; import * as logging from './logging'; import { LogVerbosity } from './constants'; @@ -62,9 +62,8 @@ class DnsResolver implements Resolver { private readonly minTimeBetweenResolutionsMs: number; private pendingLookupPromise: Promise | null = null; private pendingTxtPromise: Promise | null = null; - private latestLookupResult: Endpoint[] | null = null; - private latestServiceConfig: ServiceConfig | null = null; - private latestServiceConfigError: StatusObject | null = null; + private latestLookupResult: StatusOr | null = null; + private latestServiceConfigResult: StatusOr | null = null; private percentage: number; private defaultResolutionError: StatusObject; private backoff: BackoffTimeout; @@ -149,13 +148,12 @@ class DnsResolver implements Resolver { if (!this.returnedIpResult) { trace('Returning IP address for target ' + uriToString(this.target)); setImmediate(() => { - this.listener.onSuccessfulResolution( - this.ipResult!, + this.listener( + statusOrFromValue(this.ipResult!), + {}, null, - null, - null, - {} - ); + '' + ) }); this.returnedIpResult = true; } @@ -167,11 +165,15 @@ class DnsResolver implements Resolver { if (this.dnsHostname === null) { trace('Failed to parse DNS address ' + uriToString(this.target)); setImmediate(() => { - this.listener.onError({ - code: Status.UNAVAILABLE, - details: `Failed to parse DNS address ${uriToString(this.target)}`, - metadata: new Metadata(), - }); + this.listener( + statusOrFromError({ + code: Status.UNAVAILABLE, + details: `Failed to parse DNS address ${uriToString(this.target)}` + }), + {}, + null, + '' + ); }); this.stopNextResolutionTimer(); } else { @@ -194,11 +196,9 @@ class DnsResolver implements Resolver { return; } this.pendingLookupPromise = null; - this.backoff.reset(); - this.backoff.stop(); - this.latestLookupResult = addressList.map(address => ({ + this.latestLookupResult = statusOrFromValue(addressList.map(address => ({ addresses: [address], - })); + }))); const allAddressesString: string = '[' + addressList.map(addr => addr.host + ':' + addr.port).join(',') + @@ -209,21 +209,17 @@ class DnsResolver implements Resolver { ': ' + allAddressesString ); - if (this.latestLookupResult.length === 0) { - this.listener.onError(this.defaultResolutionError); - return; - } /* If the TXT lookup has not yet finished, both of the last two * arguments will be null, which is the equivalent of getting an * empty TXT response. When the TXT lookup does finish, its handler * can update the service config by using the same address list */ - this.listener.onSuccessfulResolution( + const healthStatus = this.listener( this.latestLookupResult, - this.latestServiceConfig, - this.latestServiceConfigError, - null, - {} + {}, + this.latestServiceConfigResult, + '' ); + this.handleHealthStatus(healthStatus); }, err => { if (this.pendingLookupPromise === null) { @@ -237,7 +233,12 @@ class DnsResolver implements Resolver { ); this.pendingLookupPromise = null; this.stopNextResolutionTimer(); - this.listener.onError(this.defaultResolutionError); + this.listener( + statusOrFromError(this.defaultResolutionError), + {}, + this.latestServiceConfigResult, + '' + ) } ); /* If there already is a still-pending TXT resolution, we can just use @@ -253,31 +254,35 @@ class DnsResolver implements Resolver { return; } this.pendingTxtPromise = null; + let serviceConfig: ServiceConfig | null; try { - this.latestServiceConfig = extractAndSelectServiceConfig( + serviceConfig = extractAndSelectServiceConfig( txtRecord, this.percentage ); + if (serviceConfig) { + this.latestServiceConfigResult = statusOrFromValue(serviceConfig); + } else { + this.latestServiceConfigResult = null; + } } catch (err) { - this.latestServiceConfigError = { + this.latestServiceConfigResult = statusOrFromError({ code: Status.UNAVAILABLE, details: `Parsing service config failed with error ${ (err as Error).message - }`, - metadata: new Metadata(), - }; + }` + }); } if (this.latestLookupResult !== null) { /* We rely here on the assumption that calling this function with * identical parameters will be essentialy idempotent, and calling * it with the same address list and a different service config * should result in a fast and seamless switchover. */ - this.listener.onSuccessfulResolution( + this.listener( this.latestLookupResult, - this.latestServiceConfig, - this.latestServiceConfigError, - null, - {} + {}, + this.latestServiceConfigResult, + '' ); } }, @@ -295,6 +300,21 @@ class DnsResolver implements Resolver { } } + /** + * The ResolverListener returns a boolean indicating whether the LB policy + * accepted the resolution result. A false result on an otherwise successful + * resolution should be treated as a resolution failure. + * @param healthStatus + */ + private handleHealthStatus(healthStatus: boolean) { + if (healthStatus) { + this.backoff.stop(); + this.backoff.reset(); + } else { + this.continueResolving = true; + } + } + private async lookup(hostname: string): Promise { if (GRPC_NODE_USE_ALTERNATIVE_RESOLVER) { trace('Using alternative DNS resolver.'); @@ -400,8 +420,7 @@ class DnsResolver implements Resolver { this.pendingLookupPromise = null; this.pendingTxtPromise = null; this.latestLookupResult = null; - this.latestServiceConfig = null; - this.latestServiceConfigError = null; + this.latestServiceConfigResult = null; this.returnedIpResult = false; } diff --git a/packages/grpc-js/src/resolver-ip.ts b/packages/grpc-js/src/resolver-ip.ts index 8fed35bd1..80837f86d 100644 --- a/packages/grpc-js/src/resolver-ip.ts +++ b/packages/grpc-js/src/resolver-ip.ts @@ -15,7 +15,7 @@ */ import { isIPv4, isIPv6 } from 'net'; -import { StatusObject } from './call-interface'; +import { StatusObject, statusOrFromError, statusOrFromValue } from './call-interface'; import { ChannelOptions } from './channel-options'; import { LogVerbosity, Status } from './constants'; import { Metadata } from './metadata'; @@ -92,14 +92,18 @@ class IpResolver implements Resolver { this.hasReturnedResult = true; process.nextTick(() => { if (this.error) { - this.listener.onError(this.error); - } else { - this.listener.onSuccessfulResolution( - this.endpoints, - null, + this.listener( + statusOrFromError(this.error), + {}, null, + '' + ); + } else { + this.listener( + statusOrFromValue(this.endpoints), + {}, null, - {} + '' ); } }); diff --git a/packages/grpc-js/src/resolver-uds.ts b/packages/grpc-js/src/resolver-uds.ts index 4d84de9d5..5ef8c0754 100644 --- a/packages/grpc-js/src/resolver-uds.ts +++ b/packages/grpc-js/src/resolver-uds.ts @@ -18,6 +18,7 @@ import { Resolver, ResolverListener, registerResolver } from './resolver'; import { Endpoint } from './subchannel-address'; import { GrpcUri } from './uri-parser'; import { ChannelOptions } from './channel-options'; +import { statusOrFromValue } from './call-interface'; class UdsResolver implements Resolver { private hasReturnedResult = false; @@ -39,12 +40,11 @@ class UdsResolver implements Resolver { if (!this.hasReturnedResult) { this.hasReturnedResult = true; process.nextTick( - this.listener.onSuccessfulResolution, - this.endpoints, + this.listener, + statusOrFromValue(this.endpoints), + {}, null, - null, - null, - {} + '' ); } } diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 9cbcff591..28cc9876c 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -16,7 +16,7 @@ */ import { MethodConfig, ServiceConfig } from './service-config'; -import { StatusObject } from './call-interface'; +import { StatusOr } from './call-interface'; import { Endpoint } from './subchannel-address'; import { GrpcUri, uriToString } from './uri-parser'; import { ChannelOptions } from './channel-options'; @@ -24,6 +24,8 @@ import { Metadata } from './metadata'; import { Status } from './constants'; import { Filter, FilterFactory } from './filter'; +export const CHANNEL_ARGS_CONFIG_SELECTOR_KEY = 'grpc.internal.config_selector'; + export interface CallConfig { methodConfig: MethodConfig; onCommitted?: () => void; @@ -41,34 +43,27 @@ export interface ConfigSelector { unref(): void; } -/** - * A listener object passed to the resolver's constructor that provides name - * resolution updates back to the resolver's owner. - */ export interface ResolverListener { /** - * Called whenever the resolver has new name resolution results to report - * @param addressList The new list of backend addresses - * @param serviceConfig The new service configuration corresponding to the - * `addressList`. Will be `null` if no service configuration was - * retrieved or if the service configuration was invalid - * @param serviceConfigError If non-`null`, indicates that the retrieved - * service configuration was invalid + * Called whenever the resolver has new name resolution results or an error to + * report. + * @param endpointList The list of endpoints, or an error if resolution failed + * @param attributes Arbitrary key/value pairs to pass along to load balancing + * policies + * @param serviceConfig The service service config for the endpoint list, or an + * error if the retrieved service config is invalid, or null if there is no + * service config + * @param resolutionNote Provides additional context to RPC failure status + * messages generated by the load balancing policy. + * @returns Whether or not the load balancing policy accepted the result. */ - onSuccessfulResolution( - addressList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null, - configSelector: ConfigSelector | null, - attributes: { [key: string]: unknown } - ): void; - /** - * Called whenever a name resolution attempt fails. - * @param error Describes how resolution failed - */ - onError(error: StatusObject): void; + ( + endpointList: StatusOr, + attributes: { [key: string]: unknown }, + serviceConfig: StatusOr | null, + resolutionNote: string + ): boolean; } - /** * A resolver class that handles one or more of the name syntax schemes defined * in the [gRPC Name Resolution document](https://github.com/grpc/grpc/blob/master/doc/naming.md) diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 694613483..c117e9455 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -27,12 +27,11 @@ import { validateServiceConfig, } from './service-config'; import { ConnectivityState } from './connectivity-state'; -import { ConfigSelector, createResolver, Resolver } from './resolver'; -import { ServiceError } from './call'; +import { CHANNEL_ARGS_CONFIG_SELECTOR_KEY, ConfigSelector, createResolver, Resolver } from './resolver'; import { Picker, UnavailablePicker, QueuePicker } from './picker'; import { BackoffOptions, BackoffTimeout } from './backoff-timeout'; import { Status } from './constants'; -import { StatusObject } from './call-interface'; +import { StatusObject, StatusOr } from './call-interface'; import { Metadata } from './metadata'; import * as logging from './logging'; import { LogVerbosity } from './constants'; @@ -251,75 +250,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { ); this.innerResolver = createResolver( target, - { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: ServiceError | null, - configSelector: ConfigSelector | null, - attributes: { [key: string]: unknown } - ) => { - this.backoffTimeout.stop(); - this.backoffTimeout.reset(); - let workingServiceConfig: ServiceConfig | null = null; - /* This first group of conditionals implements the algorithm described - * in https://github.com/grpc/proposal/blob/master/A21-service-config-error-handling.md - * in the section called "Behavior on receiving a new gRPC Config". - */ - if (serviceConfig === null) { - // Step 4 and 5 - if (serviceConfigError === null) { - // Step 5 - this.previousServiceConfig = null; - workingServiceConfig = this.defaultServiceConfig; - } else { - // Step 4 - if (this.previousServiceConfig === null) { - // Step 4.ii - this.handleResolutionFailure(serviceConfigError); - } else { - // Step 4.i - workingServiceConfig = this.previousServiceConfig; - } - } - } else { - // Step 3 - workingServiceConfig = serviceConfig; - this.previousServiceConfig = serviceConfig; - } - const workingConfigList = - workingServiceConfig?.loadBalancingConfig ?? []; - const loadBalancingConfig = selectLbConfigFromList( - workingConfigList, - true - ); - if (loadBalancingConfig === null) { - // There were load balancing configs but none are supported. This counts as a resolution failure - this.handleResolutionFailure({ - code: Status.UNAVAILABLE, - details: - 'All load balancer options in service config are not compatible', - metadata: new Metadata(), - }); - configSelector?.unref(); - return; - } - this.childLoadBalancer.updateAddressList( - endpointList, - loadBalancingConfig, - {...this.channelOptions, ...attributes} - ); - const finalServiceConfig = - workingServiceConfig ?? this.defaultServiceConfig; - this.onSuccessfulResolution( - finalServiceConfig, - configSelector ?? getDefaultConfigSelector(finalServiceConfig) - ); - }, - onError: (error: StatusObject) => { - this.handleResolutionFailure(error); - }, - }, + this.handleResolverResult.bind(this), channelOptions ); const backoffOptions: BackoffOptions = { @@ -337,6 +268,62 @@ export class ResolvingLoadBalancer implements LoadBalancer { this.backoffTimeout.unref(); } + private handleResolverResult( + endpointList: StatusOr, + attributes: { [key: string]: unknown }, + serviceConfig: StatusOr | null, + resolutionNote: string + ): boolean { + this.backoffTimeout.stop(); + this.backoffTimeout.reset(); + let resultAccepted = true; + let workingServiceConfig: ServiceConfig | null = null; + if (serviceConfig === null) { + workingServiceConfig = this.defaultServiceConfig; + } else if (serviceConfig.ok) { + workingServiceConfig = serviceConfig.value; + } else { + if (this.previousServiceConfig !== null) { + workingServiceConfig = this.previousServiceConfig; + } else { + resultAccepted = false; + this.handleResolutionFailure(serviceConfig.error); + } + } + + if (workingServiceConfig !== null) { + const workingConfigList = + workingServiceConfig?.loadBalancingConfig ?? []; + const loadBalancingConfig = selectLbConfigFromList( + workingConfigList, + true + ); + if (loadBalancingConfig === null) { + resultAccepted = false; + this.handleResolutionFailure({ + code: Status.UNAVAILABLE, + details: + 'All load balancer options in service config are not compatible', + metadata: new Metadata(), + }); + } else { + resultAccepted = this.childLoadBalancer.updateAddressList( + endpointList, + loadBalancingConfig, + {...this.channelOptions, ...attributes}, + resolutionNote + ); + } + } + if (resultAccepted) { + this.onSuccessfulResolution( + workingServiceConfig!, + attributes[CHANNEL_ARGS_CONFIG_SELECTOR_KEY] as ConfigSelector ?? getDefaultConfigSelector(workingServiceConfig!) + ); + } + return resultAccepted; + } + private updateResolution() { this.innerResolver.updateResolution(); if (this.currentState === ConnectivityState.IDLE) { @@ -391,7 +378,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { } updateAddressList( - endpointList: Endpoint[], + endpointList: StatusOr, lbConfig: TypedLoadBalancingConfig | null ): never { throw new Error('updateAddressList not supported on ResolvingLoadBalancer'); diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 9528b125c..6e68c695a 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -782,27 +782,31 @@ export class Server { private resolvePort(port: GrpcUri): Promise { return new Promise((resolve, reject) => { - const resolverListener: ResolverListener = { - onSuccessfulResolution: ( - endpointList, - serviceConfig, - serviceConfigError - ) => { - // We only want one resolution result. Discard all future results - resolverListener.onSuccessfulResolution = () => {}; - const addressList = ([] as SubchannelAddress[]).concat( - ...endpointList.map(endpoint => endpoint.addresses) - ); - if (addressList.length === 0) { - reject(new Error(`No addresses resolved for port ${port}`)); - return; - } - resolve(addressList); - }, - onError: error => { - reject(new Error(error.details)); - }, - }; + let seenResolution = false; + const resolverListener: ResolverListener = ( + endpointList, + attributes, + serviceConfig, + resolutionNote + ) => { + if (seenResolution) { + return true; + } + seenResolution = true; + if (!endpointList.ok) { + reject(new Error(endpointList.error.details)); + return true; + } + const addressList = ([] as SubchannelAddress[]).concat( + ...endpointList.value.map(endpoint => endpoint.addresses) + ); + if (addressList.length === 0) { + reject(new Error(`No addresses resolved for port ${port}`)); + return true; + } + resolve(addressList); + return true; + } const resolver = createResolver(port, resolverListener, this.options); resolver.updateResolution(); }); diff --git a/packages/grpc-js/test/test-pick-first.ts b/packages/grpc-js/test/test-pick-first.ts index 1de2e8d37..754691b06 100644 --- a/packages/grpc-js/test/test-pick-first.ts +++ b/packages/grpc-js/test/test-pick-first.ts @@ -31,6 +31,8 @@ import { Metadata } from '../src/metadata'; import { Picker } from '../src/picker'; import { Endpoint, subchannelAddressToString } from '../src/subchannel-address'; import { MockSubchannel, TestClient, TestServer } from './common'; +import { statusOrFromError, statusOrFromValue } from '../src/call-interface'; +import { Status } from '../src/constants'; function updateStateCallBackForExpectedStateSequence( expectedStateSequence: ConnectivityState[], @@ -125,9 +127,10 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 1 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.READY); @@ -145,12 +148,13 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [ + statusOrFromValue([ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, - ], + ]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[1].transitionToState(ConnectivityState.READY); @@ -168,16 +172,17 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [ + statusOrFromValue([ { addresses: [ { host: 'localhost', port: 1 }, { host: 'localhost', port: 2 }, ], }, - ], + ]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[1].transitionToState(ConnectivityState.READY); @@ -203,9 +208,10 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 1 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]), config, - {} + {}, + '' ); }); it('Should stay CONNECTING if only some subchannels fail to connect', done => { @@ -220,12 +226,13 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [ + statusOrFromValue([ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, - ], + ]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); @@ -243,12 +250,13 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [ + statusOrFromValue([ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, - ], + ]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); @@ -269,12 +277,13 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [ + statusOrFromValue([ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, - ], + ]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); @@ -309,12 +318,13 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [ + statusOrFromValue([ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, - ], + ]), config, - {} + {}, + '' ); }); it('Should enter READY if a subchannel connects after entering TRANSIENT_FAILURE mode', done => { @@ -337,12 +347,13 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [ + statusOrFromValue([ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, - ], + ]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.READY); @@ -369,22 +380,24 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [ + statusOrFromValue([ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, - ], + ]), config, - {} + {}, + '' ); process.nextTick(() => { currentStartState = ConnectivityState.CONNECTING; pickFirst.updateAddressList( - [ + statusOrFromValue([ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, - ], + ]), config, - {} + {}, + '' ); }); }); @@ -409,19 +422,21 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [ + statusOrFromValue([ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, - ], + ]), config, - {} + {}, + '' ); process.nextTick(() => { currentStartState = ConnectivityState.READY; pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 3 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 3 }] }]), config, - {} + {}, + '' ); }); }); @@ -446,9 +461,10 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 1 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); @@ -475,16 +491,18 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 1 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { currentStartState = ConnectivityState.IDLE; pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 2 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); @@ -512,16 +530,18 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 1 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { currentStartState = ConnectivityState.TRANSIENT_FAILURE; pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 2 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); @@ -549,15 +569,17 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 1 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 2 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); @@ -597,25 +619,28 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 1 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); process.nextTick(() => { pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 2 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE); process.nextTick(() => { pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 3 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 3 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[2].transitionToState( @@ -660,21 +685,24 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 1 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 2 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 2 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]), config, - {} + {}, + '' ); }); }); @@ -704,9 +732,10 @@ describe('pick_first load balancing policy', () => { ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( - [{ addresses: [{ host: 'localhost', port: 1 }] }], + statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]), config, - {} + {}, + '' ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); @@ -726,7 +755,20 @@ describe('pick_first load balancing policy', () => { } ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); - pickFirst.updateAddressList([], config, {}); + pickFirst.updateAddressList(statusOrFromValue([]), config, {}, ''); + }); + it('Should report TRANSIENT_FAILURE with an endpoint list error', done => { + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.TRANSIENT_FAILURE], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList(statusOrFromError({code: Status.UNAVAILABLE, details: 'Resolver error'}), config, {}, ''); }); describe('Address list randomization', () => { const shuffleConfig = new PickFirstLoadBalancingConfig(true); @@ -760,20 +802,21 @@ describe('pick_first load balancing policy', () => { for (let i = 0; i < 10; i++) { endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] }); } + const endpointList = statusOrFromValue(endpoints); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); /* Pick from 10 subchannels 5 times, with address randomization enabled, * and verify that at least two different subchannels are picked. The * probability choosing the same address every time is 1/10,000, which * I am considering an acceptable flake rate */ - pickFirst.updateAddressList(endpoints, shuffleConfig, {}); + pickFirst.updateAddressList(endpointList, shuffleConfig, {}, ''); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, shuffleConfig, {}); + pickFirst.updateAddressList(endpointList, shuffleConfig, {}, ''); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, shuffleConfig, {}); + pickFirst.updateAddressList(endpointList, shuffleConfig, {}, ''); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, shuffleConfig, {}); + pickFirst.updateAddressList(endpointList, shuffleConfig, {}, ''); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, shuffleConfig, {}); + pickFirst.updateAddressList(endpointList, shuffleConfig, {}, ''); process.nextTick(() => { assert(pickedSubchannels.size > 1); done(); @@ -816,16 +859,17 @@ describe('pick_first load balancing policy', () => { for (let i = 0; i < 10; i++) { endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] }); } + const endpointList = statusOrFromValue(endpoints); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); - pickFirst.updateAddressList(endpoints, config, {}); + pickFirst.updateAddressList(endpointList, config, {}, ''); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, config, {}); + pickFirst.updateAddressList(endpointList, config, {}, ''); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, config, {}); + pickFirst.updateAddressList(endpointList, config, {}, ''); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, config, {}); + pickFirst.updateAddressList(endpointList, config, {}, ''); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, config, {}); + pickFirst.updateAddressList(endpointList, config, {}, ''); process.nextTick(() => { assert(pickedSubchannels.size === 1); done(); diff --git a/packages/grpc-js/test/test-resolver.ts b/packages/grpc-js/test/test-resolver.ts index 4bae0c304..931a5e21c 100644 --- a/packages/grpc-js/test/test-resolver.ts +++ b/packages/grpc-js/test/test-resolver.ts @@ -23,7 +23,7 @@ import * as resolver_dns from '../src/resolver-dns'; import * as resolver_uds from '../src/resolver-uds'; import * as resolver_ip from '../src/resolver-ip'; import { ServiceConfig } from '../src/service-config'; -import { StatusObject } from '../src/call-interface'; +import { StatusOr } from '../src/call-interface'; import { Endpoint, SubchannelAddress, @@ -63,25 +63,27 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('localhost:50051')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 }) - ); - assert( - hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) - ); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 }) + ); + assert( + hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) + ); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -93,65 +95,71 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('localhost')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }) - ); - assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }) + ); + assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); }); it('Should correctly represent an ipv4 address', done => { const target = resolverManager.mapUriDefaultScheme(parseUri('1.2.3.4')!)!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '1.2.3.4', port: 443 }) - ); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '1.2.3.4', port: 443 }) + ); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); }); it('Should correctly represent an ipv6 address', done => { const target = resolverManager.mapUriDefaultScheme(parseUri('::1')!)!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -160,22 +168,24 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('[::1]:50051')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) - ); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) + ); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -184,20 +194,22 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('example.com')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert(endpointList.length > 0); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert(endpointList.length > 0); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -208,23 +220,21 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('grpctest.kleinsch.com')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - if (serviceConfig !== null) { - assert( - serviceConfig.loadBalancingPolicy === 'round_robin', - 'Should have found round robin LB policy' - ); - done(); - } - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (serviceConfig !== null) { + assert(serviceConfig.ok); + assert( + serviceConfig.value.loadBalancingPolicy === 'round_robin', + 'Should have found round robin LB policy' + ); + done(); + } + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -234,21 +244,18 @@ describe('Name Resolver', () => { parseUri('grpctest.kleinsch.com')! )!; let count = 0; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - assert( - serviceConfig === null, - 'Should not have found service config' - ); - count++; - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + assert( + serviceConfig === null, + 'Should not have found service config' + ); + count++; + return true; }; const resolver = resolverManager.createResolver(target, listener, { 'grpc.service_config_disable_resolution': 1, @@ -271,25 +278,27 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('loopback4.unittest.grpc.io')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }), - `None of [${endpointList.map(addr => - endpointToString(addr) - )}] matched '127.0.0.1:443'` - ); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }), + `None of [${endpointList.map(addr => + endpointToString(addr) + )}] matched '127.0.0.1:443'` + ); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -300,20 +309,22 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('loopback6.unittest.grpc.io')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -325,27 +336,27 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('loopback46.unittest.grpc.io')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }), - `None of [${endpointList.map(addr => - endpointToString(addr) - )}] matched '127.0.0.1:443'` - ); - /* TODO(murgatroid99): check for IPv6 result, once we can get that - * consistently */ - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }), + `None of [${endpointList.map(addr => + endpointToString(addr) + )}] matched '127.0.0.1:443'` + ); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -356,20 +367,22 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('network-tools.com')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert(endpointList.length > 0); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert(endpointList.length > 0); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -386,23 +399,23 @@ describe('Name Resolver', () => { const target2 = resolverManager.mapUriDefaultScheme( parseUri('grpc-test4.sandbox.googleapis.com')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - assert(endpointList.length > 0); - completeCount += 1; - if (completeCount === 2) { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - done(); - } - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (completeCount >= 2) { + return true; + } + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert(endpointList.length > 0); + completeCount += 1; + if (completeCount === 2) { + done(); + } + return true; }; const resolver1 = resolverManager.createResolver(target1, listener, {}); resolver1.updateResolution(); @@ -419,26 +432,25 @@ describe('Name Resolver', () => { let resultCount = 0; const resolver = resolverManager.createResolver( target, - { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - assert( - hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }) - ); - assert( - hasMatchingAddress(endpointList, { host: '::1', port: 443 }) - ); - resultCount += 1; - if (resultCount === 1) { - process.nextTick(() => resolver.updateResolution()); - } - }, - onError: (error: StatusObject) => { - assert.ifError(error); - }, + ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }) + ); + assert( + hasMatchingAddress(endpointList, { host: '::1', port: 443 }) + ); + resultCount += 1; + if (resultCount === 1) { + process.nextTick(() => resolver.updateResolution()); + } + return true; }, { 'grpc.dns_min_time_between_resolutions_ms': 2000 } ); @@ -455,20 +467,18 @@ describe('Name Resolver', () => { let resultCount = 0; const resolver = resolverManager.createResolver( target, - { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - assert.fail('Resolution succeeded unexpectedly'); - }, - onError: (error: StatusObject) => { - resultCount += 1; - if (resultCount === 1) { - process.nextTick(() => resolver.updateResolution()); - } - }, + ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + assert(!maybeEndpointList.ok); + resultCount += 1; + if (resultCount === 1) { + process.nextTick(() => resolver.updateResolution()); + } + return true; }, {} ); @@ -484,20 +494,22 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('unix:socket')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert(hasMatchingAddress(endpointList, { path: 'socket' })); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert(hasMatchingAddress(endpointList, { path: 'socket' })); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -506,20 +518,22 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('unix:///tmp/socket')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert(hasMatchingAddress(endpointList, { path: '/tmp/socket' })); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert(hasMatchingAddress(endpointList, { path: '/tmp/socket' })); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -530,22 +544,24 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('ipv4:127.0.0.1')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }) - ); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }) + ); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -554,22 +570,24 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('ipv4:127.0.0.1:50051')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 }) - ); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 }) + ); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -578,25 +596,27 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('ipv4:127.0.0.1:50051,127.0.0.1:50052')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 }) - ); - assert( - hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50052 }) - ); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 }) + ); + assert( + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50052 }) + ); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -605,20 +625,22 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('ipv6:::1')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -627,22 +649,24 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('ipv6:[::1]:50051')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) - ); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) + ); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); @@ -651,25 +675,27 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme( parseUri('ipv6:[::1]:50051,[::1]:50052')! )!; - const listener: resolverManager.ResolverListener = { - onSuccessfulResolution: ( - endpointList: Endpoint[], - serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null - ) => { - // Only handle the first resolution result - listener.onSuccessfulResolution = () => {}; - assert( - hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) - ); - assert( - hasMatchingAddress(endpointList, { host: '::1', port: 50052 }) - ); - done(); - }, - onError: (error: StatusObject) => { - done(new Error(`Failed with status ${error.details}`)); - }, + let resultSeen = false; + const listener: resolverManager.ResolverListener = ( + maybeEndpointList: StatusOr, + attributes: { [key: string]: unknown}, + serviceConfig: StatusOr | null, + resolutionNote: string + ) => { + if (resultSeen) { + return true; + } + resultSeen = true; + assert(maybeEndpointList.ok); + const endpointList = maybeEndpointList.value; + assert( + hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) + ); + assert( + hasMatchingAddress(endpointList, { host: '::1', port: 50052 }) + ); + done(); + return true; }; const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution();