Skip to content

Commit e88a923

Browse files
authored
Implement nonfirst attempt LA metering changes (temporalio#1659)
1 parent 7c53473 commit e88a923

File tree

9 files changed

+219
-7
lines changed

9 files changed

+219
-7
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,11 @@
4848
import io.temporal.worker.WorkflowImplementationOptions;
4949
import io.temporal.workflow.Functions;
5050
import java.time.Duration;
51-
import java.util.HashMap;
52-
import java.util.List;
53-
import java.util.Map;
54-
import java.util.Optional;
51+
import java.util.*;
5552
import java.util.concurrent.BlockingQueue;
5653
import java.util.concurrent.LinkedBlockingDeque;
5754
import java.util.concurrent.TimeUnit;
55+
import java.util.concurrent.atomic.AtomicInteger;
5856
import java.util.concurrent.locks.Lock;
5957
import java.util.concurrent.locks.ReentrantLock;
6058

@@ -76,6 +74,8 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
7674

7775
private final LocalActivityDispatcher localActivityDispatcher;
7876

77+
private final LocalActivityMeteringHelper localActivityMeteringHelper;
78+
7979
private final ReplayWorkflow workflow;
8080

8181
private final WorkflowStateMachines workflowStateMachines;
@@ -122,6 +122,7 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
122122
this.replayWorkflowExecutor =
123123
new ReplayWorkflowExecutor(workflow, workflowStateMachines, context);
124124
this.localActivityCompletionSink = localActivityCompletionQueue::add;
125+
this.localActivityMeteringHelper = new LocalActivityMeteringHelper();
125126
}
126127

127128
@Override
@@ -130,6 +131,8 @@ public WorkflowTaskResult handleWorkflowTask(
130131
throws Throwable {
131132
lock.lock();
132133
try {
134+
localActivityMeteringHelper.newWFTStarting();
135+
133136
Deadline wftHearbeatDeadline =
134137
Deadline.after(
135138
(long)
@@ -167,6 +170,7 @@ public WorkflowTaskResult handleWorkflowTask(
167170
.setQueryResults(queryResults)
168171
.setFinalCommand(context.isWorkflowMethodCompleted())
169172
.setForceWorkflowTask(localActivityTaskCount > 0 && !context.isWorkflowMethodCompleted())
173+
.setNonfirstLocalActivityAttempts(localActivityMeteringHelper.getNonfirstAttempts())
170174
.build();
171175
} finally {
172176
lock.unlock();
@@ -299,6 +303,8 @@ private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
299303
accepted,
300304
"Unable to schedule local activity for execution, "
301305
+ "no more slots available and local activity task queue is full");
306+
307+
localActivityMeteringHelper.addNewLocalActivity(laRequest);
302308
}
303309

304310
if (localActivityTaskCount == 0) {
@@ -315,6 +321,7 @@ private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
315321
}
316322

317323
localActivityTaskCount--;
324+
localActivityMeteringHelper.markLocalActivityComplete(laCompletion.getActivityId());
318325

319326
if (laCompletion.getProcessingError() != null) {
320327
throw laCompletion.getProcessingError().getThrowable();
@@ -363,4 +370,41 @@ public void cancel(HistoryEvent cancelEvent) {
363370
replayWorkflowExecutor.handleWorkflowExecutionCancelRequested(cancelEvent);
364371
}
365372
}
373+
374+
private static class LocalActivityMeteringHelper {
375+
private final Map<String, AtomicInteger> firstWftActivities = new HashMap<>();
376+
private final Map<String, AtomicInteger> nonFirstWftActivities = new HashMap<>();
377+
private final Set<String> completed = new HashSet<>();
378+
379+
private void newWFTStarting() {
380+
for (String activityId : firstWftActivities.keySet()) {
381+
AtomicInteger removed = firstWftActivities.remove(activityId);
382+
removed.set(0);
383+
nonFirstWftActivities.put(activityId, removed);
384+
}
385+
}
386+
387+
private void addNewLocalActivity(ExecuteLocalActivityParameters params) {
388+
AtomicInteger attemptsDuringWFTCounter = new AtomicInteger(0);
389+
params.setOnNewAttemptCallback(attemptsDuringWFTCounter::incrementAndGet);
390+
firstWftActivities.put(params.getActivityId(), attemptsDuringWFTCounter);
391+
}
392+
393+
private void markLocalActivityComplete(String activityId) {
394+
completed.add(activityId);
395+
}
396+
397+
private int getNonfirstAttempts() {
398+
int result =
399+
nonFirstWftActivities.values().stream()
400+
.map(ai -> ai.getAndSet(0))
401+
.reduce(0, Integer::sum);
402+
for (String activityId : completed) {
403+
firstWftActivities.remove(activityId);
404+
nonFirstWftActivities.remove(activityId);
405+
}
406+
completed.clear();
407+
return result;
408+
}
409+
}
366410
}

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.uber.m3.util.ImmutableMap;
2929
import io.temporal.api.command.v1.Command;
3030
import io.temporal.api.command.v1.FailWorkflowExecutionCommandAttributes;
31+
import io.temporal.api.common.v1.MeteringMetadata;
3132
import io.temporal.api.common.v1.WorkflowExecution;
3233
import io.temporal.api.common.v1.WorkflowType;
3334
import io.temporal.api.enums.v1.CommandType;
@@ -210,6 +211,11 @@ private Result createCompletedWFTRequest(
210211
.addAllCommands(result.getCommands())
211212
.putAllQueryResults(result.getQueryResults())
212213
.setForceCreateNewWorkflowTask(result.isForceWorkflowTask())
214+
.setMeteringMetadata(
215+
MeteringMetadata.newBuilder()
216+
.setNonfirstLocalActivityExecutionAttempts(
217+
result.getNonfirstLocalActivityAttempts())
218+
.build())
213219
.setReturnNewWorkflowTask(result.isForceWorkflowTask());
214220

215221
if (stickyTaskQueueName != null

temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowTaskResult.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public static final class Builder {
3737
private boolean finalCommand;
3838
private Map<String, WorkflowQueryResult> queryResults;
3939
private boolean forceWorkflowTask;
40+
private int nonfirstLocalActivityAttempts;
4041

4142
public Builder setCommands(List<Command> commands) {
4243
this.commands = commands;
@@ -58,26 +59,35 @@ public Builder setForceWorkflowTask(boolean forceWorkflowTask) {
5859
return this;
5960
}
6061

62+
public Builder setNonfirstLocalActivityAttempts(int nonfirstLocalActivityAttempts) {
63+
this.nonfirstLocalActivityAttempts = nonfirstLocalActivityAttempts;
64+
return this;
65+
}
66+
6167
public WorkflowTaskResult build() {
6268
return new WorkflowTaskResult(
6369
commands == null ? Collections.emptyList() : commands,
6470
queryResults == null ? Collections.emptyMap() : queryResults,
6571
finalCommand,
66-
forceWorkflowTask);
72+
forceWorkflowTask,
73+
nonfirstLocalActivityAttempts);
6774
}
6875
}
6976

7077
private final List<Command> commands;
7178
private final boolean finalCommand;
7279
private final Map<String, WorkflowQueryResult> queryResults;
7380
private final boolean forceWorkflowTask;
81+
private final int nonfirstLocalActivityAttempts;
7482

7583
private WorkflowTaskResult(
7684
List<Command> commands,
7785
Map<String, WorkflowQueryResult> queryResults,
7886
boolean finalCommand,
79-
boolean forceWorkflowTask) {
87+
boolean forceWorkflowTask,
88+
int nonfirstLocalActivityAttempts) {
8089
this.commands = commands;
90+
this.nonfirstLocalActivityAttempts = nonfirstLocalActivityAttempts;
8191
if (forceWorkflowTask && finalCommand) {
8292
throw new IllegalArgumentException("both forceWorkflowTask and finalCommand are true");
8393
}
@@ -102,4 +112,8 @@ public boolean isFinalCommand() {
102112
public boolean isForceWorkflowTask() {
103113
return forceWorkflowTask;
104114
}
115+
116+
public int getNonfirstLocalActivityAttempts() {
117+
return nonfirstLocalActivityAttempts;
118+
}
105119
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/ExecuteLocalActivityParameters.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.temporal.api.failure.v1.Failure;
2626
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
2727
import io.temporal.internal.common.ProtobufTimeUtils;
28+
import io.temporal.workflow.Functions;
2829
import io.temporal.workflow.Workflow;
2930
import java.time.Duration;
3031
import java.util.Objects;
@@ -49,6 +50,7 @@ public class ExecuteLocalActivityParameters {
4950
private final @Nonnull Duration localRetryThreshold;
5051
private final boolean doNotIncludeArgumentsIntoMarker;
5152
private final @Nullable Duration scheduleToStartTimeout;
53+
private @Nullable Functions.Proc onNewAttemptCallback;
5254

5355
public ExecuteLocalActivityParameters(
5456
@Nonnull PollActivityTaskQueueResponse.Builder activityTaskBuilder,
@@ -63,6 +65,7 @@ public ExecuteLocalActivityParameters(
6365
this.previousLocalExecutionFailure = previousLocalExecutionFailure;
6466
this.doNotIncludeArgumentsIntoMarker = doNotIncludeArgumentsIntoMarker;
6567
this.localRetryThreshold = localRetryThreshold;
68+
this.onNewAttemptCallback = null;
6669
}
6770

6871
public String getActivityId() {
@@ -121,4 +124,16 @@ public Duration getLocalRetryThreshold() {
121124
public Duration getScheduleToStartTimeout() {
122125
return scheduleToStartTimeout;
123126
}
127+
128+
@Nonnull
129+
public Functions.Proc getOnNewAttemptCallback() {
130+
if (onNewAttemptCallback == null) {
131+
return () -> {};
132+
}
133+
return onNewAttemptCallback;
134+
}
135+
136+
public void setOnNewAttemptCallback(@Nonnull Functions.Proc onNewAttemptCallback) {
137+
this.onNewAttemptCallback = onNewAttemptCallback;
138+
}
124139
}

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityAttemptTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public PollActivityTaskQueueResponse.Builder getAttemptTask() {
5858
}
5959

6060
public void markAsTakenFromQueue() {
61+
executionContext.newAttempt();
6162
if (takenFromQueueCallback != null) {
6263
takenFromQueueCallback.apply();
6364
}

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityExecutionContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,8 @@ public boolean callback(LocalActivityResult result) {
160160
public boolean isCompleted() {
161161
return executionResult.isDone();
162162
}
163+
164+
public void newAttempt() {
165+
executionParams.getOnNewAttemptCallback().apply();
166+
}
163167
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.activityTests;
22+
23+
import static org.junit.Assume.assumeFalse;
24+
25+
import io.temporal.activity.LocalActivityOptions;
26+
import io.temporal.api.enums.v1.EventType;
27+
import io.temporal.api.history.v1.HistoryEvent;
28+
import io.temporal.client.WorkflowOptions;
29+
import io.temporal.client.WorkflowStub;
30+
import io.temporal.common.RetryOptions;
31+
import io.temporal.failure.ActivityFailure;
32+
import io.temporal.testing.internal.SDKTestWorkflowRule;
33+
import io.temporal.workflow.Workflow;
34+
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
35+
import io.temporal.workflow.shared.TestActivities.VariousTestActivities;
36+
import io.temporal.workflow.shared.TestWorkflows;
37+
import java.time.Duration;
38+
import java.util.Arrays;
39+
import java.util.List;
40+
import java.util.stream.Collectors;
41+
import org.junit.Assert;
42+
import org.junit.Rule;
43+
import org.junit.Test;
44+
45+
public class LongLocalActivityFailsWhileHeartbeatingMeteringTest {
46+
47+
private final TestActivitiesImpl activitiesImpl = new TestActivitiesImpl();
48+
49+
private static final int WORKFLOW_TASK_TIMEOUT_SEC = 2;
50+
51+
@Rule
52+
public SDKTestWorkflowRule testWorkflowRule =
53+
SDKTestWorkflowRule.newBuilder()
54+
.setWorkflowTypes(TestLongLocalActivityWorkflowTaskHeartbeatFailureWorkflowImpl.class)
55+
.setActivityImplementations(activitiesImpl)
56+
.setTestTimeoutSeconds(4 * WORKFLOW_TASK_TIMEOUT_SEC + 10)
57+
.build();
58+
59+
/**
60+
* Test that local activity that failed to heartbeat and executed longer than Workflow Task
61+
* Timeout will be repeated during replay
62+
*/
63+
@Test
64+
public void testLongLocalActivityFailsWhileHeartbeatingMetering() {
65+
// Needs server release which propagates metering metadata to event
66+
assumeFalse("skipping for docker tests", SDKTestWorkflowRule.useExternalService);
67+
68+
WorkflowOptions options =
69+
WorkflowOptions.newBuilder()
70+
.setWorkflowRunTimeout(Duration.ofMinutes(5))
71+
.setWorkflowTaskTimeout(Duration.ofSeconds(WORKFLOW_TASK_TIMEOUT_SEC))
72+
.setTaskQueue(testWorkflowRule.getTaskQueue())
73+
.build();
74+
TestWorkflows.TestWorkflowReturnString workflowStub =
75+
testWorkflowRule
76+
.getWorkflowClient()
77+
.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class, options);
78+
workflowStub.execute();
79+
List<HistoryEvent> taskCompleteEvents =
80+
testWorkflowRule.getHistoryEvents(
81+
WorkflowStub.fromTyped(workflowStub).getExecution().getWorkflowId(),
82+
EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED);
83+
List<Integer> nonFirstLocalActivityExecutionAttempts =
84+
taskCompleteEvents.stream()
85+
.map(
86+
e ->
87+
e.getWorkflowTaskCompletedEventAttributes()
88+
.getMeteringMetadata()
89+
.getNonfirstLocalActivityExecutionAttempts())
90+
.collect(Collectors.toList());
91+
// First task should have 0 non-first local activity execution attempts
92+
Assert.assertEquals(Arrays.asList(0, 2, 3, 1), nonFirstLocalActivityExecutionAttempts);
93+
}
94+
95+
public static class TestLongLocalActivityWorkflowTaskHeartbeatFailureWorkflowImpl
96+
implements TestWorkflows.TestWorkflowReturnString {
97+
98+
@Override
99+
public String execute() {
100+
LocalActivityOptions options =
101+
LocalActivityOptions.newBuilder()
102+
.setScheduleToCloseTimeout(Duration.ofSeconds(100))
103+
.setStartToCloseTimeout(Duration.ofSeconds(1))
104+
.setRetryOptions(
105+
RetryOptions.newBuilder()
106+
.setMaximumInterval(Duration.ofMillis(510))
107+
.setInitialInterval(Duration.ofMillis(510))
108+
.setBackoffCoefficient(1)
109+
.setMaximumAttempts(6)
110+
.build())
111+
.build();
112+
VariousTestActivities localActivities =
113+
Workflow.newLocalActivityStub(VariousTestActivities.class, options);
114+
try {
115+
localActivities.throwIO();
116+
} catch (ActivityFailure e) {
117+
// We expect the activity to fail
118+
}
119+
try {
120+
localActivities.throwIO();
121+
} catch (ActivityFailure e) {
122+
// We expect the activity to fail
123+
}
124+
return "yay";
125+
}
126+
}
127+
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,6 +1364,7 @@ private static void completeWorkflowTask(
13641364
WorkflowTaskCompletedEventAttributes.newBuilder()
13651365
.setIdentity(request.getIdentity())
13661366
.setBinaryChecksum(request.getBinaryChecksum())
1367+
.setMeteringMetadata(request.getMeteringMetadata())
13671368
.setScheduledEventId(data.scheduledEventId);
13681369
HistoryEvent event =
13691370
HistoryEvent.newBuilder()

0 commit comments

Comments
 (0)