Skip to content

Commit

Permalink
Fix low-hanging lint issues (angr#3777)
Browse files Browse the repository at this point in the history
* Fix low-hanging lint issues

* Fix some test failures

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix doc warnings

* Fix test_pickle.py

* Fix code-block doc warning

* Try indenting code block

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
twizmwazin and pre-commit-ci[bot] authored Feb 7, 2023
1 parent a5e67a0 commit 5cbc3b0
Show file tree
Hide file tree
Showing 155 changed files with 456 additions and 1,263 deletions.
1 change: 0 additions & 1 deletion angr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
from . import knowledge_plugins
from . import exploration_techniques
from .exploration_techniques import ExplorationTechnique
from . import type_backend
from . import sim_type as types
from .state_hierarchy import StateHierarchy

Expand Down
1 change: 0 additions & 1 deletion angr/analyses/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def register_analysis(cls, name):
from .vfg import VFG
from .boyscout import BoyScout

# from .girlscout import GirlScout
from .backward_slice import BackwardSlice
from .veritesting import Veritesting
from .vsa_ddg import VSA_DDG
Expand Down
1 change: 0 additions & 1 deletion angr/analyses/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

if TYPE_CHECKING:
from ..knowledge_base import KnowledgeBase
import angr
from ..project import Project
from typing_extensions import ParamSpec

Expand Down
1 change: 0 additions & 1 deletion angr/analyses/cfg/cfb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import cffi

import cle
from cle.backends.externs import KernelObject, ExternObject
Expand Down
4 changes: 2 additions & 2 deletions angr/analyses/cfg/cfg_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2208,7 +2208,7 @@ def _graph_traversal_handler(self, g, src, dst, data, blockaddr_to_function, kno
else:
self._addr_to_function(returning_target, blockaddr_to_function, known_functions)

to_outside = not blockaddr_to_function[returning_target] is src_function
to_outside = blockaddr_to_function[returning_target] is not src_function

n = self.model.get_any_node(returning_target)
if n is None:
Expand Down Expand Up @@ -2346,7 +2346,7 @@ def _graph_traversal_handler(self, g, src, dst, data, blockaddr_to_function, kno
if called_function is not None and called_function.returning is False:
return

to_outside = not target_function is src_function
to_outside = target_function is not src_function

confirmed = called_function is None or called_function.returning is True
self.kb.functions._add_fakeret_to(
Expand Down
26 changes: 15 additions & 11 deletions angr/analyses/cfg/cfg_emulated.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,16 @@ def __init__(
jumpkind, or a SimState instance. Unsupported entries in starts will
lead to an AngrCFGError being raised.
:param keep_state: Whether to keep the SimStates for each CFGNode.
:param resolve_indirect_jumps: Whether to enable the indirect jump resolvers for resolving indirect jumps
:param enable_advanced_backward_slicing: Whether to enable an intensive technique for resolving indirect jumps
:param enable_symbolic_back_traversal: Whether to enable an intensive technique for resolving indirect jumps
:param list indirect_jump_resolvers: A custom list of indirect jump resolvers. If this list is None or empty,
default indirect jump resolvers specific to this architecture and binary
types will be loaded.
:param resolve_indirect_jumps: Whether to enable the indirect jump resolvers for resolving indirect
jumps
:param enable_advanced_backward_slicing: Whether to enable an intensive technique for resolving indirect
jumps
:param enable_symbolic_back_traversal: Whether to enable an intensive technique for resolving indirect
jumps
:param list indirect_jump_resolvers: A custom list of indirect jump resolvers. If this list is None or
empty,
default indirect jump resolvers specific to this architecture and
binary types will be loaded.
:param additional_edges: A dict mapping addresses of basic blocks to addresses of
successors to manually include and analyze forward from.
:param bool no_construct: Skip the construction procedure. Only used in unit-testing.
Expand Down Expand Up @@ -1242,7 +1246,7 @@ def _pre_job_handling(self, job): # pylint:disable=arguments-differ

the_jobs = []
if block_id in self._pending_jobs:
the_jobs: "Pendingjob" = self._pending_jobs.pop(block_id)
the_jobs: "PendingJob" = self._pending_jobs.pop(block_id)
for the_job in the_jobs:
self._deregister_analysis_job(the_job.caller_func_addr, the_job)
else:
Expand Down Expand Up @@ -2373,8 +2377,8 @@ def _try_resolving_indirect_jumps(self, sim_successors, cfg_node, func_addr, suc
else:
concrete_target = legit_successor.solver.eval(legit_successor.ip)
if (
not self.project.loader.find_object_containing(concrete_target)
is self.project.loader.main_object
self.project.loader.find_object_containing(concrete_target)
is not self.project.loader.main_object
):
should_resolve = False

Expand Down Expand Up @@ -2846,7 +2850,7 @@ def _search_for_function_hints(self, successor_state):
# Now let's live with this big hack...
try:
const = successor_state.solver.eval_one(data.ast)
except: # pylint: disable=bare-except
except Exception:
continue

if self._is_address_executable(const):
Expand Down Expand Up @@ -3111,7 +3115,7 @@ def _create_new_call_stack(self, addr, all_jobs, job, exit_target, jumpkind):
# although the jumpkind is not Ijk_Call, it may still jump to a new function... let's see
if self.project.is_hooked(exit_target):
hooker = self.project.hooked_by(exit_target)
if not hooker is procedures.stubs.UserHook.UserHook:
if hooker is not procedures.stubs.UserHook.UserHook:
# if it's not a UserHook, it must be a function
# Update the function address of the most recent call stack frame
new_call_stack = job.call_stack_copy()
Expand Down
2 changes: 1 addition & 1 deletion angr/analyses/cfg/cfg_fast.py
Original file line number Diff line number Diff line change
Expand Up @@ -2337,7 +2337,7 @@ def _create_job_call(
else:
callee_function = self.kb.functions.function(addr=new_function_addr, syscall=is_syscall)
if callee_function is not None:
callee_might_return = not (callee_function.returning is False)
callee_might_return = callee_function.returning is not False

if callee_might_return:
func_edges = []
Expand Down
9 changes: 3 additions & 6 deletions angr/analyses/cfg/cfg_fast_soot.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@
from pysoot.sootir.soot_value import SootLocal
from pysoot.sootir.soot_statement import IfStmt, InvokeStmt, GotoStmt, AssignStmt
from pysoot.sootir.soot_expr import (
SootInterfaceInvokeExpr,
SootSpecialInvokeExpr,
SootStaticInvokeExpr,
SootVirtualInvokeExpr,
SootInvokeExpr,
SootDynamicInvokeExpr,
) # pylint:disable=unused-import
)

PYSOOT_INSTALLED = True
except ImportError:
Expand Down Expand Up @@ -438,7 +434,8 @@ def _scan_soot_block(self, cfg_job, current_func_addr):
# Mark the address as traced
self._traced_addresses.add(addr)

# soot_block is only used once per CFGNode. We should be able to clean up the CFGNode here in order to save memory
# soot_block is only used once per CFGNode. We should be able to clean up the CFGNode here in order to save
# memory
cfg_node.soot_block = None

successors = self._soot_get_successors(addr, current_func_addr, soot_block, cfg_node)
Expand Down
10 changes: 8 additions & 2 deletions angr/analyses/cfg/cfg_job_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ def callsite_repr(self):
return "None"

s = []
format_addr = lambda addr: "None" if addr is None else hex(addr)

def format_addr(addr):
return "None" if addr is None else hex(addr)

for i in range(0, len(self.callsite_tuples), 2):
s.append("@".join(map(format_addr, self.callsite_tuples[i : i + 2])))
return " -> ".join(s)
Expand Down Expand Up @@ -84,7 +87,10 @@ def callsite_repr(self):
return "None"

s = []
format_addr = lambda addr: "None" if addr is None else hex(addr)

def format_addr(addr):
return "None" if addr is None else hex(addr)

for i in range(0, len(self.callsite_tuples), 2):
s.append("@".join(map(format_addr, self.callsite_tuples[i : i + 2])))
return " -> ".join(s)
Expand Down
2 changes: 1 addition & 1 deletion angr/analyses/cfg/indirect_jump_resolvers/jumptable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,7 @@ def _initial_state(self, block_addr, cfg, func_addr: int):
# FIXME:
# this is a hack: for certain architectures, we do not initialize the base pointer, since the jump table on
# those architectures may use the bp register to store value
if not self.project.arch.name in {"S390X"}:
if self.project.arch.name not in {"S390X"}:
state.regs.bp = state.arch.initial_sp + 0x2000

return state
Expand Down
8 changes: 5 additions & 3 deletions angr/analyses/cfg/indirect_jump_resolvers/mips_elf_fast.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,11 @@ def _set_gp_load_callback(state, blade, project, gp_offset, gp_value):
state.inspect.make_breakpoint(
"tmp_write",
when=BP_BEFORE,
condition=lambda s, bbl_addr_=block_addr_in_slice, tmp_offset_=tmp_offset: s.scratch.bbl_addr
== bbl_addr_
and s.inspect.tmp_write_num == tmp_offset_,
condition=(
lambda s, bbl_addr_=block_addr_in_slice, tmp_offset_=tmp_offset: s.scratch.bbl_addr
== bbl_addr_
and s.inspect.tmp_write_num == tmp_offset_
),
action=OverwriteTmpValueCallback(gp_value).overwrite_tmp_value,
)
break
Expand Down
4 changes: 2 additions & 2 deletions angr/analyses/cfg_slice_to_sink/graph.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
def slice_callgraph(callgraph, cfg_slice_to_sink):
"""
Slice a callgraph, keeping only the nodes present in the <CFGSliceToSink> representation, and th transitions for which
a path exists.
Slice a callgraph, keeping only the nodes present in the <CFGSliceToSink> representation, and th transitions for
which a path exists.
*Note* that this function mutates the graph passed as an argument.
Expand Down
2 changes: 1 addition & 1 deletion angr/analyses/code_tagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, func):

def analyze(self):
for analysis, arches in self.ANALYSES:
if not arches is None and self.project.arch.name not in arches:
if arches is not None and self.project.arch.name not in arches:
continue
tags = analysis()
if tags:
Expand Down
7 changes: 4 additions & 3 deletions angr/analyses/congruency_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

class CongruencyCheck(Analysis):
"""
This is an analysis to ensure that angr executes things identically with different execution backends (i.e., unicorn vs vex).
This is an analysis to ensure that angr executes things identically with different execution backends (i.e., unicorn
vs vex).
"""

def __init__(self, throw=False):
Expand Down Expand Up @@ -346,8 +347,8 @@ def compare_states(self, sl, sr):
# make sure the flags are the same
if sl.arch.name in ("AMD64", "X86", "ARM", "ARMEL", "ARMHF", "AARCH64"):
# pylint: disable=unused-variable
n_bkp = sr.regs.cc_op, sr.regs.cc_dep1, sr.regs.cc_dep2, sr.regs.cc_ndep
u_bkp = sl.regs.cc_op, sl.regs.cc_dep1, sl.regs.cc_dep2, sl.regs.cc_ndep
sr.regs.cc_op, sr.regs.cc_dep1, sr.regs.cc_dep2, sr.regs.cc_ndep # n_bkp
sl.regs.cc_op, sl.regs.cc_dep1, sl.regs.cc_dep2, sl.regs.cc_ndep # u_bkp
if sl.arch.name in ("AMD64", "X86"):
n_flags = sr.regs.eflags.canonicalize(var_map=n_map, counter=n_counter)[-1]
u_flags = sl.regs.eflags.canonicalize(var_map=u_map, counter=u_counter)[-1]
Expand Down
6 changes: 5 additions & 1 deletion angr/analyses/datagraph_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
l = logging.getLogger(name=__name__)


class DataGraphError(Exception):
pass


class DataGraphMeta:
def __init__(self):
self._p = None
Expand Down Expand Up @@ -71,7 +75,7 @@ def _branch(self, live_defs, node, path=""):
l.debug("--> Branch: running block 0x%x" % irsb.addr)
block = self._make_block(irsb, live_defs)
self._imarks.update(block._imarks)
if block.stop == True:
if block.stop is True:
# l.debug(" ### Stopping at block 0x%x" % (irsb.addr))
l.debug(" ### End of path %s" % path)
return irsb.addr
Expand Down
8 changes: 5 additions & 3 deletions angr/analyses/ddg.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def __eq__(self, other):
return type(other) is AST and other.op == self.op and other.operands == self.operands

def __repr__(self):
_short_repr = lambda a: a.short_repr
def _short_repr(a):
return a.short_repr

if len(self.operands) == 1:
return f"{self.op}{_short_repr(self.operands[0])}"
Expand Down Expand Up @@ -479,7 +480,8 @@ class DDG(Analysis):
def __init__(self, cfg, start=None, call_depth=None, block_addrs=None):
"""
:param cfg: Control flow graph. Please make sure each node has an associated `state` with it, e.g. by
passing the keep_state=True and state_add_options=angr.options.refs arguments to CFGEmulated.
passing the keep_state=True and state_add_options=angr.options.refs arguments to
CFGEmulated.
:param start: An address, Specifies where we start the generation of this data dependence graph.
:param call_depth: None or integers. A non-negative integer specifies how deep we would like to track in the
call tree. None disables call_depth limit.
Expand Down Expand Up @@ -1510,7 +1512,7 @@ def _build_function_dependency_graphs(self):

if dst.block_addr in block_addr_to_func:
dst_target_func = block_addr_to_func[dst.block_addr]
if not dst_target_func is src_target_func:
if dst_target_func is not src_target_func:
self._function_data_dependencies[dst_target_func].add_edge(src, dst, **data)

#
Expand Down
2 changes: 1 addition & 1 deletion angr/analyses/decompiler/callsite_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _analyze(self):

last_stmt = self.block.statements[-1]

if not type(last_stmt) is Stmt.Call:
if type(last_stmt) is not Stmt.Call:
self.result_block = self.block
return

Expand Down
2 changes: 1 addition & 1 deletion angr/analyses/decompiler/clinic.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ def _convert(self, block_node):
:rtype: ailment.Block
"""

if not type(block_node) is BlockNode:
if type(block_node) is not BlockNode:
return block_node

block = self.project.factory.block(block_node.addr, block_node.size)
Expand Down
7 changes: 5 additions & 2 deletions angr/analyses/decompiler/decompiler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# pylint:disable=unused-import
import logging
from collections import defaultdict
from typing import List, Tuple, Optional, Iterable, Union, Type, Set, Dict, Any
from typing import List, Tuple, Optional, Iterable, Union, Type, Set, Dict, Any, TYPE_CHECKING

from cle import SymbolType
import ailment
Expand All @@ -19,6 +19,8 @@
from .decompilation_options import DecompilationOption
from .decompilation_cache import DecompilationCache

if TYPE_CHECKING:
from .peephole_optimizations.base import PeepholeOptimizationStmtBase, PeepholeOptimizationExprBase

l = logging.getLogger(name=__name__)

Expand Down Expand Up @@ -135,7 +137,8 @@ def _decompile(self):
cache.binop_operators = binop_operators

# convert function blocks to AIL blocks
progress_callback = lambda p, **kwargs: self._update_progress(p * (70 - 5) / 100.0 + 5, **kwargs)
def progress_callback(p, **kwargs):
return self._update_progress(p * (70 - 5) / 100.0 + 5, **kwargs)

if self._regen_clinic or old_clinic is None or self.func.prototype is None:
clinic = self.project.analyses.Clinic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ def _ail_handle_Add(self, expr):
return super()._ail_handle_Add(expr)

def _match_signed_division_add_operands(self, op0, op1):
# From: Add((Conv(64->32, ((Load(addr=stack_base+4, size=4, endness=Iend_LE) Mulls 0x55555556<32>) >> 0x20<8>)) >> 0x1f<8>),
# From: Add((Conv(64->32, ((Load(addr=stack_base+4, size=4, endness=Iend_LE) Mulls 0x55555556<32>)
# >> 0x20<8>)) >> 0x1f<8>),
# Conv(64->32, ((Load(addr=stack_base+4, size=4, endness=Iend_LE) Mulls 0x55555556<32>) >> 0x20<8>)))
# To: Load(addr=stack_base+4, size=4, endness=Iend_LE) /s 3

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ailment.expression import Convert, BinaryOp, Const
from ailment.expression import BinaryOp, Const

from .base import PeepholeOptimizationExprBase

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import math

from ailment.expression import Convert, BinaryOp, Const

from .base import PeepholeOptimizationExprBase
Expand Down
12 changes: 6 additions & 6 deletions angr/analyses/decompiler/region_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -940,8 +940,8 @@ def _region_out_edges(graph, region, data=False):
return out_edges

def _remove_node(self, graph: networkx.DiGraph, node): # pylint:disable=no-self-use
in_edges = [(src, dst, data) for (src, dst, data) in graph.in_edges(node, data=True) if not src is node]
out_edges = [(src, dst, data) for (src, dst, data) in graph.out_edges(node, data=True) if not dst is node]
in_edges = [(src, dst, data) for (src, dst, data) in graph.in_edges(node, data=True) if src is not node]
out_edges = [(src, dst, data) for (src, dst, data) in graph.out_edges(node, data=True) if dst is not node]

if len(in_edges) <= 1 and len(out_edges) <= 1:
# it forms a region by itself :-)
Expand Down Expand Up @@ -988,8 +988,8 @@ def _merge_nodes(
dst = new_node
graph.add_edge(new_node, dst, **data)

assert not node_a in graph
assert not node_b in graph
assert node_a not in graph
assert node_b not in graph

def _absorb_node(
self, graph: networkx.DiGraph, node_mommy, node_kiddie, force_multinode=False
Expand Down Expand Up @@ -1028,8 +1028,8 @@ def _absorb_node(
dst = new_node
graph.add_edge(new_node, dst, **data)

assert not node_mommy in graph
assert not node_kiddie in graph
assert node_mommy not in graph
assert node_kiddie not in graph

def _ensure_jump_at_loop_exit_ends(self, node: Union[Block, MultiNode]) -> None:
if isinstance(node, Block):
Expand Down
Loading

0 comments on commit 5cbc3b0

Please sign in to comment.