diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index ca90f11..22e3576 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -21,6 +21,7 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,6 +50,9 @@ public abstract class KafkaStreamsApp extends PlatformService { public static final String CLEANUP_LOCAL_STATE = "cleanup.local.state"; public static final String PRE_CREATE_TOPICS = "precreate.topics"; public static final String KAFKA_STREAMS_CONFIG_KEY = "kafka.streams.config"; + public static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; + public static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; + private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApp.class); protected KafkaStreams app; @@ -182,11 +186,15 @@ public Logger getLogger() { } public List getInputTopics(Map properties) { - return new ArrayList<>(); + Config jobConfig = (Config) properties.get(getJobConfigKey()); + return jobConfig != null ? + Arrays.asList(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)) : new ArrayList<>(); } public List getOutputTopics(Map properties) { - return new ArrayList<>(); + Config jobConfig = (Config) properties.get(getJobConfigKey()); + return jobConfig != null ? + Arrays.asList(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)) : new ArrayList<>(); } private Map getJobStreamsConfig(Config jobConfig) {