Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions domovoi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Domovoi(Chalice):
sns_subscribers = {}
sqs_subscribers = {}
s3_subscribers = {}
sfn_tasks = {}
sfns = {}
cwl_sub_filters = {}
dynamodb_event_sources = {}

Expand Down Expand Up @@ -105,13 +105,34 @@ def register_rule(func):
return func
return register_rule

def step_function_task(self, state_name, state_machine_definition):
def step_function(self, state_machine_name=None):
def register_sfn(func):
handler_name = state_machine_name
if handler_name is None:
handler_name = func.__name__

self.sfns[handler_name] = dict(state_machine_definition=func(),
state_machine_name=handler_name,
states=dict())
return func
return register_sfn

def step_function_task(self, state_name, state_machine_definition=None, state_machine_name=None):
def register_sfn_task(func):
if state_name in self.sfn_tasks:
raise KeyError(state_name)
self.sfn_tasks[state_name] = dict(state_name=state_name,
state_machine_definition=state_machine_definition,
func=func)
if state_machine_name is None and state_machine_definition is None:
raise Exception("Neither state_machine_name nor state_machine_definition provided")

if state_machine_name is None:
name = "default"
else:
name = state_machine_name

if self.sfns.get(name) is None and state_machine_definition is None:
raise Exception("Neither a valid state_machine_name nor state_machine_definition provided")

self.sfns.setdefault(name, dict(state_machine_definition=state_machine_definition,
state_machine_name=name,
states=dict()))["states"][state_name] = func
return func
return register_sfn_task

Expand Down Expand Up @@ -181,12 +202,13 @@ def __call__(self, event, context):
elif "awslogs" in event:
event = json.loads(gzip.decompress(base64.b64decode(event["awslogs"]["data"])))
handler = self.cwl_sub_filters[event["logGroup"]]["func"]
elif "domovoi-stepfunctions-task" in invoked_function_arn.resource:
elif "domovoi-stepfunctions-" in invoked_function_arn.resource:
_, lambda_name, lambda_alias = invoked_function_arn.resource.split(":")
assert lambda_alias.startswith("domovoi-stepfunctions-task-")
task_name = lambda_alias[len("domovoi-stepfunctions-task-"):]
assert lambda_alias.startswith("domovoi-stepfunctions-")
sfn_name = lambda_alias[len("domovoi-stepfunctions-"):].split("-")[0]
task_name = lambda_alias[len("domovoi-stepfunctions-"):].split("-")[2]
context.stepfunctions_task_name = task_name
handler = self.sfn_tasks[task_name]["func"]
handler = self.sfns[sfn_name]["states"][task_name]

if handler is None:
raise DomovoiException("No handler found for event {}".format(event))
Expand Down
104 changes: 51 additions & 53 deletions scripts/domovoi
Original file line number Diff line number Diff line change
Expand Up @@ -311,59 +311,57 @@ if not args.dry_run:
for page in awslambda.get_paginator('list_aliases').paginate(FunctionName=function_name):
existing_aliases.extend(page["Aliases"])

state_machine = None
for sfn_task_name, sfn_task in domovoi_app.sfn_tasks.items():
print("Registering step function state machine for", sfn_task_name)
if state_machine is None:
state_machine = sfn_task["state_machine_definition"]
else:
msg = "Multiple state machine definitions are not supported"
assert state_machine == sfn_task["state_machine_definition"], msg
lambda_alias = "domovoi-stepfunctions-task-" + sfn_task_name
alias_args = dict(FunctionName=function_name,
Name=lambda_alias,
FunctionVersion="$LATEST",
Description="Domovoi Lambda routing label for a Step Functions state machine task")
all_states = domovoi.Domovoi.get_all_states(state_machine)
state = all_states[sfn_task["state_name"]]
if not args.dry_run:
for alias in existing_aliases:
if alias["Name"] == lambda_alias and alias["FunctionVersion"] == "$LATEST":
break
else:
try:
awslambda.create_alias(**alias_args)
except awslambda.exceptions.ResourceConflictException:
awslambda.update_alias(**alias_args)
state["Resource"] = lambda_arn + ":" + lambda_alias

if state_machine and not args.dry_run:
iam_role_arn = config.iam_role_arn or iam.Role(function_name).arn
sm_args = dict(name=function_name,
definition=json.dumps(state_machine),
roleArn=iam_role_arn)
try:
sm = sfn.create_state_machine(**sm_args)
print("Created new state machine", sm["stateMachineArn"])
except botocore.exceptions.ClientError as e:
for page in sfn.get_paginator("list_state_machines").paginate():
for sm in page["stateMachines"]:
if sm["name"] == function_name:
break
if sm["name"] != function_name:
raise e
sm = sfn.describe_state_machine(stateMachineArn=sm["stateMachineArn"])
sm_args.clear()
if json.loads(sm["definition"]) != state_machine:
sm_args["definition"] = json.dumps(state_machine)
if sm["roleArn"] != iam_role_arn:
sm_args["roleArn"] = iam_role_arn
if sm_args:
print("Updating state machine", sm["stateMachineArn"])
sfn.update_state_machine(stateMachineArn=sm["stateMachineArn"], **sm_args)
else:
print("No changes required to existing state machine", sm["stateMachineArn"])
print("State machine:", sm["stateMachineArn"])
for sfn_name, sfn_data in domovoi_app.sfns.items():
state_machine = sfn_data["state_machine_definition"]
for sfn_task_name, sfn_task in sfn_data["states"].items():
print("Registering step function state machine for", sfn_task_name)
lambda_alias = "domovoi-stepfunctions-" + sfn_name + "-task-" + sfn_task_name
alias_args = dict(FunctionName=function_name,
Name=lambda_alias,
FunctionVersion="$LATEST",
Description="Domovoi Lambda routing label for a Step Functions state machine task")
all_states = domovoi.Domovoi.get_all_states(state_machine)
state = all_states[sfn_task_name]
if state is not None:
if not args.dry_run:
for alias in existing_aliases:
if alias["Name"] == lambda_alias and alias["FunctionVersion"] == "$LATEST":
break
else:
try:
awslambda.create_alias(**alias_args)
except awslambda.exceptions.ResourceConflictException:
awslambda.update_alias(**alias_args)
state["Resource"] = lambda_arn + ":" + lambda_alias

if state_machine and not args.dry_run:
iam_role_arn = config.iam_role_arn or iam.Role(function_name).arn
name = "{}-{}".format(function_name, sfn_name) if sfn_name != "default" else function_name
sm_args = dict(name=name,
definition=json.dumps(state_machine),
roleArn=iam_role_arn)
try:
sm = sfn.create_state_machine(**sm_args)
print("Created new state machine", sm["stateMachineArn"])
except botocore.exceptions.ClientError as e:
for page in sfn.get_paginator("list_state_machines").paginate():
for sm in page["stateMachines"]:
if sm["name"] == function_name:
break
if sm["name"] != function_name:
raise e
sm = sfn.describe_state_machine(stateMachineArn=sm["stateMachineArn"])
sm_args.clear()
if json.loads(sm["definition"]) != state_machine:
sm_args["definition"] = json.dumps(state_machine)
if sm["roleArn"] != iam_role_arn:
sm_args["roleArn"] = iam_role_arn
if sm_args:
print("Updating state machine", sm["stateMachineArn"])
sfn.update_state_machine(stateMachineArn=sm["stateMachineArn"], **sm_args)
else:
print("No changes required to existing state machine", sm["stateMachineArn"])
print("State machine:", sm["stateMachineArn"])

if args.dry_run:
print("Dry run successful")