Skip to content

Add RawValue support for non-converted Payloads #1664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions packages/common/src/converter/payload-converter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,23 @@ export function mapFromPayloads<K extends string, T = unknown>(
) as Record<K, T>;
}

/**
* RawValue is a wrapper over a payload.
* A payload that belongs to a RawValue is special in that it bypasses user-defined payload converters,
* instead using the default payload converter. The payload still undergoes codec conversion.
*/
export class RawValue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sake of type safety, I think we should add an optional generic parameter on RawValue, and save that generic parameter using the type brand pattern. Also, we should accept passing in an optional payload converter, i.e. in cases where user want to proce a specific payload converter (not the default one, and not the one they configured on the worker or client).

export declare const rawPayloadTypeBrand: unique symbol;

class RawValue<T = unknown> {
  private readonly _payload: Payload;

  // 
  private readonly [rawPayloadTypeBrand]: T = undefined as T;

  constructor(value: T, payloadConverter: PayloadConverter = defaultPayloadConverter) {
    this._payload = payloadConverter.toPayload(value);
  }

  get payload(): Payload {
    return this._payload;
  }
}

private readonly _payload: Payload;

constructor(value: unknown) {
this._payload = defaultPayloadConverter.toPayload(value);
}

get payload(): Payload {
return this._payload;
}
}

export interface PayloadConverterWithEncoding {
/**
* Converts a value to a {@link Payload}.
Expand Down Expand Up @@ -143,6 +160,9 @@ export class CompositePayloadConverter implements PayloadConverter {
* Returns the first successful result, throws {@link ValueError} if there is no converter that can handle the value.
*/
public toPayload<T>(value: T): Payload {
if (value instanceof RawValue) {
return value.payload;
}
for (const converter of this.converters) {
const result = converter.toPayload(value);
if (result !== undefined) {
Expand All @@ -160,6 +180,7 @@ export class CompositePayloadConverter implements PayloadConverter {
if (payload.metadata === undefined || payload.metadata === null) {
throw new ValueError('Missing payload metadata');
}

const encoding = decode(payload.metadata[METADATA_ENCODING_KEY]);
const converter = this.converterByEncoding.get(encoding);
if (converter === undefined) {
Expand Down
4 changes: 2 additions & 2 deletions packages/test/src/test-integration-split-three.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ if ('promiseHooks' in v8) {
{
file_path: '/packages/test/src/workflows/stack-tracer.ts',
function_name: 'enhancedStackTracer',
line: 32,
line: 33,
column: 35,
internal_code: false,
},
Expand All @@ -112,7 +112,7 @@ if ('promiseHooks' in v8) {
{
file_path: '/packages/test/src/workflows/stack-tracer.ts',
function_name: 'enhancedStackTracer',
line: 32,
line: 33,
column: 35,
internal_code: false,
},
Expand Down
29 changes: 28 additions & 1 deletion packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import {
ActivityCancellationType,
ApplicationFailure,
defineSearchAttributeKey,
RawValue,
SearchAttributePair,
SearchAttributeType,
TypedSearchAttributes,
WorkflowExecutionAlreadyStartedError,
} from '@temporalio/common';
import { temporal } from '@temporalio/proto';
import { signalSchedulingWorkflow } from './activities/helpers';
import { activityStartedSignal } from './workflows/definitions';
import * as workflows from './workflows';
Expand Down Expand Up @@ -386,7 +388,7 @@ test('Query workflow metadata returns handler descriptions', async (t) => {

await worker.runUntil(async () => {
const handle = await startWorkflow(queryWorkflowMetadata);
const meta = await handle.query(workflow.workflowMetadataQuery);
const meta = (await handle.query(workflow.workflowMetadataQuery)) as temporal.api.sdk.v1.IWorkflowMetadata;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not need a cast here.

t.is(meta.definition?.type, 'queryWorkflowMetadata');
const queryDefinitions = meta.definition?.queryDefinitions;
// Three built-in ones plus dummyQuery1 and dummyQuery2
Expand Down Expand Up @@ -1340,6 +1342,31 @@ test('can register search attributes to dev server', async (t) => {
await env.teardown();
});

export async function rawValueWorkflow(value: unknown): Promise<RawValue> {
const { rawValueActivity } = workflow.proxyActivities({ startToCloseTimeout: '10s' });
return await rawValueActivity(new RawValue(value));
}

test('workflow and activity can receive/return RawValue', async (t) => {
const { executeWorkflow, createWorker } = helpers(t);
const worker = await createWorker({
activities: {
async rawValueActivity(value: unknown): Promise<RawValue> {
return new RawValue(value);
},
},
});

await worker.runUntil(async () => {
const testValue = 'test';
const rawValue = new RawValue(testValue);
const res = await executeWorkflow(rawValueWorkflow, {
args: [rawValue],
});
t.deepEqual(res, testValue);
});
});

export async function ChildWorkflowInfo(): Promise<workflow.RootWorkflowInfo | undefined> {
let blocked = true;
workflow.setHandler(unblockSignal, () => {
Expand Down
1 change: 1 addition & 0 deletions packages/test/src/workflows/stack-tracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
import * as wf from '@temporalio/workflow';
import type { EnhancedStackTrace } from '@temporalio/workflow/lib/interfaces';
import { defaultPayloadConverter } from '@temporalio/common/lib/converter/payload-converter';

Check warning on line 7 in packages/test/src/workflows/stack-tracer.ts

View workflow job for this annotation

GitHub Actions / Lint and Prune / Lint and Prune

'defaultPayloadConverter' is defined but never used. Allowed unused vars must match /^_/u
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import

import type * as activities from '../activities';
import { unblockOrCancel } from './unblock-or-cancel';

Expand Down
22 changes: 12 additions & 10 deletions packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
WorkflowUpdateValidatorType,
mapFromPayloads,
fromPayloadsAtIndex,
RawValue,
WorkflowFunctionWithOptions,
VersioningBehavior,
WorkflowDefinitionOptions,
Expand All @@ -30,7 +31,7 @@ import {
} from '@temporalio/common/lib/converter/payload-search-attributes';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
import type { coresdk, temporal } from '@temporalio/proto';
import type { coresdk } from '@temporalio/proto';
import { alea, RNG } from './alea';
import { RootCancellationScope } from './cancellation-scope';
import { UpdateScope } from './update-scope';
Expand All @@ -41,7 +42,6 @@ import {
DefaultSignalHandler,
StackTraceSDKInfo,
StackTraceFileSlice,
EnhancedStackTrace,
StackTraceFileLocation,
WorkflowInfo,
WorkflowCreateOptionsInternal,
Expand Down Expand Up @@ -263,17 +263,19 @@ export class Activator implements ActivationHandler {
'__stack_trace',
{
handler: () => {
return this.getStackTraces()
.map((s) => s.formatted)
.join('\n\n');
return new RawValue(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return new RawValue(
return new RawValue<string>(

this.getStackTraces()
.map((s) => s.formatted)
.join('\n\n')
);
},
description: 'Returns a sensible stack trace.',
},
],
[
'__enhanced_stack_trace',
{
handler: (): EnhancedStackTrace => {
handler: (): RawValue => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
handler: (): RawValue => {
handler: (): RawValue<EnhancedStackTrace> => {

const { sourceMap } = this;
const sdk: StackTraceSDKInfo = { name: 'typescript', version: pkg.version };
const stacks = this.getStackTraces().map(({ structured: locations }) => ({ locations }));
Expand All @@ -293,15 +295,15 @@ export class Activator implements ActivationHandler {
}
}
}
return { sdk, stacks, sources };
return new RawValue({ sdk, stacks, sources });
},
description: 'Returns a stack trace annotated with source information.',
},
],
[
'__temporal_workflow_metadata',
{
handler: (): temporal.api.sdk.v1.IWorkflowMetadata => {
handler: (): RawValue => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
handler: (): RawValue => {
handler: (): RawValue<temporal.api.sdk.v1.IWorkflowMetadata> => {

const workflowType = this.info.workflowType;
const queryDefinitions = Array.from(this.queryHandlers.entries()).map(([name, value]) => ({
name,
Expand All @@ -315,14 +317,14 @@ export class Activator implements ActivationHandler {
name,
description: value.description,
}));
return {
return new RawValue({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's sad we're loosing type safety check here… There's nothing ensuring that the arg to RawValue complies with the expected type definition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my suggestions elsewhere that adds back type safety.

definition: {
type: workflowType,
queryDefinitions,
signalDefinitions,
updateDefinitions,
},
};
});
},
description: 'Returns metadata associated with this workflow.',
},
Expand Down
1 change: 1 addition & 0 deletions packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
WorkflowReturnType,
WorkflowUpdateValidatorType,
SearchAttributeUpdatePair,
RawValue,

Check warning on line 25 in packages/workflow/src/workflow.ts

View workflow job for this annotation

GitHub Actions / Lint and Prune / Lint and Prune

'RawValue' is defined but never used. Allowed unused vars must match /^_/u
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import

compilePriority,
WorkflowDefinitionOptionsOrGetter,
} from '@temporalio/common';
Expand Down
Loading