diff --git a/operator/src/main/java/dev/responsive/controller/client/ControllerClient.java b/operator/src/main/java/dev/responsive/controller/client/ControllerClient.java index 3552957bf..015bf0d5b 100644 --- a/operator/src/main/java/dev/responsive/controller/client/ControllerClient.java +++ b/operator/src/main/java/dev/responsive/controller/client/ControllerClient.java @@ -16,6 +16,7 @@ package dev.responsive.controller.client; +import java.util.Optional; import responsive.controller.v1.controller.proto.ControllerOuterClass.ApplicationState; import responsive.controller.v1.controller.proto.ControllerOuterClass.CurrentStateRequest; import responsive.controller.v1.controller.proto.ControllerOuterClass.EmptyRequest; @@ -41,5 +42,5 @@ public interface ControllerClient { * @param request an empty request containing only the application id * @return the target application state that the operator should resolve */ - ApplicationState getTargetState(final EmptyRequest request); + Optional getTargetState(final EmptyRequest request); } diff --git a/operator/src/main/java/dev/responsive/controller/client/grpc/ControllerGrpcClient.java b/operator/src/main/java/dev/responsive/controller/client/grpc/ControllerGrpcClient.java index b39cc765f..3f546ec06 100644 --- a/operator/src/main/java/dev/responsive/controller/client/grpc/ControllerGrpcClient.java +++ b/operator/src/main/java/dev/responsive/controller/client/grpc/ControllerGrpcClient.java @@ -23,8 +23,11 @@ import io.grpc.Grpc; import io.grpc.ManagedChannel; import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.TlsChannelCredentials; import io.grpc.stub.MetadataUtils; +import java.util.Optional; import java.util.concurrent.TimeUnit; import responsive.controller.v1.controller.proto.ControllerGrpc; import responsive.controller.v1.controller.proto.ControllerOuterClass; @@ -68,14 +71,18 @@ public void currentState(final ControllerOuterClass.CurrentStateRequest request) } @Override - public ControllerOuterClass.ApplicationState getTargetState( + public Optional getTargetState( final ControllerOuterClass.EmptyRequest request) { - final var rsp = stub.withDeadlineAfter(5, TimeUnit.SECONDS) - .getTargetState(request); - if (!rsp.getError().equals("")) { - throw new RuntimeException(rsp.getError()); + try { + final var rsp = stub.withDeadlineAfter(5, TimeUnit.SECONDS) + .getTargetState(request); + return Optional.of(rsp.getState()); + } catch (final StatusRuntimeException e) { + if (e.getStatus().getCode().equals(Status.NOT_FOUND.getCode())) { + return Optional.empty(); + } + throw e; } - return rsp.getState(); } private void throwOnError(final ControllerOuterClass.SimpleResponse rsp) { diff --git a/operator/src/main/java/dev/responsive/k8s/operator/reconciler/DemoPolicyPlugin.java b/operator/src/main/java/dev/responsive/k8s/operator/reconciler/DemoPolicyPlugin.java index dd465ecd0..1edfe4e75 100644 --- a/operator/src/main/java/dev/responsive/k8s/operator/reconciler/DemoPolicyPlugin.java +++ b/operator/src/main/java/dev/responsive/k8s/operator/reconciler/DemoPolicyPlugin.java @@ -31,10 +31,12 @@ import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import responsive.controller.v1.controller.proto.ControllerOuterClass; +import responsive.controller.v1.controller.proto.ControllerOuterClass.ApplicationState; public class DemoPolicyPlugin implements PolicyPlugin { private static final Logger LOG = LoggerFactory.getLogger(DemoPolicyPlugin.class); @@ -85,22 +87,18 @@ public void reconcile( currentStateFromApplication(managedApp)) ); - final var maybeTargetState = - ctx.getSecondaryResource(TargetStateWithTimestamp.class); + final Optional maybeTargetState + = responsiveCtx.getControllerClient().getTargetState( + ControllerProtoFactories.emptyRequest(policy)); if (maybeTargetState.isEmpty()) { - LOG.warn("No target state present in ctx. This should not happen"); + LOG.info("controller has no current target state"); return; } final var targetState = maybeTargetState.get(); LOG.info("target state for app {} {}", appName, targetState); - if (targetState.getTargetState().isEmpty()) { - LOG.info( - "we were not able to get a target state from controller, so don't try to reconcile one"); - return; - } - final var targetReplicas = targetState.getTargetState().get().getDemoState().getReplicas(); + final var targetReplicas = targetState.getDemoState().getReplicas(); if (targetReplicas != managedApp.getReplicas()) { LOG.info( "Scaling {}/{} from {} to {}", diff --git a/operator/src/main/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconciler.java b/operator/src/main/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconciler.java index c60d1ecf6..7c40326a0 100644 --- a/operator/src/main/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconciler.java +++ b/operator/src/main/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconciler.java @@ -17,7 +17,6 @@ package dev.responsive.k8s.operator.reconciler; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import dev.responsive.controller.client.ControllerClient; import dev.responsive.k8s.controller.ControllerProtoFactories; import dev.responsive.k8s.crd.ResponsivePolicy; @@ -26,12 +25,13 @@ import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; +import io.javaoperatorsdk.operator.api.reconciler.MaxReconciliationInterval; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; -import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +39,12 @@ /** * Core reconciliation handler for operator */ -@ControllerConfiguration +@ControllerConfiguration( + maxReconciliationInterval = @MaxReconciliationInterval( + interval = 10, + timeUnit = TimeUnit.SECONDS + ) +) public class ResponsivePolicyReconciler implements Reconciler, EventSourceInitializer { private static final Logger LOG = LoggerFactory.getLogger(ResponsivePolicyReconciler.class); @@ -65,28 +70,7 @@ public ResponsivePolicyReconciler(final ControllerClient controllerClient) { @Override public Map prepareEventSources(EventSourceContext ctx) { - final var poller = new PerResourcePollingEventSource<>( - policy -> { - try { - return ImmutableSet.of(new TargetStateWithTimestamp( - // TODO(rohan): this is a hack to force an event at each poll interval. - // we should either: 1. make the controller robust to not rely on polling - // 2. poll in some less hacky way (while still using events) - responsiveCtx.getControllerClient() - .getTargetState(ControllerProtoFactories.emptyRequest(policy)))); - } catch (final Throwable t) { - LOG.error("Error fetching target state", t); - // We return an empty target state to force reconciliation to run, since right now the - // controller is stateless and relies on periodic updates after it restarts - return ImmutableSet.of(new TargetStateWithTimestamp()); - } - }, - ctx.getPrimaryCache(), - 10000L, - TargetStateWithTimestamp.class - ); final var builder = ImmutableMap.builder(); - builder.putAll(EventSourceInitializer.nameEventSources(poller)); // add the plugin event sources // TODO(rohan): how do we make sure that these sources dont cross streams (should be fine // if they are all resource-scoped events) diff --git a/operator/src/main/java/dev/responsive/k8s/operator/reconciler/TargetStateWithTimestamp.java b/operator/src/main/java/dev/responsive/k8s/operator/reconciler/TargetStateWithTimestamp.java deleted file mode 100644 index 11ca03520..000000000 --- a/operator/src/main/java/dev/responsive/k8s/operator/reconciler/TargetStateWithTimestamp.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2023 Responsive Computing, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.responsive.k8s.operator.reconciler; - -import java.time.Instant; -import java.util.Objects; -import java.util.Optional; -import responsive.controller.v1.controller.proto.ControllerOuterClass; - -class TargetStateWithTimestamp { - private final Instant timestamp; - private final Optional targetState; - - TargetStateWithTimestamp(final ControllerOuterClass.ApplicationState targetState) { - this(Instant.now(), Optional.of(targetState)); - } - - TargetStateWithTimestamp() { - this(Instant.now(), Optional.empty()); - } - - TargetStateWithTimestamp(final Instant timestamp, - final Optional targetState) { - this.timestamp = Objects.requireNonNull(timestamp); - this.targetState = Objects.requireNonNull(targetState); - } - - public Instant getTimestamp() { - return timestamp; - } - - public Optional getTargetState() { - return targetState; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final TargetStateWithTimestamp that = (TargetStateWithTimestamp) o; - return Objects.equals(timestamp, that.timestamp) - && Objects.equals(targetState, that.targetState); - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, targetState); - } - - @Override - public String toString() { - return "TargetStateWithTimestamp{" - + "timestamp=" + timestamp - + ", targetState=" + targetState - + '}'; - } -} diff --git a/operator/src/test/java/dev/responsive/controller/client/grpc/ControllerGrpcClientTest.java b/operator/src/test/java/dev/responsive/controller/client/grpc/ControllerGrpcClientTest.java index 74b80d7f9..7769db4a0 100644 --- a/operator/src/test/java/dev/responsive/controller/client/grpc/ControllerGrpcClientTest.java +++ b/operator/src/test/java/dev/responsive/controller/client/grpc/ControllerGrpcClientTest.java @@ -38,6 +38,7 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; +import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.TlsChannelCredentials; import io.grpc.examples.helloworld.GreeterGrpc; @@ -46,6 +47,7 @@ import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.testing.GrpcCleanupRule; +import java.util.Optional; import org.junit.Rule; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -224,7 +226,7 @@ public void shouldSendTargetStatusRequest() { final var returnedState = client.getTargetState(req); // then: - assertThat(returnedState, is(state)); + assertThat(returnedState, is(Optional.of(state))); verify(stub).getTargetState(req); } @@ -232,13 +234,22 @@ public void shouldSendTargetStatusRequest() { public void shouldHandleTargetStatusRequestError() { // given: final var req = ControllerOuterClass.EmptyRequest.newBuilder().build(); - when(stub.getTargetState(any())).thenReturn( - ControllerOuterClass.GetTargetStateResponse.newBuilder() - .setError("oops") - .build() - ); + when(stub.getTargetState(any())).thenThrow(new StatusRuntimeException(Status.UNKNOWN)); + + // when: + assertThrows(StatusRuntimeException.class, () -> client.getTargetState(req)); + } + + @Test + public void shouldHandleTargetStatusRequestNotFoundError() { + // given: + final var req = ControllerOuterClass.EmptyRequest.newBuilder().build(); + when(stub.getTargetState(any())).thenThrow(new StatusRuntimeException(Status.NOT_FOUND)); // when: - assertThrows(RuntimeException.class, () -> client.getTargetState(req)); + final var returnedState = client.getTargetState(req); + + // then: + assertThat(returnedState, is(Optional.empty())); } } \ No newline at end of file diff --git a/operator/src/test/java/dev/responsive/k8s/operator/reconciler/DemoPolicyPluginTest.java b/operator/src/test/java/dev/responsive/k8s/operator/reconciler/DemoPolicyPluginTest.java index 1c7f810b5..9fc3e2062 100644 --- a/operator/src/test/java/dev/responsive/k8s/operator/reconciler/DemoPolicyPluginTest.java +++ b/operator/src/test/java/dev/responsive/k8s/operator/reconciler/DemoPolicyPluginTest.java @@ -165,9 +165,8 @@ public void setup() { lenient().when(ctx.getClient()).thenReturn(client); lenient().when(client.apps()).thenReturn(appsClient); - lenient().when(ctx.getSecondaryResource( - dev.responsive.k8s.operator.reconciler.TargetStateWithTimestamp.class)) - .thenReturn(Optional.of(new TargetStateWithTimestamp(targetState))); + lenient().when(controllerClient.getTargetState(ControllerProtoFactories.emptyRequest(policy))) + .thenReturn(Optional.of(targetState)); } @Test @@ -320,8 +319,8 @@ public void shouldNotPatchDeploymentIfReplicasNotChanged() { public void shouldNotPatchDeploymentIfNoTargetStateSpecified() { // given: setupForDeployment(); - when(ctx.getSecondaryResource(TargetStateWithTimestamp.class)) - .thenReturn(Optional.of(new TargetStateWithTimestamp())); + when(controllerClient.getTargetState(ControllerProtoFactories.emptyRequest(policy))) + .thenReturn(Optional.empty()); // when: plugin.reconcile(policy, ctx, responsiveCtx); @@ -458,8 +457,8 @@ public void shouldNotPatchStatefulSetIfReplicasNotChanged() { public void shouldNotPatchStatefulSetIfNoTargetStateSpecified() { // given: setupForStatefulSet(); - when(ctx.getSecondaryResource(TargetStateWithTimestamp.class)) - .thenReturn(Optional.of(new TargetStateWithTimestamp())); + when(controllerClient.getTargetState(ControllerProtoFactories.emptyRequest(policy))) + .thenReturn(Optional.empty()); // when: plugin.reconcile(policy, ctx, responsiveCtx); diff --git a/operator/src/test/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconcilerTest.java b/operator/src/test/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconcilerTest.java index d8e0cead8..5c4329e76 100644 --- a/operator/src/test/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconcilerTest.java +++ b/operator/src/test/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconcilerTest.java @@ -18,30 +18,18 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import dev.responsive.controller.client.ControllerClient; import dev.responsive.k8s.controller.ControllerProtoFactories; import dev.responsive.k8s.crd.ResponsivePolicy; import dev.responsive.k8s.crd.ResponsivePolicySpec; import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.processing.event.source.EventSource; -import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; -import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -55,14 +43,8 @@ class ResponsivePolicyReconcilerTest { @Mock private Context ctx; @Mock - private IndexerResourceCache cache; - @Mock private EventSourceContext eventCtx; @Mock - private ControllerConfiguration controllerConfig; - @Mock - private KubernetesClient client; - @Mock private ControllerClient controllerClient; @Mock private dev.responsive.k8s.operator.reconciler.PolicyPlugin plugin; @@ -78,11 +60,6 @@ class ResponsivePolicyReconcilerTest { @BeforeEach public void setup() { responsiveCtx = new dev.responsive.k8s.operator.reconciler.ResponsiveContext(controllerClient); - lenient().when(eventCtx.getControllerConfiguration()).thenReturn(controllerConfig); - lenient().when(controllerConfig.getEffectiveNamespaces()) - .thenReturn(ImmutableSet.of("responsive")); - lenient().when(eventCtx.getClient()).thenReturn(client); - lenient().when(eventCtx.getPrimaryCache()).thenReturn(cache); lenient().when(plugin.prepareEventSources(eventCtx, responsiveCtx)).thenReturn( ImmutableMap.of( "pes1", pluginEventSource1, @@ -105,44 +82,6 @@ public void setup() { ); } - @Test - public void shouldIncludeControllerPollingEventSource() { - // when: - final var sources = reconciler.prepareEventSources(eventCtx); - - // then: - assertThat(sources.values(), hasItem(instanceOf(PerResourcePollingEventSource.class))); - } - - @Test - @SuppressWarnings("unchecked") - public void shouldReturnEmptyTargetStateIfControllerPollFails() { - // given: - final var sources = reconciler.prepareEventSources(eventCtx); - final var maybeSource = sources.values().stream() - .filter(s -> s instanceof PerResourcePollingEventSource) - .findFirst(); - assertThat(maybeSource, not(Optional.empty())); - final var source = (PerResourcePollingEventSource) maybeSource.get(); - final var resource = mock(ResponsivePolicy.class); - when(resource.getMetadata()).thenReturn(new ObjectMeta()); - when(resource.getSpec()).thenReturn(new ResponsivePolicySpec( - "ping", - "pong", - PolicyStatus.POLICY_STATUS_MANAGED, - ResponsivePolicySpec.PolicyType.DEMO, - Optional.of(new ResponsivePolicySpec.DemoPolicy(123)) - )); - when(controllerClient.getTargetState(any())).thenThrow(new RuntimeException("oops")); - - // when: - final var ret = (Optional) source.getSecondaryResource(resource); - - // then: - assertThat(ret.isPresent(), is(true)); - assertThat(ret.get().getTargetState().isPresent(), is(false)); - } - @Test public void shouldIncludePluginEventSources() { // when: