diff --git a/README.md b/README.md index 189b13b..82df35c 100644 --- a/README.md +++ b/README.md @@ -632,6 +632,10 @@ export KAFKA_TOPIC=json-events export KAFKA_PROCESSORS=parseJson,validateSchema,addTimestamp export METRICS_INTERVAL_MS=30000 export SHUTDOWN_TIMEOUT_MS=5000 +export HEALTH_HTTP_ENABLED=true +export HEALTH_HTTP_HOST=0.0.0.0 +export HEALTH_HTTP_PORT=8080 +export HEALTH_HTTP_PATH=/health ``` --- @@ -656,6 +660,10 @@ Configure via environment variables: export KAFKA_PROCESSORS=parseJson,validateSchema,addTimestamp export METRICS_INTERVAL_MS=30000 export SHUTDOWN_TIMEOUT_MS=5000 + export HEALTH_HTTP_ENABLED=true + export HEALTH_HTTP_HOST=0.0.0.0 + export HEALTH_HTTP_PORT=8080 + export HEALTH_HTTP_PATH=/health ``` --- diff --git a/app/avro/src/main/java/org/kpipe/App.java b/app/avro/src/main/java/org/kpipe/App.java index 72f0619..45944af 100644 --- a/app/avro/src/main/java/org/kpipe/App.java +++ b/app/avro/src/main/java/org/kpipe/App.java @@ -17,6 +17,7 @@ import org.kpipe.consumer.KPipeConsumer; import org.kpipe.consumer.OffsetManager; import org.kpipe.consumer.enums.ConsumerCommand; +import org.kpipe.health.HttpHealthServer; import org.kpipe.metrics.ConsumerMetricsReporter; import org.kpipe.metrics.MetricsReporter; import org.kpipe.metrics.ProcessorMetricsReporter; @@ -38,6 +39,7 @@ public class App implements AutoCloseable { private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis()); private final KPipeConsumer functionalConsumer; private final ConsumerRunner> runner; + private final HttpHealthServer healthServer; private final AtomicReference> currentMetrics = new AtomicReference<>(); private final MessageProcessorRegistry processorRegistry; private final MessageSinkRegistry sinkRegistry; @@ -81,6 +83,7 @@ public App(final AppConfig config, final String schemaRegistryUrl) { final var processorMetricsReporter = new ProcessorMetricsReporter(processorRegistry, null); final var sinkMetricsReporter = new SinkMetricsReporter(sinkRegistry, null); runner = createConsumerRunner(config, consumerMetricsReporter, processorMetricsReporter, sinkMetricsReporter); + healthServer = HttpHealthServer.fromEnv(runner::isHealthy, config.appName()).orElse(null); } private static String resolveSchemaRegistryUrl() { @@ -197,6 +200,7 @@ public MessageSinkRegistry getSinkRegistry() { } void start() { + if (healthServer != null) healthServer.start(); runner.start(); } @@ -210,6 +214,7 @@ private Map getMetrics() { @Override public void close() { + if (healthServer != null) healthServer.close(); runner.close(); } } diff --git a/app/json/src/main/java/org/kpipe/App.java b/app/json/src/main/java/org/kpipe/App.java index 23b3654..15c4874 100644 --- a/app/json/src/main/java/org/kpipe/App.java +++ b/app/json/src/main/java/org/kpipe/App.java @@ -17,6 +17,7 @@ import org.kpipe.consumer.KPipeConsumer; import org.kpipe.consumer.OffsetManager; import org.kpipe.consumer.enums.ConsumerCommand; +import org.kpipe.health.HttpHealthServer; import org.kpipe.metrics.ConsumerMetricsReporter; import org.kpipe.metrics.MetricsReporter; import org.kpipe.metrics.ProcessorMetricsReporter; @@ -34,6 +35,7 @@ public class App implements AutoCloseable { private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis()); private final KPipeConsumer kpipeConsumer; private final ConsumerRunner> runner; + private final HttpHealthServer healthServer; private final AtomicReference> currentMetrics = new AtomicReference<>(); private final MessageProcessorRegistry processorRegistry; private final MessageSinkRegistry sinkRegistry; @@ -70,6 +72,7 @@ public App(final AppConfig config) { final var processorMetricsReporter = new ProcessorMetricsReporter(processorRegistry, null); runner = createConsumerRunner(config, consumerMetricsReporter, processorMetricsReporter); + healthServer = HttpHealthServer.fromEnv(runner::isHealthy, config.appName()).orElse(null); } /// Creates the consumer runner with appropriate lifecycle hooks. @@ -168,6 +171,7 @@ public MessageSinkRegistry getSinkRegistry() { } void start() { + if (healthServer != null) healthServer.start(); runner.start(); } @@ -181,6 +185,7 @@ private Map getMetrics() { @Override public void close() { + if (healthServer != null) healthServer.close(); runner.close(); } } diff --git a/app/protobuf/src/main/java/org/kpipe/App.java b/app/protobuf/src/main/java/org/kpipe/App.java index 87803cf..d7e6d2d 100644 --- a/app/protobuf/src/main/java/org/kpipe/App.java +++ b/app/protobuf/src/main/java/org/kpipe/App.java @@ -17,6 +17,7 @@ import org.kpipe.consumer.KPipeConsumer; import org.kpipe.consumer.OffsetManager; import org.kpipe.consumer.enums.ConsumerCommand; +import org.kpipe.health.HttpHealthServer; import org.kpipe.metrics.ConsumerMetricsReporter; import org.kpipe.metrics.MetricsReporter; import org.kpipe.metrics.ProcessorMetricsReporter; @@ -34,6 +35,7 @@ public class App implements AutoCloseable { private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis()); private final KPipeConsumer kpipeConsumer; private final ConsumerRunner> runner; + private final HttpHealthServer healthServer; private final AtomicReference> currentMetrics = new AtomicReference<>(); private final MessageProcessorRegistry processorRegistry; private final MessageSinkRegistry sinkRegistry; @@ -68,6 +70,7 @@ public App(final AppConfig config) { final var processorMetricsReporter = new ProcessorMetricsReporter(processorRegistry, null); runner = createConsumerRunner(config, consumerMetricsReporter, processorMetricsReporter); + healthServer = HttpHealthServer.fromEnv(runner::isHealthy, config.appName()).orElse(null); } /// Creates the consumer runner with appropriate lifecycle hooks. @@ -163,6 +166,7 @@ public MessageSinkRegistry getSinkRegistry() { } void start() { + if (healthServer != null) healthServer.start(); runner.start(); } @@ -176,6 +180,7 @@ private Map getMetrics() { @Override public void close() { + if (healthServer != null) healthServer.close(); runner.close(); } } diff --git a/docker-compose.yaml b/docker-compose.yaml index 154e2b6..b8acee8 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -88,11 +88,14 @@ services: dockerfile: Dockerfile args: MESSAGE_FORMAT: ${MESSAGE_FORMAT:-avro} + ports: + - "8080:8080" environment: - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 - KAFKA_CONSUMER_GROUP=kpipe-group - KAFKA_TOPIC=${MESSAGE_FORMAT:-avro}-topic - APP_NAME=kafka-consumer-app + - HEALTH_HTTP_PORT=8080 depends_on: schema-registry: condition: service_healthy diff --git a/lib/src/main/java/org/kpipe/health/HttpHealthServer.java b/lib/src/main/java/org/kpipe/health/HttpHealthServer.java new file mode 100644 index 0000000..8a26240 --- /dev/null +++ b/lib/src/main/java/org/kpipe/health/HttpHealthServer.java @@ -0,0 +1,134 @@ +package org.kpipe.health; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import org.kpipe.config.AppConfig; + +/// Lightweight HTTP health check server using the JDK built-in HttpServer. +public final class HttpHealthServer implements AutoCloseable { + + public static final String ENV_ENABLED = "HEALTH_HTTP_ENABLED"; + public static final String ENV_PORT = "HEALTH_HTTP_PORT"; + public static final String ENV_HOST = "HEALTH_HTTP_HOST"; + public static final String ENV_PATH = "HEALTH_HTTP_PATH"; + + public static final int DEFAULT_PORT = 8080; + public static final String DEFAULT_HOST = "0.0.0.0"; + public static final String DEFAULT_PATH = "/health"; + + private static final Logger LOGGER = System.getLogger(HttpHealthServer.class.getName()); + + private final HttpServer server; + private final Supplier healthSupplier; + private final String path; + private final String appName; + private final AtomicBoolean started = new AtomicBoolean(false); + + private HttpHealthServer( + final String host, + final int port, + final String path, + final Supplier healthSupplier, + final String appName + ) throws IOException { + this.healthSupplier = Objects.requireNonNull(healthSupplier, "Health supplier cannot be null"); + this.path = normalizePath(path); + this.appName = appName != null ? appName : "kpipe-app"; + this.server = HttpServer.create(new InetSocketAddress(host, port), 0); + this.server.createContext(this.path, this::handleHealth); + } + + public static Optional fromEnv( + final Supplier healthSupplier, + final String appName + ) { + final var enabled = + !"false".equalsIgnoreCase(AppConfig.getEnvOrDefault(ENV_ENABLED, "true")); + if (!enabled) { + return Optional.empty(); + } + + final var host = AppConfig.getEnvOrDefault(ENV_HOST, DEFAULT_HOST); + final var path = AppConfig.getEnvOrDefault(ENV_PATH, DEFAULT_PATH); + final var port = parsePort(AppConfig.getEnvOrDefault(ENV_PORT, Integer.toString(DEFAULT_PORT))); + + try { + return Optional.of(new HttpHealthServer(host, port, path, healthSupplier, appName)); + } catch (final IOException e) { + LOGGER.log(Level.ERROR, "Failed to start health HTTP server", e); + return Optional.empty(); + } + } + + public void start() { + if (started.compareAndSet(false, true)) { + server.start(); + LOGGER.log(Level.INFO, "Health HTTP server started on %s%s".formatted(server.getAddress(), path)); + } + } + + @Override + public void close() { + if (started.compareAndSet(true, false)) { + server.stop(0); + LOGGER.log(Level.INFO, "Health HTTP server stopped"); + } + } + + private void handleHealth(final HttpExchange exchange) throws IOException { + if (!"GET".equalsIgnoreCase(exchange.getRequestMethod())) { + sendResponse(exchange, 405, "Method Not Allowed"); + return; + } + + final boolean healthy; + try { + healthy = Boolean.TRUE.equals(healthSupplier.get()); + } catch (final Exception e) { + LOGGER.log(Level.WARNING, "Health check failed for %s".formatted(appName), e); + sendResponse(exchange, 503, "UNHEALTHY"); + return; + } + + if (healthy) { + sendResponse(exchange, 200, "OK"); + } else { + sendResponse(exchange, 503, "UNHEALTHY"); + } + } + + private static void sendResponse(final HttpExchange exchange, final int status, final String body) + throws IOException { + final var payload = body.getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); + exchange.sendResponseHeaders(status, payload.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(payload); + } + } + + private static String normalizePath(final String path) { + if (path == null || path.isBlank()) return DEFAULT_PATH; + return path.startsWith("/") ? path : "/" + path; + } + + private static int parsePort(final String value) { + try { + final var port = Integer.parseInt(value); + if (port < 1 || port > 65535) return DEFAULT_PORT; + return port; + } catch (final NumberFormatException e) { + return DEFAULT_PORT; + } + } +} diff --git a/lib/src/test/java/org/kpipe/health/HttpHealthServerTest.java b/lib/src/test/java/org/kpipe/health/HttpHealthServerTest.java new file mode 100644 index 0000000..00f586b --- /dev/null +++ b/lib/src/test/java/org/kpipe/health/HttpHealthServerTest.java @@ -0,0 +1,132 @@ +package org.kpipe.health; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Optional; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; +import org.kpipe.config.AppConfig; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import com.sun.net.httpserver.HttpServer; + +class HttpHealthServerTest { + + @Test + void shouldNormalizePathAndDefaultAppName() throws Exception { + final var server = newServer("health", () -> true, null); + + assertEquals("/health", getField(server, "path")); + assertEquals("kpipe-app", getField(server, "appName")); + + server.close(); + } + + @Test + void shouldReturnOkWhenHealthy() throws Exception { + try (HttpHealthServer server = newServer("/health", () -> true, "test-app")) { + server.start(); + + final var response = sendRequest(server, "GET"); + assertEquals(200, response.statusCode()); + assertEquals("OK", response.body()); + } + } + + @Test + void shouldReturnUnhealthyWhenSupplierFalse() throws Exception { + try (HttpHealthServer server = newServer("/health", () -> false, "test-app")) { + server.start(); + + final var response = sendRequest(server, "GET"); + assertEquals(503, response.statusCode()); + assertEquals("UNHEALTHY", response.body()); + } + } + + @Test + void shouldReturnUnhealthyWhenSupplierThrows() throws Exception { + final Supplier supplier = () -> { + throw new RuntimeException("boom"); + }; + + try (HttpHealthServer server = newServer("/health", supplier, "test-app")) { + server.start(); + + final var response = sendRequest(server, "GET"); + assertEquals(503, response.statusCode()); + assertEquals("UNHEALTHY", response.body()); + } + } + + @Test + void shouldReturnMethodNotAllowedForNonGet() throws Exception { + try (HttpHealthServer server = newServer("/health", () -> true, "test-app")) { + server.start(); + + final var response = sendRequest(server, "POST"); + assertEquals(405, response.statusCode()); + assertEquals("Method Not Allowed", response.body()); + } + } + + @Test + void shouldReturnEmptyWhenDisabledFromEnv() { + try (MockedStatic mocked = Mockito.mockStatic(AppConfig.class)) { + mocked.when(() -> AppConfig.getEnvOrDefault(HttpHealthServer.ENV_ENABLED, "true")) + .thenReturn("false"); + + final Optional server = HttpHealthServer.fromEnv(() -> true, "test-app"); + assertTrue(server.isEmpty()); + } + } + + private static HttpHealthServer newServer( + final String path, + final Supplier supplier, + final String appName + ) throws Exception { + final Constructor ctor = + HttpHealthServer.class.getDeclaredConstructor( + String.class, + int.class, + String.class, + Supplier.class, + String.class + ); + ctor.setAccessible(true); + return ctor.newInstance("127.0.0.1", 0, path, supplier, appName); + } + + private static HttpResponse sendRequest( + final HttpHealthServer server, + final String method + ) throws Exception { + final HttpServer httpServer = getField(server, "server"); + final String path = getField(server, "path"); + final InetSocketAddress address = httpServer.getAddress(); + final URI uri = URI.create("http://127.0.0.1:" + address.getPort() + path); + + final HttpRequest request = HttpRequest.newBuilder(uri) + .method(method, HttpRequest.BodyPublishers.noBody()) + .build(); + + return HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString()); + } + + @SuppressWarnings("unchecked") + private static T getField(final HttpHealthServer server, final String name) throws Exception { + final Field field = HttpHealthServer.class.getDeclaredField(name); + field.setAccessible(true); + return (T) field.get(server); + } +}