diff --git a/messaging-client-core/src/main/java/com/inmobi/messaging/consumer/AbstractMessageConsumer.java b/messaging-client-core/src/main/java/com/inmobi/messaging/consumer/AbstractMessageConsumer.java index 6c962cc2..7669e591 100644 --- a/messaging-client-core/src/main/java/com/inmobi/messaging/consumer/AbstractMessageConsumer.java +++ b/messaging-client-core/src/main/java/com/inmobi/messaging/consumer/AbstractMessageConsumer.java @@ -149,14 +149,12 @@ public void init(String topicName, String consumerName, Date startTimestamp, && startTime.after(new Date(System.currentTimeMillis()))) { throw new IllegalArgumentException("Future start time is not accepted"); } + init(config); + metrics = (BaseMessageConsumerStatsExposer) getMetricsImpl(); String emitterConfig = config .getString(MessageConsumerFactory.EMITTER_CONF_FILE_KEY); if (emitterConfig != null) { statsEmitter.init(emitterConfig); - } - init(config); - metrics = (BaseMessageConsumerStatsExposer) getMetricsImpl(); - if (emitterConfig != null) { statsEmitter.add(metrics); } } diff --git a/messaging-client-databus/src/main/java/com/inmobi/messaging/consumer/databus/DatabusConsumer.java b/messaging-client-databus/src/main/java/com/inmobi/messaging/consumer/databus/DatabusConsumer.java index 49281d8d..3f41bacc 100644 --- a/messaging-client-databus/src/main/java/com/inmobi/messaging/consumer/databus/DatabusConsumer.java +++ b/messaging-client-databus/src/main/java/com/inmobi/messaging/consumer/databus/DatabusConsumer.java @@ -201,7 +201,6 @@ protected void createPartitionReaders() throws IOException { PartitionReaderStatsExposer collectorMetrics = new CollectorReaderStatsExposer(topicName, consumerName, id.toString(), consumerNumber, fsuri); - addStatsExposer(collectorMetrics); Path streamsLocalDir = null; if (readFromLocalStream) { streamsLocalDir = DatabusUtil.getStreamDir(StreamType.LOCAL, @@ -214,6 +213,7 @@ protected void createPartitionReaders() throws IOException { new Path(streamDir, collector), streamsLocalDir, buffer, topicName, partitionTimestamp, waitTimeForFlush, waitTimeForFileCreate, collectorMetrics, stopTime)); + addStatsExposer(collectorMetrics); messageConsumedMap.put(id, false); numList = 0; } @@ -228,12 +228,12 @@ protected void createPartitionReaders() throws IOException { PartitionReaderStatsExposer clusterMetrics = new PartitionReaderStatsExposer(topicName, consumerName, id.toString(), consumerNumber, fsuri); - addStatsExposer(clusterMetrics); readers.put(id, new PartitionReader(id, partitionCheckpointList, fs, buffer, streamDir, conf, DatabusInputFormat.class.getCanonicalName(), partitionTimestamp, waitTimeForFileCreate, true, clusterMetrics, partitionMinList, stopTime)); + addStatsExposer(clusterMetrics); messageConsumedMap.put(id, false); } } diff --git a/messaging-client-databus/src/main/java/com/inmobi/messaging/consumer/hadoop/HadoopConsumer.java b/messaging-client-databus/src/main/java/com/inmobi/messaging/consumer/hadoop/HadoopConsumer.java index d9a44d31..4e2c2d85 100644 --- a/messaging-client-databus/src/main/java/com/inmobi/messaging/consumer/hadoop/HadoopConsumer.java +++ b/messaging-client-databus/src/main/java/com/inmobi/messaging/consumer/hadoop/HadoopConsumer.java @@ -91,7 +91,6 @@ protected void createPartitionReaders() throws IOException { PartitionReaderStatsExposer clusterMetrics = new PartitionReaderStatsExposer(topicName, consumerName, id.toString(), consumerNumber, fsUri); - addStatsExposer(clusterMetrics); PartitionReader reader = new PartitionReader(id, partitionCheckpointList, fileSystems[i], buffer, rootDirs[i], conf, inputFormatClassName, partitionTimestamp, @@ -99,6 +98,7 @@ protected void createPartitionReaders() throws IOException { stopTime); LOG.debug("Created partition " + id); readers.put(id, reader); + addStatsExposer(clusterMetrics); messageConsumedMap.put(id, false); } }