Skip to content

Delta xds #152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ public class CacheStatusInfo<T> implements StatusInfo<T> {
private final T nodeGroup;

private final ConcurrentMap<Long, Watch> watches = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, DeltaWatch> deltaWatches = new ConcurrentHashMap<>();
private volatile long lastWatchRequestTime;
private volatile long lastDeltaWatchRequestTime;

public CacheStatusInfo(T nodeGroup) {
this.nodeGroup = nodeGroup;
Expand All @@ -31,6 +33,11 @@ public long lastWatchRequestTime() {
return lastWatchRequestTime;
}

@Override
public long lastDeltaWatchRequestTime() {
return lastDeltaWatchRequestTime;
}

/**
* {@inheritDoc}
*/
Expand All @@ -47,6 +54,11 @@ public int numWatches() {
return watches.size();
}

@Override
public int numDeltaWatches() {
return deltaWatches.size();
}

/**
* Removes the given watch from the tracked collection of watches.
*
Expand All @@ -56,6 +68,15 @@ public void removeWatch(long watchId) {
watches.remove(watchId);
}

/**
* Removes the given delta watch from the tracked collection of watches.
*
* @param watchId the ID for the delta watch that should be removed
*/
public void removeDeltaWatch(long watchId) {
deltaWatches.remove(watchId);
}

/**
* Sets the timestamp of the last discovery watch request.
*
Expand All @@ -65,6 +86,15 @@ public void setLastWatchRequestTime(long lastWatchRequestTime) {
this.lastWatchRequestTime = lastWatchRequestTime;
}

/**
* Sets the timestamp of the last discovery delta watch request.
*
* @param lastDeltaWatchRequestTime the latest delta watch request timestamp
*/
public void setLastDeltaWatchRequestTime(long lastDeltaWatchRequestTime) {
this.lastDeltaWatchRequestTime = lastDeltaWatchRequestTime;
}

/**
* Adds the given watch to the tracked collection of watches.
*
Expand All @@ -75,13 +105,30 @@ public void setWatch(long watchId, Watch watch) {
watches.put(watchId, watch);
}

/**
* Adds the given watch to the tracked collection of watches.
*
* @param watchId the ID for the watch that should be added
* @param watch the watch that should be added
*/
public void setDeltaWatch(long watchId, DeltaWatch watch) {
deltaWatches.put(watchId, watch);
}

/**
* Returns the set of IDs for all watched currently being tracked.
*/
public Set<Long> watchIds() {
return ImmutableSet.copyOf(watches.keySet());
}

/**
* Returns the set of IDs for all watched currently being tracked.
*/
public Set<Long> deltaWatchIds() {
return ImmutableSet.copyOf(deltaWatches.keySet());
}

/**
* Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is
* removed from the tracked collection. If it returns {@code false}, then the watch is not removed.
Expand All @@ -91,4 +138,15 @@ public Set<Long> watchIds() {
public void watchesRemoveIf(BiFunction<Long, Watch, Boolean> filter) {
watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue()));
}

/**
* Iterate over all tracked delta watches and execute the given function. If it returns {@code true},
* then the watch is removed from the tracked collection. If it returns {@code false}, then
* the watch is not removed.
*
* @param filter the function to execute on each delta watch
*/
public void deltaWatchesRemoveIf(BiFunction<Long, DeltaWatch, Boolean> filter) {
deltaWatches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue()));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.envoyproxy.controlplane.cache;

import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -28,4 +29,25 @@ Watch createWatch(
Set<String> knownResourceNames,
Consumer<Response> responseConsumer,
boolean hasClusterChanged);

/**
* Returns a new configuration resource {@link Watch} for the given discovery request.
*
* @param request the discovery request (node, names, etc.) to use to generate the watch
* @param requesterVersion the last version applied by the requester
* @param resourceVersions resources that are already known to the requester
* @param pendingResources resources that the caller is waiting for
* @param isWildcard indicates if the stream is in wildcard mode
* @param responseConsumer the response handler, used to process outgoing response messages
* @param hasClusterChanged indicates if EDS should be sent immediately, even if version has not been changed.
* Supported in ADS mode.
*/
DeltaWatch createDeltaWatch(
DeltaXdsRequest request,
String requesterVersion,
Map<String, String> resourceVersions,
Set<String> pendingResources,
boolean isWildcard,
Consumer<DeltaResponse> responseConsumer,
boolean hasClusterChanged);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.envoyproxy.controlplane.cache;

import com.google.auto.value.AutoValue;
import com.google.protobuf.Message;
import java.util.List;
import java.util.Map;

/**
* {@code Response} is a data class that contains the response for an assumed configuration type.
*/
@AutoValue
public abstract class DeltaResponse {

public static DeltaResponse create(DeltaXdsRequest request,
Map<String, SnapshotResource<?>> resources,
List<String> removedResources,
String version) {
return new AutoValue_DeltaResponse(request, resources, removedResources, version);
}

/**
* Returns the original request associated with the response.
*/
public abstract DeltaXdsRequest request();

/**
* Returns the resources to include in the response.
*/
public abstract Map<String, SnapshotResource<? extends Message>> resources();

/**
* Returns the removed resources to include in the response.
*/
public abstract List<String> removedResources();

/**
* Returns the version of the resources as tracked by the cache for the given type. Envoy responds with this version
* as an acknowledgement.
*/
public abstract String version();
}
121 changes: 121 additions & 0 deletions cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package io.envoyproxy.controlplane.cache;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;

/**
* {@code Watch} is a dedicated stream of configuration resources produced by the configuration cache and consumed by
* the xDS server.
*/
public class DeltaWatch {
private static final AtomicIntegerFieldUpdater<DeltaWatch> isCancelledUpdater =
AtomicIntegerFieldUpdater.newUpdater(DeltaWatch.class, "isCancelled");
private final DeltaXdsRequest request;
private final Consumer<DeltaResponse> responseConsumer;
private final Map<String, String> resourceVersions;
private final Set<String> pendingResources;
private final boolean isWildcard;
private final String version;
private volatile int isCancelled = 0;
private Runnable stop;

/**
* Construct a watch.
*
* @param request the original request for the watch
* @param version indicates the stream current version
* @param isWildcard indicates if the stream is in wildcard mode
* @param responseConsumer handler for outgoing response messages
*/
public DeltaWatch(DeltaXdsRequest request,
Map<String, String> resourceVersions,
Set<String> pendingResources,
String version,
boolean isWildcard,
Consumer<DeltaResponse> responseConsumer) {
this.request = request;
this.resourceVersions = resourceVersions;
this.pendingResources = pendingResources;
this.version = version;
this.isWildcard = isWildcard;
this.responseConsumer = responseConsumer;
}

/**
* Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel
* may be called multiple times, with each subsequent call being a no-op.
*/
public void cancel() {
if (isCancelledUpdater.compareAndSet(this, 0, 1)) {
if (stop != null) {
stop.run();
}
}
}

/**
* Returns boolean indicating whether or not the watch has been cancelled.
*/
public boolean isCancelled() {
return isCancelledUpdater.get(this) == 1;
}

/**
* Returns the original request for the watch.
*/
public DeltaXdsRequest request() {
return request;
}

/**
* Returns the tracked resources for the watch.
*/
public Map<String, String> trackedResources() {
return resourceVersions;
}

/**
* Returns the pending resources for the watch.
*/
public Set<String> pendingResources() {
return pendingResources;
}

/**
* Returns the stream current version.
*/
public String version() {
return version;
}

/**
* Indicates if the stream is in wildcard mode.
*/
public boolean isWildcard() {
return isWildcard;
}

/**
* Sends the given response to the watch's response handler.
*
* @param response the response to be handled
* @throws WatchCancelledException if the watch has already been cancelled
*/
public void respond(DeltaResponse response) throws WatchCancelledException {
if (isCancelled()) {
throw new WatchCancelledException();
}

responseConsumer.accept(response);
}

/**
* Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it
* ensures that this stop callback is only executed once.
*/
public void setStop(Runnable stop) {
this.stop = stop;
}
}
Loading