Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,10 @@ export KAFKA_TOPIC=json-events
export KAFKA_PROCESSORS=parseJson,validateSchema,addTimestamp
export METRICS_INTERVAL_MS=30000
export SHUTDOWN_TIMEOUT_MS=5000
export HEALTH_HTTP_ENABLED=true
export HEALTH_HTTP_HOST=0.0.0.0
export HEALTH_HTTP_PORT=8080
export HEALTH_HTTP_PATH=/health
```

---
Expand All @@ -656,6 +660,10 @@ Configure via environment variables:
export KAFKA_PROCESSORS=parseJson,validateSchema,addTimestamp
export METRICS_INTERVAL_MS=30000
export SHUTDOWN_TIMEOUT_MS=5000
export HEALTH_HTTP_ENABLED=true
export HEALTH_HTTP_HOST=0.0.0.0
export HEALTH_HTTP_PORT=8080
export HEALTH_HTTP_PATH=/health
```

---
Expand Down
5 changes: 5 additions & 0 deletions app/avro/src/main/java/org/kpipe/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.kpipe.consumer.KPipeConsumer;
import org.kpipe.consumer.OffsetManager;
import org.kpipe.consumer.enums.ConsumerCommand;
import org.kpipe.health.HttpHealthServer;
import org.kpipe.metrics.ConsumerMetricsReporter;
import org.kpipe.metrics.MetricsReporter;
import org.kpipe.metrics.ProcessorMetricsReporter;
Expand All @@ -38,6 +39,7 @@ public class App implements AutoCloseable {
private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
private final KPipeConsumer<byte[], byte[]> functionalConsumer;
private final ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner;
private final HttpHealthServer healthServer;
private final AtomicReference<Map<String, Long>> currentMetrics = new AtomicReference<>();
private final MessageProcessorRegistry processorRegistry;
private final MessageSinkRegistry sinkRegistry;
Expand Down Expand Up @@ -81,6 +83,7 @@ public App(final AppConfig config, final String schemaRegistryUrl) {
final var processorMetricsReporter = new ProcessorMetricsReporter(processorRegistry, null);
final var sinkMetricsReporter = new SinkMetricsReporter(sinkRegistry, null);
runner = createConsumerRunner(config, consumerMetricsReporter, processorMetricsReporter, sinkMetricsReporter);
healthServer = HttpHealthServer.fromEnv(runner::isHealthy, config.appName()).orElse(null);
}

private static String resolveSchemaRegistryUrl() {
Expand Down Expand Up @@ -197,6 +200,7 @@ public MessageSinkRegistry getSinkRegistry() {
}

void start() {
if (healthServer != null) healthServer.start();
runner.start();
}

Expand All @@ -210,6 +214,7 @@ private Map<String, Long> getMetrics() {

@Override
public void close() {
if (healthServer != null) healthServer.close();
runner.close();
}
}
5 changes: 5 additions & 0 deletions app/json/src/main/java/org/kpipe/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.kpipe.consumer.KPipeConsumer;
import org.kpipe.consumer.OffsetManager;
import org.kpipe.consumer.enums.ConsumerCommand;
import org.kpipe.health.HttpHealthServer;
import org.kpipe.metrics.ConsumerMetricsReporter;
import org.kpipe.metrics.MetricsReporter;
import org.kpipe.metrics.ProcessorMetricsReporter;
Expand All @@ -34,6 +35,7 @@ public class App implements AutoCloseable {
private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
private final KPipeConsumer<byte[], byte[]> kpipeConsumer;
private final ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner;
private final HttpHealthServer healthServer;
private final AtomicReference<Map<String, Long>> currentMetrics = new AtomicReference<>();
private final MessageProcessorRegistry processorRegistry;
private final MessageSinkRegistry sinkRegistry;
Expand Down Expand Up @@ -70,6 +72,7 @@ public App(final AppConfig config) {
final var processorMetricsReporter = new ProcessorMetricsReporter(processorRegistry, null);

runner = createConsumerRunner(config, consumerMetricsReporter, processorMetricsReporter);
healthServer = HttpHealthServer.fromEnv(runner::isHealthy, config.appName()).orElse(null);
}

/// Creates the consumer runner with appropriate lifecycle hooks.
Expand Down Expand Up @@ -168,6 +171,7 @@ public MessageSinkRegistry getSinkRegistry() {
}

void start() {
if (healthServer != null) healthServer.start();
runner.start();
}

Expand All @@ -181,6 +185,7 @@ private Map<String, Long> getMetrics() {

@Override
public void close() {
if (healthServer != null) healthServer.close();
runner.close();
}
}
5 changes: 5 additions & 0 deletions app/protobuf/src/main/java/org/kpipe/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.kpipe.consumer.KPipeConsumer;
import org.kpipe.consumer.OffsetManager;
import org.kpipe.consumer.enums.ConsumerCommand;
import org.kpipe.health.HttpHealthServer;
import org.kpipe.metrics.ConsumerMetricsReporter;
import org.kpipe.metrics.MetricsReporter;
import org.kpipe.metrics.ProcessorMetricsReporter;
Expand All @@ -34,6 +35,7 @@ public class App implements AutoCloseable {
private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
private final KPipeConsumer<byte[], byte[]> kpipeConsumer;
private final ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner;
private final HttpHealthServer healthServer;
private final AtomicReference<Map<String, Long>> currentMetrics = new AtomicReference<>();
private final MessageProcessorRegistry processorRegistry;
private final MessageSinkRegistry sinkRegistry;
Expand Down Expand Up @@ -68,6 +70,7 @@ public App(final AppConfig config) {

final var processorMetricsReporter = new ProcessorMetricsReporter(processorRegistry, null);
runner = createConsumerRunner(config, consumerMetricsReporter, processorMetricsReporter);
healthServer = HttpHealthServer.fromEnv(runner::isHealthy, config.appName()).orElse(null);
}

/// Creates the consumer runner with appropriate lifecycle hooks.
Expand Down Expand Up @@ -163,6 +166,7 @@ public MessageSinkRegistry getSinkRegistry() {
}

void start() {
if (healthServer != null) healthServer.start();
runner.start();
}

Expand All @@ -176,6 +180,7 @@ private Map<String, Long> getMetrics() {

@Override
public void close() {
if (healthServer != null) healthServer.close();
runner.close();
}
}
3 changes: 3 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,14 @@ services:
dockerfile: Dockerfile
args:
MESSAGE_FORMAT: ${MESSAGE_FORMAT:-avro}
ports:
- "8080:8080"
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- KAFKA_CONSUMER_GROUP=kpipe-group
- KAFKA_TOPIC=${MESSAGE_FORMAT:-avro}-topic
- APP_NAME=kafka-consumer-app
- HEALTH_HTTP_PORT=8080
depends_on:
schema-registry:
condition: service_healthy
Expand Down
134 changes: 134 additions & 0 deletions lib/src/main/java/org/kpipe/health/HttpHealthServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package org.kpipe.health;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.System.Logger;
import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.kpipe.config.AppConfig;

/// Lightweight HTTP health check server using the JDK built-in HttpServer.
public final class HttpHealthServer implements AutoCloseable {

public static final String ENV_ENABLED = "HEALTH_HTTP_ENABLED";
public static final String ENV_PORT = "HEALTH_HTTP_PORT";
public static final String ENV_HOST = "HEALTH_HTTP_HOST";
public static final String ENV_PATH = "HEALTH_HTTP_PATH";

public static final int DEFAULT_PORT = 8080;
public static final String DEFAULT_HOST = "0.0.0.0";
public static final String DEFAULT_PATH = "/health";

private static final Logger LOGGER = System.getLogger(HttpHealthServer.class.getName());

private final HttpServer server;
private final Supplier<Boolean> healthSupplier;
private final String path;
private final String appName;
private final AtomicBoolean started = new AtomicBoolean(false);

private HttpHealthServer(
final String host,
final int port,
final String path,
final Supplier<Boolean> healthSupplier,
final String appName
) throws IOException {
this.healthSupplier = Objects.requireNonNull(healthSupplier, "Health supplier cannot be null");
this.path = normalizePath(path);
this.appName = appName != null ? appName : "kpipe-app";
this.server = HttpServer.create(new InetSocketAddress(host, port), 0);
this.server.createContext(this.path, this::handleHealth);
}

public static Optional<HttpHealthServer> fromEnv(
final Supplier<Boolean> healthSupplier,
final String appName
) {
final var enabled =
!"false".equalsIgnoreCase(AppConfig.getEnvOrDefault(ENV_ENABLED, "true"));
if (!enabled) {
return Optional.empty();
}

final var host = AppConfig.getEnvOrDefault(ENV_HOST, DEFAULT_HOST);
final var path = AppConfig.getEnvOrDefault(ENV_PATH, DEFAULT_PATH);
final var port = parsePort(AppConfig.getEnvOrDefault(ENV_PORT, Integer.toString(DEFAULT_PORT)));

try {
return Optional.of(new HttpHealthServer(host, port, path, healthSupplier, appName));
} catch (final IOException e) {
LOGGER.log(Level.ERROR, "Failed to start health HTTP server", e);
return Optional.empty();
}
}

public void start() {
if (started.compareAndSet(false, true)) {
server.start();
LOGGER.log(Level.INFO, "Health HTTP server started on %s%s".formatted(server.getAddress(), path));
}
}

@Override
public void close() {
if (started.compareAndSet(true, false)) {
server.stop(0);
LOGGER.log(Level.INFO, "Health HTTP server stopped");
}
}

private void handleHealth(final HttpExchange exchange) throws IOException {
if (!"GET".equalsIgnoreCase(exchange.getRequestMethod())) {
sendResponse(exchange, 405, "Method Not Allowed");
return;
}

final boolean healthy;
try {
healthy = Boolean.TRUE.equals(healthSupplier.get());
} catch (final Exception e) {
LOGGER.log(Level.WARNING, "Health check failed for %s".formatted(appName), e);
sendResponse(exchange, 503, "UNHEALTHY");
return;
}

if (healthy) {
sendResponse(exchange, 200, "OK");
} else {
sendResponse(exchange, 503, "UNHEALTHY");
}
}

private static void sendResponse(final HttpExchange exchange, final int status, final String body)
throws IOException {
final var payload = body.getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(status, payload.length);
try (OutputStream out = exchange.getResponseBody()) {
out.write(payload);
}
}

private static String normalizePath(final String path) {
if (path == null || path.isBlank()) return DEFAULT_PATH;
return path.startsWith("/") ? path : "/" + path;
}

private static int parsePort(final String value) {
try {
final var port = Integer.parseInt(value);
if (port < 1 || port > 65535) return DEFAULT_PORT;
return port;
} catch (final NumberFormatException e) {
return DEFAULT_PORT;
}
}
}
Loading