Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import dev.responsive.k8s.crd.PolicyCooldownSpec;
import io.fabric8.generator.annotation.ValidationRule;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

@ValidationRule(value = "self.maxReplicas >= self.minReplicas")
public class KafkaStreamsPolicySpec {

private final int maxReplicas;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package dev.responsive.k8s.operator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import dev.responsive.controller.client.grpc.ControllerGrpcClient;
import dev.responsive.k8s.operator.reconciler.ResponsivePolicyReconciler;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.utils.KubernetesSerialization;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import java.io.FileReader;
import java.io.Reader;
import java.util.Properties;
Expand All @@ -39,6 +42,9 @@ public class OperatorMain {
private static final String API_KEY_CONFIG = "responsive.platform.api.key";
private static final String SECRET_CONFIG = "responsive.platform.api.secret";

// copied from io.javaoperatorsdk.operator.api.config.ConfigurationService
private static final int DEFAULT_MAX_CONCURRENT_REQUEST = 512;

public static void main(String[] args) {
LOG.info("Starting main");

Expand Down Expand Up @@ -76,8 +82,20 @@ public static void main(String[] args) {
final String apiKey = config.getProperty(API_KEY_CONFIG, "");
final String secret = config.getProperty(SECRET_CONFIG, "");

final Operator operator = new Operator();
Serialization.jsonMapper().registerModule(new Jdk8Module());
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
// we override the k8s client here so we can supply our own ObjectMapper
final Operator operator = new Operator(o -> o
.withSSABasedCreateUpdateMatchForDependentResources(false)
.withKubernetesClient(
new KubernetesClientBuilder()
// copied from io.javaoperatorsdk.operator.api.config.ConfigurationService
.withConfig(new ConfigBuilder(Config.autoConfigure(null))
.withMaxConcurrentRequests(DEFAULT_MAX_CONCURRENT_REQUEST)
.build())
.withKubernetesSerialization(new KubernetesSerialization(mapper, true))
.build()
));
final ResponsivePolicyReconciler reconciler =
new ResponsivePolicyReconciler(environment, new ControllerGrpcClient(
target,
Expand All @@ -86,17 +104,7 @@ public static void main(String[] args) {
tlsOff
));

operator.register(reconciler, new ControllerConfiguration<>() {
@Override
public String getAssociatedReconcilerClassName() {
return ResponsivePolicyReconciler.class.getName();
}

@Override
public String getLabelSelector() {
return selector;
}
});
operator.register(reconciler, o -> o.withLabelSelector(selector));

operator.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
Expand All @@ -46,6 +47,7 @@
/**
* Core reconciliation handler for operator
*/
@ControllerConfiguration
public class ResponsivePolicyReconciler implements
Reconciler<ResponsivePolicy>, EventSourceInitializer<ResponsivePolicy> {
private static final Logger LOG = LoggerFactory.getLogger(ResponsivePolicyReconciler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import dev.responsive.controller.client.ControllerClient;
import dev.responsive.k8s.crd.ResponsivePolicy;
Expand All @@ -34,11 +35,11 @@
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.utils.KubernetesSerialization;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -67,12 +68,13 @@ public class OperatorE2EIntegrationTest {

@BeforeAll
public static void setUp() {
Serialization.jsonMapper().registerModule(new Jdk8Module());

K3S.start();
Config config = Config.fromKubeconfig(K3S.getKubeConfigYaml());
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
kubernetesClient = new KubernetesClientBuilder()
.withConfig(config)
.withKubernetesSerialization(new KubernetesSerialization(mapper, true))
.build();

Namespace namespace = new NamespaceBuilder()
Expand Down Expand Up @@ -116,20 +118,10 @@ public void testReconcileLabelSelector() throws InterruptedException {
.thenThrow(new StatusRuntimeException(Status.NOT_FOUND));

final var reconciler = new ResponsivePolicyReconciler("", controllerClient);
final var operator = new Operator(kubernetesClient);
final var operator = new Operator(o -> o.withKubernetesClient(kubernetesClient));
operator.register(
reconciler,
new ControllerConfiguration<>() {
@Override
public String getAssociatedReconcilerClassName() {
return ResponsivePolicyReconciler.class.getName();
}

@Override
public String getLabelSelector() {
return "environment=test";
}
}
o -> o.withLabelSelector("environment=test")
);

operator.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
Expand Down Expand Up @@ -124,6 +125,8 @@ class KafkaStreamsPolicyPluginTest {
private ArgumentCaptor<ControllerOuterClass.CurrentStateRequest> currentStateRequestCaptor;
@Mock
private ControllerClient controllerClient;
@Mock
private ConfigurationService configService;
private final PodResource pod1 = mockPodResource("p1");
private final PodResource pod2 = mockPodResource("p2");

Expand Down Expand Up @@ -194,6 +197,9 @@ public void setup() {
lenient().when(ctx.getSecondaryResource(
ActionsWithTimestamp.class))
.thenReturn(Optional.of(new ActionsWithTimestamp()));
lenient().when(controllerConfig.getConfigurationService()).thenReturn(configService);
lenient().when(configService.parseResourceVersionsForEventFilteringAndCaching())
.thenReturn(false);
}


Expand Down
5 changes: 3 additions & 2 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ dependencyResolutionManagement {
version("jackson", "2.15.2")
version("kafka", "3.7.0")
version("scylla", "4.15.0.0")
version("javaoperatorsdk", "4.3.0")
version("javaoperatorsdk", "4.9.6")
version("grpc", "1.52.1")
version("protobuf-java", "3.22.3")
version("slf4j", "1.7.5")
version("log4j2", "2.20.0")
version("mongoDB", "4.10.2")
version("fabric8", "6.13.4")

library("jackson", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8").versionRef("jackson")

Expand All @@ -81,7 +82,7 @@ dependencyResolutionManagement {
bundle("grpc", listOf("grpc-netty", "grpc-protobuf", "grpc-stub", "javax-annotation-api"))

library("protobuf-java-util", "com.google.protobuf", "protobuf-java-util").versionRef("protobuf-java")
library("crd-generator-atp", "io.fabric8", "crd-generator-apt").version("6.5.1")
library("crd-generator-atp", "io.fabric8", "crd-generator-apt").versionRef("fabric8")

library("guava", "com.google.guava:guava:32.1.3-jre")

Expand Down