Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code quality improvements #63

Merged
merged 3 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
if line.startswith("version = "):
line = f"version = {ref_name}\n"
f.write(line)
except:
except Exception:
pass
# HACK: support [options].develop_requires install development dependencies
if "develop" in sys.argv:
Expand Down
54 changes: 27 additions & 27 deletions src/dumpulator/dumpulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def _find_thread(self, thread_id):
thread = self._minidump.threads.threads[i]
if thread.ThreadId == thread_id:
return thread
raise Exception(f"Thread 0x{thread_id:x} ({thread_id}) not found!")
raise Exception(f"Thread {hex(thread_id)} ({thread_id}) not found!")

def debug(self, message: str):
if self._debug:
Expand Down Expand Up @@ -419,7 +419,7 @@ def _setup_memory(self):
seg: minidump.MinidumpMemorySegment
for seg in self._minidump.memory_segments_64.memory_segments:
emu_addr = seg.start_virtual_address & self.addr_mask
self.debug(f"initialize base: 0x{emu_addr:x}, size: 0x{seg.size:x}")
self.debug(f"initialize base: {hex(emu_addr)}, size: {hex(seg.size)}")
memory.move(seg.start_virtual_address)
assert memory.current_position == seg.start_virtual_address
data = memory.read(seg.size)
Expand Down Expand Up @@ -508,11 +508,11 @@ def _setup_pebteb(self, thread):
if self.wow64:
self.memory.set_region_info(self.peb - PAGE_SIZE, "WoW64 PEB", size=PAGE_SIZE)

self.info(f"TEB: 0x{self.teb:x}, PEB: 0x{self.peb:x}")
self.info(f" ConsoleHandle: 0x{self.console_handle:x}")
self.info(f" StandardInput: 0x{self.stdin_handle:x}")
self.info(f" StandardOutput: 0x{self.stdout_handle:x}")
self.info(f" StandardError: 0x{self.stderr_handle:x}")
self.info(f"TEB: {hex(self.teb)}, PEB: {hex(self.peb)}")
self.info(f" ConsoleHandle: {hex(self.console_handle)}")
self.info(f" StandardInput: {hex(self.stdin_handle)}")
self.info(f" StandardOutput: {hex(self.stdout_handle)}")
self.info(f" StandardError: {hex(self.stderr_handle)}")

process_heaps = []
for i in range(0, min(number_of_heaps, 0x1000)):
Expand Down Expand Up @@ -785,7 +785,7 @@ def _setup_syscalls(self):
syscalls.append((export.address, export.name))
elif export.name == "Wow64Transition":
patch_addr = self.read_ptr(export.address)
self.info(f"Patching Wow64Transition: {export.address:x} -> {patch_addr:x}")
self.info(f"Patching Wow64Transition: {hex(export.address)} -> {hex(patch_addr)}")
# See: https://opcode0x90.wordpress.com/2007/05/18/kifastsystemcall-hook/
# mov edx, esp; sysenter; ret
KiFastSystemCall = b"\x8B\xD4\x0F\x34\x90\x90\xC3"
Expand Down Expand Up @@ -921,7 +921,7 @@ def handle_exception(self):

csp = self.regs.csp - allocation_size
self.write(csp, allocation_size * b"\x69") # fill stuff with 0x69 for debugging
self.info(f"old csp: {self.regs.csp:x}, new csp: {csp:x}")
self.info(f"old csp: {hex(self.regs.csp)}, new csp: {hex(csp)}")
context_size = ctypes.sizeof(context_type)
context = context_type.from_buffer(self.read(csp, context_size))
context.ContextFlags = context_flags
Expand Down Expand Up @@ -1011,12 +1011,12 @@ def start(self, begin, end=0xffffffffffffffff, count=0) -> None:

if self.exception.type == ExceptionType.Terminate:
if self.exit_code is not None:
self.info(f"exit code: {self.exit_code:x}")
self.info(f"exit code: {hex(self.exit_code)}")
break

try:
emu_begin = self.handle_exception()
except:
except Exception:
traceback.print_exc()
self.error(f"exception during exception handling (stack overflow?)")
break
Expand All @@ -1039,12 +1039,12 @@ def start(self, begin, end=0xffffffffffffffff, count=0) -> None:
emu_until = 0xffffffffffffffff
emu_count = self.exception.tb_icount + 1

self.info(f"emu_start({emu_begin:x}, {emu_until:x}, {emu_count})")
self.info(f"emu_start({hex(emu_begin)}, {hex(emu_until)}, {emu_count})")
self.kill_me = None
self._uc.emu_start(emu_begin, until=emu_until, count=emu_count)
self.info(f'emulation finished, cip = {self.regs.cip:x}')
self.info(f'emulation finished, cip = {hex(self.regs.cip)}')
if self.exit_code is not None:
self.info(f"exit code: {self.exit_code:x}")
self.info(f"exit code: {hex(self.exit_code)}")
break
except UcError as err:
if self.kill_me is not None and type(self.kill_me) is not UcError:
Expand All @@ -1053,7 +1053,7 @@ def start(self, begin, end=0xffffffffffffffff, count=0) -> None:
# Handle the exception outside of the except handler
continue
else:
self.error(f'error: {err}, cip = {self.regs.cip:x}')
self.error(f'error: {err}, cip = {hex(self.regs.cip)}')
traceback.print_exc()
break

Expand All @@ -1062,7 +1062,7 @@ def stop(self, exit_code=None) -> None:
self.exit_code = None
if exit_code is not None:
self.exit_code = int(exit_code)
except:
except Exception:
traceback.print_exc()
self.error("Invalid type passed to exit_code!")
self._uc.emu_stop()
Expand Down Expand Up @@ -1204,7 +1204,7 @@ def load_dll(self, file_name: str, file_data: bytes):

def _hook_code_exception(uc: Uc, address, size, dp: Dumpulator):
try:
dp.info(f"exception step: {address:x}[{size}]")
dp.info(f"exception step: {hex(address)}[{size}]")
ex = dp.exception
ex.step_count += 1
if ex.step_count >= ex.tb_icount:
Expand All @@ -1220,7 +1220,7 @@ def _hook_mem(uc: Uc, access, address, size, value, dp: Dumpulator):

fetch_accesses = [UC_MEM_FETCH, UC_MEM_FETCH_PROT, UC_MEM_FETCH_UNMAPPED]
if access == UC_MEM_FETCH_UNMAPPED and address >= FORCE_KILL_ADDR - 0x10 and address <= FORCE_KILL_ADDR + 0x10 and dp.kill_me is not None:
dp.error(f"forced exit memory operation {access} of {address:x}[{size:x}] = {value:X}")
dp.error(f"forced exit memory operation {access} of {hex(address)}[{hex(size)}] = {hex(value)}")
return False
if dp.exception.final and access in fetch_accesses:
dp.info(f"fetch from {hex(address)}[{size}] already reported")
Expand All @@ -1245,11 +1245,11 @@ def _hook_mem(uc: Uc, access, address, size, value, dp: Dumpulator):
final = dp.trace or dp.exception.code_hook_h is not None
info = "final" if final else "initial"
if access == UC_MEM_READ_UNMAPPED:
dp.error(f"{info} unmapped read from {address:x}[{size:x}], cip = {dp.regs.cip:x}, exception: {exception}")
dp.error(f"{info} unmapped read from {hex(address)}[{hex(size)}], cip = {hex(dp.regs.cip)}, exception: {exception}")
elif access == UC_MEM_WRITE_UNMAPPED:
dp.error(f"{info} unmapped write to {address:x}[{size:x}] = {value:x}, cip = {dp.regs.cip:x}")
dp.error(f"{info} unmapped write to {hex(address)}[{hex(size)}] = {hex(value)}, cip = {hex(dp.regs.cip)}")
elif access == UC_MEM_FETCH_UNMAPPED:
dp.error(f"{info} unmapped fetch of {address:x}[{size:x}], cip = {dp.regs.rip:x}, cs = {dp.regs.cs:x}")
dp.error(f"{info} unmapped fetch of {hex(address)}[{hex(size)}], cip = {hex(dp.regs.rip)}, cs = {hex(dp.regs.cs)}")
else:
names = {
UC_MEM_READ: "UC_MEM_READ", # Memory is read from
Expand All @@ -1263,7 +1263,7 @@ def _hook_mem(uc: Uc, access, address, size, value, dp: Dumpulator):
UC_MEM_FETCH_PROT: "UC_MEM_FETCH_PROT", # Fetch from non-executable, but mapped, memory
UC_MEM_READ_AFTER: "UC_MEM_READ_AFTER", # Memory is read from (successful access)
}
dp.error(f"{info} unsupported access {names.get(access, str(access))} of {address:x}[{size:x}] = {value:X}, cip = {dp.regs.cip:x}")
dp.error(f"{info} unsupported access {names.get(access, str(access))} of {hex(address)}[{hex(size)}] = {hex(value)}, cip = {hex(dp.regs.cip)}")

if final:
# Make sure this is the same exception we expect
Expand Down Expand Up @@ -1360,14 +1360,14 @@ def _hook_code(uc: Uc, address, size, dp: Dumpulator):
elif module:
address_name = " " + module

line = f"0x{address:x}{address_name}|"
line = f"{hex(address)}{address_name}|"
if instr is not None:
line += instr.mnemonic
if instr.op_str:
line += " "
line += instr.op_str
for reg in _get_regs(instr):
line += f"|{reg}=0x{dp.regs.__getattr__(reg):x}"
line += f"|{reg}={hex(dp.regs.__getattr__(reg))}"
if instr.mnemonic in {"syscall", "sysenter"}:
line += f"|sequence_id=[{dp.sequence_id}]"
else:
Expand Down Expand Up @@ -1440,7 +1440,7 @@ def _hook_interrupt(uc: Uc, number, dp: Dumpulator):
description = interrupt_names[number]
else:
description = f"IRQ {number - 32}"
dp.error(f"interrupt {number} ({description}), cip = {dp.regs.cip:x}, cs = {dp.regs.cs:x}")
dp.error(f"interrupt {number} ({description}), cip = {hex(dp.regs.cip)}, cs = {hex(dp.regs.cs)}")

# There should not be an exception active
assert dp.exception.type == ExceptionType.NoException
Expand Down Expand Up @@ -1519,7 +1519,7 @@ def syscall_arg(index):
dp.exception = status
raise dp.raise_kill(UcError(UC_ERR_EXCEPTION)) from None
else:
dp.info(f"status = {status:x}")
dp.info(f"status = {hex(status)}")
dp.regs.cax = status
if dp.x64:
dp.regs.rcx = dp.regs.cip + 2
Expand Down Expand Up @@ -1560,7 +1560,7 @@ def _hook_invalid(uc: Uc, dp: Dumpulator):
if dp.kill_me:
dp.error(f"terminating emulation...")
return False
dp.error(f"invalid instruction at {address:x}")
dp.error(f"invalid instruction at {hex(address)}")
try:
code = dp.read(address, 15)
instr = next(dp.cs.disasm(code, address, 1))
Expand Down
Loading