Skip to content
Open
2 changes: 1 addition & 1 deletion conf/yarn/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,4 @@ admin.server.enabled=false

# job history store ( WARN [GobblinYarnAppLauncher] NOT starting the admin UI because the job execution info server is NOT enabled )
job.execinfo.server.enabled=false
job.history.store.enabled=false
job.history.store.enabled=false
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.temporal;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.temporal.ddm.worker.ExecutionWorker;
import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldJobLauncher;
import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldWorker;

Expand All @@ -29,14 +30,20 @@
public interface GobblinTemporalConfigurationKeys {

String PREFIX = "gobblin.temporal.";
String STAGE_SPECIFIC_PREFIX = PREFIX + "stage.";

String WORKER_CLASS = PREFIX + "worker.class";
String DEFAULT_WORKER_CLASS = HelloWorldWorker.class.getName();
String EXECUTION_WORKER_CLASS = ExecutionWorker.class.getName();
String GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
String DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";

String GOBBLIN_TEMPORAL_TASK_QUEUE = PREFIX + "task.queue.name";
String DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue";

// Execution task queue for work execution specialization
String EXECUTION_TASK_QUEUE = PREFIX + "execution.task.queue.name";
String DEFAULT_EXECUTION_TASK_QUEUE = "GobblinTemporalExecutionQueue";
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX = PREFIX + "job.launcher.";
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "class";
String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = HelloWorldJobLauncher.class.getName();
Expand Down Expand Up @@ -134,4 +141,5 @@ public interface GobblinTemporalConfigurationKeys {
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4;

Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a documentation comment for the WORK_EXECUTION_MEMORY_MB configuration key to explain its purpose and usage, similar to other configuration keys in this interface. The comment should clarify that this is the memory allocation in megabytes for execution worker containers when dynamic scaling is enabled.

Suggested change
/**
* Memory allocation (in megabytes) for execution worker containers when dynamic scaling is enabled.
* This value determines the amount of memory assigned to each worker container during execution.
*/

Copilot uses AI. Check for mistakes.
String WORK_EXECUTION_MEMORY_MB = STAGE_SPECIFIC_PREFIX + "workExecution.memory.mb";
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,12 @@
/** Basic boilerplate for a {@link TemporalWorker} to register its activity and workflow capabilities and listen on a particular queue */
public abstract class AbstractTemporalWorker implements TemporalWorker {
private final WorkflowClient workflowClient;
private final String queueName;
private final WorkerFactory workerFactory;
private final Config config;
protected final Config config;

public AbstractTemporalWorker(Config cfg, WorkflowClient client) {
config = cfg;
workflowClient = client;
queueName = ConfigUtils.getString(cfg,
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);

// Create a Worker factory that can be used to create Workers that poll specific Task Queues.
workerFactory = WorkerFactory.newInstance(workflowClient);
Expand All @@ -52,7 +48,7 @@ public AbstractTemporalWorker(Config cfg, WorkflowClient client) {

@Override
public void start() {
Worker worker = workerFactory.newWorker(queueName, createWorkerOptions());
Worker worker = workerFactory.newWorker(getTaskQueue(), createWorkerOptions());
// This Worker hosts both Workflow and Activity implementations.
// Workflows are stateful, so you need to supply a type to create instances.
worker.registerWorkflowImplementationTypes(getWorkflowImplClasses());
Expand All @@ -77,6 +73,12 @@ protected WorkerOptions createWorkerOptions() {
/** @return activity instances; NOTE: activities must be stateless and thread-safe, so a shared instance is used. */
protected abstract Object[] getActivityImplInstances();

protected String getTaskQueue() {
return ConfigUtils.getString(config,
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
}

private final void stashWorkerConfig(Config cfg) {
// stash to associate with...
WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void start()
for (int i = 0; i < this.numTemporalWorkers; i++) {
workers.add(initiateWorker());
}
initializeExecutionWorkers();
}catch (Exception e) {
logger.info(e + " for initiate workers");
throw new RuntimeException(e);
Expand All @@ -252,8 +253,8 @@ private TemporalWorker initiateWorker() throws Exception {
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(
managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);

String workerClassName = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS,
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
logger.info("Creating worker - class: '{}'", workerClassName);
Config workerConfig = clusterConfig;
TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor(
Expand All @@ -263,6 +264,38 @@ private TemporalWorker initiateWorker() throws Exception {
return worker;
}

private void initializeExecutionWorkers() throws Exception {
boolean dynamicScalingEnabled = ConfigUtils.getBoolean(clusterConfig,
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false);

if (!dynamicScalingEnabled) {
return;
}

String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS,
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
boolean isExecutionWorkerContainer = GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS.equals(workerClassName);

// only the initial container (WorkFulfillment worker) should start an additional ExecutionWorker worker
if (isExecutionWorkerContainer) {
return;
}

logger.info("Starting additional ExecutionWorker in initial container");

String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(
managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);

TemporalWorker executionWorker = GobblinConstructorUtils.invokeLongestConstructor(
(Class<TemporalWorker>)Class.forName(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS),
clusterConfig, client);
executionWorker.start();
workers.add(executionWorker);
logger.info("Worker started for class: {}", GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS);
}

private void initMetricReporter() {
if (this.containerMetrics.isPresent()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ public ActivityOptions buildActivityOptions(Properties props, boolean setHeartbe
.build();
}

public ActivityOptions buildActivityOptions(Properties props, boolean setHeartbeatTimeout, String taskQueue) {
ActivityOptions.Builder builder = ActivityOptions.newBuilder()
.setStartToCloseTimeout(getStartToCloseTimeout(props))
.setRetryOptions(buildRetryOptions(props))
.setTaskQueue(taskQueue);

if (setHeartbeatTimeout) {
builder.setHeartbeatTimeout(getHeartbeatTimeout(props));
}

return builder.build();
}

Comment on lines +75 to +87
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added buildActivityOptions method with taskQueue parameter is not used anywhere in the codebase. If this method is intended for future use, consider documenting this in a code comment. Otherwise, consider removing it to avoid maintaining unused code. Alternatively, if this should be used for routing activities to the execution queue when dynamic scaling is enabled, the implementation may be incomplete.

Suggested change
public ActivityOptions buildActivityOptions(Properties props, boolean setHeartbeatTimeout, String taskQueue) {
ActivityOptions.Builder builder = ActivityOptions.newBuilder()
.setStartToCloseTimeout(getStartToCloseTimeout(props))
.setRetryOptions(buildRetryOptions(props))
.setTaskQueue(taskQueue);
if (setHeartbeatTimeout) {
builder.setHeartbeatTimeout(getHeartbeatTimeout(props));
}
return builder.build();
}

Copilot uses AI. Check for mistakes.
private ActivityOptions buildActivityOptionsWithoutHeartBeatTimeout(Properties props) {
return ActivityOptions.newBuilder()
.setStartToCloseTimeout(getStartToCloseTimeout(props))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.workflow.WorkflowStage;
import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;


/**
Expand Down Expand Up @@ -62,8 +65,9 @@ public List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary remainingWor
protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState);

protected ProfileDerivation calcProfileDerivation(String basisProfileName, WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) {
// TODO: implement right-sizing!!! (for now just return unchanged)
return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged());
// Create overlay with execution-specific memory and worker class
ProfileOverlay overlay = createExecutionWorkerOverlay(jobState);
return new ProfileDerivation(basisProfileName, overlay);
}

protected String calcProfileDerivationName(JobState jobState) {
Expand All @@ -72,6 +76,28 @@ protected String calcProfileDerivationName(JobState jobState) {
}

protected String calcBasisProfileName(JobState jobState) {
return WorkforceProfiles.BASELINE_NAME; // always build upon baseline
// Always derive from the global baseline
return WorkforceProfiles.BASELINE_NAME;
}

private ProfileOverlay createExecutionWorkerOverlay(JobState jobState) {
List<ProfileOverlay.KVPair> overlayPairs = new java.util.ArrayList<>();

// Add execution-specific memory if configured (overrides baseline memory)
if (jobState.contains(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB)) {
overlayPairs.add(new ProfileOverlay.KVPair(
GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
jobState.getProp(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB)
));
}

// Add ExecutionWorker class to ensure correct task queue routing
overlayPairs.add(new ProfileOverlay.KVPair(
GobblinTemporalConfigurationKeys.WORKER_CLASS,
GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS
));

return overlayPairs.isEmpty() ? ProfileOverlay.unchanged() : new ProfileOverlay.Adding(overlayPairs);
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for overlayPairs.isEmpty() on line 100 will always be false because the ExecutionWorker class is unconditionally added to overlayPairs on line 95. This means ProfileOverlay.unchanged() will never be returned. Consider removing this check or restructuring the logic since overlayPairs will always contain at least one element.

Suggested change
return overlayPairs.isEmpty() ? ProfileOverlay.unchanged() : new ProfileOverlay.Adding(overlayPairs);
return new ProfileOverlay.Adding(overlayPairs);

Copilot uses AI. Check for mistakes.
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.worker;

import java.util.concurrent.TimeUnit;

import com.typesafe.config.Config;

import io.temporal.client.WorkflowClient;
import io.temporal.worker.WorkerOptions;

import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
import org.apache.gobblin.util.ConfigUtils;


/**
* Specialized worker for Work Execution stage.
* This worker only registers activities for:
* - ProcessWorkUnit (Work Execution)
*
* Runs on containers with stage-specific memory for work execution operations.
* Polls the execution task queue to ensure activities run on appropriately-sized containers.
*/
public class ExecutionWorker extends AbstractTemporalWorker {
public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120;
public int maxExecutionConcurrency;
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field maxExecutionConcurrency should be declared as private or have documentation explaining why it needs package-private visibility. Consider making it private if external access is not required, or document the reason for package-private visibility if it's intentional for testing purposes.

Suggested change
public int maxExecutionConcurrency;
private int maxExecutionConcurrency;

Copilot uses AI. Check for mistakes.

public ExecutionWorker(Config config, WorkflowClient workflowClient) {
super(config, workflowClient);
this.maxExecutionConcurrency = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER);
}

@Override
protected Class<?>[] getWorkflowImplClasses() {
return new Class[] {
ProcessWorkUnitsWorkflowImpl.class,
NestingExecOfProcessWorkUnitWorkflowImpl.class
};
}

@Override
protected Object[] getActivityImplInstances() {
return new Object[] {
new ProcessWorkUnitImpl()
};
}

@Override
protected WorkerOptions createWorkerOptions() {
return WorkerOptions.newBuilder()
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
.setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency)
.setMaxConcurrentLocalActivityExecutionSize(this.maxExecutionConcurrency)
.setMaxConcurrentWorkflowTaskExecutionSize(this.maxExecutionConcurrency)
.build();
}

@Override
protected String getTaskQueue() {
return ConfigUtils.getString(
config,
GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE,
GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.workflow;

import com.typesafe.config.Config;
import lombok.Getter;

import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;

/**
* Represents the different stages of a Gobblin Temporal workflow.
*
* <p>Stages:
* <ul>
* <li>WORK_DISCOVERY: Discovers data sources, generates work units (uses default queue)</li>
* <li>WORK_EXECUTION: Processes work units to transform and load data (uses execution queue when dynamic scaling enabled)</li>
* <li>COMMIT: Commits work units (uses default queue)</li>
* </ul>
*
* <p>Queue routing:
* <ul>
* <li>Dynamic scaling OFF: All stages use default queue</li>
* <li>Dynamic scaling ON: WORK_EXECUTION uses dedicated execution queue, others use default queue</li>
* </ul>
*/
@Getter
public enum WorkflowStage {
WORK_DISCOVERY("workDiscovery", GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE),
WORK_EXECUTION("workExecution", GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE,
GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE),
COMMIT("commit", GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);

private final String profileBaseName;
private final String taskQueueConfigKey;
private final String defaultTaskQueue;

WorkflowStage(String profileBaseName, String taskQueueConfigKey, String defaultTaskQueue) {
this.profileBaseName = profileBaseName;
this.taskQueueConfigKey = taskQueueConfigKey;
this.defaultTaskQueue = defaultTaskQueue;
}

/**
* Returns the task queue for this stage, reading from config or using default.
* Example: "GobblinTemporalDiscoveryCommitQueue", "GobblinTemporalExecutionQueue"
*
* @param config the configuration to read from
* @return the task queue name for this stage
*/
public String getTaskQueue(Config config) {
return config.hasPath(taskQueueConfigKey)
? config.getString(taskQueueConfigKey)
: defaultTaskQueue;
}
}
Loading
Loading