Skip to content

Feature/Eventbridge v2: Add ECS target #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions localstack-core/localstack/services/events/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,54 @@ def _validate_input(self, target: Target):
raise ValueError("BatchParameters.JobName is required for Batch target")


class ContainerTargetSender(TargetSender):
class ECSTargetSender(TargetSender):
def send_event(self, event):
raise NotImplementedError("ECS target is not yet implemented")
ecs_parameters = self.target.get("EcsParameters", {})
task_definition_arn = ecs_parameters.get("TaskDefinitionArn")

# Extract network configuration if it exists
vpc_configuration = ecs_parameters.get("NetworkConfiguration", {}).get(
"awsvpcConfiguration", {}
)
aws_vpc_configuration = {
"subnets": vpc_configuration.get("Subnets"),
"securityGroups": vpc_configuration.get("SecurityGroups"),
"assignPublicIp": vpc_configuration.get("AssignPublicIp"),
}

kwargs = {
"launchType": ecs_parameters.get("LaunchType"),
"networkConfiguration": {"awsvpcConfiguration": aws_vpc_configuration}
if aws_vpc_configuration
else None,
"count": ecs_parameters.get("TaskCount"),
"platformVersion": ecs_parameters.get("PlatformVersion"),
"group": ecs_parameters.get("Group"),
"capacityProviderStrategy": ecs_parameters.get("CapacityProviderStrategy"),
"enableECSManagedTags": ecs_parameters.get("EnableECSManagedTags"),
"enableExecuteCommand": ecs_parameters.get("EnableExecuteCommand"),
"propagateTags": ecs_parameters.get("PropagateTags"),
"referenceId": ecs_parameters.get("ReferenceId"),
"tags": ecs_parameters.get("Tags"),
}

# Remove any keys with a value of None
kwargs = {k: v for k, v in kwargs.items() if v is not None}

self.client.run_task(taskDefinition=task_definition_arn, cluster=self.arn, **kwargs)

def _validate_input(self, target: Target):
super()._validate_input(target)
if not collections.get_safe(target, "$.EcsParameters.TaskDefinitionArn"):
raise ValueError("EcsParameters.TaskDefinitionArn is required for ECS target")
ecs_parameters = target.get("EcsParameters", {})
if ecs_parameters.get("LaunchType", {}) == "FARGATE":
if not ecs_parameters.get("NetworkConfiguration", {}):
raise ValueError("NetworkConfiguration is required for FARGATE LaunchType")
if not ecs_parameters.get("NetworkConfiguration", {}).get("awsvpcConfiguration", {}):
raise ValueError("awsvpcConfiguration is required for FARGATE LaunchType")
if ecs_parameters.get("CapacityProviderStrategy") and ecs_parameters.get("LaunchType"):
raise ValueError("only LaunchType or CapacityProviderStrategy can be provided")
Comment on lines +309 to +310
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Check if both LaunchType and CapacityProviderStrategy are provided



class EventsTargetSender(TargetSender):
Expand Down Expand Up @@ -441,7 +481,7 @@ class TargetSenderFactory:
"apigateway": ApiGatewayTargetSender,
"appsync": AppSyncTargetSender,
"batch": BatchTargetSender,
"ecs": ContainerTargetSender,
"ecs": ECSTargetSender,
"events": EventsTargetSender,
"firehose": FirehoseTargetSender,
"kinesis": KinesisTargetSender,
Expand Down
199 changes: 165 additions & 34 deletions localstack-core/localstack/testing/pytest/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1715,40 +1715,6 @@ def _create_delivery_stream(**kwargs):
LOG.info("Failed to delete delivery stream %s", delivery_stream_name)


@pytest.fixture
def events_create_rule(aws_client):
rules = []

def _create_rule(**kwargs):
rule_name = kwargs["Name"]
bus_name = kwargs.get("EventBusName", "")
pattern = kwargs.get("EventPattern", {})
schedule = kwargs.get("ScheduleExpression", "")
rule_arn = aws_client.events.put_rule(
Name=rule_name,
EventBusName=bus_name,
EventPattern=json.dumps(pattern),
ScheduleExpression=schedule,
)["RuleArn"]
rules.append({"name": rule_name, "bus": bus_name})
return rule_arn

yield _create_rule

for rule in rules:
targets = aws_client.events.list_targets_by_rule(
Rule=rule["name"], EventBusName=rule["bus"]
)["Targets"]

targetIds = [target["Id"] for target in targets]
if len(targetIds) > 0:
aws_client.events.remove_targets(
Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
)

aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])


@pytest.fixture
def ses_configuration_set(aws_client):
configuration_set_names = []
Expand Down Expand Up @@ -2220,6 +2186,171 @@ def factory(**kwargs):
aws_client.route53.delete_hosted_zone(Id=zone_id)


###############################
# Events (EventBridge) fixtures
###############################


@pytest.fixture
def events_create_event_bus(aws_client, region_name, account_id):
event_bus_names = []

def _create_event_bus(**kwargs):
if "Name" not in kwargs:
kwargs["Name"] = f"test-event-bus-{short_uid()}"

response = aws_client.events.create_event_bus(**kwargs)
event_bus_names.append(kwargs["Name"])
return response

yield _create_event_bus

for event_bus_name in event_bus_names:
try:
response = aws_client.events.list_rules(EventBusName=event_bus_name)
rules = [rule["Name"] for rule in response["Rules"]]

# Delete all rules for the current event bus
for rule in rules:
try:
response = aws_client.events.list_targets_by_rule(
Rule=rule, EventBusName=event_bus_name
)
targets = [target["Id"] for target in response["Targets"]]

# Remove all targets for the current rule
if targets:
for target in targets:
aws_client.events.remove_targets(
Rule=rule, EventBusName=event_bus_name, Ids=[target]
)

aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
except Exception as e:
LOG.warning(f"Failed to delete rule {rule}: {e}")

# Delete archives for event bus
event_source_arn = (
f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
)
response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
archives = [archive["ArchiveName"] for archive in response["Archives"]]
for archive in archives:
try:
aws_client.events.delete_archive(ArchiveName=archive)
except Exception as e:
LOG.warning(f"Failed to delete archive {archive}: {e}")

aws_client.events.delete_event_bus(Name=event_bus_name)
except Exception as e:
LOG.warning(f"Failed to delete event bus {event_bus_name}: {e}")


@pytest.fixture
def events_put_rule(aws_client):
rules = []

def _put_rule(**kwargs):
if "Name" not in kwargs:
kwargs["Name"] = f"rule-{short_uid()}"

response = aws_client.events.put_rule(**kwargs)
rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
return response

yield _put_rule

for rule, event_bus_name in rules:
try:
response = aws_client.events.list_targets_by_rule(
Rule=rule, EventBusName=event_bus_name
)
targets = [target["Id"] for target in response["Targets"]]

# Remove all targets for the current rule
if targets:
for target in targets:
aws_client.events.remove_targets(
Rule=rule, EventBusName=event_bus_name, Ids=[target]
)

aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
except Exception as e:
LOG.warning(f"Failed to delete rule {rule}: {e}")


@pytest.fixture
def events_create_rule(aws_client):
rules = []

def _create_rule(**kwargs):
rule_name = kwargs["Name"]
bus_name = kwargs.get("EventBusName", "")
pattern = kwargs.get("EventPattern", {})
schedule = kwargs.get("ScheduleExpression", "")
rule_arn = aws_client.events.put_rule(
Name=rule_name,
EventBusName=bus_name,
EventPattern=json.dumps(pattern),
ScheduleExpression=schedule,
)["RuleArn"]
rules.append({"name": rule_name, "bus": bus_name})
return rule_arn

yield _create_rule

for rule in rules:
targets = aws_client.events.list_targets_by_rule(
Rule=rule["name"], EventBusName=rule["bus"]
)["Targets"]

targetIds = [target["Id"] for target in targets]
if len(targetIds) > 0:
aws_client.events.remove_targets(
Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
)

aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])


@pytest.fixture
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
queue_urls = []

def _sqs_as_events_target(queue_name: str | None = None) -> tuple[str, str]:
if not queue_name:
queue_name = f"tests-queue-{short_uid()}"
sqs_client = aws_client.sqs
queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
queue_urls.append(queue_url)
queue_arn = sqs_get_queue_arn(queue_url)
policy = {
"Version": "2012-10-17",
"Id": f"sqs-eventbridge-{short_uid()}",
"Statement": [
{
"Sid": f"SendMessage-{short_uid()}",
"Effect": "Allow",
"Principal": {"Service": "events.amazonaws.com"},
"Action": "sqs:SendMessage",
"Resource": queue_arn,
}
],
}
sqs_client.set_queue_attributes(
QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
)
return queue_url, queue_arn

yield _sqs_as_events_target

for queue_url in queue_urls:
try:
aws_client.sqs.delete_queue(QueueUrl=queue_url)
except Exception as e:
LOG.debug("error cleaning up queue %s: %s", queue_url, e)


@pytest.fixture
def clean_up(
aws_client,
Expand Down
Loading