From 0351e49b7ae313fd98242701858d9f47b31db73c Mon Sep 17 00:00:00 2001 From: Arjit Tiwari Date: Thu, 3 Jul 2025 11:46:09 +0530 Subject: [PATCH] Use main thread for FlowCatalog -> listeners.onAddSpec --- .../apache/gobblin/runtime/spec_catalog/FlowCatalog.java | 3 ++- .../runtime/spec_catalog/SpecCatalogListenersList.java | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 24732cc862c..5fa44368634 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -35,6 +35,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; import com.typesafe.config.ConfigException; @@ -110,7 +111,7 @@ public FlowCatalog(Config config, GobblinInstanceEnvironment env) { public FlowCatalog(Config config, Optional log, Optional parentMetricContext, boolean instrumentationEnabled) { this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); - this.listeners = new SpecCatalogListenersList(log); + this.listeners = new SpecCatalogListenersList(Optional.of(MoreExecutors.newDirectExecutorService()), log); if (instrumentationEnabled) { MetricContext realParentCtx = parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass())); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java index e0c11aba48a..c473f0638b5 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java @@ -44,7 +44,11 @@ public SpecCatalogListenersList() { } public SpecCatalogListenersList(Optional log) { - _disp = new CallbacksDispatcher<>(Optional.absent(), log); + this(Optional.absent(), log); + } + + public SpecCatalogListenersList(Optional executorService, Optional log) { + _disp = new CallbacksDispatcher<>(executorService, log); } public Logger getLog() {