diff --git a/packages/activity/src/index.ts b/packages/activity/src/index.ts index 96b38e195..b58644701 100644 --- a/packages/activity/src/index.ts +++ b/packages/activity/src/index.ts @@ -90,14 +90,14 @@ export { * * @example * - *```ts + * ```ts *import { CompleteAsyncError } from '@temporalio/activity'; * *export async function myActivity(): Promise { * // ... * throw new CompleteAsyncError(); *} - *``` + * ``` */ @SymbolBasedInstanceOfError('CompleteAsyncError') export class CompleteAsyncError extends Error {} diff --git a/packages/client/src/helpers.ts b/packages/client/src/helpers.ts index 3bd50888e..403073241 100644 --- a/packages/client/src/helpers.ts +++ b/packages/client/src/helpers.ts @@ -1,11 +1,10 @@ import { ServiceError as GrpcServiceError, status as grpcStatus } from '@grpc/grpc-js'; +import { LoadedDataConverter, NamespaceNotFoundError } from '@temporalio/common'; import { - LoadedDataConverter, - mapFromPayloads, - NamespaceNotFoundError, + decodeSearchAttributes, + decodeTypedSearchAttributes, searchAttributePayloadConverter, - SearchAttributes, -} from '@temporalio/common'; +} from '@temporalio/common/lib/converter/payload-search-attributes'; import { Replace } from '@temporalio/common/lib/type-helpers'; import { optionalTsToDate, requiredTsToDate } from '@temporalio/common/lib/time'; import { decodeMapFromPayloads } from '@temporalio/common/lib/internal-non-workflow/codec-helpers'; @@ -71,11 +70,8 @@ export async function executionInfoFromRaw( executionTime: optionalTsToDate(raw.executionTime), closeTime: optionalTsToDate(raw.closeTime), memo: await decodeMapFromPayloads(dataConverter, raw.memo?.fields), - searchAttributes: Object.fromEntries( - Object.entries( - mapFromPayloads(searchAttributePayloadConverter, raw.searchAttributes?.indexedFields ?? {}) as SearchAttributes - ).filter(([_, v]) => v && v.length > 0) // Filter out empty arrays returned by pre 1.18 servers - ), + searchAttributes: decodeSearchAttributes(raw.searchAttributes?.indexedFields), + typedSearchAttributes: decodeTypedSearchAttributes(raw.searchAttributes?.indexedFields), parentExecution: raw.parentExecution ? { workflowId: raw.parentExecution.workflowId!, diff --git a/packages/client/src/schedule-client.ts b/packages/client/src/schedule-client.ts index 5f05e9ae3..6ac7ed0f5 100644 --- a/packages/client/src/schedule-client.ts +++ b/packages/client/src/schedule-client.ts @@ -1,6 +1,11 @@ import { status as grpcStatus } from '@grpc/grpc-js'; import { v4 as uuid4 } from 'uuid'; -import { mapToPayloads, searchAttributePayloadConverter, Workflow } from '@temporalio/common'; +import { Workflow } from '@temporalio/common'; +import { + decodeSearchAttributes, + decodeTypedSearchAttributes, + encodeUnifiedSearchAttributes, +} from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors, Headers } from '@temporalio/common/lib/interceptors'; import { encodeMapToPayloads, @@ -39,7 +44,6 @@ import { decodeScheduleRecentActions, decodeScheduleRunningActions, decodeScheduleSpec, - decodeSearchAttributes, encodeScheduleAction, encodeSchedulePolicies, encodeScheduleSpec, @@ -238,11 +242,12 @@ export class ScheduleClient extends BaseClient { state: encodeScheduleState(opts.state), }, memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined, - searchAttributes: opts.searchAttributes - ? { - indexedFields: mapToPayloads(searchAttributePayloadConverter, opts.searchAttributes), - } - : undefined, + searchAttributes: + opts.searchAttributes || opts.typedSearchAttributes // eslint-disable-line deprecation/deprecation + ? { + indexedFields: encodeUnifiedSearchAttributes(opts.searchAttributes, opts.typedSearchAttributes), // eslint-disable-line deprecation/deprecation + } + : undefined, initialPatch: { triggerImmediately: opts.state?.triggerImmediately ? { overlapPolicy: temporal.api.enums.v1.ScheduleOverlapPolicy.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL } @@ -388,7 +393,8 @@ export class ScheduleClient extends BaseClient { workflowType: raw.info.workflowType.name, }, memo: await decodeMapFromPayloads(this.dataConverter, raw.memo?.fields), - searchAttributes: decodeSearchAttributes(raw.searchAttributes), + searchAttributes: decodeSearchAttributes(raw.searchAttributes?.indexedFields), + typedSearchAttributes: decodeTypedSearchAttributes(raw.searchAttributes?.indexedFields), state: { paused: raw.info?.paused === true, note: raw.info?.notes ?? undefined, @@ -425,7 +431,8 @@ export class ScheduleClient extends BaseClient { spec: decodeScheduleSpec(raw.schedule.spec), action: await decodeScheduleAction(this.client.dataConverter, raw.schedule.action), memo: await decodeMapFromPayloads(this.client.dataConverter, raw.memo?.fields), - searchAttributes: decodeSearchAttributes(raw.searchAttributes), + searchAttributes: decodeSearchAttributes(raw.searchAttributes?.indexedFields), + typedSearchAttributes: decodeTypedSearchAttributes(raw.searchAttributes?.indexedFields), policies: { // 'overlap' should never be missing on describe, as the server will replace UNSPECIFIED by an actual value overlap: decodeScheduleOverlapPolicy(raw.schedule.policies?.overlapPolicy) ?? ScheduleOverlapPolicy.SKIP, diff --git a/packages/client/src/schedule-helpers.ts b/packages/client/src/schedule-helpers.ts index 768008e48..e403b7cce 100644 --- a/packages/client/src/schedule-helpers.ts +++ b/packages/client/src/schedule-helpers.ts @@ -1,14 +1,10 @@ import Long from 'long'; // eslint-disable-line import/no-named-as-default +import { compileRetryPolicy, decompileRetryPolicy, extractWorkflowType, LoadedDataConverter } from '@temporalio/common'; import { - compileRetryPolicy, - decompileRetryPolicy, - extractWorkflowType, - LoadedDataConverter, - mapFromPayloads, - mapToPayloads, - searchAttributePayloadConverter, - SearchAttributes, -} from '@temporalio/common'; + encodeUnifiedSearchAttributes, + decodeSearchAttributes, + decodeTypedSearchAttributes, +} from '@temporalio/common/lib/converter/payload-search-attributes'; import { Headers } from '@temporalio/common/lib/interceptors'; import { decodeArrayFromPayloads, @@ -260,11 +256,12 @@ export async function encodeScheduleAction( workflowTaskTimeout: msOptionalToTs(action.workflowTaskTimeout), retryPolicy: action.retry ? compileRetryPolicy(action.retry) : undefined, memo: action.memo ? { fields: await encodeMapToPayloads(dataConverter, action.memo) } : undefined, - searchAttributes: action.searchAttributes - ? { - indexedFields: mapToPayloads(searchAttributePayloadConverter, action.searchAttributes), - } - : undefined, + searchAttributes: + action.searchAttributes || action.typedSearchAttributes // eslint-disable-line deprecation/deprecation + ? { + indexedFields: encodeUnifiedSearchAttributes(action.searchAttributes, action.typedSearchAttributes), // eslint-disable-line deprecation/deprecation + } + : undefined, header: { fields: headers }, }, }; @@ -326,14 +323,8 @@ export async function decodeScheduleAction( args: await decodeArrayFromPayloads(dataConverter, pb.startWorkflow.input?.payloads), memo: await decodeMapFromPayloads(dataConverter, pb.startWorkflow.memo?.fields), retry: decompileRetryPolicy(pb.startWorkflow.retryPolicy), - searchAttributes: Object.fromEntries( - Object.entries( - mapFromPayloads( - searchAttributePayloadConverter, - pb.startWorkflow.searchAttributes?.indexedFields ?? {} - ) as SearchAttributes - ) - ), + searchAttributes: decodeSearchAttributes(pb.startWorkflow.searchAttributes?.indexedFields), + typedSearchAttributes: decodeTypedSearchAttributes(pb.startWorkflow.searchAttributes?.indexedFields), workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout), workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout), workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout), @@ -342,17 +333,6 @@ export async function decodeScheduleAction( throw new TypeError('Unsupported schedule action'); } -export function decodeSearchAttributes( - pb: temporal.api.common.v1.ISearchAttributes | undefined | null -): SearchAttributes { - if (!pb?.indexedFields) return {}; - return Object.fromEntries( - Object.entries(mapFromPayloads(searchAttributePayloadConverter, pb.indexedFields) as SearchAttributes).filter( - ([_, v]) => v && v.length > 0 - ) // Filter out empty arrays returned by pre 1.18 servers - ); -} - export function decodeScheduleRunningActions( pb?: temporal.api.common.v1.IWorkflowExecution[] | null ): ScheduleExecutionStartWorkflowActionResult[] { diff --git a/packages/client/src/schedule-types.ts b/packages/client/src/schedule-types.ts index 31991fabc..9630e2f50 100644 --- a/packages/client/src/schedule-types.ts +++ b/packages/client/src/schedule-types.ts @@ -1,5 +1,5 @@ import { checkExtends, Replace } from '@temporalio/common/lib/type-helpers'; -import { Duration, SearchAttributes, Workflow } from '@temporalio/common'; +import { Duration, SearchAttributes, Workflow, TypedSearchAttributes, SearchAttributePair } from '@temporalio/common'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; import type { temporal } from '@temporalio/proto'; import { WorkflowStartOptions } from './workflow-options'; @@ -70,8 +70,21 @@ export interface ScheduleOptions = Replace< - Omit, + Omit, { action: A; state: Omit; @@ -172,12 +185,22 @@ export interface ScheduleSummary { memo?: Record; /** - * Additional indexed information attached to the Schedule. - * More info: https://docs.temporal.io/docs/typescript/search-attributes + * Additional indexed information attached to the Schedule. More info: + * https://docs.temporal.io/docs/typescript/search-attributes * * Values are always converted using {@link JsonPayloadConverter}, even when a custom Data Converter is provided. + * + * @deprecated Use {@link typedSearchAttributes} instead. */ - searchAttributes?: SearchAttributes; + searchAttributes?: SearchAttributes; // eslint-disable-line deprecation/deprecation + + /** + * Additional indexed information attached to the Schedule. More info: + * https://docs.temporal.io/docs/typescript/search-attributes + * + * Values are always converted using {@link JsonPayloadConverter}, even when a custom Data Converter is provided. + */ + typedSearchAttributes?: TypedSearchAttributes; state: { /** @@ -284,12 +307,22 @@ export type ScheduleDescription = { memo?: Record; /** - * Additional indexed information attached to the Schedule. - * More info: https://docs.temporal.io/docs/typescript/search-attributes + * Additional indexed information attached to the Schedule. More info: + * https://docs.temporal.io/docs/typescript/search-attributes + * + * Values are always converted using {@link JsonPayloadConverter}, even when a custom Data Converter is provided. + * + * @deprecated Use {@link typedSearchAttributes} instead. + */ + searchAttributes: SearchAttributes; // eslint-disable-line deprecation/deprecation + + /** + * Additional indexed information attached to the Schedule. More info: + * https://docs.temporal.io/docs/typescript/search-attributes * * Values are always converted using {@link JsonPayloadConverter}, even when a custom Data Converter is provided. */ - searchAttributes: SearchAttributes; + typedSearchAttributes: TypedSearchAttributes; state: { /** @@ -745,6 +778,7 @@ export type ScheduleOptionsStartWorkflowAction = { | 'args' | 'memo' | 'searchAttributes' + | 'typedSearchAttributes' | 'retry' | 'workflowExecutionTimeout' | 'workflowRunTimeout' @@ -776,6 +810,7 @@ export type ScheduleDescriptionStartWorkflowAction = ScheduleSummaryStartWorkflo | 'args' | 'memo' | 'searchAttributes' + | 'typedSearchAttributes' | 'retry' | 'workflowExecutionTimeout' | 'workflowRunTimeout' diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index 6dd29ce9d..7289f0e05 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -1,5 +1,5 @@ import type * as grpc from '@grpc/grpc-js'; -import type { SearchAttributes, SearchAttributeValue } from '@temporalio/common'; +import type { TypedSearchAttributes, SearchAttributes, SearchAttributeValue } from '@temporalio/common'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; import * as proto from '@temporalio/proto'; import { Replace } from '@temporalio/common/lib/type-helpers'; @@ -47,7 +47,9 @@ export interface WorkflowExecutionInfo { executionTime?: Date; closeTime?: Date; memo?: Record; - searchAttributes: SearchAttributes; + /** @deprecated Use {@link typedSearchAttributes} instead. */ + searchAttributes: SearchAttributes; // eslint-disable-line deprecation/deprecation + typedSearchAttributes: TypedSearchAttributes; parentExecution?: Required; raw: RawWorkflowExecutionInfo; } @@ -56,7 +58,7 @@ export interface CountWorkflowExecution { count: number; groups: { count: number; - groupValues: SearchAttributeValue[]; + groupValues: SearchAttributeValue[]; // eslint-disable-line deprecation/deprecation }[]; } diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 321a5da40..15b75ce3e 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -4,11 +4,9 @@ import { BaseWorkflowHandle, CancelledFailure, compileRetryPolicy, - mapToPayloads, HistoryAndWorkflowId, QueryDefinition, RetryState, - searchAttributePayloadConverter, SignalDefinition, UpdateDefinition, TerminatedFailure, @@ -25,6 +23,7 @@ import { encodeWorkflowIdConflictPolicy, WorkflowIdConflictPolicy, } from '@temporalio/common'; +import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { History } from '@temporalio/common/lib/proto-utils'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; @@ -1218,11 +1217,12 @@ export class WorkflowClient extends BaseClient { workflowStartDelay: options.startDelay, retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, memo: options.memo ? { fields: await encodeMapToPayloads(this.dataConverter, options.memo) } : undefined, - searchAttributes: options.searchAttributes - ? { - indexedFields: mapToPayloads(searchAttributePayloadConverter, options.searchAttributes), - } - : undefined, + searchAttributes: + options.searchAttributes || options.typedSearchAttributes // eslint-disable-line deprecation/deprecation + ? { + indexedFields: encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes), // eslint-disable-line deprecation/deprecation + } + : undefined, cronSchedule: options.cronSchedule, header: { fields: headers }, }; @@ -1265,6 +1265,7 @@ export class WorkflowClient extends BaseClient { protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise { const { options: opts, workflowType, headers } = input; const { identity, namespace } = this.options; + return { namespace, identity, @@ -1284,11 +1285,12 @@ export class WorkflowClient extends BaseClient { workflowStartDelay: opts.startDelay, retryPolicy: opts.retry ? compileRetryPolicy(opts.retry) : undefined, memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined, - searchAttributes: opts.searchAttributes - ? { - indexedFields: mapToPayloads(searchAttributePayloadConverter, opts.searchAttributes), - } - : undefined, + searchAttributes: + opts.searchAttributes || opts.typedSearchAttributes // eslint-disable-line deprecation/deprecation + ? { + indexedFields: encodeUnifiedSearchAttributes(opts.searchAttributes, opts.typedSearchAttributes), // eslint-disable-line deprecation/deprecation + } + : undefined, cronSchedule: opts.cronSchedule, header: { fields: headers }, }; diff --git a/packages/common/src/converter/payload-converter.ts b/packages/common/src/converter/payload-converter.ts index 83002678f..662eb1a99 100644 --- a/packages/common/src/converter/payload-converter.ts +++ b/packages/common/src/converter/payload-converter.ts @@ -49,7 +49,10 @@ export function toPayloads(converter: PayloadConverter, ...values: unknown[]): P * * @throws {@link ValueError} if conversion of any value in the map fails */ -export function mapToPayloads(converter: PayloadConverter, map: Record): Record { +export function mapToPayloads( + converter: PayloadConverter, + map: Record +): Record { return Object.fromEntries( Object.entries(map).map(([k, v]): [K, Payload] => [k as K, converter.toPayload(v)]) ) as Record; @@ -84,17 +87,17 @@ export function arrayFromPayloads(converter: PayloadConverter, payloads?: Payloa return payloads.map((payload: Payload) => converter.fromPayload(payload)); } -export function mapFromPayloads( +export function mapFromPayloads( converter: PayloadConverter, map?: Record | null | undefined -): Record | undefined { +): Record | undefined { if (map == null) return undefined; return Object.fromEntries( Object.entries(map).map(([k, payload]): [K, unknown] => { const value = converter.fromPayload(payload as Payload); return [k as K, value]; }) - ) as Record; + ) as Record; } export interface PayloadConverterWithEncoding { @@ -252,73 +255,6 @@ export class JsonPayloadConverter implements PayloadConverterWithEncoding { } } -/** - * Converts Search Attribute values using JsonPayloadConverter - */ -export class SearchAttributePayloadConverter implements PayloadConverter { - jsonConverter = new JsonPayloadConverter(); - validNonDateTypes = ['string', 'number', 'boolean']; - - public toPayload(values: unknown): Payload { - if (!Array.isArray(values)) { - throw new ValueError(`SearchAttribute value must be an array`); - } - - if (values.length > 0) { - const firstValue = values[0]; - const firstType = typeof firstValue; - if (firstType === 'object') { - for (const [idx, value] of values.entries()) { - if (!(value instanceof Date)) { - throw new ValueError( - `SearchAttribute values must arrays of strings, numbers, booleans, or Dates. The value ${value} at index ${idx} is of type ${typeof value}` - ); - } - } - } else { - if (!this.validNonDateTypes.includes(firstType)) { - throw new ValueError(`SearchAttribute array values must be: string | number | boolean | Date`); - } - - for (const [idx, value] of values.entries()) { - if (typeof value !== firstType) { - throw new ValueError( - `All SearchAttribute array values must be of the same type. The first value ${firstValue} of type ${firstType} doesn't match value ${value} of type ${typeof value} at index ${idx}` - ); - } - } - } - } - - // JSON.stringify takes care of converting Dates to ISO strings - const ret = this.jsonConverter.toPayload(values); - if (ret === undefined) { - throw new ValueError('Could not convert search attributes to payloads'); - } - return ret; - } - - /** - * Datetime Search Attribute values are converted to `Date`s - */ - public fromPayload(payload: Payload): T { - if (payload.metadata === undefined || payload.metadata === null) { - throw new ValueError('Missing payload metadata'); - } - - const value = this.jsonConverter.fromPayload(payload); - let arrayWrappedValue = Array.isArray(value) ? value : [value]; - - const searchAttributeType = decode(payload.metadata.type); - if (searchAttributeType === 'Datetime') { - arrayWrappedValue = arrayWrappedValue.map((dateString) => new Date(dateString)); - } - return arrayWrappedValue as unknown as T; - } -} - -export const searchAttributePayloadConverter = new SearchAttributePayloadConverter(); - export class DefaultPayloadConverter extends CompositePayloadConverter { // Match the order used in other SDKs, but exclude Protobuf converters so that the code, including // `proto3-json-serializer`, doesn't take space in Workflow bundles that don't use Protobufs. To use Protobufs, use diff --git a/packages/common/src/converter/payload-search-attributes.ts b/packages/common/src/converter/payload-search-attributes.ts new file mode 100644 index 000000000..0e54f53bd --- /dev/null +++ b/packages/common/src/converter/payload-search-attributes.ts @@ -0,0 +1,220 @@ +import { decode, encode } from '../encoding'; +import { ValueError } from '../errors'; +import { Payload } from '../interfaces'; +import { + TypedSearchAttributes, + SearchAttributeType, + SearchAttributes, + isValidValueForType, + TypedSearchAttributeValue, + SearchAttributePair, + SearchAttributeUpdatePair, + TypedSearchAttributeUpdateValue, +} from '../search-attributes'; +import { PayloadConverter, JsonPayloadConverter, mapFromPayloads, mapToPayloads } from './payload-converter'; + +/** + * Converts Search Attribute values using JsonPayloadConverter + */ +export class SearchAttributePayloadConverter implements PayloadConverter { + jsonConverter = new JsonPayloadConverter(); + validNonDateTypes = ['string', 'number', 'boolean']; + + public toPayload(values: unknown): Payload { + if (!Array.isArray(values)) { + throw new ValueError(`SearchAttribute value must be an array`); + } + + if (values.length > 0) { + const firstValue = values[0]; + const firstType = typeof firstValue; + if (firstType === 'object') { + for (const [idx, value] of values.entries()) { + if (!(value instanceof Date)) { + throw new ValueError( + `SearchAttribute values must arrays of strings, numbers, booleans, or Dates. The value ${value} at index ${idx} is of type ${typeof value}` + ); + } + } + } else { + if (!this.validNonDateTypes.includes(firstType)) { + throw new ValueError(`SearchAttribute array values must be: string | number | boolean | Date`); + } + + for (const [idx, value] of values.entries()) { + if (typeof value !== firstType) { + throw new ValueError( + `All SearchAttribute array values must be of the same type. The first value ${firstValue} of type ${firstType} doesn't match value ${value} of type ${typeof value} at index ${idx}` + ); + } + } + } + } + + // JSON.stringify takes care of converting Dates to ISO strings + const ret = this.jsonConverter.toPayload(values); + if (ret === undefined) { + throw new ValueError('Could not convert search attributes to payloads'); + } + return ret; + } + + /** + * Datetime Search Attribute values are converted to `Date`s + */ + public fromPayload(payload: Payload): T { + if (payload.metadata == null) { + throw new ValueError('Missing payload metadata'); + } + + const value = this.jsonConverter.fromPayload(payload); + let arrayWrappedValue = Array.isArray(value) ? value : [value]; + const searchAttributeType = decode(payload.metadata.type); + if (searchAttributeType === 'Datetime') { + arrayWrappedValue = arrayWrappedValue.map((dateString) => new Date(dateString)); + } + return arrayWrappedValue as unknown as T; + } +} + +export const searchAttributePayloadConverter = new SearchAttributePayloadConverter(); + +export class TypedSearchAttributePayloadConverter implements PayloadConverter { + jsonConverter = new JsonPayloadConverter(); + + public toPayload(attr: T): Payload { + if (!(attr instanceof TypedSearchAttributeValue || attr instanceof TypedSearchAttributeUpdateValue)) { + throw new ValueError( + `Expect input to be instance of TypedSearchAttributeValue or TypedSearchAttributeUpdateValue, got: ${JSON.stringify( + attr + )}` + ); + } + + // We check for deletion as well as regular typed search attributes. + if (attr.value !== null && !isValidValueForType(attr.type, attr.value)) { + throw new ValueError(`Invalid search attribute value ${attr.value} for given type ${attr.type}`); + } + + // For server search attributes to work properly, we cannot set the metadata + // type when we set null + if (attr.value === null) { + const payload = this.jsonConverter.toPayload(attr.value); + if (payload === undefined) { + throw new ValueError('Could not convert typed search attribute to payload'); + } + return payload; + } + + // JSON.stringify takes care of converting Dates to ISO strings + const payload = this.jsonConverter.toPayload(attr.value); + if (payload === undefined) { + throw new ValueError('Could not convert typed search attribute to payload'); + } + + // Note: this shouldn't be the case but the compiler complains without this check. + if (payload.metadata == null) { + throw new ValueError('Missing payload metadata'); + } + // Add encoded type of search attribute to metatdata + payload.metadata['type'] = encode(TypedSearchAttributes.toMetadataType(attr.type)); + return payload; + } + + // Note: type casting undefined values is not clear to caller. + // We can't change the typing of the method to return undefined, it's not allowed by the interface. + public fromPayload(payload: Payload): T { + if (payload.metadata == null) { + throw new ValueError('Missing payload metadata'); + } + + // If no 'type' metadata field or no given value, we skip. + if (payload.metadata.type == null) { + return undefined as T; + } + const type = TypedSearchAttributes.toSearchAttributeType(decode(payload.metadata.type)); + // Unrecognized metadata type (sanity check). + if (type === undefined) { + return undefined as T; + } + + let value = this.jsonConverter.fromPayload(payload); + + // Handle legacy values without KEYWORD_LIST type. + if (type !== SearchAttributeType.KEYWORD_LIST && Array.isArray(value)) { + // Cannot have an array with multiple values for non-KEYWORD_LIST type. + if (value.length > 1) { + return undefined as T; + } + // Unpack single value array. + value = value[0]; + } + if (type === SearchAttributeType.DATETIME && value) { + value = new Date(value as string); + } + // Check if the value is a valid for the given type. If not, skip. + if (!isValidValueForType(type, value)) { + return undefined as T; + } + return new TypedSearchAttributeValue(type, value) as T; + } +} + +export const typedSearchAttributePayloadConverter = new TypedSearchAttributePayloadConverter(); + +// If both params are provided, conflicting keys will be overwritten by typedSearchAttributes. +export function encodeUnifiedSearchAttributes( + searchAttributes?: SearchAttributes, // eslint-disable-line deprecation/deprecation + typedSearchAttributes?: TypedSearchAttributes | SearchAttributeUpdatePair[] +): Record { + return { + ...(searchAttributes ? mapToPayloads(searchAttributePayloadConverter, searchAttributes) : {}), + ...(typedSearchAttributes + ? mapToPayloads>( + typedSearchAttributePayloadConverter, + Object.fromEntries( + (Array.isArray(typedSearchAttributes) ? typedSearchAttributes : typedSearchAttributes.getAll()).map( + (pair) => { + return [pair.key.name, new TypedSearchAttributeUpdateValue(pair.key.type, pair.value)]; + } + ) + ) + ) + : {}), + }; +} + +// eslint-disable-next-line deprecation/deprecation +export function decodeSearchAttributes(indexedFields: Record | undefined | null): SearchAttributes { + if (!indexedFields) return {}; + return Object.fromEntries( + // eslint-disable-next-line deprecation/deprecation + Object.entries(mapFromPayloads(searchAttributePayloadConverter, indexedFields) as SearchAttributes).filter( + ([_, v]) => v && v.length > 0 + ) // Filter out empty arrays returned by pre 1.18 servers + ); +} + +export function decodeTypedSearchAttributes( + indexedFields: Record | undefined | null +): TypedSearchAttributes { + return new TypedSearchAttributes( + Object.entries( + mapFromPayloads | undefined>( + typedSearchAttributePayloadConverter, + indexedFields + ) ?? {} + ).reduce((acc, [k, attr]) => { + // Filter out undefined values from converter. + if (!attr) { + return acc; + } + const key = { name: k, type: attr.type }; + // Ensure is valid pair. + if (isValidValueForType(key.type, attr.value)) { + acc.push({ key, value: attr.value } as SearchAttributePair); + } + return acc; + }, []) + ); +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index b8fb2f2d8..f4d44abba 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -24,6 +24,15 @@ export type { Timestamp, Duration, StringValue } from './time'; export * from './workflow-handle'; export * from './workflow-options'; export * from './versioning-intent'; +export { + SearchAttributes, // eslint-disable-line deprecation/deprecation + SearchAttributeValue, // eslint-disable-line deprecation/deprecation + SearchAttributeType, + SearchAttributePair, + SearchAttributeUpdatePair, + TypedSearchAttributes, + defineSearchAttributeKey, +} from './search-attributes'; /** * Encode a UTF-8 string into a Uint8Array diff --git a/packages/common/src/interfaces.ts b/packages/common/src/interfaces.ts index f0dc4a227..34becc49b 100644 --- a/packages/common/src/interfaces.ts +++ b/packages/common/src/interfaces.ts @@ -93,14 +93,6 @@ export interface QueryDefinition = ReturnType extends Promise ? R : never; -/** - * If another SDK creates a Search Attribute that's not an array, we wrap it in an array. - * - * Dates are serialized as ISO strings. - */ -export type SearchAttributes = Record | undefined>; -export type SearchAttributeValue = string[] | number[] | boolean[] | Date[]; - export interface ActivityFunction

{ (...args: P): Promise; } diff --git a/packages/common/src/search-attributes.ts b/packages/common/src/search-attributes.ts new file mode 100644 index 000000000..5f0355cc1 --- /dev/null +++ b/packages/common/src/search-attributes.ts @@ -0,0 +1,292 @@ +import type { temporal } from '@temporalio/proto'; +import { makeProtoEnumConverters } from './internal-workflow'; + +/** @deprecated: Use {@link TypedSearchAttributes} instead */ +export type SearchAttributeValueOrReadonly = SearchAttributeValue | Readonly | undefined; // eslint-disable-line deprecation/deprecation +/** @deprecated: Use {@link TypedSearchAttributes} instead */ +export type SearchAttributes = Record; // eslint-disable-line deprecation/deprecation +/** @deprecated: Use {@link TypedSearchAttributes} instead */ +export type SearchAttributeValue = string[] | number[] | boolean[] | Date[]; // eslint-disable-line deprecation/deprecation + +export const SearchAttributeType = { + TEXT: 'TEXT', + KEYWORD: 'KEYWORD', + INT: 'INT', + DOUBLE: 'DOUBLE', + BOOL: 'BOOL', + DATETIME: 'DATETIME', + KEYWORD_LIST: 'KEYWORD_LIST', +} as const; + +export type SearchAttributeType = (typeof SearchAttributeType)[keyof typeof SearchAttributeType]; + +// Note: encodeSearchAttributeIndexedValueType exported for use in tests to register search attributes +// ts-prune-ignore-next +export const [encodeSearchAttributeIndexedValueType, _] = makeProtoEnumConverters< + temporal.api.enums.v1.IndexedValueType, + typeof temporal.api.enums.v1.IndexedValueType, + keyof typeof temporal.api.enums.v1.IndexedValueType, + typeof SearchAttributeType, + 'INDEXED_VALUE_TYPE_' +>( + { + [SearchAttributeType.TEXT]: 1, + [SearchAttributeType.KEYWORD]: 2, + [SearchAttributeType.INT]: 3, + [SearchAttributeType.DOUBLE]: 4, + [SearchAttributeType.BOOL]: 5, + [SearchAttributeType.DATETIME]: 6, + [SearchAttributeType.KEYWORD_LIST]: 7, + UNSPECIFIED: 0, + } as const, + 'INDEXED_VALUE_TYPE_' +); + +interface IndexedValueTypeMapping { + TEXT: string; + KEYWORD: string; + INT: number; + DOUBLE: number; + BOOL: boolean; + DATETIME: Date; + KEYWORD_LIST: string[]; +} + +export function isValidValueForType( + type: T, + value: unknown +): value is IndexedValueTypeMapping[T] { + switch (type) { + case SearchAttributeType.TEXT: + case SearchAttributeType.KEYWORD: + return typeof value === 'string'; + case SearchAttributeType.INT: + return Number.isInteger(value); + case SearchAttributeType.DOUBLE: + return typeof value === 'number'; + case SearchAttributeType.BOOL: + return typeof value === 'boolean'; + case SearchAttributeType.DATETIME: + return value instanceof Date; + case SearchAttributeType.KEYWORD_LIST: + return Array.isArray(value) && value.every((item) => typeof item === 'string'); + default: + return false; + } +} + +export interface SearchAttributeKey { + name: string; + type: T; +} + +export function defineSearchAttributeKey(name: string, type: T): SearchAttributeKey { + return { name, type }; +} + +class BaseSearchAttributeValue { + private readonly _type: T; + private readonly _value: V; + + constructor(type: T, value: V) { + this._type = type; + this._value = value; + } + + get type(): T { + return this._type; + } + + get value(): V { + return this._value; + } +} + +// Internal type for class private data. +// Exported for use in payload conversion. +export class TypedSearchAttributeValue extends BaseSearchAttributeValue {} +// ts-prune-ignore-next +export class TypedSearchAttributeUpdateValue extends BaseSearchAttributeValue< + T, + IndexedValueTypeMapping[T] | null +> {} + +export type SearchAttributePair = { + [T in SearchAttributeType]: { key: SearchAttributeKey; value: IndexedValueTypeMapping[T] }; +}[SearchAttributeType]; + +export type SearchAttributeUpdatePair = { + [T in SearchAttributeType]: { key: SearchAttributeKey; value: IndexedValueTypeMapping[T] | null }; +}[SearchAttributeType]; + +export class TypedSearchAttributes { + private searchAttributes: Record> = {}; + + constructor(initialAttributes?: SearchAttributePair[]) { + if (initialAttributes === undefined) return; + for (const pair of initialAttributes) { + if (pair.key.name in this.searchAttributes) { + throw new Error(`Duplicate search attribute key: ${pair.key.name}`); + } + this.searchAttributes[pair.key.name] = new TypedSearchAttributeValue(pair.key.type, pair.value); + } + } + + get(key: SearchAttributeKey): IndexedValueTypeMapping[T] | undefined { + const attr = this.searchAttributes[key.name]; + // Key not found or type mismatch. + if (attr === undefined || !isValidValueForType(key.type, attr.value)) { + return undefined; + } + return attr.value; + } + + /** Returns a deep copy of the given TypedSearchAttributes instance */ + copy(): TypedSearchAttributes { + const state: Record> = {}; + + for (const [key, attr] of Object.entries(this.searchAttributes)) { + // Create a new instance with the same properties + let value = attr.value; + // For non-primitive types, create a deep copy + if (attr.value instanceof Date) { + value = new Date(attr.value); + } else if (Array.isArray(attr.value)) { + value = [...attr.value]; + } + state[key] = new TypedSearchAttributeValue(attr.type, value); + } + + // Create return value with manually assigned state. + const res = new TypedSearchAttributes(); + res.searchAttributes = state; + return res; + } + + /** + * @hidden + * Return JSON representation of this class as SearchAttributePair[] + * Default toJSON method is not used because it's JSON representation includes private state. + */ + toJSON(): SearchAttributePair[] { + return this.getAll(); + } + + /** Returns a copy of the current TypedSearchAttributes instance with the updated attributes. */ + updateCopy(updates: SearchAttributeUpdatePair[]): TypedSearchAttributes { + // Create a deep copy of the current instance. + const res = this.copy(); + // Apply updates. + res.update(updates); + return res; + } + + // Performs direct mutation on the current instance. + private update(updates: SearchAttributeUpdatePair[]) { + // Apply updates. + for (const pair of updates) { + // Delete attribute. + if (pair.value === null) { + // Delete only if the update matches a key and type. + const attrVal = this.searchAttributes[pair.key.name]; + if (attrVal && attrVal.type === pair.key.type) { + delete this.searchAttributes[pair.key.name]; + } + continue; + } + // Add or update attribute. + this.searchAttributes[pair.key.name] = new TypedSearchAttributeValue(pair.key.type, pair.value); + } + } + + getAll(): SearchAttributePair[] { + const res: SearchAttributePair[] = []; + for (const [key, attr] of Object.entries(this.searchAttributes)) { + const attrKey = { name: key, type: attr.type }; + // Sanity check, should always be legal. + if (isValidValueForType(attrKey.type, attr.value)) { + res.push({ key: attrKey, value: attr.value } as SearchAttributePair); + } + } + return res; + } + + static getKeyFromUntyped( + key: string, + value: SearchAttributeValueOrReadonly // eslint-disable-line deprecation/deprecation + ): SearchAttributeKey | undefined { + if (value == null) { + return; + } + + // Unpack single-element arrays. + const val = value.length === 1 ? value[0] : value; + switch (typeof val) { + case 'string': + // Check if val is an ISO string, if so, return a DATETIME key. + if (!isNaN(Date.parse(val)) && Date.parse(val) === new Date(val).getTime()) { + return { name: key, type: SearchAttributeType.DATETIME }; + } + return { name: key, type: SearchAttributeType.TEXT }; + case 'number': + return { + name: key, + type: Number.isInteger(val) ? SearchAttributeType.INT : SearchAttributeType.DOUBLE, + }; + case 'boolean': + return { name: key, type: SearchAttributeType.BOOL }; + case 'object': + if (val instanceof Date) { + return { name: key, type: SearchAttributeType.DATETIME }; + } + if (Array.isArray(val) && val.every((item) => typeof item === 'string')) { + return { name: key, type: SearchAttributeType.KEYWORD_LIST }; + } + return; + default: + return; + } + } + + static toMetadataType(type: SearchAttributeType): string { + switch (type) { + case SearchAttributeType.TEXT: + return 'Text'; + case SearchAttributeType.KEYWORD: + return 'Keyword'; + case SearchAttributeType.INT: + return 'Int'; + case SearchAttributeType.DOUBLE: + return 'Double'; + case SearchAttributeType.BOOL: + return 'Bool'; + case SearchAttributeType.DATETIME: + return 'Datetime'; + case SearchAttributeType.KEYWORD_LIST: + return 'KeywordList'; + default: + throw new Error(`Unknown search attribute type: ${type}`); + } + } + + static toSearchAttributeType(type: string): SearchAttributeType | undefined { + switch (type) { + case 'Text': + return SearchAttributeType.TEXT; + case 'Keyword': + return SearchAttributeType.KEYWORD; + case 'Int': + return SearchAttributeType.INT; + case 'Double': + return SearchAttributeType.DOUBLE; + case 'Bool': + return SearchAttributeType.BOOL; + case 'Datetime': + return SearchAttributeType.DATETIME; + case 'KeywordList': + return SearchAttributeType.KEYWORD_LIST; + default: + return; + } + } +} diff --git a/packages/common/src/workflow-options.ts b/packages/common/src/workflow-options.ts index 8bbcef324..6206a9861 100644 --- a/packages/common/src/workflow-options.ts +++ b/packages/common/src/workflow-options.ts @@ -1,8 +1,9 @@ import type { temporal } from '@temporalio/proto'; -import { SearchAttributes, Workflow } from './interfaces'; +import { Workflow } from './interfaces'; import { RetryPolicy } from './retry-policy'; import { Duration } from './time'; import { makeProtoEnumConverters } from './internal-workflow'; +import { SearchAttributePair, SearchAttributes, TypedSearchAttributes } from './search-attributes'; /** * Defines what happens when trying to start a Workflow with the same ID as a *Closed* Workflow. @@ -173,8 +174,22 @@ export interface BaseWorkflowOptions { * https://docs.temporal.io/docs/typescript/search-attributes * * Values are always converted using {@link JsonPayloadConverter}, even when a custom data converter is provided. + * + * @deprecated Use {@link typedSearchAttributes} instead. + */ + searchAttributes?: SearchAttributes; // eslint-disable-line deprecation/deprecation + + /** + * Specifies additional indexed information to attach to the Workflow Execution. More info: + * https://docs.temporal.io/docs/typescript/search-attributes + * + * Values are always converted using {@link JsonPayloadConverter}, even when a custom data converter is provided. + * Note that search attributes are not encoded, as such, do not include any sensitive information. + * + * If both {@link searchAttributes} and {@link typedSearchAttributes} are provided, conflicting keys will be overwritten + * by {@link typedSearchAttributes}. */ - searchAttributes?: SearchAttributes; + typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes; } export type WithWorkflowArgs = T & diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index 6b085a01f..97eed9524 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -1,24 +1,24 @@ import * as fs from 'fs/promises'; import * as net from 'net'; import path from 'path'; -import StackUtils from 'stack-utils'; -import ava, { TestFn } from 'ava'; import * as grpc from '@grpc/grpc-js'; import asyncRetry from 'async-retry'; +import ava, { TestFn } from 'ava'; +import StackUtils from 'stack-utils'; import { v4 as uuid4 } from 'uuid'; -import { inWorkflowContext, WorkflowInfo } from '@temporalio/workflow'; -import { Payload, PayloadCodec } from '@temporalio/common'; -import { Worker as RealWorker, WorkerOptions } from '@temporalio/worker'; -import * as worker from '@temporalio/worker'; import { Client, Connection } from '@temporalio/client'; +import { Payload, PayloadCodec } from '@temporalio/common'; +import { historyToJSON } from '@temporalio/common/lib/proto-utils'; import * as iface from '@temporalio/proto'; import { LocalTestWorkflowEnvironmentOptions, TestWorkflowEnvironment as RealTestWorkflowEnvironment, TimeSkippingTestWorkflowEnvironmentOptions, } from '@temporalio/testing'; +import * as worker from '@temporalio/worker'; +import { Worker as RealWorker, WorkerOptions } from '@temporalio/worker'; +import { inWorkflowContext, WorkflowInfo } from '@temporalio/workflow'; import { LoggerSinksInternal as DefaultLoggerSinks } from '@temporalio/workflow/lib/logs'; -import { historyToJSON } from '@temporalio/common/lib/proto-utils'; export function u8(s: string): Uint8Array { // TextEncoder requires lib "dom" @@ -33,7 +33,7 @@ function isSet(env: string | undefined, def: boolean): boolean { return env === '1' || env === 't' || env === 'true'; } -export const RUN_INTEGRATION_TESTS = inWorkflowContext() || isSet(process.env.RUN_INTEGRATION_TESTS, false); +export const RUN_INTEGRATION_TESTS = inWorkflowContext() || isSet(process.env.RUN_INTEGRATION_TESTS, true); export const REUSE_V8_CONTEXT = inWorkflowContext() || isSet(process.env.REUSE_V8_CONTEXT, true); export const RUN_TIME_SKIPPING_TESTS = inWorkflowContext() || !(process.platform === 'linux' && process.arch === 'arm64'); diff --git a/packages/test/src/run-activation-perf-tests.ts b/packages/test/src/run-activation-perf-tests.ts index 9a7fc9a6c..d65608721 100644 --- a/packages/test/src/run-activation-perf-tests.ts +++ b/packages/test/src/run-activation-perf-tests.ts @@ -7,6 +7,7 @@ import { WorkflowCodeBundler } from '@temporalio/worker/lib/workflow/bundler'; import { parseWorkflowCode } from '@temporalio/worker/lib/worker'; import { VMWorkflow, VMWorkflowCreator } from '@temporalio/worker/lib/workflow/vm'; import * as wf from '@temporalio/workflow'; +import { TypedSearchAttributes } from '@temporalio/common'; // WARNING: This file is a quick and dirty utility to run Workflow Activation performance testing // localy. It is not part of our regular test suite and hasn't been reviewed. @@ -80,6 +81,7 @@ if (!wf.inWorkflowContext()) { taskTimeoutMs: 1000, taskQueue: 'test', searchAttributes: {}, + typedSearchAttributes: new TypedSearchAttributes(), historyLength: 3, historySize: 300, continueAsNewSuggested: false, diff --git a/packages/test/src/test-integration-split-one.ts b/packages/test/src/test-integration-split-one.ts index aac8569eb..1348870ed 100644 --- a/packages/test/src/test-integration-split-one.ts +++ b/packages/test/src/test-integration-split-one.ts @@ -610,19 +610,20 @@ test('WorkflowHandle.describe result is wrapped', configMacro, async (t, config) t.deepEqual(execution.type, 'argsAndReturn'); t.deepEqual(execution.memo, { note: 'foo' }); t.true(execution.startTime instanceof Date); - t.deepEqual(execution.searchAttributes!.CustomKeywordField, ['test-value']); - t.deepEqual(execution.searchAttributes!.CustomIntField, [1]); - t.deepEqual(execution.searchAttributes!.CustomDatetimeField, [date]); - const binSum = execution.searchAttributes!.BinaryChecksums as string[]; + t.deepEqual(execution.searchAttributes!.CustomKeywordField, ['test-value']); // eslint-disable-line deprecation/deprecation + t.deepEqual(execution.searchAttributes!.CustomIntField, [1]); // eslint-disable-line deprecation/deprecation + t.deepEqual(execution.searchAttributes!.CustomDatetimeField, [date]); // eslint-disable-line deprecation/deprecation + const binSum = execution.searchAttributes!.BinaryChecksums as string[]; // eslint-disable-line deprecation/deprecation if (binSum != null) { t.regex(binSum[0], /@temporalio\/worker@/); } else { - t.deepEqual(execution.searchAttributes!.BuildIds, ['unversioned', `unversioned:${worker.options.buildId}`]); + t.deepEqual(execution.searchAttributes!.BuildIds, ['unversioned', `unversioned:${worker.options.buildId}`]); // eslint-disable-line deprecation/deprecation } }); +// eslint-disable-next-line deprecation/deprecation export async function returnSearchAttributes(): Promise { - const sa = workflowInfo().searchAttributes!; // eslint-disable-line @typescript-eslint/no-non-null-assertion + const sa = workflowInfo().searchAttributes!; // eslint-disable-line @typescript-eslint/no-non-null-assertion, deprecation/deprecation const datetime = (sa.CustomDatetimeField as Array)[0]; return { ...sa, @@ -666,13 +667,12 @@ test('Workflow can upsert Search Attributes', configMacro, async (t, config) => const res = await worker.runUntil(handle.result()); t.deepEqual(res, { CustomBoolField: [true], - CustomIntField: [], // clear CustomKeywordField: ['durable code'], CustomTextField: ['is useful'], CustomDatetimeField: [date.toISOString()], CustomDoubleField: [3.14], }); - const { searchAttributes } = await handle.describe(); + const { searchAttributes } = await handle.describe(); // eslint-disable-line deprecation/deprecation const { BinaryChecksums, BuildIds, ...rest } = searchAttributes; t.deepEqual(rest, { CustomBoolField: [true], @@ -723,6 +723,8 @@ test('Workflow can read WorkflowInfo', configMacro, async (t, config) => { runId: handle.firstExecutionRunId, taskQueue, searchAttributes: {}, + // Typed search attributes gets serialized as an array. + typedSearchAttributes: [], workflowType: 'returnWorkflowInfo', workflowId: handle.workflowId, historyLength: 3, diff --git a/packages/test/src/test-integration-split-three.ts b/packages/test/src/test-integration-split-three.ts index 8a39892a0..1edf8387e 100644 --- a/packages/test/src/test-integration-split-three.ts +++ b/packages/test/src/test-integration-split-three.ts @@ -28,7 +28,6 @@ test('cancel-http-request', configMacro, async (t, config) => { t.pass(); }); -// TODO(thomas): fix if ('promiseHooks' in v8) { // Skip in old node versions test('Stack trace query returns stack that makes sense', configMacro, async (t, config) => { diff --git a/packages/test/src/test-integration-split-two.ts b/packages/test/src/test-integration-split-two.ts index 6f6413fda..80e5f1530 100644 --- a/packages/test/src/test-integration-split-two.ts +++ b/packages/test/src/test-integration-split-two.ts @@ -7,10 +7,10 @@ import { ApplicationFailure, defaultPayloadConverter, Payload, - searchAttributePayloadConverter, WorkflowExecutionAlreadyStartedError, WorkflowNotFoundError, } from '@temporalio/common'; +import { searchAttributePayloadConverter } from '@temporalio/common/lib/converter/payload-search-attributes'; import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow'; @@ -96,7 +96,7 @@ test('WorkflowOptions are passed correctly', configMacro, async (t, config) => { ), [3] ); - t.deepEqual(execution.searchAttributes!.CustomIntField, [3]); + t.deepEqual(execution.searchAttributes!.CustomIntField, [3]); // eslint-disable-line deprecation/deprecation t.is(execution.raw.executionConfig?.taskQueue?.name, 'diff-task-queue'); t.is( execution.raw.executionConfig?.taskQueue?.kind, @@ -224,8 +224,8 @@ test('continue-as-new-to-same-workflow keeps memo and search attributes', config const execution = await handle.describe(); t.not(execution.runId, handle.firstExecutionRunId); t.deepEqual(execution.memo, { note: 'foo' }); - t.deepEqual(execution.searchAttributes!.CustomKeywordField, ['test-value']); - t.deepEqual(execution.searchAttributes!.CustomIntField, [1]); + t.deepEqual(execution.searchAttributes!.CustomKeywordField, ['test-value']); // eslint-disable-line deprecation/deprecation + t.deepEqual(execution.searchAttributes!.CustomIntField, [1]); // eslint-disable-line deprecation/deprecation }); }); @@ -253,8 +253,8 @@ test( t.is(info.type, 'sleeper'); t.not(info.runId, handle.firstExecutionRunId); t.deepEqual(info.memo, { note: 'foo' }); - t.deepEqual(info.searchAttributes!.CustomKeywordField, ['test-value']); - t.deepEqual(info.searchAttributes!.CustomIntField, [1]); + t.deepEqual(info.searchAttributes!.CustomKeywordField, ['test-value']); // eslint-disable-line deprecation/deprecation + t.deepEqual(info.searchAttributes!.CustomIntField, [1]); // eslint-disable-line deprecation/deprecation }); } ); @@ -291,8 +291,8 @@ test('continue-as-new-to-different-workflow can set memo and search attributes', t.is(info.type, 'sleeper'); t.not(info.runId, handle.firstExecutionRunId); t.deepEqual(info.memo, { note: 'bar' }); - t.deepEqual(info.searchAttributes!.CustomKeywordField, ['test-value-2']); - t.deepEqual(info.searchAttributes!.CustomIntField, [3]); + t.deepEqual(info.searchAttributes!.CustomKeywordField, ['test-value-2']); // eslint-disable-line deprecation/deprecation + t.deepEqual(info.searchAttributes!.CustomIntField, [3]); // eslint-disable-line deprecation/deprecation }); }); diff --git a/packages/test/src/test-payload-converter.ts b/packages/test/src/test-payload-converter.ts index 4a8d1d28f..495b5f542 100644 --- a/packages/test/src/test-payload-converter.ts +++ b/packages/test/src/test-payload-converter.ts @@ -10,10 +10,10 @@ import { METADATA_ENCODING_KEY, METADATA_MESSAGE_TYPE_KEY, PayloadConverterError, - SearchAttributePayloadConverter, UndefinedPayloadConverter, ValueError, } from '@temporalio/common'; +import { SearchAttributePayloadConverter } from '@temporalio/common/lib/converter/payload-search-attributes'; import { encode } from '@temporalio/common/lib/encoding'; import { DefaultPayloadConverterWithProtobufs, diff --git a/packages/test/src/test-schedules.ts b/packages/test/src/test-schedules.ts index 01ec102dd..58cebf9c7 100644 --- a/packages/test/src/test-schedules.ts +++ b/packages/test/src/test-schedules.ts @@ -10,9 +10,9 @@ import { ScheduleHandle, ScheduleSummary, ScheduleUpdateOptions, - SearchAttributes, } from '@temporalio/client'; import { msToNumber } from '@temporalio/common/lib/time'; +import { SearchAttributes, SearchAttributeType, TypedSearchAttributes } from '@temporalio/common'; import { registerDefaultCustomSearchAttributes, RUN_INTEGRATION_TESTS } from './helpers'; export interface Context { @@ -168,6 +168,9 @@ if (RUN_INTEGRATION_TESTS) { searchAttributes: { CustomKeywordField: ['test-value2'], }, + typedSearchAttributes: new TypedSearchAttributes([ + { key: { name: 'CustomIntField', type: SearchAttributeType.INT }, value: 42 }, + ]), }, }); @@ -177,7 +180,18 @@ if (RUN_INTEGRATION_TESTS) { t.is(describedSchedule.action.type, 'startWorkflow'); t.is(describedSchedule.action.workflowType, 'dummyWorkflow'); t.deepEqual(describedSchedule.action.memo, { 'my-memo': 'foo' }); - t.deepEqual(describedSchedule.action.searchAttributes?.CustomKeywordField, ['test-value2']); + // eslint-disable-next-line deprecation/deprecation + t.deepEqual(describedSchedule.action.searchAttributes, { + CustomKeywordField: ['test-value2'], + CustomIntField: [42], + }); + t.deepEqual( + describedSchedule.action.typedSearchAttributes, + new TypedSearchAttributes([ + { key: { name: 'CustomIntField', type: SearchAttributeType.INT }, value: 42 }, + { key: { name: 'CustomKeywordField', type: SearchAttributeType.KEYWORD }, value: 'test-value2' }, + ]) + ); } finally { await handle.delete(); } @@ -186,24 +200,26 @@ if (RUN_INTEGRATION_TESTS) { test.serial('Can create schedule with startWorkflow action (with args)', async (t) => { const { client } = t.context; const scheduleId = `can-create-schedule-with-startWorkflow-action-${randomUUID()}`; - const action = { - type: 'startWorkflow', - workflowType: dummyWorkflowWith2Args, - args: [3, 4], - taskQueue, - memo: { - 'my-memo': 'foo', - }, - searchAttributes: { - CustomKeywordField: ['test-value2'], - }, - } as const; const handle = await client.schedule.create({ scheduleId, spec: { calendars: [{ hour: { start: 2, end: 7, step: 1 } }], }, - action, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflowWith2Args, + args: [3, 4], + taskQueue, + memo: { + 'my-memo': 'foo', + }, + searchAttributes: { + CustomKeywordField: ['test-value2'], + }, + typedSearchAttributes: new TypedSearchAttributes([ + { key: { name: 'CustomIntField', type: SearchAttributeType.INT }, value: 42 }, + ]), + }, }); try { @@ -213,7 +229,18 @@ if (RUN_INTEGRATION_TESTS) { t.is(describedSchedule.action.workflowType, 'dummyWorkflowWith2Args'); t.deepEqual(describedSchedule.action.args, [3, 4]); t.deepEqual(describedSchedule.action.memo, { 'my-memo': 'foo' }); - t.deepEqual(describedSchedule.action.searchAttributes?.CustomKeywordField, ['test-value2']); + // eslint-disable-next-line deprecation/deprecation + t.deepEqual(describedSchedule.action.searchAttributes, { + CustomKeywordField: ['test-value2'], + CustomIntField: [42], + }); + t.deepEqual( + describedSchedule.action.typedSearchAttributes, + new TypedSearchAttributes([ + { key: { name: 'CustomIntField', type: SearchAttributeType.INT }, value: 42 }, + { key: { name: 'CustomKeywordField', type: SearchAttributeType.KEYWORD }, value: 'test-value2' }, + ]) + ); } finally { await handle.delete(); } @@ -324,6 +351,9 @@ if (RUN_INTEGRATION_TESTS) { searchAttributes: { CustomKeywordField: ['test-value2'], }, + typedSearchAttributes: new TypedSearchAttributes([ + { key: { name: 'CustomIntField', type: SearchAttributeType.INT }, value: 42 }, + ]), }, }); @@ -551,7 +581,7 @@ if (RUN_INTEGRATION_TESTS) { const expectedIds: string[] = []; for (let i = 0; i < 4; i++) { const scheduleId = `test-query-${groupId}-${i + 1}`; - const searchAttributes: SearchAttributes = {}; + const searchAttributes: SearchAttributes = {}; // eslint-disable-line deprecation/deprecation if (i < 2) { searchAttributes['CustomKeywordField'] = ['some-value']; expectedIds.push(scheduleId); @@ -568,6 +598,9 @@ if (RUN_INTEGRATION_TESTS) { taskQueue, }, searchAttributes, + typedSearchAttributes: new TypedSearchAttributes([ + { key: { name: 'CustomIntField', type: SearchAttributeType.INT }, value: 42 }, + ]), }) ); } diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index 0f0cac482..dc6402f1b 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -5,7 +5,7 @@ import { Connection, WorkflowClient } from '@temporalio/client'; import { DefaultLogger, InjectedSinks, Runtime, WorkerOptions, LogEntry } from '@temporalio/worker'; import { SearchAttributes, WorkflowInfo } from '@temporalio/workflow'; import { UnsafeWorkflowInfo } from '@temporalio/workflow/lib/interfaces'; -import { SdkComponent } from '@temporalio/common'; +import { SdkComponent, TypedSearchAttributes } from '@temporalio/common'; import { RUN_INTEGRATION_TESTS, Worker, asSdkLoggerSink, registerDefaultCustomSearchAttributes } from './helpers'; import { defaultOptions } from './mock-native-worker'; import * as workflows from './workflows'; @@ -118,6 +118,10 @@ if (RUN_INTEGRATION_TESTS) { memo: {}, parent: undefined, searchAttributes: {}, + // FIXME: consider rehydrating the class before passing to sink functions or + // create a variant of WorkflowInfo that corresponds to what we actually get in sinks. + // See issue #1635. + typedSearchAttributes: { searchAttributes: {} } as unknown as TypedSearchAttributes, historyLength: 3, continueAsNewSuggested: false, // values ignored for the purpose of comparison @@ -366,11 +370,11 @@ if (RUN_INTEGRATION_TESTS) { test('Sink functions contains upserted search attributes', async (t) => { const taskQueue = `${__filename}-${t.title}`; - const recordedMessages = Array<{ message: string; searchAttributes: SearchAttributes }>(); + const recordedMessages = Array<{ message: string; searchAttributes: SearchAttributes }>(); // eslint-disable-line deprecation/deprecation const sinks = asSdkLoggerSink(async (info, message, _attrs) => { recordedMessages.push({ message, - searchAttributes: info.searchAttributes, + searchAttributes: info.searchAttributes, // eslint-disable-line deprecation/deprecation }); }); @@ -399,7 +403,6 @@ if (RUN_INTEGRATION_TESTS) { message: 'Workflow completed', searchAttributes: { CustomBoolField: [true], - CustomIntField: [], // clear CustomKeywordField: ['durable code'], CustomTextField: ['is useful'], CustomDatetimeField: [date], diff --git a/packages/test/src/test-typed-search-attributes.ts b/packages/test/src/test-typed-search-attributes.ts new file mode 100644 index 000000000..4a95641a7 --- /dev/null +++ b/packages/test/src/test-typed-search-attributes.ts @@ -0,0 +1,465 @@ +import { randomUUID } from 'crypto'; +import { ExecutionContext } from 'ava'; +import { ScheduleOptionsAction, WorkflowExecutionDescription } from '@temporalio/client'; +import { + TypedSearchAttributes, + SearchAttributes, + SearchAttributePair, + SearchAttributeType, + SearchAttributeUpdatePair, + defineSearchAttributeKey, +} from '@temporalio/common'; +import { temporal } from '@temporalio/proto'; +import { + condition, + defineQuery, + defineSignal, + setHandler, + upsertSearchAttributes, + WorkflowInfo, + workflowInfo, +} from '@temporalio/workflow'; +import { encodeSearchAttributeIndexedValueType } from '@temporalio/common/lib/search-attributes'; +import { waitUntil } from './helpers'; +import { Context, helpers, makeTestFunction } from './helpers-integration'; + +const test = makeTestFunction({ + workflowsPath: __filename, + workflowEnvironmentOpts: { + server: { + namespace: 'test-typed-search-attributes', + }, + }, +}); + +const date = new Date(); +const secondDate = new Date(date.getTime() + 1000); + +// eslint-disable-next-line deprecation/deprecation +const untypedAttrsInput: SearchAttributes = { + untyped_single_string: ['one'], + untyped_single_int: [1], + untyped_single_double: [1.23], + untyped_single_bool: [true], + untyped_single_date: [date], + untyped_multi_string: ['one', 'two'], +}; + +// The corresponding typed search attributes from untypedSearchAttributes. +const typedFromUntypedInput: SearchAttributePair[] = [ + { key: defineSearchAttributeKey('untyped_single_string', SearchAttributeType.TEXT), value: 'one' }, + { key: defineSearchAttributeKey('untyped_single_int', SearchAttributeType.INT), value: 1 }, + { key: defineSearchAttributeKey('untyped_single_double', SearchAttributeType.DOUBLE), value: 1.23 }, + { key: defineSearchAttributeKey('untyped_single_bool', SearchAttributeType.BOOL), value: true }, + { key: defineSearchAttributeKey('untyped_single_date', SearchAttributeType.DATETIME), value: date }, + { key: defineSearchAttributeKey('untyped_multi_string', SearchAttributeType.KEYWORD_LIST), value: ['one', 'two'] }, +]; + +const typedAttrsListInput: SearchAttributePair[] = [ + { key: defineSearchAttributeKey('typed_text', SearchAttributeType.TEXT), value: 'typed_text' }, + { key: defineSearchAttributeKey('typed_keyword', SearchAttributeType.KEYWORD), value: 'typed_keyword' }, + { key: defineSearchAttributeKey('typed_int', SearchAttributeType.INT), value: 123 }, + { key: defineSearchAttributeKey('typed_double', SearchAttributeType.DOUBLE), value: 123.45 }, + { key: defineSearchAttributeKey('typed_bool', SearchAttributeType.BOOL), value: true }, + { key: defineSearchAttributeKey('typed_datetime', SearchAttributeType.DATETIME), value: date }, + { + key: defineSearchAttributeKey('typed_keyword_list', SearchAttributeType.KEYWORD_LIST), + value: ['typed', 'keywords'], + }, +]; + +const typedAttrsObjInput = new TypedSearchAttributes(typedAttrsListInput); + +// The corresponding untyped search attributes from typedSearchAttributesList. +// eslint-disable-next-line deprecation/deprecation +const untypedFromTypedInput: SearchAttributes = { + typed_text: ['typed_text'], + typed_keyword: ['typed_keyword'], + typed_int: [123], + typed_double: [123.45], + typed_bool: [true], + typed_datetime: [date], + typed_keyword_list: ['typed', 'keywords'], +}; + +const erroneousTypedKeys = { + erroneous_typed_int: temporal.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_INT, +}; + +const dummyWorkflow = async () => undefined; + +// Note: this is needed, the test fails due to +// test.serial.before not being defined when running workflows. +if (test?.serial?.before) { + // Register all search attribute keys. + test.serial.before(async (t) => { + // Transform untyped keys into 'untypedKey: IndexValueType' pairs. + const untypedKeys = Object.entries(untypedAttrsInput).reduce( + (acc, [key, value]) => { + const typedKey = TypedSearchAttributes.getKeyFromUntyped(key, value); + const encodedKey = encodeSearchAttributeIndexedValueType(typedKey?.type); + if (encodedKey) { + acc[key] = encodedKey; + } + return acc; + }, + {} as { [key: string]: temporal.api.enums.v1.IndexedValueType } + ); + + const typedKeys = typedAttrsListInput.reduce( + (acc, pair) => { + const encodedKey = encodeSearchAttributeIndexedValueType(pair.key.type); + if (encodedKey) { + acc[pair.key.name] = encodedKey; + } + return acc; + }, + {} as { [key: string]: temporal.api.enums.v1.IndexedValueType } + ); + + await t.context.env.connection.operatorService.addSearchAttributes({ + namespace: t.context.env.namespace, + searchAttributes: { + ...untypedKeys, + ...typedKeys, + ...erroneousTypedKeys, + }, + }); + + await waitUntil(async () => { + const resp = await t.context.env.connection.operatorService.listSearchAttributes({ + namespace: t.context.env.namespace, + }); + return ( + Object.keys(untypedKeys).every((key) => key in resp.customAttributes) && + Object.keys(typedKeys).every((key) => key in resp.customAttributes) + ); + }, 300); + }); +} + +test('does not allow non-integer values for integer search attributes', async (t) => { + try { + const { taskQueue } = helpers(t); + const client = t.context.env.client; + const action: ScheduleOptionsAction = { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }; + const erroneousKeyName = Object.keys(erroneousTypedKeys)[0]; + await client.schedule.create({ + scheduleId: randomUUID(), + spec: { + calendars: [{ hour: { start: 2, end: 7, step: 1 } }], + }, + action, + typedSearchAttributes: [ + // Use a double value for an integer search attribute. + // This is legal at compile-time, but should error at runtime when converting to payload. + { key: defineSearchAttributeKey(erroneousKeyName, SearchAttributeType.INT), value: 123.4 }, + ], + }); + } catch (err) { + if (err instanceof Error) { + t.is(err.message, 'Invalid search attribute value 123.4 for given type INT'); + } else { + t.fail('Unexpected error type'); + } + } +}); + +interface TestInputSearchAttributes { + name: string; + input: { + searchAttributes?: SearchAttributes; // eslint-disable-line deprecation/deprecation + typedSearchAttributes?: TypedSearchAttributes | SearchAttributePair[]; + }; + expected: { + searchAttributes?: SearchAttributes; // eslint-disable-line deprecation/deprecation + typedSearchAttributes?: TypedSearchAttributes; + }; +} + +// inputTestCases contains permutations of search attribute inputs +const inputTestCases: TestInputSearchAttributes[] = [ + // Input only untyped search attributes + { + name: 'only-untyped-search-attributes', + input: { + searchAttributes: untypedAttrsInput, + }, + expected: { + searchAttributes: untypedAttrsInput, + typedSearchAttributes: new TypedSearchAttributes(typedFromUntypedInput), + }, + }, + // Input only typed search attributes as a list + { + name: 'only-typed-search-attributes-list', + input: { + typedSearchAttributes: typedAttrsListInput, + }, + expected: { + searchAttributes: untypedFromTypedInput, + typedSearchAttributes: typedAttrsObjInput, + }, + }, + // Input only typed search attributes as an object + { + name: 'only-typed-search-attributes-obj', + input: { + typedSearchAttributes: typedAttrsObjInput, + }, + expected: { + searchAttributes: untypedFromTypedInput, + typedSearchAttributes: typedAttrsObjInput, + }, + }, + // Input both untyped and typed search attributes + { + name: 'both-untyped-and-typed-sa', + input: { + searchAttributes: { + ...untypedAttrsInput, + // Expect to be overwritten by the corresponding typed search attribute. Overwritten value to be "typed_text". + typed_text: ['different_value_from_untyped'], + }, + typedSearchAttributes: typedAttrsListInput, + }, + expected: { + searchAttributes: { + ...untypedFromTypedInput, + ...untypedAttrsInput, + }, + typedSearchAttributes: typedAttrsObjInput.updateCopy(typedFromUntypedInput), + }, + }, +]; + +test('creating schedules with various input search attributes', async (t) => { + await Promise.all( + inputTestCases.map(async ({ input, expected, name }) => { + const { taskQueue } = helpers(t); + const client = t.context.env.client; + const action: ScheduleOptionsAction = { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + }; + const handle = await client.schedule.create({ + scheduleId: randomUUID(), + spec: { + calendars: [{ hour: { start: 2, end: 7, step: 1 } }], + }, + action, + ...input, + }); + const desc = await handle.describe(); + t.deepEqual(desc.searchAttributes, expected.searchAttributes, name); // eslint-disable-line deprecation/deprecation + t.deepEqual(desc.typedSearchAttributes, expected.typedSearchAttributes, name); + }) + ); +}); + +export const getWorkflowInfo = defineQuery('getWorkflowInfo'); +export const mutateSearchAttributes = + defineSignal<[SearchAttributes | SearchAttributeUpdatePair[]]>('mutateSearchAttributes'); // eslint-disable-line deprecation/deprecation +export const complete = defineSignal('complete'); + +export async function changeSearchAttributes(): Promise { + let isComplete = false; + setHandler(getWorkflowInfo, () => { + return workflowInfo(); + }); + setHandler(complete, () => { + isComplete = true; + }); + setHandler(mutateSearchAttributes, (attrs) => { + upsertSearchAttributes(attrs); + }); + await condition(() => isComplete); +} + +test('upsert works with various search attribute mutations', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ namespace: t.context.env.namespace }); + await worker.runUntil(async () => { + // Start workflow with some initial search attributes. + const handle = await startWorkflow(changeSearchAttributes, { + typedSearchAttributes: typedAttrsListInput, + }); + let res = await handle.query(getWorkflowInfo); + let desc = await handle.describe(); + assertWorkflowInfoSearchAttributes(t, res, untypedFromTypedInput, typedAttrsListInput); + assertWorkflowDescSearchAttributes(t, desc, untypedFromTypedInput, typedAttrsListInput); + + // Update search attributes with untyped input. + // eslint-disable-next-line deprecation/deprecation + const untypedUpdateAttrs: SearchAttributes = { + typed_text: ['new_value'], + typed_keyword: ['new_keyword'], + typed_int: [2], + typed_double: [2.34], + typed_datetime: [secondDate], + typed_keyword_list: ['three', 'four', 'five'], + // Delete key - empty value. + typed_bool: [], + }; + + // Update search attributes with untyped input. + await handle.signal(mutateSearchAttributes, untypedUpdateAttrs); + res = await handle.query(getWorkflowInfo); + desc = await handle.describe(); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { typed_bool, ...untypedUpdateExpected } = untypedUpdateAttrs; + + assertWorkflowInfoSearchAttributes(t, res, untypedUpdateExpected, [ + { key: defineSearchAttributeKey('typed_text', SearchAttributeType.TEXT), value: 'new_value' }, + { key: defineSearchAttributeKey('typed_keyword', SearchAttributeType.KEYWORD), value: 'new_keyword' }, + { key: defineSearchAttributeKey('typed_int', SearchAttributeType.INT), value: 2 }, + { key: defineSearchAttributeKey('typed_double', SearchAttributeType.DOUBLE), value: 2.34 }, + { + key: defineSearchAttributeKey('typed_keyword_list', SearchAttributeType.KEYWORD_LIST), + value: ['three', 'four', 'five'], + }, + { key: defineSearchAttributeKey('typed_datetime', SearchAttributeType.DATETIME), value: secondDate }, + ]); + + assertWorkflowDescSearchAttributes(t, desc, untypedUpdateExpected, [ + { key: defineSearchAttributeKey('typed_text', SearchAttributeType.TEXT), value: 'new_value' }, + { key: defineSearchAttributeKey('typed_keyword', SearchAttributeType.KEYWORD), value: 'new_keyword' }, + { key: defineSearchAttributeKey('typed_int', SearchAttributeType.INT), value: 2 }, + { key: defineSearchAttributeKey('typed_double', SearchAttributeType.DOUBLE), value: 2.34 }, + { + key: defineSearchAttributeKey('typed_keyword_list', SearchAttributeType.KEYWORD_LIST), + value: ['three', 'four', 'five'], + }, + { key: defineSearchAttributeKey('typed_datetime', SearchAttributeType.DATETIME), value: secondDate }, + ]); + + // Update search attributes with typed input. + const typedUpdateAttrs: SearchAttributeUpdatePair[] = [ + // Delete key. + { key: defineSearchAttributeKey('typed_text', SearchAttributeType.TEXT), value: null }, + { key: defineSearchAttributeKey('typed_int', SearchAttributeType.INT), value: 3 }, + { key: defineSearchAttributeKey('typed_double', SearchAttributeType.DOUBLE), value: 3.45 }, + { + key: defineSearchAttributeKey('typed_keyword_list', SearchAttributeType.KEYWORD_LIST), + value: ['six', 'seven'], + }, + // Add key. + { key: defineSearchAttributeKey('typed_bool', SearchAttributeType.BOOL), value: false }, + ]; + + // Update search attributes with typed input. + await handle.signal(mutateSearchAttributes, typedUpdateAttrs); + res = await handle.query(getWorkflowInfo); + desc = await handle.describe(); + + // Note that we expect the empty array in the untyped search attributes. + const expectedUntyped = { + typed_int: [3], + typed_double: [3.45], + typed_keyword_list: ['six', 'seven'], + typed_bool: [false], + typed_keyword: ['new_keyword'], + typed_datetime: [secondDate], + }; + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { typed_keyword, typed_datetime, ...newDescExpected } = expectedUntyped; + const expectedTyped = [ + { key: defineSearchAttributeKey('typed_int', SearchAttributeType.INT), value: 3 }, + { key: defineSearchAttributeKey('typed_double', SearchAttributeType.DOUBLE), value: 3.45 }, + { + key: defineSearchAttributeKey('typed_keyword_list', SearchAttributeType.KEYWORD_LIST), + value: ['six', 'seven'], + }, + { key: defineSearchAttributeKey('typed_bool', SearchAttributeType.BOOL), value: false }, + { key: defineSearchAttributeKey('typed_keyword', SearchAttributeType.KEYWORD), value: 'new_keyword' }, + { key: defineSearchAttributeKey('typed_datetime', SearchAttributeType.DATETIME), value: secondDate }, + ]; + + const expectedDescTyped = [ + { key: defineSearchAttributeKey('typed_int', SearchAttributeType.INT), value: 3 }, + { key: defineSearchAttributeKey('typed_double', SearchAttributeType.DOUBLE), value: 3.45 }, + { + key: defineSearchAttributeKey('typed_keyword_list', SearchAttributeType.KEYWORD_LIST), + value: ['six', 'seven'], + }, + { key: defineSearchAttributeKey('typed_bool', SearchAttributeType.BOOL), value: false }, + { key: defineSearchAttributeKey('typed_keyword', SearchAttributeType.KEYWORD), value: 'new_keyword' }, + { key: defineSearchAttributeKey('typed_datetime', SearchAttributeType.DATETIME), value: secondDate }, + ]; + + assertWorkflowInfoSearchAttributes(t, res, expectedUntyped, expectedTyped); + assertWorkflowDescSearchAttributes(t, desc, newDescExpected, expectedDescTyped); + + await handle.signal(complete); + }); +}); + +function assertWorkflowInfoSearchAttributes( + t: ExecutionContext, + res: WorkflowInfo, + searchAttributes: SearchAttributes, // eslint-disable-line deprecation/deprecation + searchAttrPairs: SearchAttributePair[] +) { + // Check initial search attributes are present. + // Response from query serializes datetime attributes to strings so we serialize our expected responses. + t.deepEqual(res.searchAttributes, normalizeSearchAttrs(searchAttributes)); // eslint-disable-line deprecation/deprecation + // This casting is necessary because res.typedSearchAttributes has actually been serialized by its toJSON method + // (returning an array of SearchAttributePair), but is not reflected in its type definition. + assertMatchingSearchAttributePairs(t, res.typedSearchAttributes as unknown as SearchAttributePair[], searchAttrPairs); +} + +function assertWorkflowDescSearchAttributes( + t: ExecutionContext, + desc: WorkflowExecutionDescription, + searchAttributes: SearchAttributes, // eslint-disable-line deprecation/deprecation + searchAttrPairs: SearchAttributePair[] +) { + // Check that all search attributes are present in the workflow description's search attributes. + t.like(desc.searchAttributes, searchAttributes); // eslint-disable-line deprecation/deprecation + const descOmittingBuildIds = desc.typedSearchAttributes + .updateCopy([{ key: defineSearchAttributeKey('BuildIds', SearchAttributeType.KEYWORD_LIST), value: null }]) + .getAll(); + assertMatchingSearchAttributePairs(t, descOmittingBuildIds, searchAttrPairs); +} + +// eslint-disable-next-line deprecation/deprecation +function normalizeSearchAttrs(attrs: SearchAttributes): SearchAttributes { + const res: SearchAttributes = {}; // eslint-disable-line deprecation/deprecation + for (const [key, value] of Object.entries(attrs)) { + if (Array.isArray(value) && value.length === 1 && value[0] instanceof Date) { + res[key] = [value[0].toISOString()]; + continue; + } + res[key] = value; + } + return res; +} + +function normalizeSearchAttrPairs(attrs: SearchAttributePair[]): SearchAttributePair[] { + const res: SearchAttributePair[] = []; + for (const { key, value } of attrs) { + if (value instanceof Date) { + res.push({ key, value: value.toISOString() } as SearchAttributePair); + continue; + } + res.push({ key, value } as SearchAttributePair); + } + return res; +} + +function assertMatchingSearchAttributePairs( + t: ExecutionContext, + actual: SearchAttributePair[], + expected: SearchAttributePair[] +) { + t.deepEqual( + normalizeSearchAttrPairs(actual).sort((a, b) => a.key.name.localeCompare(b.key.name)), + normalizeSearchAttrPairs(expected).sort((a, b) => a.key.name.localeCompare(b.key.name)) + ); +} diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index 2eb10bbbd..1c25f61bc 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -11,6 +11,7 @@ import { SdkComponent, Payload, toPayloads, + TypedSearchAttributes, } from '@temporalio/common'; import { msToTs } from '@temporalio/common/lib/time'; import { coresdk, temporal } from '@temporalio/proto'; @@ -113,6 +114,7 @@ async function createWorkflow( taskTimeoutMs: 1000, taskQueue: 'test', searchAttributes: {}, + typedSearchAttributes: new TypedSearchAttributes(), historyLength: 3, historySize: 300, continueAsNewSuggested: false, diff --git a/packages/test/src/workflows/upsert-and-read-search-attributes.ts b/packages/test/src/workflows/upsert-and-read-search-attributes.ts index ff8d6db76..cf598cdb8 100644 --- a/packages/test/src/workflows/upsert-and-read-search-attributes.ts +++ b/packages/test/src/workflows/upsert-and-read-search-attributes.ts @@ -1,5 +1,6 @@ import { SearchAttributes, upsertSearchAttributes, workflowInfo } from '@temporalio/workflow'; +// eslint-disable-next-line deprecation/deprecation export async function upsertAndReadSearchAttributes(msSinceEpoch: number): Promise { upsertSearchAttributes({ CustomIntField: [123], @@ -12,5 +13,5 @@ export async function upsertAndReadSearchAttributes(msSinceEpoch: number): Promi CustomDatetimeField: [new Date(msSinceEpoch)], CustomDoubleField: [3.14], }); - return workflowInfo().searchAttributes; + return workflowInfo().searchAttributes; // eslint-disable-line deprecation/deprecation } diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 395e3ffd7..b9b32106f 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -31,6 +31,7 @@ import { Payload, ApplicationFailure, ensureApplicationFailure, + TypedSearchAttributes, } from '@temporalio/common'; import { decodeArrayFromPayloads, @@ -1276,6 +1277,7 @@ export class Worker { runId: activation.runId, workflowType, searchAttributes: {}, + typedSearchAttributes: new TypedSearchAttributes(), parent: convertToParentWorkflowType(parentWorkflowInfo), taskQueue: this.options.taskQueue, namespace: this.options.namespace, diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 607503274..8889e32c0 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -60,6 +60,8 @@ export { PayloadConverter, RetryPolicy, rootCause, + SearchAttributes, // eslint-disable-line deprecation/deprecation + SearchAttributeValue, // eslint-disable-line deprecation/deprecation ServerFailure, TemporalFailure, TerminatedFailure, @@ -71,8 +73,6 @@ export { ActivityInterface, // eslint-disable-line deprecation/deprecation Payload, QueryDefinition, - SearchAttributes, - SearchAttributeValue, SignalDefinition, UntypedActivities, Workflow, diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index d742b5695..8b23f91b4 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -10,6 +10,8 @@ import { QueryDefinition, Duration, VersioningIntent, + TypedSearchAttributes, + SearchAttributePair, } from '@temporalio/common'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow/enums-helpers'; @@ -39,8 +41,16 @@ export interface WorkflowInfo { * Indexed information attached to the Workflow Execution * * This value may change during the lifetime of an Execution. + * @deprecated Use {@link typedSearchAttributes} instead. */ - readonly searchAttributes: SearchAttributes; + readonly searchAttributes: SearchAttributes; // eslint-disable-line deprecation/deprecation + + /** + * Indexed information attached to the Workflow Execution, exposed through an interface. + * + * This value may change during the lifetime of an Execution. + */ + readonly typedSearchAttributes: TypedSearchAttributes; /** * Non-indexed information attached to the Workflow Execution @@ -251,8 +261,20 @@ export interface ContinueAsNewOptions { memo?: Record; /** * Searchable attributes to attach to next Workflow run + * @deprecated Use {@link typedSearchAttributes} instead. + */ + searchAttributes?: SearchAttributes; // eslint-disable-line deprecation/deprecation + /** + * Specifies additional indexed information to attach to the Workflow Execution. More info: + * https://docs.temporal.io/docs/typescript/search-attributes + * + * Values are always converted using {@link JsonPayloadConverter}, even when a custom data converter is provided. + * Note that search attributes are not encoded, as such, do not include any sensitive information. + * + * If both {@link searchAttributes} and {@link typedSearchAttributes} are provided, conflicting keys will be overwritten + * by {@link typedSearchAttributes}. */ - searchAttributes?: SearchAttributes; + typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes; /** * When using the Worker Versioning feature, specifies whether this Workflow should * Continue-as-New onto a worker with a compatible Build Id or not. See {@link VersioningIntent}. diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 6dd686b49..002bdbc5f 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -19,10 +19,12 @@ import { WorkflowUpdateType, WorkflowUpdateValidatorType, mapFromPayloads, - searchAttributePayloadConverter, fromPayloadsAtIndex, - SearchAttributes, } from '@temporalio/common'; +import { + decodeSearchAttributes, + decodeTypedSearchAttributes, +} from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; import type { coresdk, temporal } from '@temporalio/proto'; @@ -516,8 +518,10 @@ export class Activator implements ActivationHandler { // Most things related to initialization have already been handled in the constructor this.mutateWorkflowInfo((info) => ({ ...info, - searchAttributes: - (mapFromPayloads(searchAttributePayloadConverter, searchAttributes?.indexedFields) as SearchAttributes) ?? {}, + + searchAttributes: decodeSearchAttributes(searchAttributes?.indexedFields), + typedSearchAttributes: decodeTypedSearchAttributes(searchAttributes?.indexedFields), + memo: mapFromPayloads(this.payloadConverter, memo?.fields), lastResult: fromPayloadsAtIndex(this.payloadConverter, 0, lastCompletionResult?.payloads), lastFailure: diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 04277af95..711b71b04 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -9,10 +9,11 @@ import { LocalActivityOptions, mapToPayloads, QueryDefinition, - searchAttributePayloadConverter, SearchAttributes, + SearchAttributeValue, SignalDefinition, toPayloads, + TypedSearchAttributes, UntypedActivities, UpdateDefinition, WithWorkflowArgs, @@ -20,7 +21,12 @@ import { WorkflowResultType, WorkflowReturnType, WorkflowUpdateValidatorType, + SearchAttributeUpdatePair, } from '@temporalio/common'; +import { + encodeUnifiedSearchAttributes, + searchAttributePayloadConverter, +} from '@temporalio/common/lib/converter/payload-search-attributes'; import { versioningIntentToProto } from '@temporalio/common/lib/versioning-intent-enum'; import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; @@ -381,9 +387,10 @@ function startChildWorkflowExecutionNextHandler({ workflowIdReusePolicy: encodeWorkflowIdReusePolicy(options.workflowIdReusePolicy), parentClosePolicy: encodeParentClosePolicy(options.parentClosePolicy), cronSchedule: options.cronSchedule, - searchAttributes: options.searchAttributes - ? mapToPayloads(searchAttributePayloadConverter, options.searchAttributes) - : undefined, + searchAttributes: + options.searchAttributes || options.typedSearchAttributes // eslint-disable-line deprecation/deprecation + ? encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes) // eslint-disable-line deprecation/deprecation + : undefined, memo: options.memo && mapToPayloads(activator.payloadConverter, options.memo), versioningIntent: versioningIntentToProto(options.versioningIntent), }, @@ -923,9 +930,10 @@ export function makeContinueAsNewFunc( headers, taskQueue: options.taskQueue, memo: options.memo && mapToPayloads(activator.payloadConverter, options.memo), - searchAttributes: options.searchAttributes - ? mapToPayloads(searchAttributePayloadConverter, options.searchAttributes) - : undefined, + searchAttributes: + options.searchAttributes || options.typedSearchAttributes // eslint-disable-line deprecation/deprecation + ? encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes) // eslint-disable-line deprecation/deprecation + : undefined, workflowRunTimeout: msOptionalToTs(options.workflowRunTimeout), workflowTaskTimeout: msOptionalToTs(options.workflowTaskTimeout), versioningIntent: versioningIntentToProto(options.versioningIntent), @@ -947,14 +955,15 @@ export function makeContinueAsNewFunc( * * @example * - *```ts + * ```ts *import { continueAsNew } from '@temporalio/workflow'; +import { SearchAttributeType } from '@temporalio/common'; * *export async function myWorkflow(n: number): Promise { * // ... Workflow logic * await continueAsNew(n + 1); *} - *``` + * ``` */ export function continueAsNew(...args: Parameters): Promise { return makeContinueAsNewFunc()(...args); @@ -1343,20 +1352,26 @@ export function setDefaultQueryHandler(handler: DefaultQueryHandler | undefined) * Updates this Workflow's Search Attributes by merging the provided `searchAttributes` with the existing Search * Attributes, `workflowInfo().searchAttributes`. * - * For example, this Workflow code: + * Search attributes can be upserted using either SearchAttributes (deprecated) or SearchAttributeUpdatePair[] (preferred) + * + * Upserting a workflow's search attributes using SearchAttributeUpdatePair[]: * * ```ts - * upsertSearchAttributes({ - * CustomIntField: [1], - * CustomBoolField: [true] - * }); - * upsertSearchAttributes({ - * CustomIntField: [42], - * CustomKeywordField: ['durable code', 'is great'] - * }); + * const intKey = defineSearchKey('CustomIntField', 'INT'); + * const boolKey = defineSearchKey('CustomBoolField', 'BOOL'); + * const keywordListKey = defineSearchKey('CustomKeywordField', 'KEYWORD_LIST'); + * + * upsertSearchAttributes([ + * defineSearchAttribute(intKey, 1), + * defineSearchAttribute(boolKey, true) + * ]); + * upsertSearchAttributes([ + * defineSearchAttribute(intKey, 42), + * defineSearchAttribute(keywordListKey, ['durable code', 'is great']) + * ]); * ``` * - * would result in the Workflow having these Search Attributes: + * Would result in the Workflow having these Search Attributes: * * ```ts * { @@ -1366,9 +1381,12 @@ export function setDefaultQueryHandler(handler: DefaultQueryHandler | undefined) * } * ``` * - * @param searchAttributes The Record to merge. Use a value of `[]` to clear a Search Attribute. + * @param searchAttributes The Record to merge. + * If using SearchAttributeUpdatePair[] (preferred), set a value to null to remove the search attribute. + * If using SearchAttributes (deprecated), set a value to undefined or an empty list to remove the search attribute. */ -export function upsertSearchAttributes(searchAttributes: SearchAttributes): void { +// eslint-disable-next-line deprecation/deprecation +export function upsertSearchAttributes(searchAttributes: SearchAttributes | SearchAttributeUpdatePair[]): void { const activator = assertInWorkflowContext( 'Workflow.upsertSearchAttributes(...) may only be used from a Workflow Execution.' ); @@ -1377,21 +1395,111 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void throw new Error('searchAttributes must be a non-null SearchAttributes'); } - activator.pushCommand({ - upsertWorkflowSearchAttributes: { - searchAttributes: mapToPayloads(searchAttributePayloadConverter, searchAttributes), - }, - }); + if (Array.isArray(searchAttributes)) { + // Typed search attributes + activator.pushCommand({ + upsertWorkflowSearchAttributes: { + searchAttributes: encodeUnifiedSearchAttributes(undefined, searchAttributes), + }, + }); - activator.mutateWorkflowInfo((info: WorkflowInfo): WorkflowInfo => { - return { - ...info, - searchAttributes: { - ...info.searchAttributes, - ...searchAttributes, + activator.mutateWorkflowInfo((info: WorkflowInfo): WorkflowInfo => { + // Create a copy of the current state. + const newSearchAttributes: SearchAttributes = { ...info.searchAttributes }; // eslint-disable-line deprecation/deprecation + for (const pair of searchAttributes) { + if (pair.value == null) { + // If the value is null, remove the search attribute. + // We don't mutate the existing state (just the new map) so this is safe. + delete newSearchAttributes[pair.key.name]; + } else { + newSearchAttributes[pair.key.name] = Array.isArray(pair.value) + ? pair.value + : ([pair.value] as SearchAttributeValue); // eslint-disable-line deprecation/deprecation + } + } + return { + ...info, + searchAttributes: newSearchAttributes, + // Create an empty copy and apply existing and new updates. Keep in mind the order matters here (existing first, new second - to possibly overwrite existing). + typedSearchAttributes: info.typedSearchAttributes.updateCopy([...searchAttributes]), + }; + }); + } else { + // Legacy search attributes + activator.pushCommand({ + upsertWorkflowSearchAttributes: { + searchAttributes: mapToPayloads(searchAttributePayloadConverter, searchAttributes), }, - }; - }); + }); + + activator.mutateWorkflowInfo((info: WorkflowInfo): WorkflowInfo => { + // Create a new copy of the current state. + let typedSearchAttributes = info.typedSearchAttributes.updateCopy([]); + const newSearchAttributes: SearchAttributes = { ...info.searchAttributes }; // eslint-disable-line deprecation/deprecation + + // Upsert legacy search attributes into typedSearchAttributes. + for (const [k, v] of Object.entries(searchAttributes)) { + if (v !== undefined && !Array.isArray(v)) { + throw new Error(`Search attribute value must be an array or undefined, got ${v}`); + } + + // The value is undefined or an empty list, this signifies deletion. + // Remove from both untyped & typed search attributes. + if (v == null || (Array.isArray(v) && v.length === 0)) { + // We cannot discern a valid key typing from these values. + // Instead, we do a "best effort" deletion from typed search attributes: + // - check if a matching key name exists, if so, remove it. + const matchingPair = typedSearchAttributes.getAll().find((pair) => pair.key.name === k); + if (matchingPair) { + typedSearchAttributes = typedSearchAttributes.updateCopy([ + { key: matchingPair.key, value: null } as SearchAttributeUpdatePair, + ]); + } + delete newSearchAttributes[k]; + continue; + } + + // Attempt to discern a valid key typing for the update. + const typedKey = TypedSearchAttributes.getKeyFromUntyped(k, v); + + // Unable to discern a valid key typing (no valid type for defined value). + // Skip applying this update (no-op). + if (typedKey === undefined) { + continue; + } + + // TEXT type is inferred from a string value, but it could also be KEYWORD. + // If a matching pair exists with KEYWORD type, use that instead. + if (typedKey.type === 'TEXT') { + const matchingPair = typedSearchAttributes.getAll().find((pair) => pair.key.name === typedKey.name); + if (matchingPair) { + typedKey.type = matchingPair.key.type; + } + } + + let newValue: unknown = v; + // Unpack value if it is a single-element array. + if (v.length === 1) { + newValue = v[0]; + // Convert value back to Date. + if (typedKey.type === 'DATETIME') { + newValue = new Date(newValue as string); + } + } + + // We have a defined value with valid type. Apply the update. + typedSearchAttributes = typedSearchAttributes.updateCopy([ + { key: typedKey, value: newValue } as SearchAttributeUpdatePair, + ]); + newSearchAttributes[k] = v; + } + return { + ...info, + searchAttributes: newSearchAttributes, + typedSearchAttributes, + }; + }); + } } /**