diff --git a/patcherex/techniques/indirectcfi.py b/patcherex/techniques/indirectcfi.py index e1d1844..bb3d1be 100644 --- a/patcherex/techniques/indirectcfi.py +++ b/patcherex/techniques/indirectcfi.py @@ -48,7 +48,7 @@ def is_mainbin_call(self,addr,ff): # but we do not apply indirectcfi if we find an allocate of executable memory return True - baddr = self.patcher.cfg.get_any_node(addr,anyaddr=True).addr + baddr = self.patcher.cfg.model.get_any_node(addr,anyaddr=True).addr if baddr == None: return False call_sites = ff.get_call_sites() diff --git a/patcherex/techniques/malloc_ext_patcher.py b/patcherex/techniques/malloc_ext_patcher.py index d32698e..59a142e 100644 --- a/patcherex/techniques/malloc_ext_patcher.py +++ b/patcherex/techniques/malloc_ext_patcher.py @@ -70,7 +70,7 @@ def get_reg_free_map(self): # map all basic block addresses in the function to which regs are read or written reg_free_map = dict() reg_not_free_map = dict() - for n in self.patcher.cfg.nodes(): + for n in self.patcher.cfg.model.nodes(): assert n.addr not in reg_free_map #no duplicated nodes assert n.addr != 0 #no weird nodes @@ -118,7 +118,7 @@ def is_reg_free(self,addr,reg,ignore_current_bb,debug=False): return False def is_last_returning_block(self,node): - node = self.patcher.cfg.get_any_node(node.addr) + node = self.patcher.cfg.model.get_any_node(node.addr) try: function = self.patcher.cfg.functions[node.function_address] except KeyError: @@ -129,7 +129,7 @@ def is_last_returning_block(self,node): return False def last_block_to_return_locations(self,addr): - node = self.patcher.cfg.get_any_node(addr) + node = self.patcher.cfg.model.get_any_node(addr) if node is None: return [] function = self.patcher.cfg.functions[node.function_address] @@ -138,14 +138,14 @@ def last_block_to_return_locations(self,addr): return_locations = [] for site in self.inv_callsites[function.addr]: - node = self.patcher.cfg.get_any_node(site) - nlist = self.patcher.cfg.get_successors_and_jumpkind(node, excluding_fakeret=False) + node = self.patcher.cfg.model.get_any_node(site) + nlist = self.patcher.cfg.model.get_successors_and_jumpkind(node, excluding_fakeret=False) return_locations.extend([n[0] for n in nlist if n[1]=='Ijk_FakeRet']) return return_locations def get_all_succ(self,addr): cfg = self.patcher.cfg - all_nodes = cfg.get_all_nodes(addr) + all_nodes = cfg.model.get_all_nodes(addr) if len(all_nodes) != 1: raise CfgError() n = all_nodes[0] @@ -155,7 +155,7 @@ def get_all_succ(self,addr): return [n.addr for n in self.last_block_to_return_locations(addr)], False all_succ = set() - for s, jk in cfg.get_successors_and_jumpkind(n): + for s, jk in cfg.model.get_successors_and_jumpkind(n): if not jk.startswith("Ijk_Sys"): all_succ.add(s.addr) # a syscall writes in eax, I do not handle it explicitly diff --git a/patcherex/techniques/noflagprintf.py b/patcherex/techniques/noflagprintf.py index 6f5a963..5b0f710 100644 --- a/patcherex/techniques/noflagprintf.py +++ b/patcherex/techniques/noflagprintf.py @@ -60,7 +60,8 @@ def hash_str(tstr): def ro_segments(self): if self._ro_segments is None: self._ro_segments = tuple( - seg for seg in self.patcher.project.loader.main_object.segments if seg.is_readable and not seg.is_writable + seg for seg in self.patcher.project.loader.main_object.segments + if seg.is_readable and not seg.is_writable ) return self._ro_segments @@ -79,7 +80,8 @@ def get_patches(self): continue fmt_arg_pos = PRINTF_VARIANTS[func_name] - callers = set.union(set(), *(cfg.get_predecessors(node) for node in cfg.get_all_nodes(func.addr))) + callers = set.union(set(), *(cfg.model.get_predecessors(node) + for node in cfg.model.get_all_nodes(func.addr))) handled_addrs = set() func_to_cfg = {} diff --git a/patcherex/techniques/simple_ptr_enc.py b/patcherex/techniques/simple_ptr_enc.py index 0a81452..25d579c 100644 --- a/patcherex/techniques/simple_ptr_enc.py +++ b/patcherex/techniques/simple_ptr_enc.py @@ -1172,7 +1172,7 @@ def _generate_syscall_patches(self, cfg, syscall_name, argument_indices_in, argu if syscall is None: return patches - predecessors = cfg.get_any_node(syscall.addr).predecessors + predecessors = cfg.model.get_any_node(syscall.addr).predecessors for pred in predecessors: # it must ends with int 80h last_instr_addr = pred.instruction_addrs[-1] diff --git a/patcherex/techniques/stackretencryption.py b/patcherex/techniques/stackretencryption.py index f949380..f5ace60 100644 --- a/patcherex/techniques/stackretencryption.py +++ b/patcherex/techniques/stackretencryption.py @@ -176,7 +176,7 @@ def add_stackretencryption_to_function(self,start,ends): tailp = [] for i,e in enumerate(ends): - bb_addr = self.patcher.cfg.get_any_node(e,anyaddr=True).addr + bb_addr = self.patcher.cfg.model.get_any_node(e,anyaddr=True).addr code = self.add_patch_at_bb(bb_addr,is_tail=True) tailp.append(InsertCodePatch(e,code,name="stackretencryption_tail_%d_%d_%#x"%(self.npatch,i,start),priority=100)) @@ -216,7 +216,7 @@ def function_to_patch_locations(self,ff): return None, None def is_last_returning_block(self,node): - node = self.patcher.cfg.get_any_node(node.addr) + node = self.patcher.cfg.model.get_any_node(node.addr) try: function = self.patcher.cfg.functions[node.function_address] except KeyError: @@ -227,7 +227,7 @@ def is_last_returning_block(self,node): return False def last_block_to_return_locations(self,addr): - node = self.patcher.cfg.get_any_node(addr) + node = self.patcher.cfg.model.get_any_node(addr) if node == None: return [] function = self.patcher.cfg.functions[node.function_address] @@ -236,8 +236,8 @@ def last_block_to_return_locations(self,addr): return_locations = [] for site in self.inv_callsites[function.addr]: - node = self.patcher.cfg.get_any_node(site) - nlist = self.patcher.cfg.get_successors_and_jumpkind(node, excluding_fakeret=False) + node = self.patcher.cfg.model.get_any_node(site) + nlist = self.patcher.cfg.model.get_successors_and_jumpkind(node, excluding_fakeret=False) return_locations.extend([n[0] for n in nlist if n[1]=='Ijk_FakeRet']) return return_locations @@ -264,7 +264,7 @@ def get_reg_free_map(self): # map all basic block addresses in the function to which regs are read or written reg_free_map = dict() reg_not_free_map = dict() - for n in self.patcher.cfg.nodes(): + for n in self.patcher.cfg.model.nodes(): if self.patcher.project.is_hooked(n.addr): continue @@ -302,7 +302,7 @@ def get_reg_free_map(self): def get_all_succ(self,addr): cfg = self.patcher.cfg - all_nodes = cfg.get_all_nodes(addr) + all_nodes = cfg.model.get_all_nodes(addr) if len(all_nodes) != 1: raise CfgError() n = all_nodes[0] @@ -312,7 +312,7 @@ def get_all_succ(self,addr): return [n.addr for n in self.last_block_to_return_locations(addr)], False all_succ = set() - for s, jk in cfg.get_successors_and_jumpkind(n): + for s, jk in cfg.model.get_successors_and_jumpkind(n): if not jk.startswith("Ijk_Sys"): all_succ.add(s.addr) # a syscall writes in eax, I do not handle it explicitly @@ -370,7 +370,8 @@ def _func_is_safe(self, ident, func): return True # skip functions that have enough predecessors - if len(self.patcher.cfg.get_predecessors(self.patcher.cfg.get_any_node(func.addr))) > self.safe_calls_limit: + predecessors = self.patcher.cfg.model.get_predecessors(self.patcher.cfg.model.get_any_node(func.addr)) + if len(predecessors) > self.safe_calls_limit: return True is_safe = True diff --git a/patcherex/techniques/transmitprotection.py b/patcherex/techniques/transmitprotection.py index ae5b896..df53968 100644 --- a/patcherex/techniques/transmitprotection.py +++ b/patcherex/techniques/transmitprotection.py @@ -139,7 +139,7 @@ def get_patches(self): l.warning("Found %d transmit_wrapper... better not to touch anything"%len(transmit_wrapper)) return [] transmit_wrapper = transmit_wrapper[0] - victim_node = cfg.get_any_node(transmit_wrapper.addr) + victim_node = cfg.model.get_any_node(transmit_wrapper.addr) victim_addr = int(victim_node.instruction_addrs[-1]) patches.extend(self.compute_patches(victim_addr)) diff --git a/patcherex/techniques/uninitialized_patcher.py b/patcherex/techniques/uninitialized_patcher.py index e6a7ab8..8da7539 100644 --- a/patcherex/techniques/uninitialized_patcher.py +++ b/patcherex/techniques/uninitialized_patcher.py @@ -72,7 +72,7 @@ def get_reg_free_map(self): # map all basic block addresses in the function to which regs are read or written reg_free_map = dict() reg_not_free_map = dict() - for n in self.patcher.cfg.nodes(): + for n in self.patcher.cfg.model.nodes(): if self.patcher.project.is_hooked(n.addr): continue @@ -113,7 +113,8 @@ def _should_skip(self, ff): return True if cfg_utils.is_floatingpoint_function(self.patcher, ff): return True - all_pred_addrs = set(x.addr for x in self.patcher.cfg.get_predecessors(self.patcher.cfg.get_any_node(ff.addr))) + predecessors = self.patcher.cfg.model.get_predecessors(self.patcher.cfg.model.get_any_node(ff.addr)) + all_pred_addrs = set(x.addr for x in predecessors) if len(all_pred_addrs) > 5: return True @@ -143,7 +144,7 @@ def is_reg_free(self,addr,reg,ignore_current_bb,debug=False): return False def is_last_returning_block(self,node): - node = self.patcher.cfg.get_any_node(node.addr) + node = self.patcher.cfg.model.get_any_node(node.addr) try: function = self.patcher.cfg.functions[node.function_address] except KeyError: @@ -154,7 +155,7 @@ def is_last_returning_block(self,node): return False def last_block_to_return_locations(self,addr): - node = self.patcher.cfg.get_any_node(addr) + node = self.patcher.cfg.model.get_any_node(addr) if node is None: return [] function = self.patcher.cfg.functions[node.function_address] @@ -163,14 +164,14 @@ def last_block_to_return_locations(self,addr): return_locations = [] for site in self.inv_callsites[function.addr]: - node = self.patcher.cfg.get_any_node(site) - nlist = self.patcher.cfg.get_successors_and_jumpkind(node, excluding_fakeret=False) + node = self.patcher.cfg.model.get_any_node(site) + nlist = self.patcher.cfg.model.get_successors_and_jumpkind(node, excluding_fakeret=False) return_locations.extend([n[0] for n in nlist if n[1]=='Ijk_FakeRet']) return return_locations def get_all_succ(self,addr): cfg = self.patcher.cfg - all_nodes = cfg.get_all_nodes(addr) + all_nodes = cfg.model.get_all_nodes(addr) if len(all_nodes) != 1: raise CfgError() n = all_nodes[0] @@ -180,7 +181,7 @@ def get_all_succ(self,addr): return [n.addr for n in self.last_block_to_return_locations(addr)], False all_succ = set() - for s, jk in cfg.get_successors_and_jumpkind(n): + for s, jk in cfg.model.get_successors_and_jumpkind(n): if not jk.startswith("Ijk_Sys"): all_succ.add(s.addr) # a syscall writes in eax, I do not handle it explicitly @@ -278,7 +279,7 @@ def _handle_func(self, ff): bl, seen, written = to_process.pop() seen.add(bl) - cfg_node = self.patcher.cfg.get_any_node(bl.addr) + cfg_node = self.patcher.cfg.model.get_any_node(bl.addr) if not cfg_node: continue insts = cfg_node.instruction_addrs diff --git a/tests/test_cfg.py b/tests/test_cfg.py index 8438f31..f636859 100755 --- a/tests/test_cfg.py +++ b/tests/test_cfg.py @@ -59,7 +59,7 @@ def last_block_to_callers(addr,cfg,inv_callsites): return_locations = [] for site in inv_callsites[function.addr]: node = cfg.model.get_any_node(site) - nlist = cfg.get_successors_and_jumpkind(node, excluding_fakeret=False) + nlist = cfg.model.get_successors_and_jumpkind(node, excluding_fakeret=False) return_locations.extend([n[0] for n in nlist if n[1]=='Ijk_FakeRet']) return return_locations diff --git a/tests/test_techniques.py b/tests/test_techniques.py index 7c55bc4..038a7fb 100755 --- a/tests/test_techniques.py +++ b/tests/test_techniques.py @@ -1324,6 +1324,8 @@ def test_countdown_1(BackendClass, data_fallback, try_pdf_removal): pipe = subprocess.PIPE p = subprocess.Popen([tmp_file], stdin=pipe, stdout=pipe, stderr=pipe) res = p.communicate(b"foo\ntest\n") + print(f'got: {res[0]}') + print(f'expected: {expected_output}') assert expected_output == res[0]