Skip to content
118 changes: 111 additions & 7 deletions cloudlift/deployment/cluster_template_generator.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
import json
import re
import pathlib

from cfn_flip import to_yaml
from stringcase import camelcase, pascalcase
from troposphere import (Base64, FindInMap, Output, Parameter, Ref, Sub,
cloudformation)
cloudformation, GetAtt, Join)
from troposphere.autoscaling import (AutoScalingGroup, LaunchConfiguration,
ScalingPolicy)
ScalingPolicy, LifecycleHook)
from troposphere.cloudwatch import Alarm, MetricDimension
from troposphere.ec2 import (VPC, InternetGateway, NatGateway, Route,
RouteTable, SecurityGroup, Subnet,
SubnetRouteTableAssociation, VPCGatewayAttachment)
from troposphere.ecs import Cluster
from troposphere.elasticache import SubnetGroup as ElastiCacheSubnetGroup
from troposphere.iam import InstanceProfile, Role
from troposphere.iam import InstanceProfile, Role, PolicyType, Policy
from troposphere.logs import LogGroup
from troposphere.policies import (AutoScalingRollingUpdate, CreationPolicy,
ResourceSignal)
from troposphere.rds import DBSubnetGroup

from troposphere.awslambda import Function, Code, MEMORY_VALUES, Permission
from cloudlift.config import DecimalEncoder
from cloudlift.config import get_client_for, get_region_for_environment
from cloudlift.deployment.template_generator import TemplateGenerator
from cloudlift.version import VERSION
from troposphere.sns import Subscription, Topic, SubscriptionResource
from awacs.aws import Allow, Statement, Principal, PolicyDocument
from awacs.sts import AssumeRole


class ClusterTemplateGenerator(TemplateGenerator):
Expand Down Expand Up @@ -305,6 +309,7 @@ def _add_cluster(self):
cluster = Cluster('Cluster', ClusterName=Ref('AWS::StackName'))
self.template.add_resource(cluster)
self._add_ec2_auto_scaling()
self._add_instance_draining(cluster)
self._add_cluster_alarms(cluster)
return cluster

Expand Down Expand Up @@ -541,6 +546,105 @@ def _add_ec2_auto_scaling(self):
)
self.template.add_resource(self.cluster_scaling_policy)

def _add_instance_draining(self, cluster):
self.sns_asg_role = Role(
"SNSASGRole",
AssumeRolePolicyDocument=PolicyDocument(
Statement=[
Statement(
Effect=Allow,
Action=[AssumeRole],
Principal=Principal("Service", ["autoscaling.amazonaws.com"])
)
]
),
ManagedPolicyArns=["arn:aws:iam::aws:policy/service-role/AutoScalingNotificationAccessRole"]
)
self.template.add_resource(self.sns_asg_role)
self.lambda_execution_role = Role(
"LambdaExecutionRole",
Policies=[Policy(
PolicyName="lambda-inline",
PolicyDocument={
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"autoscaling:CompleteLifecycleAction",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"ecs:ListContainerInstances",
"ecs:DescribeContainerInstances",
"ecs:UpdateContainerInstancesState",
"sns:Publish"
],
"Resource": "*"
}],
}
)],
AssumeRolePolicyDocument=PolicyDocument(
Statement=[
Statement(
Effect=Allow,
Action=[AssumeRole],
Principal=Principal("Service", ["lambda.amazonaws.com"])
)
]
),
ManagedPolicyArns=["arn:aws:iam::aws:policy/service-role/AutoScalingNotificationAccessRole"]
)
self.template.add_resource(self.lambda_execution_role)
with open (str(pathlib.Path(__file__).parent.absolute())+"/ecs_instance_draining_lambda.py", "r") as ecs_instance_draining_lambda:
lambda_code=ecs_instance_draining_lambda.readlines()
self.lambda_function_for_asg = Function(
"LambdaFunctionForASG",
Handler="index.lambda_handler",
Role=GetAtt(self.lambda_execution_role, "Arn"),
Runtime="python3.6",
MemorySize=128,
Timeout=60,
Code=Code(
ZipFile=Join("", lambda_code)
)
)
self.template.add_resource(self.lambda_function_for_asg)
self.asg_sns_topic = Topic(
"ASGSNSTopic",
TopicName=Join("-", [Ref(cluster),"Topic"]),
Comment thread
praveenraghav01 marked this conversation as resolved.
Outdated
Subscription=[Subscription(
Protocol="lambda",
Endpoint=GetAtt(self.lambda_function_for_asg, "Arn")
)]
)
self.template.add_resource(self.asg_sns_topic)
self.lambda_invoke_permission = Permission(
"LambdaInvokePermission",
FunctionName=Ref(self.lambda_function_for_asg),
Action="lambda:InvokeFunction",
Principal="sns.amazonaws.com",
SourceArn=Ref(self.asg_sns_topic)
)
self.template.add_resource(self.lambda_invoke_permission)
self.lambda_subscription_to_sns_topic = SubscriptionResource(
"LambdaSubscriptionToSNSTopic",
Protocol="lambda",
Endpoint=GetAtt(self.lambda_function_for_asg, "Arn"),
TopicArn=Ref(self.asg_sns_topic)
)
self.template.add_resource(self.lambda_subscription_to_sns_topic)
self.asg_lifecycle_hook=LifecycleHook(
"ASGLifecycleHook",
AutoScalingGroupName=Ref(self.auto_scaling_group),
DefaultResult="ABANDON",
LifecycleHookName=Join("-", [Ref(cluster),"ASG-Hook"]),
LifecycleTransition="autoscaling:EC2_INSTANCE_TERMINATING",
NotificationMetadata=Ref(cluster),
NotificationTargetARN=Ref(self.asg_sns_topic),
RoleARN=GetAtt(self.sns_asg_role, "Arn"),
)
self.template.add_resource(self.asg_lifecycle_hook)

def _add_cluster_parameters(self):
self.template.add_parameter(Parameter(
"Environment",
Expand Down Expand Up @@ -666,7 +770,7 @@ def _add_metadata(self):
'Subnet2',
'NotificationSnsArn'
]
},
}
],
'ParameterLabels': {
'Environment': {
Expand All @@ -684,7 +788,7 @@ def _add_metadata(self):
'default': 'Min. no. of instances in cluster'
},
'NotificationSnsArn': {
'default': 'The SNS topic to which notifactions has to be triggered'
'default': 'The SNS topic to which notifications has to be triggered'
},
'Subnet1': {
'default': 'Enter the ID of the 1st subnet'
Expand All @@ -694,7 +798,7 @@ def _add_metadata(self):
},
'VPC': {
'default': 'Enter the VPC in which you want the environment to be setup'
},
}
}
}
})
61 changes: 61 additions & 0 deletions cloudlift/deployment/ecs_instance_draining_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json
import time
import boto3
import os

ECS = boto3.client('ecs')
ASG = boto3.client('autoscaling')
SNS = boto3.client('sns')

def find_ecs_instance_info(instance_id,cluster_name):
paginator = ECS.get_paginator('list_container_instances')
for list_resp in paginator.paginate(cluster=cluster_name):
arns = list_resp['containerInstanceArns']
desc_resp = ECS.describe_container_instances(cluster=cluster_name,
containerInstances=arns)
for container_instance in desc_resp['containerInstances']:
if container_instance['ec2InstanceId'] != instance_id:
continue
print('Found instance: id=%s, arn=%s, status=%s, runningTasksCount=%s' %
(instance_id, container_instance['containerInstanceArn'],
container_instance['status'], container_instance['runningTasksCount']))
return (container_instance['containerInstanceArn'],
container_instance['status'], container_instance['runningTasksCount'])
return None, None, 0

def instance_has_running_tasks(instance_id,cluster_name):
(instance_arn, container_status, running_tasks) = find_ecs_instance_info(instance_id,cluster_name)
if instance_arn is None:
print('Could not find instance ID %s. Letting autoscaling kill the instance.' %
(instance_id))
return False
if container_status != 'DRAINING':
print('Setting container instance %s (%s) to DRAINING' %
(instance_id, instance_arn))
ECS.update_container_instances_state(cluster=cluster_name,
containerInstances=[instance_arn],
status='DRAINING')
return running_tasks > 0

def lambda_handler(event, context):
msg = json.loads(event['Records'][0]['Sns']['Message'])
print("Event: ", msg)
if 'LifecycleTransition' not in msg.keys() or \
msg['LifecycleTransition'].find('autoscaling:EC2_INSTANCE_TERMINATING') == -1:
print('Exiting since the lifecycle transition is not EC2_INSTANCE_TERMINATING.')
return
if instance_has_running_tasks(msg['EC2InstanceId'], msg['NotificationMetadata']):
print('Tasks are still running on instance %s; posting msg to SNS topic %s' %
(msg['EC2InstanceId'], event['Records'][0]['Sns']['TopicArn']))
time.sleep(5)
sns_resp = SNS.publish(TopicArn=event['Records'][0]['Sns']['TopicArn'],
Message=json.dumps(msg),
Subject='Publishing SNS msg to invoke Lambda again.')
print('Posted msg %s to SNS topic.' % (sns_resp['MessageId']))
else:
print('No tasks are running on instance %s; setting lifecycle to complete' %
(msg['EC2InstanceId']))
ASG.complete_lifecycle_action(LifecycleHookName=msg['LifecycleHookName'],
AutoScalingGroupName=msg['AutoScalingGroupName'],
LifecycleActionResult='CONTINUE',
InstanceId=msg['EC2InstanceId'])