diff --git a/operator/src/main/java/dev/responsive/k8s/crd/kafkastreams/KafkaStreamsPolicySpec.java b/operator/src/main/java/dev/responsive/k8s/crd/kafkastreams/KafkaStreamsPolicySpec.java index 8114a1610..d7bc1488b 100644 --- a/operator/src/main/java/dev/responsive/k8s/crd/kafkastreams/KafkaStreamsPolicySpec.java +++ b/operator/src/main/java/dev/responsive/k8s/crd/kafkastreams/KafkaStreamsPolicySpec.java @@ -3,10 +3,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import dev.responsive.k8s.crd.PolicyCooldownSpec; +import io.fabric8.generator.annotation.ValidationRule; import java.util.List; import java.util.Objects; import java.util.Optional; +@ValidationRule(value = "self.maxReplicas >= self.minReplicas") public class KafkaStreamsPolicySpec { private final int maxReplicas; diff --git a/operator/src/main/java/dev/responsive/k8s/operator/OperatorMain.java b/operator/src/main/java/dev/responsive/k8s/operator/OperatorMain.java index 80966ca1b..9dcdddb1f 100644 --- a/operator/src/main/java/dev/responsive/k8s/operator/OperatorMain.java +++ b/operator/src/main/java/dev/responsive/k8s/operator/OperatorMain.java @@ -16,12 +16,15 @@ package dev.responsive.k8s.operator; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import dev.responsive.controller.client.grpc.ControllerGrpcClient; import dev.responsive.k8s.operator.reconciler.ResponsivePolicyReconciler; -import io.fabric8.kubernetes.client.utils.Serialization; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.utils.KubernetesSerialization; import io.javaoperatorsdk.operator.Operator; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import java.io.FileReader; import java.io.Reader; import java.util.Properties; @@ -39,6 +42,9 @@ public class OperatorMain { private static final String API_KEY_CONFIG = "responsive.platform.api.key"; private static final String SECRET_CONFIG = "responsive.platform.api.secret"; + // copied from io.javaoperatorsdk.operator.api.config.ConfigurationService + private static final int DEFAULT_MAX_CONCURRENT_REQUEST = 512; + public static void main(String[] args) { LOG.info("Starting main"); @@ -76,8 +82,20 @@ public static void main(String[] args) { final String apiKey = config.getProperty(API_KEY_CONFIG, ""); final String secret = config.getProperty(SECRET_CONFIG, ""); - final Operator operator = new Operator(); - Serialization.jsonMapper().registerModule(new Jdk8Module()); + final ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new Jdk8Module()); + // we override the k8s client here so we can supply our own ObjectMapper + final Operator operator = new Operator(o -> o + .withSSABasedCreateUpdateMatchForDependentResources(false) + .withKubernetesClient( + new KubernetesClientBuilder() + // copied from io.javaoperatorsdk.operator.api.config.ConfigurationService + .withConfig(new ConfigBuilder(Config.autoConfigure(null)) + .withMaxConcurrentRequests(DEFAULT_MAX_CONCURRENT_REQUEST) + .build()) + .withKubernetesSerialization(new KubernetesSerialization(mapper, true)) + .build() + )); final ResponsivePolicyReconciler reconciler = new ResponsivePolicyReconciler(environment, new ControllerGrpcClient( target, @@ -86,17 +104,7 @@ public static void main(String[] args) { tlsOff )); - operator.register(reconciler, new ControllerConfiguration<>() { - @Override - public String getAssociatedReconcilerClassName() { - return ResponsivePolicyReconciler.class.getName(); - } - - @Override - public String getLabelSelector() { - return selector; - } - }); + operator.register(reconciler, o -> o.withLabelSelector(selector)); operator.start(); } diff --git a/operator/src/main/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconciler.java b/operator/src/main/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconciler.java index 263d38392..7ed7158b4 100644 --- a/operator/src/main/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconciler.java +++ b/operator/src/main/java/dev/responsive/k8s/operator/reconciler/ResponsivePolicyReconciler.java @@ -27,6 +27,7 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; @@ -46,6 +47,7 @@ /** * Core reconciliation handler for operator */ +@ControllerConfiguration public class ResponsivePolicyReconciler implements Reconciler, EventSourceInitializer { private static final Logger LOG = LoggerFactory.getLogger(ResponsivePolicyReconciler.class); diff --git a/operator/src/test/java/dev/responsive/k8s/operator/OperatorE2EIntegrationTest.java b/operator/src/test/java/dev/responsive/k8s/operator/OperatorE2EIntegrationTest.java index 582f9364d..5ed40fee3 100644 --- a/operator/src/test/java/dev/responsive/k8s/operator/OperatorE2EIntegrationTest.java +++ b/operator/src/test/java/dev/responsive/k8s/operator/OperatorE2EIntegrationTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import dev.responsive.controller.client.ControllerClient; import dev.responsive.k8s.crd.ResponsivePolicy; @@ -34,11 +35,11 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.utils.KubernetesSerialization; import io.fabric8.kubernetes.client.utils.Serialization; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.javaoperatorsdk.operator.Operator; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -67,12 +68,13 @@ public class OperatorE2EIntegrationTest { @BeforeAll public static void setUp() { - Serialization.jsonMapper().registerModule(new Jdk8Module()); - K3S.start(); Config config = Config.fromKubeconfig(K3S.getKubeConfigYaml()); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new Jdk8Module()); kubernetesClient = new KubernetesClientBuilder() .withConfig(config) + .withKubernetesSerialization(new KubernetesSerialization(mapper, true)) .build(); Namespace namespace = new NamespaceBuilder() @@ -116,20 +118,10 @@ public void testReconcileLabelSelector() throws InterruptedException { .thenThrow(new StatusRuntimeException(Status.NOT_FOUND)); final var reconciler = new ResponsivePolicyReconciler("", controllerClient); - final var operator = new Operator(kubernetesClient); + final var operator = new Operator(o -> o.withKubernetesClient(kubernetesClient)); operator.register( reconciler, - new ControllerConfiguration<>() { - @Override - public String getAssociatedReconcilerClassName() { - return ResponsivePolicyReconciler.class.getName(); - } - - @Override - public String getLabelSelector() { - return "environment=test"; - } - } + o -> o.withLabelSelector("environment=test") ); operator.start(); diff --git a/operator/src/test/java/dev/responsive/k8s/operator/reconciler/KafkaStreamsPolicyPluginTest.java b/operator/src/test/java/dev/responsive/k8s/operator/reconciler/KafkaStreamsPolicyPluginTest.java index e47c39260..71393120c 100644 --- a/operator/src/test/java/dev/responsive/k8s/operator/reconciler/KafkaStreamsPolicyPluginTest.java +++ b/operator/src/test/java/dev/responsive/k8s/operator/reconciler/KafkaStreamsPolicyPluginTest.java @@ -50,6 +50,7 @@ import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; @@ -124,6 +125,8 @@ class KafkaStreamsPolicyPluginTest { private ArgumentCaptor currentStateRequestCaptor; @Mock private ControllerClient controllerClient; + @Mock + private ConfigurationService configService; private final PodResource pod1 = mockPodResource("p1"); private final PodResource pod2 = mockPodResource("p2"); @@ -194,6 +197,9 @@ public void setup() { lenient().when(ctx.getSecondaryResource( ActionsWithTimestamp.class)) .thenReturn(Optional.of(new ActionsWithTimestamp())); + lenient().when(controllerConfig.getConfigurationService()).thenReturn(configService); + lenient().when(configService.parseResourceVersionsForEventFilteringAndCaching()) + .thenReturn(false); } diff --git a/settings.gradle.kts b/settings.gradle.kts index 6163067de..76448ad00 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -49,12 +49,13 @@ dependencyResolutionManagement { version("jackson", "2.15.2") version("kafka", "3.7.0") version("scylla", "4.15.0.0") - version("javaoperatorsdk", "4.3.0") + version("javaoperatorsdk", "4.9.6") version("grpc", "1.52.1") version("protobuf-java", "3.22.3") version("slf4j", "1.7.5") version("log4j2", "2.20.0") version("mongoDB", "4.10.2") + version("fabric8", "6.13.4") library("jackson", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8").versionRef("jackson") @@ -81,7 +82,7 @@ dependencyResolutionManagement { bundle("grpc", listOf("grpc-netty", "grpc-protobuf", "grpc-stub", "javax-annotation-api")) library("protobuf-java-util", "com.google.protobuf", "protobuf-java-util").versionRef("protobuf-java") - library("crd-generator-atp", "io.fabric8", "crd-generator-apt").version("6.5.1") + library("crd-generator-atp", "io.fabric8", "crd-generator-apt").versionRef("fabric8") library("guava", "com.google.guava:guava:32.1.3-jre")