diff --git a/aikido_zen/__init__.py b/aikido_zen/__init__.py index 2458a16b3..440288e88 100644 --- a/aikido_zen/__init__.py +++ b/aikido_zen/__init__.py @@ -71,6 +71,7 @@ def protect(mode="daemon", token=""): import aikido_zen.sinks.builtins import aikido_zen.sinks.os + import aikido_zen.sinks.pathlib import aikido_zen.sinks.shutil import aikido_zen.sinks.io import aikido_zen.sinks.http_client diff --git a/aikido_zen/sinks/pathlib.py b/aikido_zen/sinks/pathlib.py new file mode 100644 index 000000000..d3874ae36 --- /dev/null +++ b/aikido_zen/sinks/pathlib.py @@ -0,0 +1,29 @@ +""" +Sink module for python's `pathlib` +""" + +import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.helpers.register_call import register_call +from aikido_zen.sinks import before, patch_function, on_import + + +@before +def _pathlib_truediv_patch(func, instance, args, kwargs): + path = get_argument(args, kwargs, 0, "key") + op = "pathlib.PurePath.__truediv__" + register_call(op, "fs_op") + + vulns.run_vulnerability_scan(kind="path_traversal", op=op, args=(path,)) + + +@on_import("pathlib") +def patch(m): + """ + patching module pathlib + - patches PurePath.__truediv__ : Path() / Path() -> join operation + """ + + # PurePath() / "my/path/test.txt" + # This is accomplished by overloading the __truediv__ function on the Path class + patch_function(m, "PurePath.__truediv__", _pathlib_truediv_patch) diff --git a/aikido_zen/sinks/tests/os_test.py b/aikido_zen/sinks/tests/os_test.py index fc55f66fc..387d4d00b 100644 --- a/aikido_zen/sinks/tests/os_test.py +++ b/aikido_zen/sinks/tests/os_test.py @@ -1,11 +1,45 @@ import pytest from pathlib import Path, PurePath from unittest.mock import patch -import aikido_zen.sinks.os +import aikido_zen + +aikido_zen.protect() +from aikido_zen.context import Context +from aikido_zen.errors import AikidoPathTraversal +from aikido_zen.sinks.tests.clickhouse_driver_test import set_blocking_to_true kind = "path_traversal" +def set_context(param): + wsgi_request = { + "REQUEST_METHOD": "GET", + "HTTP_HEADER_1": "header 1 value", + "HTTP_HEADER_2": "Header 2 value", + "RANDOM_VALUE": "Random value", + "HTTP_COOKIE": "sessionId=abc123xyz456;", + "wsgi.url_scheme": "http", + "HTTP_HOST": "localhost:8080", + "PATH_INFO": "/hello", + "QUERY_STRING": "user=JohnDoe&age=30&age=35", + "CONTENT_TYPE": "application/json", + "REMOTE_ADDR": "198.51.100.23", + } + context = Context( + req=wsgi_request, + body={ + "param": param, + }, + source="flask", + ) + context.set_as_current_context() + + +@pytest.fixture(autouse=True) +def set_blocking_to_true(monkeypatch): + monkeypatch.setenv("AIKIDO_BLOCK", "1") + + def test_ospath_commands(): with patch( "aikido_zen.vulnerabilities.run_vulnerability_scan" @@ -39,6 +73,36 @@ def test_ospath_commands(): mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args) +def test_os_create_path_with_multiple_slashes(): + import os + + file_path = "////etc/passwd" + set_context(file_path) + with pytest.raises(AikidoPathTraversal): + full_path = Path("flaskr/resources/blogs/") / file_path + open(full_path, "r").close() + + +def test_os_create_path_with_multiple_double_slashes(): + import os + + file_path = "////etc//passwd" + set_context(file_path) + with pytest.raises(AikidoPathTraversal): + full_path = Path("flaskr/resources/blogs/") / file_path + open(full_path, "r").close() + + +def test_os_path_traversal_with_multiple_slashes(): + import os + + file_path = "home///..////..////my_secret.txt" + set_context(file_path) + with pytest.raises(AikidoPathTraversal): + full_path = Path("flaskr/resources/blogs/") / file_path + open(full_path, "r").close() + + def test_ospath_command_absolute_path(): with patch( "aikido_zen.vulnerabilities.run_vulnerability_scan" diff --git a/aikido_zen/vulnerabilities/path_traversal/unsafe_path_start.py b/aikido_zen/vulnerabilities/path_traversal/unsafe_path_start.py index 541ee763e..7e6868cd5 100644 --- a/aikido_zen/vulnerabilities/path_traversal/unsafe_path_start.py +++ b/aikido_zen/vulnerabilities/path_traversal/unsafe_path_start.py @@ -28,13 +28,19 @@ def starts_with_unsafe_path(file_path, user_input): """Check if the file path starts with any dangerous paths and the user input.""" - lower_case_path = file_path.lower() - lower_case_user_input = user_input.lower() + path_parsed = ensure_one_leading_slash(file_path.lower()) + input_parsed = ensure_one_leading_slash(user_input.lower()) for dangerous_start in dangerous_path_starts: - if lower_case_path.startswith(dangerous_start) and lower_case_path.startswith( - lower_case_user_input + if path_parsed.startswith(dangerous_start) and path_parsed.startswith( + input_parsed ): return True return False + + +def ensure_one_leading_slash(path: str) -> str: + if path.startswith("/"): + return "/" + path.lstrip("/") + return path