diff --git a/domovoi/app.py b/domovoi/app.py index 64b28e6..ecdd184 100644 --- a/domovoi/app.py +++ b/domovoi/app.py @@ -38,7 +38,7 @@ class Domovoi(Chalice): sns_subscribers = {} sqs_subscribers = {} s3_subscribers = {} - sfn_tasks = {} + sfns = {} cwl_sub_filters = {} dynamodb_event_sources = {} @@ -105,13 +105,34 @@ def register_rule(func): return func return register_rule - def step_function_task(self, state_name, state_machine_definition): + def step_function(self, state_machine_name=None): + def register_sfn(func): + handler_name = state_machine_name + if handler_name is None: + handler_name = func.__name__ + + self.sfns[handler_name] = dict(state_machine_definition=func(), + state_machine_name=handler_name, + states=dict()) + return func + return register_sfn + + def step_function_task(self, state_name, state_machine_definition=None, state_machine_name=None): def register_sfn_task(func): - if state_name in self.sfn_tasks: - raise KeyError(state_name) - self.sfn_tasks[state_name] = dict(state_name=state_name, - state_machine_definition=state_machine_definition, - func=func) + if state_machine_name is None and state_machine_definition is None: + raise Exception("Neither state_machine_name nor state_machine_definition provided") + + if state_machine_name is None: + name = "default" + else: + name = state_machine_name + + if self.sfns.get(name) is None and state_machine_definition is None: + raise Exception("Neither a valid state_machine_name nor state_machine_definition provided") + + self.sfns.setdefault(name, dict(state_machine_definition=state_machine_definition, + state_machine_name=name, + states=dict()))["states"][state_name] = func return func return register_sfn_task @@ -181,12 +202,13 @@ def __call__(self, event, context): elif "awslogs" in event: event = json.loads(gzip.decompress(base64.b64decode(event["awslogs"]["data"]))) handler = self.cwl_sub_filters[event["logGroup"]]["func"] - elif "domovoi-stepfunctions-task" in invoked_function_arn.resource: + elif "domovoi-stepfunctions-" in invoked_function_arn.resource: _, lambda_name, lambda_alias = invoked_function_arn.resource.split(":") - assert lambda_alias.startswith("domovoi-stepfunctions-task-") - task_name = lambda_alias[len("domovoi-stepfunctions-task-"):] + assert lambda_alias.startswith("domovoi-stepfunctions-") + sfn_name = lambda_alias[len("domovoi-stepfunctions-"):].split("-")[0] + task_name = lambda_alias[len("domovoi-stepfunctions-"):].split("-")[2] context.stepfunctions_task_name = task_name - handler = self.sfn_tasks[task_name]["func"] + handler = self.sfns[sfn_name]["states"][task_name] if handler is None: raise DomovoiException("No handler found for event {}".format(event)) diff --git a/scripts/domovoi b/scripts/domovoi index 59d7a6f..c703598 100755 --- a/scripts/domovoi +++ b/scripts/domovoi @@ -311,59 +311,57 @@ if not args.dry_run: for page in awslambda.get_paginator('list_aliases').paginate(FunctionName=function_name): existing_aliases.extend(page["Aliases"]) -state_machine = None -for sfn_task_name, sfn_task in domovoi_app.sfn_tasks.items(): - print("Registering step function state machine for", sfn_task_name) - if state_machine is None: - state_machine = sfn_task["state_machine_definition"] - else: - msg = "Multiple state machine definitions are not supported" - assert state_machine == sfn_task["state_machine_definition"], msg - lambda_alias = "domovoi-stepfunctions-task-" + sfn_task_name - alias_args = dict(FunctionName=function_name, - Name=lambda_alias, - FunctionVersion="$LATEST", - Description="Domovoi Lambda routing label for a Step Functions state machine task") - all_states = domovoi.Domovoi.get_all_states(state_machine) - state = all_states[sfn_task["state_name"]] - if not args.dry_run: - for alias in existing_aliases: - if alias["Name"] == lambda_alias and alias["FunctionVersion"] == "$LATEST": - break - else: - try: - awslambda.create_alias(**alias_args) - except awslambda.exceptions.ResourceConflictException: - awslambda.update_alias(**alias_args) - state["Resource"] = lambda_arn + ":" + lambda_alias - -if state_machine and not args.dry_run: - iam_role_arn = config.iam_role_arn or iam.Role(function_name).arn - sm_args = dict(name=function_name, - definition=json.dumps(state_machine), - roleArn=iam_role_arn) - try: - sm = sfn.create_state_machine(**sm_args) - print("Created new state machine", sm["stateMachineArn"]) - except botocore.exceptions.ClientError as e: - for page in sfn.get_paginator("list_state_machines").paginate(): - for sm in page["stateMachines"]: - if sm["name"] == function_name: - break - if sm["name"] != function_name: - raise e - sm = sfn.describe_state_machine(stateMachineArn=sm["stateMachineArn"]) - sm_args.clear() - if json.loads(sm["definition"]) != state_machine: - sm_args["definition"] = json.dumps(state_machine) - if sm["roleArn"] != iam_role_arn: - sm_args["roleArn"] = iam_role_arn - if sm_args: - print("Updating state machine", sm["stateMachineArn"]) - sfn.update_state_machine(stateMachineArn=sm["stateMachineArn"], **sm_args) - else: - print("No changes required to existing state machine", sm["stateMachineArn"]) - print("State machine:", sm["stateMachineArn"]) +for sfn_name, sfn_data in domovoi_app.sfns.items(): + state_machine = sfn_data["state_machine_definition"] + for sfn_task_name, sfn_task in sfn_data["states"].items(): + print("Registering step function state machine for", sfn_task_name) + lambda_alias = "domovoi-stepfunctions-" + sfn_name + "-task-" + sfn_task_name + alias_args = dict(FunctionName=function_name, + Name=lambda_alias, + FunctionVersion="$LATEST", + Description="Domovoi Lambda routing label for a Step Functions state machine task") + all_states = domovoi.Domovoi.get_all_states(state_machine) + state = all_states[sfn_task_name] + if state is not None: + if not args.dry_run: + for alias in existing_aliases: + if alias["Name"] == lambda_alias and alias["FunctionVersion"] == "$LATEST": + break + else: + try: + awslambda.create_alias(**alias_args) + except awslambda.exceptions.ResourceConflictException: + awslambda.update_alias(**alias_args) + state["Resource"] = lambda_arn + ":" + lambda_alias + + if state_machine and not args.dry_run: + iam_role_arn = config.iam_role_arn or iam.Role(function_name).arn + name = "{}-{}".format(function_name, sfn_name) if sfn_name != "default" else function_name + sm_args = dict(name=name, + definition=json.dumps(state_machine), + roleArn=iam_role_arn) + try: + sm = sfn.create_state_machine(**sm_args) + print("Created new state machine", sm["stateMachineArn"]) + except botocore.exceptions.ClientError as e: + for page in sfn.get_paginator("list_state_machines").paginate(): + for sm in page["stateMachines"]: + if sm["name"] == function_name: + break + if sm["name"] != function_name: + raise e + sm = sfn.describe_state_machine(stateMachineArn=sm["stateMachineArn"]) + sm_args.clear() + if json.loads(sm["definition"]) != state_machine: + sm_args["definition"] = json.dumps(state_machine) + if sm["roleArn"] != iam_role_arn: + sm_args["roleArn"] = iam_role_arn + if sm_args: + print("Updating state machine", sm["stateMachineArn"]) + sfn.update_state_machine(stateMachineArn=sm["stateMachineArn"], **sm_args) + else: + print("No changes required to existing state machine", sm["stateMachineArn"]) + print("State machine:", sm["stateMachineArn"]) if args.dry_run: print("Dry run successful")