forked from PaloAltoNetworks/Prisma-Enhanced-Remediation
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAWS-SSS-008.py
98 lines (73 loc) · 1.96 KB
/
AWS-SSS-008.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
Remediate Prisma Policy:
AWS:SSS-008 S3 Bucket has Global ACL Permissions enabled
Description:
Remove the existence of Global ACL permissions on S3 Bucket for the All Users and Authenticated Users groups.
Required Permissions:
- s3:GetBucketAcl
- s3:PutBucketAcl
Sample IAM Policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3Permissions",
"Action": [
"s3:GetBucketAcl",
"s3:PutBucketAcl"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
"""
import boto3
from botocore.exceptions import ClientError
def remediate(session, alert, lambda_context):
"""
Main Function invoked by index_prisma.py
"""
bucket_name = alert['resource_id']
region = alert['region']
s3 = session.client('s3', region_name=region)
try:
bucket_acl = s3.get_bucket_acl(Bucket=bucket_name)
except ClientError as e:
print(e.response['Error']['Message'])
return
new_bucket_acl = {}
new_bucket_acl['Owner'] = bucket_acl['Owner']
new_grants = []
public = False
for grant in bucket_acl['Grants']:
try:
grant_type = grant['Grantee']['Type']
grant_uri = grant['Grantee']['URI']
except KeyError:
new_grants.append(grant)
continue
if ((grant_type == 'Group' and 'AllUsers' in grant_uri) or
(grant_type == 'Group' and 'AuthenticatedUsers' in grant_uri)):
public = True
else:
new_grants.append(grant)
new_bucket_acl['Grants'] = new_grants
# Remediate
if public == True:
result = remove_public_acl(s3, bucket_name, new_bucket_acl)
return
def remove_public_acl(s3, bucket_name, new_bucket_acl):
"""
Remove S3 Bucket Global ACL Policy
"""
try:
result = s3.put_bucket_acl(
AccessControlPolicy = new_bucket_acl,
Bucket = bucket_name
)
except ClientError as e:
print(e.response['Error']['Message'])
else:
print('Global access removed from S3 bucket {} ACL policy.'.format(bucket_name))
return