diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 24732cc862c..5fa44368634 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -35,6 +35,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; import com.typesafe.config.ConfigException; @@ -110,7 +111,7 @@ public FlowCatalog(Config config, GobblinInstanceEnvironment env) { public FlowCatalog(Config config, Optional log, Optional parentMetricContext, boolean instrumentationEnabled) { this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); - this.listeners = new SpecCatalogListenersList(log); + this.listeners = new SpecCatalogListenersList(Optional.of(MoreExecutors.newDirectExecutorService()), log); if (instrumentationEnabled) { MetricContext realParentCtx = parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass())); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java index e0c11aba48a..c473f0638b5 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java @@ -44,7 +44,11 @@ public SpecCatalogListenersList() { } public SpecCatalogListenersList(Optional log) { - _disp = new CallbacksDispatcher<>(Optional.absent(), log); + this(Optional.absent(), log); + } + + public SpecCatalogListenersList(Optional executorService, Optional log) { + _disp = new CallbacksDispatcher<>(executorService, log); } public Logger getLog() {