-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandleS3Event.py
196 lines (146 loc) · 7.14 KB
/
handleS3Event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import boto3 #boto3 to interact with AWS services
import typing #typing for python typing
import botocore #boto3 exceptions
import re #regex for param checking
import logging #logging for cloudwatch
import sys
#Hard Coded In Order To Test
if __name__ == 'AWS_Lambda.handleS3Event':
from AWS_Lambda.utils import *
from AWS_Lambda.constants import *
else:
from utils import * #util functions for lex interactions
from constants import *
### S3
logger = logging.getLogger()
def valid_bucket_name(bucket_name):
if re.match(r'(?!.*\.\.)^[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]$', bucket_name):
return True
return False
def s3_create_bucket(s3_client: typing.Any, intent_request: dict) -> tuple:
"""Creating Bucket in Amazon S3
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket
Args:
s3_client (typing.Any): A low-level client representing Amazon Simple Storage Service (S3)
intent_request (dict): Amazon Lex Event Input
Returns:
tuple: (Status, The message that should be shown to the user)
"""
bucket_name = get_slot(intent_request, "bucketName", True)
#Parameter Checking For bucket name
if not valid_bucket_name(bucket_name):
return elicit_slot(intent_request, "bucketName", "Invalid Bucket Name, Please Enter A Valid Bucket Name.")
try:
bucket = s3_client.create_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as client_error:
return False, "[Error] " + client_error.response["Error"]["Message"]
except botocore.exceptions.ParamValidationError as param_error:
return False, f"[Error] {param_error}"
except: #For All Other Errors
logger.error(sys.exc_info()[0])
return False, f"[Error] ({sys.exc_info()[0]}) Unknown Errors. Please Retry Later!"
if bucket is None:
return False, "Fail Creating Bucket"
return True, f"Successfully Created Bucket {bucket_name}"
def s3_put_object(s3_client: typing.Any, intent_request: dict) -> tuple:
"""Putting Object in Amazon S3
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object
Args:
s3_client (typing.Any): A low-level client representing Amazon Simple Storage Service (S3)
intent_request (dict): Amazon Lex Event Input
Returns:
tuple: (Status, The message that should be shown to the user)
Notes:
only support text as object right now.
"""
bucket_name = get_slot(intent_request, "bucketName", True)
body = get_slot(intent_request, "body")
key = get_slot(intent_request, "key", True)
#Parameter Checking For bucket name
if not valid_bucket_name(bucket_name):
return elicit_slot(intent_request, "bucketName", "Invalid Bucket Name, Please Enter A Valid Bucket Name.")
try:
bucket = s3_client.put_object(Bucket=bucket_name, Body=body.encode('ascii'), Key=key)
except botocore.exceptions.ClientError as client_error:
return False, "[Error] " + client_error.response["Error"]["Message"]
except botocore.exceptions.ParamValidationError as param_error:
return False, f"[Error] {param_error}"
except: #For All Other Errors
logger.error(sys.exc_info()[0])
return False, f"[Error] ({sys.exc_info()[0]}) Unknown Errors. Please Retry Later!"
if bucket is None:
return False, "Fail Putting Object Into Bucket"
return True, f"Successfully Put {key} Inside {bucket_name}"
def s3_delete_object(s3_client: typing.Any, intent_request: dict) -> tuple:
"""Deleting Object in Amazon S3
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.delete_object
Args:
s3_client (typing.Any): A low-level client representing Amazon Simple Storage Service (S3)
intent_request (dict): Amazon Lex Event Input
Returns:
tuple: (Status, The message that should be shown to the user)
"""
bucket_name = get_slot(intent_request, "bucketName", True)
key = get_slot(intent_request, "key", True)
#Parameter Checking For bucket name
if not valid_bucket_name(bucket_name):
return elicit_slot(intent_request, "bucketName", "Invalid Bucket Name, Please Enter A Valid Bucket Name.")
try:
bucket = s3_client.delete_object(Bucket=bucket_name, Key=key)
except botocore.exceptions.ClientError as client_error:
return False, "[Error] " + client_error.response["Error"]["Message"]
except botocore.exceptions.ParamValidationError as param_error:
return False, f"[Error] {param_error}"
except: #For All Other Errors
logger.error(sys.exc_info()[0])
return False, f"[Error] ({sys.exc_info()[0]}) Unknown Errors. Please Retry Later!"
if bucket is None:
return False, "Fail Deleting Object From Bucket"
return True, f"Successfully Deleted {key} Inside {bucket_name}"
def s3_delete_bucket(s3_client: typing.Any, intent_request: dict) -> tuple:
"""Deleting Bucket in Amazon S3
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.delete_bucket
Args:
s3_client (typing.Any): A low-level client representing Amazon Simple Storage Service (S3)
intent_request (dict): Amazon Lex Event Input
Returns:
tuple: (Status, The message that should be shown to the user)
"""
bucket_name = get_slot(intent_request, "bucketName", True)
#Parameter Checking For bucket name
if not valid_bucket_name(bucket_name):
return elicit_slot(intent_request, "bucketName", "Invalid Bucket Name, Please Enter A Valid Bucket Name.")
try:
bucket = s3_client.delete_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as client_error:
return False, "[Error] " + client_error.response["Error"]["Message"]
except botocore.exceptions.ParamValidationError as param_error:
return False, f"[Error] {param_error}"
except: #For All Other Errors
logger.error(sys.exc_info()[0])
return False, f"[Error] ({sys.exc_info()[0]}) Unknown Errors. Please Retry Later!"
if bucket is None:
return False, "Fail Deleting Bucket"
return True, f"Successfully Deleted Bucket {bucket_name}"
def s3_handler(intent_request: dict, action: str) -> dict:
"""Amazon S3's handler
Args:
intent_request (dict): Amazon Lex Event Input
action (str): Action user wants to perform
Returns:
dict: Close response
"""
s3_client = boto3.client("s3")
if action == "CreateBucket":
response = s3_create_bucket(s3_client, intent_request)
elif action == "PutObject":
response = s3_put_object(s3_client, intent_request)
elif action == "DeleteObject":
response = s3_delete_object(s3_client, intent_request)
elif action == "DeleteBucket":
response = s3_delete_bucket(s3_client, intent_request)
else:
response = (False, f"Sorry Action : {action} Not Supported Yet!")
if type(response) != tuple:
return response
return close(intent_request, "Fulfilled" if response[0] else "Failed", [response[1]] + ENDING_PHRASE)