diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index ac0fe9675c..c61cc837c1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -1,5 +1,7 @@ package io.javaoperatorsdk.operator.api.reconciler; +import java.time.LocalTime; +import java.time.temporal.ChronoUnit; import java.util.function.UnaryOperator; import org.slf4j.Logger; @@ -25,6 +27,8 @@ public class PrimaryUpdateAndCacheUtils { public static final int DEFAULT_MAX_RETRY = 10; + public static final int DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS = 10000; + public static final int DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS = 50; private PrimaryUpdateAndCacheUtils() {} @@ -90,8 +94,10 @@ public static

P ssaPatchStatusAndCacheResource( } /** - * Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator, - * int)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY}. + * Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator, int, + * long,long)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY} and + * default cache maximum polling time and period as defined, respectively by {@link + * #DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS} and {@link #DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS}. * * @param resourceToUpdate original resource to update * @param context of reconciliation @@ -106,7 +112,13 @@ public static

P updateAndCacheResource( UnaryOperator

modificationFunction, UnaryOperator

updateMethod) { return updateAndCacheResource( - resourceToUpdate, context, modificationFunction, updateMethod, DEFAULT_MAX_RETRY); + resourceToUpdate, + context, + modificationFunction, + updateMethod, + DEFAULT_MAX_RETRY, + DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS, + DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS); } /** @@ -124,16 +136,20 @@ public static

P updateAndCacheResource( * @param modificationFunction modifications to make on primary * @param updateMethod the update method implementation * @param maxRetry maximum number of retries before giving up + * @param cachePollTimeoutMillis maximum amount of milliseconds to wait for the updated resource + * to appear in cache + * @param cachePollPeriodMillis cache polling period, in milliseconds * @param

primary type * @return the updated resource */ - @SuppressWarnings("unchecked") public static

P updateAndCacheResource( P resourceToUpdate, Context

context, UnaryOperator

modificationFunction, UnaryOperator

updateMethod, - int maxRetry) { + int maxRetry, + long cachePollTimeoutMillis, + long cachePollPeriodMillis) { if (log.isDebugEnabled()) { log.debug("Conflict retrying update for: {}", ResourceID.fromResource(resourceToUpdate)); @@ -180,14 +196,37 @@ public static

P updateAndCacheResource( resourceToUpdate.getMetadata().getNamespace(), e.getCode()); resourceToUpdate = - (P) - context - .getClient() - .resources(resourceToUpdate.getClass()) - .inNamespace(resourceToUpdate.getMetadata().getNamespace()) - .withName(resourceToUpdate.getMetadata().getName()) - .get(); + pollLocalCache( + context, resourceToUpdate, cachePollTimeoutMillis, cachePollPeriodMillis); + } + } + } + + private static

P pollLocalCache( + Context

context, P staleResource, long timeoutMillis, long pollDelayMillis) { + try { + var resourceId = ResourceID.fromResource(staleResource); + var startTime = LocalTime.now(); + final var timeoutTime = startTime.plus(timeoutMillis, ChronoUnit.MILLIS); + while (timeoutTime.isAfter(LocalTime.now())) { + log.debug("Polling cache for resource: {}", resourceId); + var cachedResource = context.getPrimaryCache().get(resourceId).orElseThrow(); + if (!cachedResource + .getMetadata() + .getResourceVersion() + .equals(staleResource.getMetadata().getResourceVersion())) { + return context + .getControllerConfiguration() + .getConfigurationService() + .getResourceCloner() + .clone(cachedResource); + } + Thread.sleep(pollDelayMillis); } + throw new OperatorException("Timeout of resource polling from cache for resource"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OperatorException(e); } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 9ec5b3694c..af75a5abc4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -126,9 +126,7 @@ public synchronized void putResource(T newResource, String previousResourceVersi knownResourceVersions.add(newResource.getMetadata().getResourceVersion()); } var resourceId = ResourceID.fromResource(newResource); - var cachedResource = - getResourceFromCache(resourceId) - .orElse(managedInformerEventSource.get(resourceId).orElse(null)); + var cachedResource = managedInformerEventSource.get(resourceId).orElse(null); boolean moveAhead = false; if (previousResourceVersion == null && cachedResource == null) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java index 438941db9c..80a254b50f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java @@ -1,16 +1,22 @@ package io.javaoperatorsdk.operator.api.reconciler; +import java.util.Optional; import java.util.function.UnaryOperator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; +import io.fabric8.kubernetes.client.utils.KubernetesSerialization; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.TestUtils; +import io.javaoperatorsdk.operator.api.config.Cloner; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; @@ -29,6 +35,7 @@ class PrimaryUpdateAndCacheUtilsTest { Context context = mock(Context.class); KubernetesClient client = mock(KubernetesClient.class); Resource resource = mock(Resource.class); + IndexedResourceCache primaryCache = mock(IndexedResourceCache.class); @BeforeEach void setup() { @@ -41,6 +48,20 @@ void setup() { when(mixedOp.inNamespace(any())).thenReturn(mixedOp); when(mixedOp.withName(any())).thenReturn(resource); when(resource.get()).thenReturn(TestUtils.testCustomResource1()); + when(context.getPrimaryCache()).thenReturn(primaryCache); + + var controllerConfiguration = mock(ControllerConfiguration.class); + when(context.getControllerConfiguration()).thenReturn(controllerConfiguration); + var configService = mock(ConfigurationService.class); + when(controllerConfiguration.getConfigurationService()).thenReturn(configService); + when(configService.getResourceCloner()) + .thenReturn( + new Cloner() { + @Override + public R clone(R object) { + return new KubernetesSerialization().clone(object); + } + }); } @Test @@ -76,6 +97,10 @@ void retriesConflicts() { when(updateOperation.apply(any())) .thenThrow(new KubernetesClientException("", 409, null)) .thenReturn(TestUtils.testCustomResource1()); + var freshResource = TestUtils.testCustomResource1(); + + freshResource.getMetadata().setResourceVersion("2"); + when(primaryCache.get(any())).thenReturn(Optional.of(freshResource)); var updated = PrimaryUpdateAndCacheUtils.updateAndCacheResource( @@ -89,7 +114,7 @@ void retriesConflicts() { updateOperation); assertThat(updated).isNotNull(); - verify(resource, times(1)).get(); + verify(primaryCache, times(1)).get(any()); } @Test @@ -97,7 +122,13 @@ void throwsIfRetryExhausted() { var updateOperation = mock(UnaryOperator.class); when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null)); + var stubbing = when(primaryCache.get(any())); + for (int i = 0; i < DEFAULT_MAX_RETRY; i++) { + var resource = TestUtils.testCustomResource1(); + resource.getMetadata().setResourceVersion("" + i); + stubbing = stubbing.thenReturn(Optional.of(resource)); + } assertThrows( OperatorException.class, () -> @@ -106,6 +137,28 @@ void throwsIfRetryExhausted() { context, UnaryOperator.identity(), updateOperation)); - verify(resource, times(DEFAULT_MAX_RETRY)).get(); + verify(primaryCache, times(DEFAULT_MAX_RETRY)).get(any()); + } + + @Test + void cachePollTimeouts() { + var updateOperation = mock(UnaryOperator.class); + + when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null)); + when(primaryCache.get(any())).thenReturn(Optional.of(TestUtils.testCustomResource1())); + + var ex = + assertThrows( + OperatorException.class, + () -> + PrimaryUpdateAndCacheUtils.updateAndCacheResource( + TestUtils.testCustomResource1(), + context, + UnaryOperator.identity(), + updateOperation, + 2, + 50L, + 10L)); + assertThat(ex.getMessage()).contains("Timeout"); } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheCustomResource.java similarity index 69% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockCustomResource.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheCustomResource.java index 8ab742a975..e87d8e8714 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockCustomResource.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheCustomResource.java @@ -9,6 +9,5 @@ @Group("sample.javaoperatorsdk") @Version("v1") @ShortNames("spwl") -public class StatusPatchCacheWithLockCustomResource - extends CustomResource - implements Namespaced {} +public class StatusPatchCacheCustomResource + extends CustomResource implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheIT.java similarity index 79% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockIT.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheIT.java index c5752f4aae..9d0b923056 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheIT.java @@ -11,19 +11,19 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -public class StatusPatchCacheWithLockIT { +public class StatusPatchCacheIT { public static final String TEST_1 = "test1"; @RegisterExtension LocallyRunOperatorExtension extension = LocallyRunOperatorExtension.builder() - .withReconciler(StatusPatchCacheWithLockReconciler.class) + .withReconciler(StatusPatchCacheReconciler.class) .build(); @Test void testStatusAlwaysUpToDate() { - var reconciler = extension.getReconcilerOfType(StatusPatchCacheWithLockReconciler.class); + var reconciler = extension.getReconcilerOfType(StatusPatchCacheReconciler.class); extension.create(testResource()); @@ -39,10 +39,10 @@ void testStatusAlwaysUpToDate() { }); } - StatusPatchCacheWithLockCustomResource testResource() { - var res = new StatusPatchCacheWithLockCustomResource(); + StatusPatchCacheCustomResource testResource() { + var res = new StatusPatchCacheCustomResource(); res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build()); - res.setSpec(new StatusPatchCacheWithLockSpec()); + res.setSpec(new StatusPatchCacheSpec()); return res; } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheReconciler.java similarity index 73% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockReconciler.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheReconciler.java index 364f8e9ff5..69215d6d01 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheReconciler.java @@ -12,16 +12,14 @@ import io.javaoperatorsdk.operator.processing.event.source.EventSource; @ControllerConfiguration -public class StatusPatchCacheWithLockReconciler - implements Reconciler { +public class StatusPatchCacheReconciler implements Reconciler { public volatile int latestValue = 0; public volatile boolean errorPresent = false; @Override - public UpdateControl reconcile( - StatusPatchCacheWithLockCustomResource resource, - Context context) { + public UpdateControl reconcile( + StatusPatchCacheCustomResource resource, Context context) { if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) { errorPresent = true; @@ -50,22 +48,20 @@ public UpdateControl reconcile( } @Override - public List> prepareEventSources( - EventSourceContext context) { + public List> prepareEventSources( + EventSourceContext context) { // periodic event triggering for testing purposes return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache())); } - private StatusPatchCacheWithLockCustomResource createFreshCopy( - StatusPatchCacheWithLockCustomResource resource) { - var res = new StatusPatchCacheWithLockCustomResource(); + private StatusPatchCacheCustomResource createFreshCopy(StatusPatchCacheCustomResource resource) { + var res = new StatusPatchCacheCustomResource(); res.setMetadata( new ObjectMetaBuilder() .withName(resource.getMetadata().getName()) .withNamespace(resource.getMetadata().getNamespace()) .build()); - res.setStatus(new StatusPatchCacheWithLockStatus()); - + res.setStatus(new StatusPatchCacheStatus()); return res; } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheSpec.java similarity index 82% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockSpec.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheSpec.java index ebbabd49a0..0885b6a858 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockSpec.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheSpec.java @@ -1,6 +1,6 @@ package io.javaoperatorsdk.operator.baseapi.statuscache; -public class StatusPatchCacheWithLockSpec { +public class StatusPatchCacheSpec { private int counter = 0; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheStatus.java similarity index 62% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockStatus.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheStatus.java index 5f2d8f5a6f..5918b2e3b8 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockStatus.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheStatus.java @@ -1,6 +1,6 @@ package io.javaoperatorsdk.operator.baseapi.statuscache; -public class StatusPatchCacheWithLockStatus { +public class StatusPatchCacheStatus { private Integer value = 0; @@ -8,7 +8,7 @@ public Integer getValue() { return value; } - public StatusPatchCacheWithLockStatus setValue(Integer value) { + public StatusPatchCacheStatus setValue(Integer value) { this.value = value; return this; }