Skip to content

webapp: Add typing to routes.py #1601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 84 additions & 73 deletions tools/web-fuzzing-introspection/app/webapp/routes.py
Original file line number Diff line number Diff line change
@@ -19,14 +19,12 @@
import json
import signal

from flask import Blueprint, render_template, request, redirect

#from app.site import models
from . import models
from flask import Blueprint, render_template, request, redirect, BaseResponse
from typing import Dict, List, Tuple, Any, Optional, Callable, Union
from . import models, data_storage

# Use these during testing.
#from app.site import test_data
from . import data_storage

blueprint = Blueprint('site', __name__, template_folder='templates')

@@ -36,23 +34,25 @@
local_oss_fuzz = ''


def get_introspector_report_url_base(project_name, datestr):
def get_introspector_report_url_base(project_name: str, datestr: str) -> str:
base_url = 'https://storage.googleapis.com/oss-fuzz-introspector/{0}/inspector-report/{1}/'
project_url = base_url.format(project_name, datestr.replace("-", ""))
return project_url


def get_introspector_report_url_source_base(project_name, datestr):
def get_introspector_report_url_source_base(project_name: str,
datestr: str) -> str:
return get_introspector_report_url_base(project_name,
datestr) + "source-code"


def get_introspector_url(project_name, datestr):
def get_introspector_url(project_name: str, datestr: str) -> str:
return get_introspector_report_url_base(project_name,
datestr) + "fuzz_report.html"


def get_coverage_report_url(project_name, datestr, language):
def get_coverage_report_url(project_name: str, datestr: str,
language: str) -> str:
if language == 'java' or language == 'python' or language == 'go':
file_report = "index.html"
else:
@@ -63,8 +63,8 @@ def get_coverage_report_url(project_name, datestr, language):
return project_url


def extract_introspector_raw_source_code(project_name, date_str, target_file):

def extract_introspector_raw_source_code(project_name: str, date_str: str,
target_file: str) -> Optional[str]:
if is_local:
src_location = os.path.join(local_oss_fuzz, 'build', 'out',
project_name, 'inspector',
@@ -87,13 +87,14 @@ def extract_introspector_raw_source_code(project_name, date_str, target_file):
return raw_source


def extract_lines_from_source_code(project_name,
date_str,
target_file,
line_begin,
line_end,
print_line_numbers=False,
sanity_check_function_end=False):
def extract_lines_from_source_code(
project_name: str,
date_str: str,
target_file: str,
line_begin: int,
line_end: int,
print_line_numbers: bool = False,
sanity_check_function_end: bool = False) -> Optional[str]:
raw_source = extract_introspector_raw_source_code(project_name, date_str,
target_file)
if raw_source is None:
@@ -147,7 +148,7 @@ def extract_lines_from_source_code(project_name,
return return_source


def get_functions_of_interest(project_name):
def get_functions_of_interest(project_name: str) -> List[models.Function]:
all_functions = data_storage.get_functions()
all_functions = all_functions + data_storage.get_constructors()

@@ -169,7 +170,7 @@ def get_functions_of_interest(project_name):
return sorted_functions_of_interest


def get_frontpage_summary_stats():
def get_frontpage_summary_stats() -> models.DBSummary:
# Get total number of projects
all_projects = data_storage.get_projects()

@@ -204,7 +205,7 @@ def get_frontpage_summary_stats():
return db_summary


def get_project_with_name(project_name):
def get_project_with_name(project_name: str) -> Optional[models.Project]:
all_projects = data_storage.get_projects()
for project in all_projects:
if project.name == project_name:
@@ -214,7 +215,8 @@ def get_project_with_name(project_name):
return None


def get_fuction_with_name(function_name, project_name):
def get_fuction_with_name(function_name: str,
project_name: str) -> models.Function:
all_functions = data_storage.get_functions()
for function in all_functions:
if function.name == function_name and function.project == project_name:
@@ -224,7 +226,8 @@ def get_fuction_with_name(function_name, project_name):
return all_functions[0]


def get_all_related_functions(primary_function):
def get_all_related_functions(
primary_function: models.Function) -> List[models.Function]:
all_functions = data_storage.get_functions()
related_functions = []
for function in all_functions:
@@ -237,7 +240,7 @@ def get_all_related_functions(primary_function):


@blueprint.route('/')
def index():
def index() -> str:
db_summary = get_frontpage_summary_stats()
db_timestamps = data_storage.DB_TIMESTAMPS
print("Length of timestamps: %d" % (len(db_timestamps)))
@@ -272,7 +275,7 @@ def index():


@blueprint.route('/function-profile', methods=['GET'])
def function_profile():
def function_profile() -> str:
function_profile = get_fuction_with_name(
request.args.get('function', 'none'),
request.args.get('project', 'none'))
@@ -285,7 +288,7 @@ def function_profile():


@blueprint.route('/project-profile', methods=['GET'])
def project_profile():
def project_profile() -> Union[str, BaseResponse]:
#print(request.args.get('project', 'none'))

target_project_name = request.args.get('project', 'none')
@@ -418,7 +421,7 @@ def project_profile():


@blueprint.route('/function-search')
def function_search():
def function_search() -> str:
info_msg = None
MAX_MATCHES_TO_DISPLAY = 900
query = request.args.get('q', '')
@@ -466,7 +469,7 @@ def function_search():


@blueprint.route('/projects-overview')
def projects_overview():
def projects_overview() -> str:
# Get statistics of the project
project_statistics = data_storage.PROJECT_TIMESTAMPS
latest_coverage_profiles = dict()
@@ -479,7 +482,8 @@ def projects_overview():
all_projects=latest_coverage_profiles.values())


def oracle_3(all_functions, all_projects):
def oracle_3(all_functions: List[models.Function],
all_projects: List[models.Project]) -> List[models.Function]:
"""Filters fucntions that:
- "have far reach but low coverage and are likely easy to trigger"
@@ -490,7 +494,7 @@ def oracle_3(all_functions, all_projects):
- at least one argument.
"""
functions_of_interest = []
projects_added = dict()
projects_added: Dict[str, List[models.Function]] = dict()

for function in all_functions:
if (function.runtime_code_coverage < 20.0
@@ -533,9 +537,11 @@ def oracle_3(all_functions, all_projects):
return functions_of_interest


def oracle_1(all_functions, all_projects, max_project_count=5):
def oracle_1(all_functions: List[models.Function],
all_projects: List[models.Project],
max_project_count: int = 5) -> List[models.Function]:
tmp_list = []
project_count = dict()
project_count: Dict[str, int] = dict()
for function in all_functions:
interesting_fuzz_keywords = {
'deserialize',
@@ -593,7 +599,7 @@ def oracle_1(all_functions, all_projects, max_project_count=5):
return functions_to_display


def match_easy_fuzz_arguments(function):
def match_easy_fuzz_arguments(function: models.Function) -> bool:
debug_args = function.debug_data.get('args')
if not debug_args:
return False
@@ -611,7 +617,7 @@ def match_easy_fuzz_arguments(function):
return False


def is_static(target_function) -> bool:
def is_static(target_function: models.Function) -> bool:
"""Returns True if a function is determined to be static and False
otherwise, including if undecided."""

@@ -668,12 +674,12 @@ def is_static(target_function) -> bool:
return False


def oracle_2(all_functions,
all_projects,
only_functions_with_xrefs=False,
no_static_functions=False):
def oracle_2(all_functions: List[models.Function],
all_projects: List[models.Project],
only_functions_with_xrefs: bool = False,
no_static_functions: bool = False) -> List[models.Function]:
tmp_list = []
project_count = dict()
project_count: Dict[str, int] = dict()
if len(all_projects) == 1:
project_to_target = all_projects[0]
else:
@@ -730,15 +736,16 @@ def oracle_2(all_functions,


@blueprint.route('/target_oracle')
def target_oracle():
def target_oracle() -> str:
all_projects = data_storage.get_projects()
all_functions = data_storage.get_functions()

functions_to_display = []

total_funcs = set()
oracle_pairs = [(oracle_1, "heuristic 1"), (oracle_2, "heuristic 2"),
(oracle_3, "heuristic 3")]
oracle_pairs: List[Tuple[Callable, str]] = [(oracle_1, "heuristic 1"),
(oracle_2, "heuristic 2"),
(oracle_3, "heuristic 3")]
for oracle, heuristic_name in oracle_pairs:
func_targets = oracle(all_functions, all_projects)
for func in func_targets:
@@ -765,7 +772,7 @@ def target_oracle():


@blueprint.route('/indexing-overview')
def indexing_overview():
def indexing_overview() -> str:
build_status = data_storage.get_build_status()

languages_summarised = dict()
@@ -794,17 +801,17 @@ def indexing_overview():


@blueprint.route('/about')
def about():
def about() -> str:
return render_template('about.html', gtag=gtag)


@blueprint.route('/api')
def api():
def api() -> str:
return render_template('api.html', gtag=gtag)


@blueprint.route('/api/annotated-cfg')
def api_annotated_cfg():
def api_annotated_cfg() -> Dict[str, Any]:
project_name = request.args.get('project', None)
if project_name is None:
return {'result': 'error', 'msg': 'Please provide project name'}
@@ -834,7 +841,7 @@ def api_annotated_cfg():


@blueprint.route('/api/project-summary')
def api_project_summary():
def api_project_summary() -> Dict[str, Any]:
project_name = request.args.get('project', None)
if project_name is None:
return {'result': 'error', 'msg': 'Please provide project name'}
@@ -858,7 +865,7 @@ def api_project_summary():


@blueprint.route('/api/branch-blockers')
def branch_blockers():
def branch_blockers() -> Dict[str, Any]:
project_name = request.args.get('project', None)
if project_name is None:
return {'result': 'error', 'msg': 'Please provide project name'}
@@ -895,7 +902,8 @@ def branch_blockers():
return {'result': 'success', 'project_blockers': project_blockers}


def get_function_from_func_signature(func_signature, project_name):
def get_function_from_func_signature(
func_signature: str, project_name: str) -> Optional[models.Function]:
all_functions = data_storage.get_functions()
for function in all_functions:
if function.project == project_name and function.func_signature == func_signature:
@@ -904,7 +912,7 @@ def get_function_from_func_signature(func_signature, project_name):


@blueprint.route('/api/all-cross-references')
def api_cross_references():
def api_cross_references() -> Dict[str, Any]:
"""Returns a json representation of all the functions in a given project"""
project_name = request.args.get('project', None)
if project_name is None:
@@ -952,7 +960,7 @@ def api_cross_references():


@blueprint.route('/api/all-functions')
def api_project_all_functions():
def api_project_all_functions() -> Dict[str, Any]:
"""Returns a json representation of all the functions in a given project"""
project_name = request.args.get('project', None)
if project_name is None:
@@ -970,7 +978,7 @@ def api_project_all_functions():


@blueprint.route('/api/all-jvm-constructors')
def api_project_all_jvm_constructors():
def api_project_all_jvm_constructors() -> Dict[str, Any]:
"""Returns a json representation of all the functions in a given project"""
project_name = request.args.get('project', None)
if project_name is None:
@@ -986,7 +994,8 @@ def api_project_all_jvm_constructors():
return {'result': 'success', 'functions': list_to_return}


def _convert_function_return_list(project_functions):
def _convert_function_return_list(
project_functions: List[models.Function]) -> List[Dict[str, Any]]:
"""Convert a function list to something we can return"""
list_to_return = []
for function in project_functions:
@@ -1016,7 +1025,7 @@ def _convert_function_return_list(project_functions):


@blueprint.route('/api/project-source-code')
def api_project_source_code():
def api_project_source_code() -> Dict[str, Any]:
"""Returns a json representation of all the functions in a given project"""
project_name = request.args.get('project', None)
if project_name is None:
@@ -1079,7 +1088,7 @@ def api_project_source_code():


@blueprint.route('/api/type-info')
def api_type_info():
def api_type_info() -> Dict[str, Any]:
"""Returns a json representation of all the functions in a given project"""
project_name = request.args.get('project', None)
if project_name is None:
@@ -1151,7 +1160,7 @@ def api_function_signature():


@blueprint.route('/api/function-source-code')
def api_function_source_code():
def api_function_source_code() -> Dict[str, Any]:
"""Returns a json representation of all the functions in a given project"""
project_name = request.args.get('project', None)
if project_name is None:
@@ -1214,7 +1223,8 @@ def api_function_source_code():
}


def get_build_status_of_project(project_name):
def get_build_status_of_project(
project_name: str) -> Optional[models.BuildStatus]:
build_status = data_storage.get_build_status()

for bs in build_status:
@@ -1225,9 +1235,9 @@ def get_build_status_of_project(project_name):


@blueprint.route('/api/easy-params-far-reach')
def api_oracle_2():
def api_oracle_2() -> Dict[str, Any]:
"""API for getting fuzz targets with easy fuzzable arguments."""
err_msgs = list()
err_msgs: List[str] = list()
project_name = request.args.get('project', None)
if project_name is None:
return {
@@ -1286,8 +1296,8 @@ def api_oracle_2():


@blueprint.route('/api/far-reach-low-cov-fuzz-keyword')
def api_oracle_1():
err_msgs = list()
def api_oracle_1() -> Dict[str, Any]:
err_msgs: List[str] = list()
project_name = request.args.get('project', None)
if project_name is None:
return {
@@ -1336,7 +1346,7 @@ def api_oracle_1():


@blueprint.route('/api/project-repository')
def project_repository():
def project_repository() -> Dict[str, Any]:
project_name = request.args.get('project', None)
if project_name is None:
return {
@@ -1359,7 +1369,7 @@ def project_repository():


@blueprint.route('/api/far-reach-but-low-coverage')
def far_reach_but_low_coverage():
def far_reach_but_low_coverage() -> Dict[str, Any]:
err_msgs = list()
project_name = request.args.get('project', None)
if project_name is None:
@@ -1518,7 +1528,7 @@ def get_full_recursive_types(debug_type_dictionary, resulting_types,


@blueprint.route('/api/shutdown')
def shutdown():
def shutdown() -> Dict[str, str]:
"""Shuts down the server, only if it's local."""
if is_local or allow_shutdown:
sig = getattr(signal, "SIGKILL", signal.SIGTERM)
@@ -1529,7 +1539,7 @@ def shutdown():


@blueprint.route('/api/all-header-files')
def all_project_header_files():
def all_project_header_files() -> Dict[str, Any]:
project = request.args.get('project', None)
if project is None:
return {
@@ -1548,7 +1558,7 @@ def all_project_header_files():


@blueprint.route('/api/addr-to-recursive-dwarf-info')
def type_at_addr():
def type_at_addr() -> Dict[str, Any]:
# Temporary disabling this API because of size limit.
# @arthurscchan 14/6/2024
return {'result': 'error', 'extended_msgs': ['Temporary disabled']}
@@ -1574,19 +1584,19 @@ def type_at_addr():
with open(type_map, 'r') as f:
type_map_dict = json.load(f)

resulting_types = dict()
resulting_types: Dict[str, Any] = dict()
print("Getting types")
get_full_recursive_types(type_map_dict, resulting_types, addr)

return {'result': 'success', 'dwarf-map': resulting_types}


@blueprint.route('/api/function-target-oracle')
def api_all_interesting_function_targets():
def api_all_interesting_function_targets() -> Dict[str, Any]:
"""Returns a list of function targets based on analysis of all functions in all
OSS-Fuzz projects (assuming they have introspetor builds) using several different
heuristics."""
result_dict = dict()
result_dict: Dict[str, Any] = dict()

# Get the list of all oracles that we have
all_projects = data_storage.get_projects()
@@ -1596,8 +1606,9 @@ def api_all_interesting_function_targets():
functions_to_display = []

total_funcs = set()
oracle_pairs = [(oracle_1, "heuristic 1"), (oracle_2, "heuristic 2"),
(oracle_3, "heuristic 3")]
oracle_pairs: List[Tuple[Callable, str]] = [(oracle_1, "heuristic 1"),
(oracle_2, "heuristic 2"),
(oracle_3, "heuristic 3")]
for oracle, heuristic_name in oracle_pairs:
func_targets = oracle(all_functions, all_projects)
for func in func_targets:
@@ -1635,8 +1646,8 @@ def api_all_interesting_function_targets():


@blueprint.route('/api/sample-cross-references')
def sample_cross_references():
"""Returns a list of strings with functions that call into a given
def sample_cross_references() -> Dict[str, Any]:
"""Returns a list of strings with functions that call into a given
target function."""
project_name = request.args.get('project', None)
if project_name is None: