diff --git a/ddbstream-lambda-sfn-cdk-ts/README.md b/ddbstream-lambda-sfn-cdk-ts/README.md new file mode 100644 index 000000000..73c9ef58e --- /dev/null +++ b/ddbstream-lambda-sfn-cdk-ts/README.md @@ -0,0 +1,152 @@ + +# Amazon DynamoDB Stream to AWS Step Functions Trigger + +This Pattern demonstrates how to automatically trigger AWS Step Functions workflows in response to changes in DynamoDB tables. The CDK construct `DynamoWorkflowTrigger` lets you connect DynamoDB and Step Functions by allowing you to define event handlers that monitor specific changes in your DynamoDB tables and trigger workflows in response. It leverages Lambda functions to evaluate conditions and start Step Functions state machines with inputs derived from the DynamoDB events. + +Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/ddbstream-lambda-sfn-cdk-ts](https://serverlessland.com/patterns/ddbstream-lambda-sfn-cdk-ts) + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Node and NPM](https://nodejs.org/en/download/) installed +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Cloud Development Kit](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) (AWS CDK) installed + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +2. Change directory to the pattern directory: + ``` + cd ddbstream-lambda-sfn-cdk-ts + ``` +3. To deploy from the command line use the following: + ```bash + npm install + npm run lambda + cdk deploy + ``` + **The deployment will take a couple of minutes** + +4. Note the outputs from the CDK deployment process. These contain the ``, `` and `` which are used for testing. + + + +## How It Works + +![Architecture Diagram](ddbstream-lambda-sfn-cdk-ts.png) + +The `DdbstreamLambdaSfnExampleStack` demonstrates how to use the the pattern: + +1. It creates a DynamoDB table (`TestTable`) with streaming enabled +2. It creates a simple Step Functions state machine (`TestStateMachine`) +3. It sets up a trigger with the following behavior: + - It applies a filter to ignore events where a `SkipMe` attribute exists in the new image + - It only processes `MODIFY` events (updates to existing items) + - It checks two conditions: + - The new value of `testKey` must be "test8" + - The old value of `testKey` must have been "test9" + - When all conditions are met, it triggers the state machine with input parameters extracted from the DynamoDB event: + - `Index` taken from the item's partition key + - `MapAttribute` taken from the first element in a list attribute + +This workflow allows you to respond to specific data changes in DynamoDB by executing custom workflows with Step Functions. + + +#### Features + +- Dead letter queue for failed invocations +- VPC support +- Custom security groups +- Fine-grained event filtering +- Multiple event handlers per construct +- JSONPath-based condition evaluation +- Input mapping for state machines + +#### Limitations + +- Tables must have streams enabled with `NEW_AND_OLD_IMAGES` +- Conditions currently only support exact matches via the `value` property +- For complex filtering, use Lambda event source filters + +## Testing + +You can test the workflow using the AWS CLI to create and modify items in the DynamoDB table. Here are some example commands to test different scenarios: + +1. First, create an item that shouldn't trigger the workflow (initial state): +```bash +aws dynamodb put-item \ + --table-name \ + --item '{ + "Index": {"S": "test-item-1"}, + "testKey": {"S": "test9"}, + "ListAttribute": {"L": [{"S": "first-element"}]} + }' +``` + +2. Update the item to trigger the workflow (meets all conditions): +```bash +aws dynamodb update-item \ + --table-name \ + --key '{"Index": {"S": "test-item-1"}}' \ + --update-expression "SET testKey = :newval" \ + --expression-attribute-values '{":newval": {"S": "test8"}}' +``` + +3. Test the SkipMe filter by creating an item that should be ignored: +```bash +aws dynamodb put-item \ + --table-name \ + --item '{ + "Index": {"S": "test-item-2"}, + "testKey": {"S": "test9"}, + "SkipMe": {"S": "true"} + }' +``` +This should not trigger DDBEventTrigger lambda at all. + +To verify the results: + +1. Check if the Step Function was triggered: +```bash +aws stepfunctions list-executions \ + --state-machine-arn +``` + +2. View the execution details: +```bash +aws stepfunctions get-execution-history \ + --execution-arn +``` + +3. Monitor Lambda function logs: +```bash +aws logs tail /aws/lambda/ --follow +``` + + +#### Troubleshooting + +- Check Amazon CloudWatch Logs for the Lambda function +- Monitor the dead letter queue for failed events +- Ensure Amazon IAM permissions are correct for DynamoDB stream access and Step Functions execution + +## Cleanup + +1. From the command line, use the following in the source folder + ```bash + cdk destroy + ``` +2. Confirm the removal and wait for the resource deletion to complete. +---- + + + +Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/ddbstream-lambda-sfn-cdk-ts/app.ts b/ddbstream-lambda-sfn-cdk-ts/app.ts new file mode 100644 index 000000000..97f8d4b79 --- /dev/null +++ b/ddbstream-lambda-sfn-cdk-ts/app.ts @@ -0,0 +1,21 @@ +#!/usr/bin/env node +import 'source-map-support/register'; +import * as cdk from 'aws-cdk-lib'; +import { DdbstreamLambdaSfnExampleStack } from './src/lib/ddbstream-lambda-sfn-example-stack'; + +const app = new cdk.App(); +new DdbstreamLambdaSfnExampleStack(app, 'DDBEventTrigger', { + /* If you don't specify 'env', this stack will be environment-agnostic. + * Account/Region-dependent features and context lookups will not work, + * but a single synthesized template can be deployed anywhere. */ + + /* Uncomment the next line to specialize this stack for the AWS Account + * and Region that are implied by the current CLI configuration. */ + // env: { account: process.env.CDK_DEFAULT_ACCOUNT, region: process.env.CDK_DEFAULT_REGION }, + + /* Uncomment the next line if you know exactly what Account and Region you + * want to deploy the stack to. */ + //env: { account: 'AWS_ACCOUNT', region: 'REGION' }, + + /* For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html */ +}); \ No newline at end of file diff --git a/ddbstream-lambda-sfn-cdk-ts/cdk.json b/ddbstream-lambda-sfn-cdk-ts/cdk.json new file mode 100644 index 000000000..b614b8555 --- /dev/null +++ b/ddbstream-lambda-sfn-cdk-ts/cdk.json @@ -0,0 +1,50 @@ +{ + "app": "npx ts-node --prefer-ts-exts app.ts", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "**/*.d.ts", + "**/*.js", + "tsconfig.json", + "package*.json", + "yarn.lock", + "node_modules", + "test" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-iam:standardizedServicePrincipals": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true + } + } \ No newline at end of file diff --git a/ddbstream-lambda-sfn-cdk-ts/ddbstream-lambda-sfn-cdk-ts.png b/ddbstream-lambda-sfn-cdk-ts/ddbstream-lambda-sfn-cdk-ts.png new file mode 100644 index 000000000..a584ee1ea Binary files /dev/null and b/ddbstream-lambda-sfn-cdk-ts/ddbstream-lambda-sfn-cdk-ts.png differ diff --git a/ddbstream-lambda-sfn-cdk-ts/example-pattern.json b/ddbstream-lambda-sfn-cdk-ts/example-pattern.json new file mode 100644 index 000000000..ef9f2068d --- /dev/null +++ b/ddbstream-lambda-sfn-cdk-ts/example-pattern.json @@ -0,0 +1,72 @@ +{ + "title": "Amazon DynamoDB Stream to AWS Step Functions Trigger", + "description": "Automatically trigger AWS Step Functions workflows in response to changes in DynamoDB tables using a CDK construct that connects DynamoDB and Step Functions.", + "language": "TypeScript", + "level": "300", + "framework": "AWS CDK", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates how to automatically trigger AWS Step Functions workflows in response to changes in DynamoDB tables.", + "The CDK construct 'DynamoWorkflowTrigger' connects DynamoDB and Step Functions by allowing you to define event handlers that monitor specific changes in your DynamoDB tables.", + "It leverages Lambda functions to evaluate conditions and start Step Functions state machines with inputs derived from the DynamoDB events.", + "The pattern includes features like dead letter queues, VPC support, custom security groups, and fine-grained event filtering." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/ddbstream-lambda-sfn-cdk-ts", + "templateURL": "serverless-patterns/ddbstream-lambda-sfn-cdk-ts", + "projectFolder": "ddbstream-lambda-sfn-cdk-ts", + "templateFile": "lib/ddbstream-lambda-sfn-example-stack.ts" + } + }, + "resources": { + "bullets": [ + { + "text": "AWS DynamoDB Streams Documentation", + "link": "https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html" + }, + { + "text": "AWS Step Functions Documentation", + "link": "https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html" + }, + { + "text": "AWS CDK Documentation", + "link": "https://docs.aws.amazon.com/cdk/latest/guide/home.html" + } + ] + }, + "deploy": { + "text": [ + "Clone the repository: git clone https://github.com/aws-samples/serverless-patterns", + "Change directory: cd ddbstream-lambda-sfn-cdk-ts", + "Install dependencies: npm install && npm run lambda", + "Deploy the CDK stack: cdk deploy" + ] + }, + "testing": { + "text": [ + "1. Create an initial item in DynamoDB: aws dynamodb put-item --table-name --item '{ \"Index\": {\"S\": \"test-item-1\"}, \"testKey\": {\"S\": \"test9\"}, \"ListAttribute\": {\"L\": [{\"S\": \"first-element\"}]} }'", + "2. Update the item to trigger workflow: aws dynamodb update-item --table-name --key '{\"Index\": {\"S\": \"test-item-1\"}}' --update-expression \"SET testKey = :newval\" --expression-attribute-values '{\":newval\": {\"S\": \"test8\"}}'", + "3. Check Step Functions execution: aws stepfunctions list-executions --state-machine-arn ", + "4. Monitor Lambda logs: aws logs tail /aws/lambda/ --follow" + ] + }, + "cleanup": { + "text": ["Delete the stack: cdk destroy"] + }, + "authors": [ + { + "name": "Avnish Kumar", + "image": "https://i.postimg.cc/W1SVxLxR/avnish-profile.jpg", + "bio": "Senior Software Engineer, Amazon Web Services", + "linkedin": "avnish-kumar-40a54328" + }, + { + "name": "Saptarshi Banerjee", + "bio": "Senior Solutions Architect, Amazon Web Services", + "linkedin": "saptarshi-banerjee-83472679" + } + ] +} \ No newline at end of file diff --git a/ddbstream-lambda-sfn-cdk-ts/package.json b/ddbstream-lambda-sfn-cdk-ts/package.json new file mode 100644 index 000000000..a5c420529 --- /dev/null +++ b/ddbstream-lambda-sfn-cdk-ts/package.json @@ -0,0 +1,26 @@ +{ + "name": "ddbstream-lambda-sfn-cdk-ts", + "version": "0.1.0", + "main": "lib/index.js", + "types": "lib/index.d.ts", + "scripts": { + "build": "tsc", + "watch": "tsc -w", + "test": "jest", + "lambda": "cd ./src/lambda && npm i" + }, + "devDependencies": { + "@types/jest": "^29.5.14", + "@types/node": "22.7.9", + "aws-cdk-lib": "2.195.0", + "@types/aws-lambda": "^8.10.102", + "constructs": "^10.0.0", + "jest": "^29.7.0", + "ts-jest": "^29.2.5", + "typescript": "~5.6.3" + }, + "peerDependencies": { + "aws-cdk-lib": "2.195.0", + "constructs": "^10.0.0" + } +} \ No newline at end of file diff --git a/ddbstream-lambda-sfn-cdk-ts/src/lambda/index.js b/ddbstream-lambda-sfn-cdk-ts/src/lambda/index.js new file mode 100644 index 000000000..ef1cd3e1c --- /dev/null +++ b/ddbstream-lambda-sfn-cdk-ts/src/lambda/index.js @@ -0,0 +1,214 @@ +const { SFNClient, StartExecutionCommand } = require('@aws-sdk/client-sfn'); +const { createMetricsLogger } = require('aws-embedded-metrics'); +const JsonPath = require('jsonpath'); + +/** + * @typedef {Object} StateMachineConfig + * @property {string} stateMachineArn + * @property {Object.} input + * @property {string[]} executionNamePrefixKeys + * @property {Object.} traceContext + */ + +/** + * @typedef {Object} EventHandler + * @property {string} eventSourceArn + * @property {string[]} eventNames + * @property {string[]} conditions + * @property {StateMachineConfig} stateMachineConfig + */ + +/** + * @typedef {Object} EventStreamHandlerConfig + * @property {EventHandler[]} eventHandlers + */ + +class StreamEventStateMachineHandler { + static get METRICS_OPERATION() { return 'HandleRecord'; } + static get METRICS_EXECUTION_EXISTS() { return 'ExecutionAlreadyExists'; } + static get METRICS_EXECUTION_STARTED() { return 'ExecutionStarted'; } + static get METRICS_JSON_PATH_ERROR() { return 'JsonPathError'; } + static get EVENT_HANDLER_CONFIG() { return 'EVENT_HANDLER_CONFIG'; } + + constructor() { + this.metricsLogger = createMetricsLogger(); + this.metricsLogger.setNamespace(process.env.AWS_LAMBDA_FUNCTION_NAME || ''); + this.metricsLogger.setDimensions({ Operation: StreamEventStateMachineHandler.METRICS_OPERATION }); + + this.stepFunctionsClient = new SFNClient({ + region: process.env.AWS_REGION + }); + + this.eventStreamHandlerConfig = this.parseConfigFromEnvironment(); + } + + parseConfigFromEnvironment() { + console.log('Parsing configuration from environment'); + try { + return JSON.parse(process.env.EVENT_HANDLER_CONFIG || ''); + } catch (ex) { + console.error('Unable to parse configuration, cannot start!', ex, process.env.EVENT_HANDLER_CONFIG); + throw ex; + } + } + + async handleRequest(event, context) { + console.log('Received DynamoDB event:', JSON.stringify(event)); + + for (const record of event.Records) { + const startTime = Date.now(); + let success = false; + + try { + await this.handleRecord(record); + success = true; + } finally { + const endTime = Date.now(); + this.metricsLogger.putMetric('Time', endTime - startTime, 'Milliseconds'); + + if (success) { + this.metricsLogger.putMetric('SuccessLatency', endTime - startTime, 'Milliseconds'); + } + + this.metricsLogger.setProperty('StartTime', new Date(startTime).toISOString()); + this.metricsLogger.setProperty('EndTime', new Date(endTime).toISOString()); + this.metricsLogger.putMetric('Exception', success ? 0 : 1, 'Count'); + await this.metricsLogger.flush(); + } + } + } + + async handleRecord(record) { + console.log('Processing DynamoDB record:', record.eventID); + + this.metricsLogger.putMetric(StreamEventStateMachineHandler.METRICS_JSON_PATH_ERROR, 0, 'Count'); + this.metricsLogger.putMetric(StreamEventStateMachineHandler.METRICS_EXECUTION_EXISTS, 0, 'Count'); + + for (const eventHandler of this.eventStreamHandlerConfig.eventHandlers) { + if (this.recordMatchesHandler(record, eventHandler)) { + await this.startExecution(eventHandler.stateMachineConfig, record); + } + else { + console.log('Record does not match handler', eventHandler); + } + } + } + + recordMatchesHandler(record, eventHandler) { + if (!record || !eventHandler) { + console.error('Invalid parameters: record and eventHandler are required'); + return false; + } + + if (eventHandler.eventNames.length > 0 && !eventHandler.eventNames.includes(record.eventName || '')) { + console.log('Event name does not match', record.eventName, eventHandler.eventNames); + return false; + } + + const jsonRecord = record.dynamodb; + if (!jsonRecord) { + console.error('Invalid parameters: record.dynamodb is required'); + return false; + } + + if (eventHandler.conditions.length > 0) { + return eventHandler.conditions.every(element => { + if (!element || !element.jsonPath) { + return false; + } + + const query = element.jsonPath; + try { + const matches = JsonPath.query(jsonRecord, query); + if (matches.length === 0) { + console.log('Unable find any match for the condition', query, 'value', element.value); + return false; + } + console.log(`JsonPath query ${query} looking for value ${element.value} running on the following record: `,jsonRecord, " found the following potential match value", matches[0]); + return matches[0] === element.value; + } catch (ex) { + console.error('Unable to run the json query', ex, query, jsonRecord); + this.metricsLogger.putMetric(StreamEventStateMachineHandler.METRICS_JSON_PATH_ERROR, 1, 'Count'); + return false; + } + }); + } + + return true; + } + + async startExecution(stateMachineConfig, record) { + try { + const executionName = this.buildExecutionName(record); + const input = this.buildExecutionInput(stateMachineConfig, record); + + console.log(`Starting execution ${executionName} for state machine ${stateMachineConfig.stateMachineArn}`); + + const command = new StartExecutionCommand({ + stateMachineArn: stateMachineConfig.stateMachineArn, + name: executionName, + input: JSON.stringify(input) + }); + + const response = await this.stepFunctionsClient.send(command); + + console.log('Started execution:', response.executionArn); + this.metricsLogger.putMetric(StreamEventStateMachineHandler.METRICS_EXECUTION_STARTED, 1, 'Count'); + } catch (error) { + if (error.name === 'ExecutionAlreadyExists') { + console.log('Execution already exists'); + this.metricsLogger.putMetric(StreamEventStateMachineHandler.METRICS_EXECUTION_EXISTS, 1, 'Count'); + } else { + throw error; + } + } + } + + buildExecutionName(record) { + return `${record.eventID || Date.now()}`; + } + + buildExecutionInput(stateMachineConfig, record) { + if (!stateMachineConfig || !record) { + throw new Error('Invalid parameters: stateMachineConfig and record are required'); + } + + try { + const result = {}; + // If input mapping is defined in stateMachineConfig + if (stateMachineConfig.input && typeof stateMachineConfig.input === 'object') { + // For each key-value pair in the input configuration + Object.entries(stateMachineConfig.input).forEach(([key, jsonPath]) => { + try { + // Evaluate the JSONPath expression against the dynamodb property + const matches = JsonPath.query(record.dynamodb, jsonPath); + + if (matches && matches.length > 0) { + result[key] = matches[0]; + } else { + console.warn(`No matches found for JSONPath: ${jsonPath} for key: ${key}`); + result[key] = null; + } + } catch (error) { + console.error(`Error evaluating JSONPath for key ${key}:`, error); + result[key] = null; + } + }); + } + console.log('Built execution input:', result); + return result; + + } catch (error) { + console.error('Error building execution input:', error); + throw new Error(`Failed to build execution input: ${error.message}`); + } + } +} + +/** + * Lambda handler function + */ +exports.handler = async (event, context) => { + const handler = new StreamEventStateMachineHandler(); + return handler.handleRequest(event, context); +}; \ No newline at end of file diff --git a/ddbstream-lambda-sfn-cdk-ts/src/lambda/package.json b/ddbstream-lambda-sfn-cdk-ts/src/lambda/package.json new file mode 100644 index 000000000..1825b3f3d --- /dev/null +++ b/ddbstream-lambda-sfn-cdk-ts/src/lambda/package.json @@ -0,0 +1,18 @@ +{ + "name": "lambda", + "version": "1.0.0", + "description": "Lambdas to consume via ALB", + "private": true, + "license": "MIT", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [], + "author": "", + "dependencies": { + "@aws-sdk/client-sfn": "^3.817.0", + "aws-embedded-metrics": "^4.2.0", + "jsonpath": "^1.1.1" + } +} diff --git a/ddbstream-lambda-sfn-cdk-ts/src/lib/ddbstream-lambda-sfn-example-stack.ts b/ddbstream-lambda-sfn-cdk-ts/src/lib/ddbstream-lambda-sfn-example-stack.ts new file mode 100644 index 000000000..e76cc65ad --- /dev/null +++ b/ddbstream-lambda-sfn-cdk-ts/src/lib/ddbstream-lambda-sfn-example-stack.ts @@ -0,0 +1,76 @@ +import * as cdk from 'aws-cdk-lib'; +import { Construct } from 'constructs'; +import { AttributeType, StreamViewType, Table } from "aws-cdk-lib/aws-dynamodb" +import { Pass, StateMachine } from "aws-cdk-lib/aws-stepfunctions" +import { DynamoWorkflowTrigger, EventName } from "./ddbstream-lambda-sfn" +import { FilterCriteria, FilterRule } from "aws-cdk-lib/aws-lambda"; + + +export class DdbstreamLambdaSfnExampleStack extends cdk.Stack { + constructor(scope: Construct, id: string, props?: cdk.StackProps) { + super(scope, id, props); + + // Create a DynamoDB table with streaming enabled and 'Index' as partition key. + // You can also import your own table. But it should have DDB streaming enabled. + const testTable = new Table(this, "TestTable", { + partitionKey: { + name: "Index", + type: AttributeType.STRING + }, + stream: StreamViewType.NEW_AND_OLD_IMAGES // Required for the trigger to work + }) + + // Create a simple Step Functions state machine with a single Pass state. + // You can import any other step function here as well. + const testStateMachine = new StateMachine(this, "TestStateMachine", { + definition: new Pass(this, "TestPassState") + }) + + // Create a DynamoDB stream trigger with event filtering and conditional execution + const exampleTrigger = new DynamoWorkflowTrigger(this, "TestTrigger", { + eventSourceFilters: [ + FilterCriteria.filter({ + dynamodb: { + NewImage: { + SkipMe: { + // Only trigger when attribute "SkipMe" does not exist + S: FilterRule.notExists(), + }, + }, + }, + }), + ], + eventHandlers: [ + { + table: testTable, + // Only trigger on MODIFY events + eventNames: [EventName.Modify], + // Only execute when: + // 1. NewImage.testKey = "test8" + // 2. OldImage.testKey = "test9" + conditions: [{ jsonPath: "$.NewImage.testKey.S", value: "test8"}, { jsonPath: "$.OldImage.testKey.S", value: "test9"}], // Ensure this is always an array + // Configure Step Functions execution with dynamic input mapping + stateMachineConfig: { + stateMachine: testStateMachine, + input: { + Index: "$.NewImage.Index.S", + MapAttribute: "$.NewImage.ListAttribute.L[0]" + } + } + } + ] + }) + + new cdk.CfnOutput(this, "DynamoDBTable", { + value: testTable.tableName, + }); + + new cdk.CfnOutput(this, "StateMachineArn", { + value: testStateMachine.stateMachineArn, + }); + + new cdk.CfnOutput(this, "LambdaName", { + value: exampleTrigger.lambda.functionName, + }); + } +} \ No newline at end of file diff --git a/ddbstream-lambda-sfn-cdk-ts/src/lib/ddbstream-lambda-sfn.ts b/ddbstream-lambda-sfn-cdk-ts/src/lib/ddbstream-lambda-sfn.ts new file mode 100644 index 000000000..cc84bbcdc --- /dev/null +++ b/ddbstream-lambda-sfn-cdk-ts/src/lib/ddbstream-lambda-sfn.ts @@ -0,0 +1,291 @@ +import { Duration, RemovalPolicy } from "aws-cdk-lib" +import { Dashboard } from "aws-cdk-lib/aws-cloudwatch" +import { ITable } from "aws-cdk-lib/aws-dynamodb" +import { ISecurityGroup, IVpc, Peer, Port, SecurityGroup, SubnetType } from "aws-cdk-lib/aws-ec2" +import { Key } from "aws-cdk-lib/aws-kms" +import { Function, Runtime, Code, StartingPosition } from "aws-cdk-lib/aws-lambda" +import { DynamoEventSource, SqsDlq } from "aws-cdk-lib/aws-lambda-event-sources" +import { RetentionDays } from "aws-cdk-lib/aws-logs" +import { Queue, QueueEncryption } from "aws-cdk-lib/aws-sqs" +import { IStateMachine } from "aws-cdk-lib/aws-stepfunctions" +import { Construct } from "constructs" +import * as path from 'path'; + +/** + * DynamoDB event stream event names. Use this to filter events based on whether they are inserts, updates, + * or deletes. + */ +export enum EventName { + /** + * A new item was inserted into the DDB table. + */ + Insert = "INSERT", + /** + * An existing item was modified in the DDB table. + */ + Modify = "MODIFY", + /** + * An item was removed from the DDB table. + */ + Remove = "REMOVE" +} + +/** + * The state machine config describes which state machine to invoke, and what properties to input into it. + */ +export interface StateMachineConfig { + /** + * State machine to invoke. + */ + stateMachine: IStateMachine + /** + * Input map. + */ + input?: { + [name: string]: string + } +} + +export interface Condition { + jsonPath: string, + value: string +} + +/** + * Each event handler describes a kind of change in DynamoDB to react to, and the state machine to trigger + * when that event happens. + */ +export interface EventHandler { + /** + * Table to consume events from. The table must have streaming enabled and set to NEW_AND_OLD_IMAGES. + */ + table: ITable + /** + * The types of events (INSERT, MODIFY, REMOVE) to trigger on. + */ + eventNames?: EventName[] + /** + * Conditions that must be met for this event handler to trigger. These are JSONPath expressions that are + * evaluated over the `dynamoDb` property of the event record. + */ + conditions?: Condition[] + /** + * The state machine to execute if conditions are met. + */ + stateMachineConfig: StateMachineConfig +} + +/** + * DynamoWorkflowTrigger construct properties. + */ +export interface DynamoWorkflowTriggerProps { + /** + * List of event handlers to trigger on. Each event handler describes a set of conditions that must be + * met, and a state machine to trigger when those conditions are met. + */ + eventHandlers: EventHandler[] + /** + * Number of times to re-try a failed Lambda invocation before sending it to the dead-letter queue. + * + * @default 3 + */ + retries?: number + /** + * VPC to run the trigger Lambda inside of. + */ + vpc?: IVpc + /** + * SubnetType to use for VPC Subnet. Requires setting vpc. + * + * Defaults to SubnetType.ISOLATED if vpc is configured. + */ + subnetType?: SubnetType + /** + * Additional security groups to apply to the event trigger lambda. Requires setting vpc. + * + * The event trigger lambda requires communication to the StepFunctions service endpoint. If a vpc is configured + * but this prop is not specified, a default security group enabling all egress HTTPS traffic is used. + * It is recommend that consumers of this construct provide a vpc and explicit security groups that limit traffic + * to only the StepFunctions service endpoint over HTTPS (port 443) using a VPC interface endpoint. + */ + additionalSecurityGroups?: ISecurityGroup[] + /** + * Add filter criteria option for event source. + * + * @default - None + */ + readonly eventSourceFilters?: Array<{ + [key: string]: any + }> +} + +/** + * State machine config that resolves the state machine to its ARN. + */ +export interface LambdaStateMachineConfig { + /** + * State machine ARN. + */ + stateMachineArn: string + /** + * Input map. + */ + input?: { + [name: string]: string + } +} + +/** + * Event handler representation that resolves each table to its event source ARN. + */ +interface LambdaEventHandler { + /** + * Table to consume events from. The table must have streaming enabled and set to NEW_AND_OLD_IMAGES. + */ + eventSourceArn: string + /** + * The types of events (INSERT, MODIFY, REMOVE) to trigger on. + */ + eventNames?: EventName[] + /** + * Conditions that must be met for this event handler to trigger. These are JSONPath expressions that are + * evaluated over the `dynamoDb` property of the event record. + */ + conditions?: Condition[] + /** + * The state machine to execute if conditions are met. + */ + stateMachineConfig: LambdaStateMachineConfig +} + +/** + * A CDK construct that to trigger StepFunctions workflows in response to changes in a DynamoDB table. + * + * The construct contains a Lambda function that evaluates JSONPath expressions against DynamoDB event + * records to determine whether a workflow must be executed. An arbitrary number of event handlers can + * be defined for a single table to handle different kinds of state transitions. + * + * The construct includes a dead-letter queue for failed invocations, as well as a dashboard and alarms. + */ +export class DynamoWorkflowTrigger extends Construct { + /** + * The Lambda function to be invoked for each DynamoDB event record. + */ + public readonly lambda: Function + + /** + * Generated CloudWatch dashboard. + */ + public readonly deadLetterQueue: Queue + + /** + * Generated CloudWatch dashboard. + */ + public readonly dashboard?: Dashboard + + + /** + * Creates a new instance. + * + * @param parent parent construct. + * @param id construct id. + * @param props properties. + */ + constructor(parent: Construct, id: string, props: DynamoWorkflowTriggerProps) { + super(parent, id) + + const dlqKmsKey = new Key(parent, "DlqKey", { + description: "SSE for encrypting the workflow trigger SQS DLQ.", + enableKeyRotation: true, + removalPolicy: RemovalPolicy.RETAIN + }) + + // Create a dead-letter queue for failed invocations. + this.deadLetterQueue = new Queue(this, "Dlq", { + retentionPeriod: Duration.days(14), + encryption: QueueEncryption.KMS, + encryptionMasterKey: dlqKmsKey, + }) + + // Construct event handler configuration for the Lambda function. This resolves CDK Table + // constructs to their event stream ARNs. + const lambdaEventHandlers: LambdaEventHandler[] = props.eventHandlers.map((handler) => { + return { + eventSourceArn: handler.table.tableStreamArn!, + eventNames: handler.eventNames, + conditions: handler.conditions, + stateMachineConfig: { + stateMachineArn: handler.stateMachineConfig.stateMachine.stateMachineArn, + input: handler.stateMachineConfig.input, + } + } + }) + + if (!props.vpc && props.additionalSecurityGroups) { + throw new Error("Cannot specify security groups without configuring a vpc.") + } + if (!props.vpc && props.subnetType) { + throw new Error("Cannot specify subnetType without configuring a vpc.") + } + + // If VPC is set build sane defaults into subnet type and security groups. + const networkConfiguration = props.vpc + ? { + vpc: props.vpc, + subnetType: props.subnetType ?? SubnetType.PRIVATE_ISOLATED, + securityGroups: props.additionalSecurityGroups || [this.buildDefaultSecurityGroup(props.vpc)] + } + : {} + + // Create the Lambda function. + this.lambda = new Function(this, "Lambda", { + code: Code.fromAsset(path.join(__dirname, '../lambda')), + handler: "index.handler", + runtime: Runtime.NODEJS_20_X, + timeout: Duration.seconds(20), + environment: { + EVENT_HANDLER_CONFIG: JSON.stringify({ + eventHandlers: lambdaEventHandlers + }) + }, + ...networkConfiguration, + logRetention: RetentionDays.TEN_YEARS + }) + + // Give the Lambda function read access to the required tables and allow it to + // start executions for the relevant state machines. + props.eventHandlers.forEach((handler) => { + handler.table.grantStreamRead(this.lambda) + // grantStreamRead is supposed to grant decrypt to the KMS key but it doesn't + handler.table.encryptionKey?.grantDecrypt(this.lambda) + handler.stateMachineConfig.stateMachine.grantStartExecution(this.lambda) + }) + + // For each table, create an event source and wire it up to the Lambda function. + const tables = new Set(props.eventHandlers.map((handler) => handler.table)) + tables.forEach((table) => { + // Create event source for the Lambda function. + const eventSource = new DynamoEventSource(table, { + startingPosition: StartingPosition.TRIM_HORIZON, + onFailure: new SqsDlq(this.deadLetterQueue), + retryAttempts: props.retries || 10, + bisectBatchOnError: true, + filters: props.eventSourceFilters, + }) + + // Connect the DDB event source to the Lambda. + this.lambda.addEventSource(eventSource) + }) + } + + buildDefaultSecurityGroup(vpc: IVpc): SecurityGroup { + const defaultSecurityGroup = new SecurityGroup(this, "DefaultSecurityGroup", { + vpc: vpc, + description: "DynamoWorkflowTrigger default security group.", + allowAllOutbound: false + }) + defaultSecurityGroup.addEgressRule(Peer.anyIpv4(), Port.tcp(443), "Enable HTTPS egress.") + + return defaultSecurityGroup + } +} diff --git a/ddbstream-lambda-sfn-cdk-ts/tsconfig.json b/ddbstream-lambda-sfn-cdk-ts/tsconfig.json new file mode 100644 index 000000000..1b620444f --- /dev/null +++ b/ddbstream-lambda-sfn-cdk-ts/tsconfig.json @@ -0,0 +1,32 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "lib": [ + "es2022" + ], + "declaration": true, + "strict": true, + "noImplicitAny": true, + "noEmit": true, + "strictNullChecks": true, + "noImplicitThis": true, + "alwaysStrict": true, + "noUnusedLocals": false, + "noUnusedParameters": false, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": false, + "inlineSourceMap": true, + "inlineSources": true, + "experimentalDecorators": true, + "strictPropertyInitialization": false, + "typeRoots": [ + "./node_modules/@types" + ] + }, + "exclude": [ + "node_modules", + "cdk.out" + ] +}