Skip to content

Background heartbeating sample #416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions activities-cancellation-heartbeating/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
"@temporalio/workflow": "^1.11.6"
},
"devDependencies": {
"@temporalio/testing": "^1.11.7",
"@tsconfig/node18": "^18.2.4",
"@types/mocha": "^10.0.10",
"@types/node": "^22.9.1",
"@typescript-eslint/eslint-plugin": "^8.18.0",
"@typescript-eslint/parser": "^8.18.0",
"eslint": "^8.57.1",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-deprecation": "^3.0.0",
"mocha": "^11.1.0",
"nodemon": "^3.1.7",
"prettier": "^3.4.2",
"ts-node": "^10.9.2",
Expand Down
52 changes: 52 additions & 0 deletions activities-cancellation-heartbeating/src/activities-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { MockActivityEnvironment } from '@temporalio/testing';
import * as assert from 'node:assert';
import { CancelledFailure } from '@temporalio/workflow';
import { ActivityExecutionDetails, myLongRunningActivity } from './activities';

describe('LongRunningActivity Test', function() {

describe('when background heartbeating', function() {
let testEnv: MockActivityEnvironment
beforeEach(async function() {
testEnv = new MockActivityEnvironment({
startToCloseTimeoutMs: 2000,
heartbeatTimeoutMs: 200,
heartbeatDetails: 0,
})
})
it('should sent details back', async function() {
const actual: ActivityExecutionDetails = await testEnv.run(myLongRunningActivity)
assert.equal(actual.heartbeatsReported, 18)
})
})
describe('when background heartbeating received cancellation notice', function() {
let testEnv: MockActivityEnvironment
beforeEach(async function() {
testEnv = new MockActivityEnvironment({
startToCloseTimeoutMs: 2000,
heartbeatTimeoutMs: 200,
heartbeatDetails: 0,
})
})
it('should sent details back', async function() {

const cancelPromise= async (): Promise<void> => {
return new Promise(resolve => {
setTimeout(() => {}, 200)
testEnv.cancel('verify CancelledFailure bubbles up')
resolve()
})
}
const runPromise= async (): Promise<ActivityExecutionDetails> => {
return await testEnv.run(myLongRunningActivity)
}

const actual = await Promise.allSettled([cancelPromise(), runPromise()])
assert.ok(actual[1])
assert.equal("fulfilled", actual[1].status)
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
assert.ok(actual[1].value.err instanceof CancelledFailure)
})
})
})
97 changes: 96 additions & 1 deletion activities-cancellation-heartbeating/src/activities.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
// @@@SNIPSTART typescript-activity-fake-progress
import { activityInfo, log, sleep, CancelledFailure, heartbeat } from '@temporalio/activity';
import {
activityInfo,
log,
sleep,
CancelledFailure,
heartbeat,
Context,
ApplicationFailure
} from '@temporalio/activity';

export async function fakeProgress(sleepIntervalMs = 1000): Promise<void> {
try {
Expand All @@ -22,3 +30,90 @@ export async function fakeProgress(sleepIntervalMs = 1000): Promise<void> {
}
}
// @@@SNIPEND

// @@@SNIPSTART typescript-activity-long-running
export interface ActivityExecutionDetails {
heartbeatsReported: number
mainOperationResult: string | undefined
err: Error | undefined
}
export async function myLongRunningActivity(): Promise<ActivityExecutionDetails> {
const ctx = Context.current()
const details: ActivityExecutionDetails = {
heartbeatsReported: ctx.info.heartbeatDetails || 0,
mainOperationResult: undefined,
err: undefined
}
const logger = ctx.log
const heartbeatTimeoutMs = ctx.info.heartbeatTimeoutMs
if(!heartbeatTimeoutMs) {
throw ApplicationFailure.nonRetryable("heartbeat is required", "ERR_MISSING_HEARTBEAT_TIMEOUT")
}
const heartbeatInterval = heartbeatTimeoutMs / 2

// mainOperation is the "real" work we are doing in the Activity
async function mainOperation(): Promise<string> {
const successMessage = 'operation successful'
// use startToClose as basis for overall ops timeouts
const timeout = ctx.info.startToCloseTimeoutMs - 100

return new Promise((resolve) => {
logger.debug('simulating operation for (ms)', {timeout})
// this simulates some lengthy operation like a report generation or API call
// we avoid using `sleep` so that the operation won't receive a CancelledFailure directly
setTimeout(() => {
// capture the operation result
details.mainOperationResult = successMessage
resolve(successMessage)
}, timeout)
})
}
// doHeartbeat creates the regular looped heartbeat we need
async function doHeartbeat():Promise<void> {
// noinspection InfiniteLoopJS
logger.debug('heartbeating every (ms)',{heartbeatInterval})
return new Promise((resolve, reject) => {
return (async function periodic() {
while(!details.err && !details.mainOperationResult) {
try {
// this will return a CancelledFailure if the server replies as such
// since we arent catching it it will bubble up to the main operation
await sleep(heartbeatInterval)
// you can pass in details to the heartbeat if you like to preserve "where" you are
heartbeat(++details.heartbeatsReported)
} catch (err) {
// demonstrate how to test for cancellation
if(err instanceof CancelledFailure) {
logger.error('cancelling heartbeat due to cancellation', {err})
}
logger.error('heartbeat received failure', {err})
reject(err)
// exit loop
throw err
}
}
})()
})
}
// _race_ the heartbeat and mainOperation so that any failure from either mainOperation or heartbeat to arrive
// will resolve the Promise collection. This is important for the CancelledFailure to be handled appropriately.
// Cancellation of the process inside the mainOperation is outside the scope of this sample, but
// you might need to abort processes explicitly upon Cancellation from Workflow.
// For example, with https://developer.mozilla.org/en-US/docs/Web/API/AbortController
try {
const result: string | void = await Promise.race([doHeartbeat(), mainOperation()])
logger.debug('received result', {result})
} catch (err) {
logger.error('Activity received error', {err})
details.err = err as Error
if(err instanceof CancelledFailure) {
// we could rethrow the error here or ignore it (as we have done here)
// throw it. log it. sorted. :)
}

}
return details
}


// @@@SNIPEND
Loading