-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqlalchemy_vuln.py
66 lines (59 loc) · 2.01 KB
/
sqlalchemy_vuln.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import json
import os
import typing as t
import boto3
import sqlalchemy
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
SERVICE = "lambda-connection-demo"
logger = Logger(service=SERVICE)
tracer = Tracer(service=SERVICE)
class LambdaProxyIntegrationResponse(t.TypedDict, total=False):
statusCode: int
body: str
headers: t.Dict[str, t.Any]
DB_USER_SECRET_NAME = os.environ.get("DB_USER_SECRET_NAME")
DB_HOST = os.environ.get("DB_HOST")
secrentsmanager = boto3.client(service_name='secretsmanager')
get_secret_value_response = secrentsmanager.get_secret_value(SecretId=DB_USER_SECRET_NAME)
secret = json.loads(get_secret_value_response["SecretString"])
db_user = secret["username"]
db_password = secret["password"]
db_host = DB_HOST or secret["host"]
db_port = secret["port"]
url = f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/"
engine = sqlalchemy.create_engine(
url,
connect_args={
"ssl": {
"ssl_ca": "./AmazonRootCA1.pem",
}
},
pool_recycle=50,
)
SECRET_KEY = "django-insecure-i2(f^4emukw6o$4k0a^14g@&lu#fa+)5yjj@$_r%)fwoac0wlv"
def handler(event, context):
logger.debug("connecting to db...")
with engine.connect() as connection:
try:
# ruleid: sqlalchemy-sqli
connection.execute(f"SELECT * FROM foobar WHERE id = '{event['id']}'")
# ok: sqlalchemy-sqli
connection.execute("SELECT * FROM foobar WHERE id = '?'", event['id'])
except Exception as e:
logger.error("An error occured:")
print(e)
return {
"statusCode": 200,
"body": json.dumps({
"state": "ERROR",
"message": f"response from '{context.log_stream_name}'"
})
}
return {
"statusCode": 200,
"body": json.dumps({
"state": "SUCCESS",
"message": f"response from '{context.log_stream_name}'"
})
}