Skip to content

Commit 73823fa

Browse files
committed
add checks for sinks, simplify reserved name checking, improve names
1 parent 12062bc commit 73823fa

File tree

5 files changed

+177
-118
lines changed

5 files changed

+177
-118
lines changed

packages/common/src/reserved.ts

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,26 @@
11
export const TEMPORAL_RESERVED_PREFIX = '__temporal_';
2-
export const STACK_TRACE_RESERVED_PREFIX = '__stack_trace';
3-
export const ENHANCED_STACK_TRACE_RESERVED_PREFIX = '__enhanced_stack_trace';
2+
export const STACK_TRACE_RESERVED_NAME = '__stack_trace';
3+
export const ENHANCED_STACK_TRACE_RESERVED_NAME = '__enhanced_stack_trace';
44

5-
export const reservedPrefixes = [
6-
TEMPORAL_RESERVED_PREFIX,
7-
STACK_TRACE_RESERVED_PREFIX,
8-
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
9-
];
5+
/**
6+
* Valid entity types that can be checked for reserved name violations
7+
*/
8+
export type ReservedNameEntityType = 'query' | 'signal' | 'update' | 'activity' | 'task queue' | 'sink' | 'workflow';
109

11-
export class ReservedPrefixError extends Error {
12-
constructor(type: string, name: string, prefix: string) {
13-
super(`Cannot use ${type} name: '${name}', with reserved prefix: '${prefix}'`);
14-
this.name = 'ReservedPrefixError';
10+
/**
11+
* Validates if the provided name contains any reserved prefixes or matches any reserved names.
12+
* Throws a TypeError if validation fails, with a specific message indicating whether the issue
13+
* is with a reserved prefix or an exact match to a reserved name.
14+
*
15+
* @param type The entity type being checked
16+
* @param name The name to check against reserved prefixes/names
17+
*/
18+
export function throwIfReservedName(type: ReservedNameEntityType, name: string): void {
19+
if (name.startsWith(TEMPORAL_RESERVED_PREFIX)) {
20+
throw new TypeError(`Cannot use ${type} name: '${name}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`);
1521
}
16-
}
17-
18-
export function throwIfReservedName(type: string, name: string): void {
19-
const prefix = maybeGetReservedPrefix(name);
20-
if (prefix) {
21-
throw new ReservedPrefixError(type, name, prefix);
22-
}
23-
}
2422

25-
export function maybeGetReservedPrefix(name: string): string | undefined {
26-
for (const prefix of reservedPrefixes) {
27-
if (name.startsWith(prefix)) {
28-
return prefix;
29-
}
23+
if (name === STACK_TRACE_RESERVED_NAME || name === ENHANCED_STACK_TRACE_RESERVED_NAME) {
24+
throw new TypeError(`Cannot use ${type} name: '${name}', which is a reserved name`);
3025
}
3126
}

packages/test/src/test-integration-workflows.ts

Lines changed: 121 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ import {
2929
TypedSearchAttributes,
3030
WorkflowExecutionAlreadyStartedError,
3131
} from '@temporalio/common';
32-
import { reservedPrefixes } from '@temporalio/common/lib/reserved';
32+
import {
33+
TEMPORAL_RESERVED_PREFIX,
34+
STACK_TRACE_RESERVED_NAME,
35+
ENHANCED_STACK_TRACE_RESERVED_NAME,
36+
} from '@temporalio/common/lib/reserved';
3337
import { signalSchedulingWorkflow } from './activities/helpers';
3438
import { activityStartedSignal } from './workflows/definitions';
3539
import * as workflows from './workflows';
@@ -1426,18 +1430,23 @@ test('Workflow can return root workflow', async (t) => {
14261430
});
14271431
});
14281432

1433+
const reservedNames = [TEMPORAL_RESERVED_PREFIX, STACK_TRACE_RESERVED_NAME, ENHANCED_STACK_TRACE_RESERVED_NAME];
1434+
14291435
test('Cannot register activities using reserved prefixes', async (t) => {
14301436
const { createWorker } = helpers(t);
14311437

1432-
for (const prefix of reservedPrefixes) {
1433-
const activityName = prefix + '_test';
1438+
for (const name of reservedNames) {
1439+
const activityName = name === TEMPORAL_RESERVED_PREFIX ? name + '_test' : name;
14341440
await t.throwsAsync(
14351441
createWorker({
14361442
activities: { [activityName]: () => {} },
14371443
}),
14381444
{
1439-
name: 'ReservedPrefixError',
1440-
message: `Cannot use activity name: '${activityName}', with reserved prefix: '${prefix}'`,
1445+
name: 'TypeError',
1446+
message:
1447+
name === TEMPORAL_RESERVED_PREFIX
1448+
? `Cannot use activity name: '${activityName}', with reserved prefix: '${name}'`
1449+
: `Cannot use activity name: '${activityName}', which is a reserved name`,
14411450
}
14421451
);
14431452
}
@@ -1446,16 +1455,45 @@ test('Cannot register activities using reserved prefixes', async (t) => {
14461455
test('Cannot register task queues using reserved prefixes', async (t) => {
14471456
const { createWorker } = helpers(t);
14481457

1449-
for (const prefix of reservedPrefixes) {
1450-
const taskQueue = prefix + '_test';
1458+
for (const name of reservedNames) {
1459+
const taskQueue = name === TEMPORAL_RESERVED_PREFIX ? name + '_test' : name;
14511460

14521461
await t.throwsAsync(
14531462
createWorker({
14541463
taskQueue,
14551464
}),
14561465
{
1457-
name: 'ReservedPrefixError',
1458-
message: `Cannot use task queue name: '${taskQueue}', with reserved prefix: '${prefix}'`,
1466+
name: 'TypeError',
1467+
message:
1468+
name === TEMPORAL_RESERVED_PREFIX
1469+
? `Cannot use task queue name: '${taskQueue}', with reserved prefix: '${name}'`
1470+
: `Cannot use task queue name: '${taskQueue}', which is a reserved name`,
1471+
}
1472+
);
1473+
}
1474+
});
1475+
1476+
test('Cannot register sinks using reserved prefixes', async (t) => {
1477+
const { createWorker } = helpers(t);
1478+
1479+
for (const name of reservedNames) {
1480+
const sinkName = name === TEMPORAL_RESERVED_PREFIX ? name + '_test' : name;
1481+
await t.throwsAsync(
1482+
createWorker({
1483+
sinks: {
1484+
[sinkName]: {
1485+
test: {
1486+
fn: () => {},
1487+
},
1488+
},
1489+
},
1490+
}),
1491+
{
1492+
name: 'TypeError',
1493+
message:
1494+
name === TEMPORAL_RESERVED_PREFIX
1495+
? `Cannot use sink name: '${sinkName}', with reserved prefix: '${name}'`
1496+
: `Cannot use sink name: '${sinkName}', which is a reserved name`,
14591497
}
14601498
);
14611499
}
@@ -1466,25 +1504,25 @@ interface HandlerError {
14661504
message: string;
14671505
}
14681506

1469-
export async function workflowBadPrefixHandler(prefix: string): Promise<HandlerError[]> {
1507+
export async function workflowReservedNameHandler(name: string): Promise<HandlerError[]> {
14701508
// Re-package errors, default payload converter has trouble converting native errors (no 'data' field).
14711509
const expectedErrors: HandlerError[] = [];
14721510
try {
1473-
setHandler(defineSignal(prefix + '_signal'), () => {});
1511+
setHandler(defineSignal(name === TEMPORAL_RESERVED_PREFIX ? name + '_signal' : name), () => {});
14741512
} catch (e) {
14751513
if (e instanceof Error) {
14761514
expectedErrors.push({ name: e.name, message: e.message });
14771515
}
14781516
}
14791517
try {
1480-
setHandler(defineUpdate(prefix + '_update'), () => {});
1518+
setHandler(defineUpdate(name === TEMPORAL_RESERVED_PREFIX ? name + '_update' : name), () => {});
14811519
} catch (e) {
14821520
if (e instanceof Error) {
14831521
expectedErrors.push({ name: e.name, message: e.message });
14841522
}
14851523
}
14861524
try {
1487-
setHandler(defineQuery(prefix + '_query'), () => {});
1525+
setHandler(defineQuery(name === TEMPORAL_RESERVED_PREFIX ? name + '_query' : name), () => {});
14881526
} catch (e) {
14891527
if (e instanceof Error) {
14901528
expectedErrors.push({ name: e.name, message: e.message });
@@ -1497,22 +1535,31 @@ test('Workflow failure if define signals/updates/queries with reserved prefixes'
14971535
const { createWorker, executeWorkflow } = helpers(t);
14981536
const worker = await createWorker();
14991537
await worker.runUntil(async () => {
1500-
for (const prefix of reservedPrefixes) {
1501-
const result = await executeWorkflow(workflowBadPrefixHandler, {
1502-
args: [prefix],
1538+
for (const name of reservedNames) {
1539+
const result = await executeWorkflow(workflowReservedNameHandler, {
1540+
args: [name],
15031541
});
15041542
t.deepEqual(result, [
15051543
{
1506-
name: 'ReservedPrefixError',
1507-
message: `Cannot use signal name: '${prefix}_signal', with reserved prefix: '${prefix}'`,
1544+
name: 'TypeError',
1545+
message:
1546+
name === TEMPORAL_RESERVED_PREFIX
1547+
? `Cannot use signal name: '${name}_signal', with reserved prefix: '${name}'`
1548+
: `Cannot use signal name: '${name}', which is a reserved name`,
15081549
},
15091550
{
1510-
name: 'ReservedPrefixError',
1511-
message: `Cannot use update name: '${prefix}_update', with reserved prefix: '${prefix}'`,
1551+
name: 'TypeError',
1552+
message:
1553+
name === TEMPORAL_RESERVED_PREFIX
1554+
? `Cannot use update name: '${name}_update', with reserved prefix: '${name}'`
1555+
: `Cannot use update name: '${name}', which is a reserved name`,
15121556
},
15131557
{
1514-
name: 'ReservedPrefixError',
1515-
message: `Cannot use query name: '${prefix}_query', with reserved prefix: '${prefix}'`,
1558+
name: 'TypeError',
1559+
message:
1560+
name === TEMPORAL_RESERVED_PREFIX
1561+
? `Cannot use query name: '${name}_query', with reserved prefix: '${name}'`
1562+
: `Cannot use query name: '${name}', which is a reserved name`,
15161563
},
15171564
]);
15181565
}
@@ -1557,53 +1604,57 @@ test('Default handlers fail given reserved prefix', async (t) => {
15571604
};
15581605

15591606
await worker.runUntil(async () => {
1560-
for (const prefix of reservedPrefixes) {
1561-
// Reserved query
1562-
let handle = await startWorkflow(workflowWithDefaultHandlers);
1563-
await asyncRetry(async () => {
1564-
if (!(await handle.query(wfReadyQuery))) {
1565-
throw new Error('Workflow not ready yet');
1566-
}
1567-
});
1568-
const queryName = `${prefix}_query`;
1569-
await t.throwsAsync(
1570-
handle.query(queryName),
1571-
{
1572-
// ReservedPrefixError transforms to a QueryNotRegisteredError on the way back from server
1573-
name: 'QueryNotRegisteredError',
1574-
message: `Cannot use query name: '${queryName}', with reserved prefix: '${prefix}'`,
1575-
},
1576-
`Query ${queryName} should fail`
1577-
);
1578-
await handle.terminate();
1579-
1580-
// Reserved signal
1581-
handle = await startWorkflow(workflowWithDefaultHandlers);
1582-
await asyncRetry(async () => {
1583-
if (!(await handle.query(wfReadyQuery))) {
1584-
throw new Error('Workflow not ready yet');
1585-
}
1586-
});
1587-
const signalName = `${prefix}_signal`;
1588-
await handle.signal(signalName);
1589-
await assertWftFailure(handle, `Cannot use signal name: '${signalName}', with reserved prefix: '${prefix}'`);
1590-
await handle.terminate();
1591-
1592-
// Reserved update
1593-
handle = await startWorkflow(workflowWithDefaultHandlers);
1594-
await asyncRetry(async () => {
1595-
if (!(await handle.query(wfReadyQuery))) {
1596-
throw new Error('Workflow not ready yet');
1597-
}
1598-
});
1599-
const updateName = `${prefix}_update`;
1600-
handle.executeUpdate(updateName).catch(() => {
1601-
// Expect failure. The error caught here is a WorkflowNotFound because
1602-
// the workflow will have already failed, so the update cannot go through.
1603-
// We assert on the expected failure below.
1604-
});
1605-
await assertWftFailure(handle, `Cannot use update name: '${updateName}', with reserved prefix: '${prefix}'`);
1606-
await handle.terminate();
1607-
}
1607+
// Reserved query
1608+
let handle = await startWorkflow(workflowWithDefaultHandlers);
1609+
await asyncRetry(async () => {
1610+
if (!(await handle.query(wfReadyQuery))) {
1611+
throw new Error('Workflow not ready yet');
1612+
}
1613+
});
1614+
const queryName = `${TEMPORAL_RESERVED_PREFIX}_query`;
1615+
await t.throwsAsync(
1616+
handle.query(queryName),
1617+
{
1618+
// TypeError transforms to a QueryNotRegisteredError on the way back from server
1619+
name: 'QueryNotRegisteredError',
1620+
message: `Cannot use query name: '${queryName}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`,
1621+
},
1622+
`Query ${queryName} should fail`
1623+
);
1624+
await handle.terminate();
1625+
1626+
// Reserved signal
1627+
handle = await startWorkflow(workflowWithDefaultHandlers);
1628+
await asyncRetry(async () => {
1629+
if (!(await handle.query(wfReadyQuery))) {
1630+
throw new Error('Workflow not ready yet');
1631+
}
1632+
});
1633+
const signalName = `${TEMPORAL_RESERVED_PREFIX}_signal`;
1634+
await handle.signal(signalName);
1635+
await assertWftFailure(
1636+
handle,
1637+
`Cannot use signal name: '${signalName}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`
1638+
);
1639+
await handle.terminate();
1640+
1641+
// Reserved update
1642+
handle = await startWorkflow(workflowWithDefaultHandlers);
1643+
await asyncRetry(async () => {
1644+
if (!(await handle.query(wfReadyQuery))) {
1645+
throw new Error('Workflow not ready yet');
1646+
}
1647+
});
1648+
const updateName = `${TEMPORAL_RESERVED_PREFIX}_update`;
1649+
handle.executeUpdate(updateName).catch(() => {
1650+
// Expect failure. The error caught here is a WorkflowNotFound because
1651+
// the workflow will have already failed, so the update cannot go through.
1652+
// We assert on the expected failure below.
1653+
});
1654+
await assertWftFailure(
1655+
handle,
1656+
`Cannot use update name: '${updateName}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`
1657+
);
1658+
await handle.terminate();
16081659
});
16091660
});

packages/worker/src/worker-options.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,17 @@ export function compileWorkerOptions(
957957
for (const activityName of activities.keys()) {
958958
throwIfReservedName('activity', activityName);
959959
}
960+
961+
// Validate sink names to ensure they don't use reserved prefixes/names
962+
if (opts.sinks) {
963+
for (const sinkName of Object.keys(opts.sinks)) {
964+
// Allow internal sinks used by the SDK
965+
if (sinkName !== '__temporal_logger' && sinkName !== '__temporal_metrics') {
966+
throwIfReservedName('sink', sinkName);
967+
}
968+
}
969+
}
970+
960971
const tuner = asNativeTuner(opts.tuner, logger);
961972

962973
return {

packages/worker/src/worker.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,9 @@ export class Worker {
468468
* This method initiates a connection to the server and will throw (asynchronously) on connection failure.
469469
*/
470470
public static async create(options: WorkerOptions): Promise<Worker> {
471+
if (!options.taskQueue) {
472+
throw new TypeError('Task queue name is required');
473+
}
471474
throwIfReservedName('task queue', options.taskQueue);
472475
const runtime = Runtime.instance();
473476
const logger = LoggerWithComposedMetadata.compose(runtime.logger, {

0 commit comments

Comments
 (0)