Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package dev.responsive.controller.client;

import java.util.Optional;
import responsive.controller.v1.controller.proto.ControllerOuterClass.ApplicationState;
import responsive.controller.v1.controller.proto.ControllerOuterClass.CurrentStateRequest;
import responsive.controller.v1.controller.proto.ControllerOuterClass.EmptyRequest;
Expand All @@ -41,5 +42,5 @@ public interface ControllerClient {
* @param request an empty request containing only the application id
* @return the target application state that the operator should resolve
*/
ApplicationState getTargetState(final EmptyRequest request);
Optional<ApplicationState> getTargetState(final EmptyRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.TlsChannelCredentials;
import io.grpc.stub.MetadataUtils;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import responsive.controller.v1.controller.proto.ControllerGrpc;
import responsive.controller.v1.controller.proto.ControllerOuterClass;
Expand Down Expand Up @@ -68,14 +71,18 @@ public void currentState(final ControllerOuterClass.CurrentStateRequest request)
}

@Override
public ControllerOuterClass.ApplicationState getTargetState(
public Optional<ControllerOuterClass.ApplicationState> getTargetState(
final ControllerOuterClass.EmptyRequest request) {
final var rsp = stub.withDeadlineAfter(5, TimeUnit.SECONDS)
.getTargetState(request);
if (!rsp.getError().equals("")) {
throw new RuntimeException(rsp.getError());
try {
final var rsp = stub.withDeadlineAfter(5, TimeUnit.SECONDS)
.getTargetState(request);
return Optional.of(rsp.getState());
} catch (final StatusRuntimeException e) {
if (e.getStatus().getCode().equals(Status.NOT_FOUND.getCode())) {
return Optional.empty();
}
throw e;
}
return rsp.getState();
}

private void throwOnError(final ControllerOuterClass.SimpleResponse rsp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import responsive.controller.v1.controller.proto.ControllerOuterClass;
import responsive.controller.v1.controller.proto.ControllerOuterClass.ApplicationState;

public class DemoPolicyPlugin implements PolicyPlugin {
private static final Logger LOG = LoggerFactory.getLogger(DemoPolicyPlugin.class);
Expand Down Expand Up @@ -85,22 +87,18 @@ public void reconcile(
currentStateFromApplication(managedApp))
);

final var maybeTargetState =
ctx.getSecondaryResource(TargetStateWithTimestamp.class);
final Optional<ApplicationState> maybeTargetState
= responsiveCtx.getControllerClient().getTargetState(
ControllerProtoFactories.emptyRequest(policy));
if (maybeTargetState.isEmpty()) {
LOG.warn("No target state present in ctx. This should not happen");
LOG.info("controller has no current target state");
return;
}

final var targetState = maybeTargetState.get();
LOG.info("target state for app {} {}", appName, targetState);

if (targetState.getTargetState().isEmpty()) {
LOG.info(
"we were not able to get a target state from controller, so don't try to reconcile one");
return;
}
final var targetReplicas = targetState.getTargetState().get().getDemoState().getReplicas();
final var targetReplicas = targetState.getDemoState().getReplicas();
if (targetReplicas != managedApp.getReplicas()) {
LOG.info(
"Scaling {}/{} from {} to {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package dev.responsive.k8s.operator.reconciler;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import dev.responsive.controller.client.ControllerClient;
import dev.responsive.k8s.controller.ControllerProtoFactories;
import dev.responsive.k8s.crd.ResponsivePolicy;
Expand All @@ -26,20 +25,26 @@
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.MaxReconciliationInterval;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Core reconciliation handler for operator
*/
@ControllerConfiguration
@ControllerConfiguration(
maxReconciliationInterval = @MaxReconciliationInterval(
interval = 10,
timeUnit = TimeUnit.SECONDS
)
)
public class ResponsivePolicyReconciler implements
Reconciler<ResponsivePolicy>, EventSourceInitializer<ResponsivePolicy> {
private static final Logger LOG = LoggerFactory.getLogger(ResponsivePolicyReconciler.class);
Expand All @@ -65,28 +70,7 @@ public ResponsivePolicyReconciler(final ControllerClient controllerClient) {

@Override
public Map<String, EventSource> prepareEventSources(EventSourceContext<ResponsivePolicy> ctx) {
final var poller = new PerResourcePollingEventSource<>(
policy -> {
try {
return ImmutableSet.of(new TargetStateWithTimestamp(
// TODO(rohan): this is a hack to force an event at each poll interval.
// we should either: 1. make the controller robust to not rely on polling
// 2. poll in some less hacky way (while still using events)
responsiveCtx.getControllerClient()
.getTargetState(ControllerProtoFactories.emptyRequest(policy))));
} catch (final Throwable t) {
LOG.error("Error fetching target state", t);
// We return an empty target state to force reconciliation to run, since right now the
// controller is stateless and relies on periodic updates after it restarts
return ImmutableSet.of(new TargetStateWithTimestamp());
}
},
ctx.getPrimaryCache(),
10000L,
TargetStateWithTimestamp.class
);
final var builder = ImmutableMap.<String, EventSource>builder();
builder.putAll(EventSourceInitializer.nameEventSources(poller));
// add the plugin event sources
// TODO(rohan): how do we make sure that these sources dont cross streams (should be fine
// if they are all resource-scoped events)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.TlsChannelCredentials;
import io.grpc.examples.helloworld.GreeterGrpc;
Expand All @@ -46,6 +47,7 @@
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.testing.GrpcCleanupRule;
import java.util.Optional;
import org.junit.Rule;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -224,21 +226,30 @@ public void shouldSendTargetStatusRequest() {
final var returnedState = client.getTargetState(req);

// then:
assertThat(returnedState, is(state));
assertThat(returnedState, is(Optional.of(state)));
verify(stub).getTargetState(req);
}

@Test
public void shouldHandleTargetStatusRequestError() {
// given:
final var req = ControllerOuterClass.EmptyRequest.newBuilder().build();
when(stub.getTargetState(any())).thenReturn(
ControllerOuterClass.GetTargetStateResponse.newBuilder()
.setError("oops")
.build()
);
when(stub.getTargetState(any())).thenThrow(new StatusRuntimeException(Status.UNKNOWN));

// when:
assertThrows(StatusRuntimeException.class, () -> client.getTargetState(req));
}

@Test
public void shouldHandleTargetStatusRequestNotFoundError() {
// given:
final var req = ControllerOuterClass.EmptyRequest.newBuilder().build();
when(stub.getTargetState(any())).thenThrow(new StatusRuntimeException(Status.NOT_FOUND));

// when:
assertThrows(RuntimeException.class, () -> client.getTargetState(req));
final var returnedState = client.getTargetState(req);

// then:
assertThat(returnedState, is(Optional.empty()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,8 @@ public void setup() {

lenient().when(ctx.getClient()).thenReturn(client);
lenient().when(client.apps()).thenReturn(appsClient);
lenient().when(ctx.getSecondaryResource(
dev.responsive.k8s.operator.reconciler.TargetStateWithTimestamp.class))
.thenReturn(Optional.of(new TargetStateWithTimestamp(targetState)));
lenient().when(controllerClient.getTargetState(ControllerProtoFactories.emptyRequest(policy)))
.thenReturn(Optional.of(targetState));
}

@Test
Expand Down Expand Up @@ -320,8 +319,8 @@ public void shouldNotPatchDeploymentIfReplicasNotChanged() {
public void shouldNotPatchDeploymentIfNoTargetStateSpecified() {
// given:
setupForDeployment();
when(ctx.getSecondaryResource(TargetStateWithTimestamp.class))
.thenReturn(Optional.of(new TargetStateWithTimestamp()));
when(controllerClient.getTargetState(ControllerProtoFactories.emptyRequest(policy)))
.thenReturn(Optional.empty());

// when:
plugin.reconcile(policy, ctx, responsiveCtx);
Expand Down Expand Up @@ -458,8 +457,8 @@ public void shouldNotPatchStatefulSetIfReplicasNotChanged() {
public void shouldNotPatchStatefulSetIfNoTargetStateSpecified() {
// given:
setupForStatefulSet();
when(ctx.getSecondaryResource(TargetStateWithTimestamp.class))
.thenReturn(Optional.of(new TargetStateWithTimestamp()));
when(controllerClient.getTargetState(ControllerProtoFactories.emptyRequest(policy)))
.thenReturn(Optional.empty());

// when:
plugin.reconcile(policy, ctx, responsiveCtx);
Expand Down
Loading