Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aikido_zen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def protect(mode="daemon", token=""):

import aikido_zen.sinks.builtins
import aikido_zen.sinks.os
import aikido_zen.sinks.pathlib
import aikido_zen.sinks.shutil
import aikido_zen.sinks.io
import aikido_zen.sinks.http_client
Expand Down
29 changes: 29 additions & 0 deletions aikido_zen/sinks/pathlib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Sink module for python's `pathlib`
"""

import aikido_zen.vulnerabilities as vulns
from aikido_zen.helpers.get_argument import get_argument
from aikido_zen.helpers.register_call import register_call
from aikido_zen.sinks import before, patch_function, on_import


@before
def _pathlib_truediv_patch(func, instance, args, kwargs):
path = get_argument(args, kwargs, 0, "key")
op = "pathlib.PurePath.__truediv__"
register_call(op, "fs_op")

vulns.run_vulnerability_scan(kind="path_traversal", op=op, args=(path,))


@on_import("pathlib")
def patch(m):
"""
patching module pathlib
- patches PurePath.__truediv__ : Path() / Path() -> join operation
"""

# PurePath() / "my/path/test.txt"
# This is accomplished by overloading the __truediv__ function on the Path class
patch_function(m, "PurePath.__truediv__", _pathlib_truediv_patch)
66 changes: 65 additions & 1 deletion aikido_zen/sinks/tests/os_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,45 @@
import pytest
from pathlib import Path, PurePath
from unittest.mock import patch
import aikido_zen.sinks.os
import aikido_zen

aikido_zen.protect()
from aikido_zen.context import Context
from aikido_zen.errors import AikidoPathTraversal
from aikido_zen.sinks.tests.clickhouse_driver_test import set_blocking_to_true

kind = "path_traversal"


def set_context(param):
wsgi_request = {
"REQUEST_METHOD": "GET",
"HTTP_HEADER_1": "header 1 value",
"HTTP_HEADER_2": "Header 2 value",
"RANDOM_VALUE": "Random value",
"HTTP_COOKIE": "sessionId=abc123xyz456;",
"wsgi.url_scheme": "http",
"HTTP_HOST": "localhost:8080",
"PATH_INFO": "/hello",
"QUERY_STRING": "user=JohnDoe&age=30&age=35",
"CONTENT_TYPE": "application/json",
"REMOTE_ADDR": "198.51.100.23",
}
context = Context(
req=wsgi_request,
body={
"param": param,
},
source="flask",
)
context.set_as_current_context()


@pytest.fixture(autouse=True)
def set_blocking_to_true(monkeypatch):
monkeypatch.setenv("AIKIDO_BLOCK", "1")


def test_ospath_commands():
with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
Expand Down Expand Up @@ -39,6 +73,36 @@ def test_ospath_commands():
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)


def test_os_create_path_with_multiple_slashes():
import os

file_path = "////etc/passwd"
set_context(file_path)
with pytest.raises(AikidoPathTraversal):
full_path = Path("flaskr/resources/blogs/") / file_path
open(full_path, "r").close()


def test_os_create_path_with_multiple_double_slashes():
import os

file_path = "////etc//passwd"
set_context(file_path)
with pytest.raises(AikidoPathTraversal):
full_path = Path("flaskr/resources/blogs/") / file_path
open(full_path, "r").close()


def test_os_path_traversal_with_multiple_slashes():
import os

file_path = "home///..////..////my_secret.txt"
set_context(file_path)
with pytest.raises(AikidoPathTraversal):
full_path = Path("flaskr/resources/blogs/") / file_path
open(full_path, "r").close()


def test_ospath_command_absolute_path():
with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
Expand Down
14 changes: 10 additions & 4 deletions aikido_zen/vulnerabilities/path_traversal/unsafe_path_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@

def starts_with_unsafe_path(file_path, user_input):
"""Check if the file path starts with any dangerous paths and the user input."""
lower_case_path = file_path.lower()
lower_case_user_input = user_input.lower()
path_parsed = ensure_one_leading_slash(file_path.lower())
input_parsed = ensure_one_leading_slash(user_input.lower())

for dangerous_start in dangerous_path_starts:
if lower_case_path.startswith(dangerous_start) and lower_case_path.startswith(
lower_case_user_input
if path_parsed.startswith(dangerous_start) and path_parsed.startswith(
input_parsed
):
return True

return False


def ensure_one_leading_slash(path: str) -> str:
if path.startswith("/"):
return "/" + path.lstrip("/")
return path