diff --git a/README.md b/README.md index 54d1d67..24c2c36 100644 --- a/README.md +++ b/README.md @@ -18,9 +18,9 @@ AndroidMemoryTool .This Tool is written in python using ctypes not affective as c. If you find any bug or not working function you can contact me. - * Date : 2022/03/23 + * Date : 2023/07/11 * Author : **__Abdul Moez__** - * Version : 0.4 + * Version : 0.5 * Study : UnderGraduate in GCU Lahore, Pakistan * Repository : https://github.com/Anonym0usWork1221/android-memorytool * Documentation: https://github.com/Anonym0usWork1221/android-memorytool/tree/main/Documentation @@ -30,31 +30,39 @@ If you find any bug or not working function you can contact me. Copyright (c) 2022 AbdulMoez +----------- + # Note - 1. This documentation is for 0.4 version (UPDATED) + 1. This documentation is for 0.5 version (UPDATED) 2. You can find old version on pypi if you want to use them -# Version 0.4 - -> Optimized the code - -> Increase Stability - -> Fixed Known bugs: - -> Fixed Simple Text search returning same values as previous search problem - -> Fixed Increasing Values/address problem in Fast search algorithms - -> Fixed output pattern in raw dump - - -> Added Reset function for queue - -> Added Hex Pattern Finder 87 ?? BB - -> Added Hex search for (Float, Dword, Double) - -> Added support for fork() process (by default takes first pid as parent) - -> Added Manually PID entering support either in int or string (pid=714 or pid="714") - -> Can raw dump from Custom start_address-end_address ("4754D6E6-5754D6E6" or "client.so") - -> Added Known error files and its fixes in (ERRORS.md) file - -> Added Support to dump map file - -> Added UTF-8 and UTF-16E support for lib read/write - -> Added a Detailed Documentation File - -> Added Documentation string stub in AndroidMemoryTool class +----------- +# Version 0.5 + -> ----------------------------------------MOD-LOGS-------------------------------------------------- <- + + -> Fixed the invalid output of hex address in reading process + -> Fixed the read_lib and write_lib hex issue the now pass the hex as a + string in old method the calculation was generating errors due to hex value passed as integer + -> Optimize the code of DataClass.py + -> Fixed Initializers of some classes (e.g: ThreadingController and DataClasses) + -> Added Complete self explaining doc strings to all the classes and functions + -> Rewrite the Mapping class inorder to reduce junk size and correct the mapping of anonymous range. + -> Added new function of find_and_replace_hex_pattern() for search wild card and replace that in hex form + e.g: find_and_replace_hex_pattern(search_pattern='2D??3D', replace_pattern='1D4D2D') + -> Added CLI for command line interface + + -> ----------------------------------------TO-DO---------------------------------------------------- <- + + -> TODO: Add Reverse engineering Support for offline binaries using known disassemblers (capstone, keystone, r2pipe) + -> TODO: Add Assembly support for reading and writing memory at runtime + + -> ----------------------------------------SUGGESTIONS---------------------------------------------- <- + -> SUGGESTIONS: You can leave your suggestions either on my mailbox or discord server. + + +----------- Requirements ----------- @@ -63,21 +71,30 @@ Requirements * Android Requirements -> Rooted Device Needed +----------- + Installation ---------------------------------------- 1. **Simply install it by pip and use it in your project** - ``pip install androidMemoryTool==0.4`` + ``pip install androidMemoryTool==0.5`` 2. **Or by cloning and then run command** ``pip install .`` 3. **Project live at** - https://pypi.org/project/androidMemoryTool/0.4/ + https://pypi.org/project/androidMemoryTool/0.5/ Memory Tool with example which can be found in the `Android-Py-Cheats-Script @ 9d2520e`. +----------- +Video Demo +----------- +[![Video Demo](https://img.youtube.com/vi/Ivyy6GQzm3w/0.jpg)](https://www.youtube.com/watch?v=Ivyy6GQzm3w) + +----------- + ## Documentation * Getting Process ID @@ -173,6 +190,19 @@ for index in range(0, len(found_pattern[0])): print(f"{found_pattern[0][index]}: {found_pattern[2][index]}") print(f"Total Pattern found: {found_pattern[1]}") ``` + +* Find and replace hex Patterns +```python +from androidMemoryTool import AndroidMemoryTool +tool = AndroidMemoryTool(PKG=662, SPEED_MODE=True, WORKERS=55, + pMAP=AndroidMemoryTool.PMAP(ALL=True)) +found_pattern = tool.find_and_replace_hex_pattern("87 ?? 2B", "87 1D 2B") +for index in range(0, len(found_pattern[0])): + # address hex value + print(f"{found_pattern[0][index]}: {found_pattern[2][index]}") +print(f"Total Pattern found and replaced: {found_pattern[1]}") # returns number +``` + * Dump Maps ```python from androidMemoryTool import AndroidMemoryTool @@ -181,12 +211,160 @@ is_dumped = tool.dump_maps(path="./") print(is_dumped) ``` +----------- + +## Android Memory Tool CLI Documentation +``` +-> This Version is come with an exciting feature called CLI (command line interface) means you dont need to execute the code everytime just use the tools cli to do short work. +``` + +The Android Memory Tool CLI is a command-line interface for the Android Memory Tool. It provides various commands to interact with memory in Android applications. + +### Usage +* For Linux: +``` +python3 -m androidMemoryTool [options] +``` +* For Android: +Execute the tool with root privileges using `sudo`: +``` +sudo python3 -m androidMemoryTool [options] +``` + +* If you added the bin path of python libraries to environment variable then you can execute it directly +```` +amt [options] +```` +**and use sudo for android** + +### Available Commands +* `read_value`: Read a value from memory. +* `read_write_value`: Read and write a value in memory. +* `write_lib`: Write a value to a library. +* `read_lib`: Read a value from a library. +* `refiner_address`: Refine a list of addresses. +* `get_module_base_address`: Get the base address of a module. +* `raw_dump`: Dump a library as raw binary. +* `find_hex_pattern`: Find a hexadecimal pattern in memory. +* `find_and_replace_hex_pattern`: Find and replace a hexadecimal pattern in memory. +* `dump_maps`: Dump memory maps. +* `get_pid`: Return the PID of a process. +* `help`: Display help information. + +### Command-line Data Types +Pass them with just name as given below +* `DWORD` +* `FLOAT` +* `DOUBLE` +* `WORD` +* `BYTE` +* `QWORD` +* `XOR` +* `UTF_8` +* `UTF_16LE` + +### Command Details +You can get detailed information about each command and its usage by running: +``` +python3 -m androidMemoryTool help +``` +For example, to get help for the read_value command, run: +``` +python3 -m androidMemoryTool help read_value +``` + +### Examples +* Read a value from memory: +```` +python3 -m androidMemoryTool read_value +```` +Replace , , , , and with the appropriate values. + +* Read and write a value in memory: +```` +python3 -m androidMemoryTool read_write_value +```` +Replace , , , , , and with the appropriate values. + +* Write a value to a library: +```` +python3 -m androidMemoryTool write_lib +```` +Replace , , , , and with the appropriate values. + +* Read a value from a library: +```` +python3 -m androidMemoryTool read_lib [--value ] +```` +Replace , , , , and with the appropriate values. + +* Refine a list of addresses: +```` +python3 -m androidMemoryTool refiner_address +```` +Replace , , , , , and with the appropriate values. + +* Get the base address of a module: +```` +python3 -m androidMemoryTool get_module_base_address +```` +Replace and with the appropriate values. + +* Dump a library as raw binary: +```` +python3 -m androidMemoryTool raw_dump [] +```` +Replace , , and with the appropriate values. The argument is optional and defaults to the current directory. + +* Find a hexadecimal pattern in memory: +```` +python3 -m androidMemoryTool find_hex_pattern +```` +Replace , , , , and with the appropriate values. + +* Find and replace a hexadecimal pattern in memory: +```` +python3 -m androidMemoryTool find_and_replace_hex_pattern +```` +Replace , , , , , and with the appropriate values. + +* Dump memory maps: +```` +python3 -m androidMemoryTool dump_maps [--path ] +```` +Replace and with the appropriate values. The argument is optional and defaults to the current directory. + +* Return the PID of a process: +```` +python3 -m androidMemoryTool get_pid +```` +Replace with the appropriate package name. + +### Version +To get the version of the Android Memory Tool, use the following command: +```` +python3 -m androidMemoryTool -v +```` +### Help +To display general help information or help for a specific command, use the help command: +```` +python3 -m androidMemoryTool help [command] +```` +Replace [command] with the desired command to get help for that command. If no command is provided, general help information will be displayed. + + +----------- + # Detailed Documentation -You can found detailed documentation [here](https://github.com/Anonym0usWork1221/android-memorytool/tree/main/Documentation) +You can find detailed documentation [here](https://github.com/Anonym0usWork1221/android-memorytool/tree/main/Documentation) + +----------- # Errors Some known errors and their solutions can be found [here](https://github.com/Anonym0usWork1221/android-memorytool/blob/main/ERRORS.md) +----------- + Supported Data Types ------------------- @@ -225,12 +403,15 @@ Supported Map Ranges | B_Bad | Bad | Bad Memory (dangerous) | | CODE_SYSTEM | Code system | Code system memory (dangerous) | +----------- + # Contributor +----------- Assistance ---------- @@ -242,6 +423,7 @@ I also created a Discord group: * Server : https://discord.gg/RMNcqzmt9f +----------- Buy Me a coffee -------------- diff --git a/Tests/AndroidMemoryTool-Tests.py b/Tests/AndroidMemoryTool-Tests.py new file mode 100644 index 0000000..df26cfa --- /dev/null +++ b/Tests/AndroidMemoryTool-Tests.py @@ -0,0 +1,74 @@ +# from androidMemoryTool import AndroidMemoryTool + +# initialize tool. +# tool = AndroidMemoryTool(PKG="ac_client", TYPE=AndroidMemoryTool.DataTypes.DWORD, SPEED_MODE=True, WORKERS=55, +# pMAP=AndroidMemoryTool.PMAP(ALL=True)) +# print(tool.get_pid("ac_client")) +# values = tool.read_value(100) +# founded_offsets = values[0] +# +# refined_address = tool.refiner_address(list_address=founded_offsets, value_to_refine=50) +# tool.read_lib('0x0', '0x100') +# tool.write_lib('0x0', '0x0', 10) + +# pid = AndroidMemoryTool.get_pid("714") +# print(pid) + +# if you are reading you will get tuple of two values offset list and total values found + +# values = tool.read_value(100) +# founded_offsets = values[0] +# founded_values = values[1] +# print(founded_values) +# print(founded_offsets) + + +# if you are writing only return total value wrote +# values1 = tool.read_write_value(100, 10) +# print(values1) + + +# if you are reading lib offset only get the value at that place +# values1 = tool.read_lib(0x0, 0x100) +# print(values1) + + +# if you are writing lib offset only get the true/false at that place +# values1 = tool.write_lib(0x0, 0x100, 20) +# print(values1) + +# dump = tool.raw_dump('client.so', '/home/kali/Documents/') +# print(dump) + +# print(tool.read_value("19h")) + +# Manually entered pid +# tool = AndroidMemoryTool(PKG=714, TYPE=AndroidMemoryTool.DataTypes.DWORD, SPEED_MODE=True, WORKERS=55, +# pMAP=AndroidMemoryTool.PMAP(ALL=True)) +# test_value = tool.read_value(42) +# print(test_value) + +# to dump maps +# is_dumped = tool.dump_maps(path="./") +# print(is_dumped) + +# utf8 lib tests +# tool = AndroidMemoryTool(PKG=662, TYPE=AndroidMemoryTool.DataTypes.UTF_8, SPEED_MODE=True, WORKERS=55, +# pMAP=AndroidMemoryTool.PMAP(ALL=True)) +# off = tool.read_value("hi") +# print(off) + +# off = tool.read_lib(0x5590fb61d8e2, 0x0, "hi") +# print(off) + +# Hex pattern +# found_pattern = tool.find_hex_pattern("87 ?? 2B") +# for index in range(0, len(found_pattern[0])): +# print(f"{found_pattern[0][index]}: {found_pattern[2][index]}") +# print(f"Total Pattern found: {found_pattern[1]}") + +# Read Write Hex pattern +# found_pattern = tool.find_and_replace_hex_pattern("87 ?? 2B", "87 1D 2D") +# for index in range(0, len(found_pattern[0])): +# print(f"{found_pattern[0][index]}: {found_pattern[2][index]}") +# print(f"Total Pattern found and replaced: {found_pattern[1]}") diff --git a/Tests/Auto-Installation-Tools.py b/Tests/Auto-Installation-Tools.py new file mode 100644 index 0000000..8b16ff0 --- /dev/null +++ b/Tests/Auto-Installation-Tools.py @@ -0,0 +1,49 @@ + +import platform +import subprocess +import sys + + +def install_radare2(): + current_platform = platform.system() + + if current_platform == 'Linux': + print(f"Platform Detected as {current_platform}: Installing Additional tools as sudo." + f" By using apt-get method.") + subprocess.check_call(['sudo', 'apt-get', 'update']) + subprocess.check_call(['sudo', 'apt-get', 'install', 'radare2']) + + elif current_platform == 'Windows': + install_path = 'C:\\Radare2\\' + print(f"Platform Detected as {current_platform}: Installing Additional tools. Using Default directory as: " + f"{install_path}") + subprocess.check_call(['powershell', 'Invoke-WebRequest', '-Uri', + 'https://github.com/radareorg/radare2/releases/latest/download/radare2_windows.zip', + '-OutFile', 'radare2.zip']) + subprocess.check_call( + ['powershell', 'Expand-Archive', '-Path', 'radare2.zip', '-DestinationPath', install_path]) + subprocess.check_call(['del', 'radare2.zip'], shell=True) + subprocess.check_call(['setx', 'PATH', f'%PATH%;{install_path}']) + + elif current_platform == 'Android': + print(f"Platform Detected as {current_platform}: Installing Additional tools. Assuming the tool is termux.") + subprocess.check_call(['pkg', 'install', 'radare2']) + + else: + print(f"Platform Detected as {current_platform}: Auto-installation of Additional tools is unsupported.") + print("Please install Radare2 manually. And it is to environment variable") + sys.exit(1) + + +try: + import r2pipe +except ImportError: + print("r2pipe not found. Installing Radare2...") + install_radare2() + try: + import r2pipe + except ImportError: + print("Additional tools not found installing automatically...") + sys.exit(1) + +print("Radare2 and r2pipe successfully installed!") diff --git a/Tests/Some-Tools-Test-Usage.py b/Tests/Some-Tools-Test-Usage.py new file mode 100644 index 0000000..5492e41 --- /dev/null +++ b/Tests/Some-Tools-Test-Usage.py @@ -0,0 +1,23 @@ +import r2pipe + + +def disassemble_binary(binary_path): + r2 = r2pipe.open(binary_path) + r2.cmd("aaa") # Analyze all functions + + functions = r2.cmdj("aflj") # Get a list of functions + + pseudocode = "" + for function in functions: + r2.cmd("aaa") + address = function['offset'] + r2.cmd(f"s {address}") + pseudocode += r2.cmd(f"pdc\n") # Generate pseudocode for each function + + r2.quit() + return pseudocode + + +binary_path = "./c" +pseudocode = disassemble_binary(binary_path) +print(pseudocode) diff --git a/amt.py b/amt.py new file mode 100644 index 0000000..4135a74 --- /dev/null +++ b/amt.py @@ -0,0 +1,18 @@ +""" + * date : 2023/07/11 + * Version : 0.5 + * author : Abdul Moez (abdulmoez123456789@gmail.com) + * Study : UnderGraduate in GCU Lahore, Pakistan + * https://github.com/Anonym0usWork1221/android-memorytool + +""" + +# !/usr/bin/env python +from androidMemoryTool import AndroidMemoryToolCLI, AndroidMemoryTool + +def execute_cli(): + cli_tool = AndroidMemoryToolCLI(AndroidMemoryTool) + cli_tool.cli_handler() + +if __name__ == '__main__': + execute_cli() diff --git a/androidMemoryTool/DataClasses.py b/androidMemoryTool/DataClasses.py index 3d0263a..bd3ce47 100644 --- a/androidMemoryTool/DataClasses.py +++ b/androidMemoryTool/DataClasses.py @@ -1,18 +1,18 @@ """ - * date : 2022/03/23 - * Version : 0.4 + * date : 2023/07/11 + * Version : 0.5 * author : Abdul Moez (abdulmoez123456789@gmail.com) * Study : UnderGraduate in GCU Lahore, Pakistan * https://github.com/Anonym0usWork1221/android-memorytool """ +from subprocess import check_output, CalledProcessError +from .mapping import Mapping +from dataclasses import dataclass from struct import pack, unpack -from sys import byteorder -from os import popen from binascii import unhexlify -from dataclasses import dataclass -from .mapping import Mapping +from sys import byteorder class DataClasses: @@ -27,6 +27,10 @@ class DataClasses: @dataclass() class DataTypes: + """ + Data class that defines different data types used in the code. + """ + DWORD: str = "DWORD" FLOAT: str = "FLOAT" DOUBLE: str = "DOUBLE" @@ -39,6 +43,10 @@ class DataTypes: @dataclass() class PMAP: + """ + Data class that defines different options for memory mapping. + """ + ALL: bool = True B_BAD: bool = False C_ALLOC: bool = False @@ -54,83 +62,107 @@ class PMAP: CODE_APP: bool = False V_video: bool = False - def __int__(self): - pass + def __init__(self): + ... @staticmethod def get_pid(pkg: any((str, int))) -> str: - pid_id = None - if isinstance(pkg, int): - pkg = str(pkg) - pid_id = popen(f"ps -q {pkg} -o cmd=") - if pid_id.read() is None: - return "" - return str(pkg) + """ + Retrieves the process ID (PID) for the given package name or PID. + + Args: + pkg: The package name or PID. + + Returns: + The process ID (PID) as a string. + """ + pkg = str(pkg) + # if the pkg is numeric string if pkg.isnumeric(): - pkg = str(pkg) - pid_id = popen(f"ps -q {pkg} -o cmd=") - if pid_id.read() is None: - return "" - return str(pkg) - - try: - pid_id = popen("pidof {}".format(pkg)) - except Exception as e: - print("[*]Exception ", e) - - pid_decode = pid_id.read() - if pid_decode is not None: - process_id = pid_decode.replace("\n", "").split(" ") - if len(process_id) > 1: - print("[*] Multiple PIDS found using first PID: %s" % process_id[0]) - return str(process_id[0]) + try: + pid_id = check_output(["ps", "-q", pkg, "-o", "cmd="]) + return pkg if pid_id.decode().strip() is not None else "" + except CalledProcessError: + print("[-] Processes is not running") else: - return "" + try: + pid_id = check_output(["pidof", "-s", pkg]) # gets the first id if multiple ids found + process_ids = pid_id.decode().strip() + return str(process_ids) + except CalledProcessError: + print("[-] Processes is not running") + + return "" def init_setup(self, PKG: any((str, int)), TYPE=DataTypes.DWORD, SPEED_MODE=False, WORKERS=55) -> None: + """ + Initializes the setup with the specified package, data type, speed mode, and number of workers. + + Args: + PKG: The package name or PID. + TYPE: The data type. + SPEED_MODE: Flag indicating whether speed mode is enabled or not. + WORKERS: The number of workers. + + Returns: + None. + """ + self._DATA_TYPES = TYPE self._PKG_NAME = PKG self._IS_SPEED_MODE = SPEED_MODE self._TOTAL_WORKERS = WORKERS def identify_dict(self, P=PMAP()) -> list: + """ + Identifies the memory addresses based on the given memory mapping options. + + Args: + P: The memory mapping options. + + Returns: + A list of identified memory addresses. + """ + mapping_functions = { + "ALL": Mapping.mapping_all, + "B_BAD": Mapping.mapping_b, + "C_ALLOC": Mapping.mapping_ca, + "C_BSS": Mapping.mapping_cb, + "C_DATA": Mapping.mapping_cd, + "C_HEAP": Mapping.mapping_ch, + "JAVA_HEAP": Mapping.mapping_jh, + "A_ANONYMOUS": Mapping.mapping_a, + "CODE_SYSTEM": Mapping.mapping_xs, + "STACK": Mapping.mapping_s, + "ASHMEM": Mapping.mapping_as, + "J_Java": Mapping.mapping_j, + "CODE_APP": Mapping.mapping_xa, + "V_video": Mapping.mapping_v + } + wanted_ranges = [k for k, v in P.__dict__.items() if v] - address = [] - - for i in wanted_ranges: - if i == "ALL": - address = Mapping.mapping_all(self.get_pid(self._PKG_NAME))["address"] - return address - elif i == "B_BAD": - address.extend(Mapping.mapping_b(self.get_pid(self._PKG_NAME))["address"]) - elif i == "C_ALLOC": - address.extend(Mapping.mapping_ca(self.get_pid(self._PKG_NAME))["address"]) - elif i == "C_BSS": - address.extend(Mapping.mapping_cb(self.get_pid(self._PKG_NAME))["address"]) - elif i == "C_DATA": - address.extend(Mapping.mapping_cd(self.get_pid(self._PKG_NAME))["address"]) - elif i == "C_HEAP": - address.extend(Mapping.mapping_ch(self.get_pid(self._PKG_NAME))["address"]) - elif i == "JAVA_HEAP": - address.extend(Mapping.mapping_jh(self.get_pid(self._PKG_NAME))["address"]) - elif i == "A_ANONYMOUS": - address.extend(Mapping.mapping_a(self.get_pid(self._PKG_NAME))["address"]) - elif i == "CODE_SYSTEM": - address.extend(Mapping.mapping_xs(self.get_pid(self._PKG_NAME))["address"]) - elif i == "STACK": - address.extend(Mapping.mapping_s(self.get_pid(self._PKG_NAME))["address"]) - elif i == "ASHMEM": - address.extend(Mapping.mapping_as(self.get_pid(self._PKG_NAME))["address"]) - elif i == "J_Java": - address.extend(Mapping.mapping_j(self.get_pid(self._PKG_NAME))["address"]) - elif i == "CODE_APP": - address.extend(Mapping.mapping_xa(self.get_pid(self._PKG_NAME))["address"]) - elif i == "V_video": - address.extend(Mapping.mapping_v(self.get_pid(self._PKG_NAME))["address"]) - return address + address_set = set() + + for map_name in wanted_ranges: + if map_name in mapping_functions: + address_set.update(mapping_functions[map_name](self.get_pid(self._PKG_NAME))["address"]) + if map_name == "ALL": + break + + return list(address_set) def data_type_encoding(self, read: any, write=None) -> any: + """ + Encodes the data values based on the specified data type for searching or writing in memory. + + Args: + read: The value to be encoded for searching in memory. + write: The value to be encoded for writing in memory. + + Returns: + The encoded search value and replace value (if provided). + """ if self._DATA_TYPES == "DWORD": if write: if "h" in str(read).lower(): @@ -310,208 +342,141 @@ def data_type_encoding(self, read: any, write=None) -> any: return None def data_type_decoding(self, read: any, write=None) -> any: - if self._DATA_TYPES == "DWORD": - if write: - if byteorder == 'little': - search_value = unpack('i', read) - replace_value = unpack('>i', write) - - return search_value, replace_value - - else: - if byteorder == 'little': - search_value = unpack('i', read) - return search_value - - elif self._DATA_TYPES == "FLOAT": - if write: - if byteorder == 'little': - search_value = unpack('f', read) - replace_value = unpack('>f', write) - - return search_value, replace_value - - else: - if byteorder == 'little': - search_value = unpack('f', read) - return search_value - - elif self._DATA_TYPES == "DOUBLE": - if write: - if byteorder == 'little': - search_value = unpack('d', read) - replace_value = unpack('>d', write) - - return search_value, replace_value - - else: - if byteorder == 'little': - search_value = unpack('d', read) - return search_value - - elif self._DATA_TYPES == "WORD": - if write: - if byteorder == 'little': - search_value = unpack('h', read) - replace_value = unpack('>h', write) - - return search_value, replace_value - - else: - if byteorder == 'little': - search_value = unpack('i', read) - return search_value - - elif self._DATA_TYPES == "BYTE": - if write: - if byteorder == 'little': - search_value = unpack('b', read) - replace_value = unpack('>b', write) - - return search_value, replace_value - - else: - if byteorder == 'little': - search_value = unpack('b', read) - return search_value - - elif self._DATA_TYPES == "QWORD": - if write: - if byteorder == 'little': - search_value = unpack('q', read) - replace_value = unpack('>q', write) - - return search_value, replace_value - - else: - if byteorder == 'little': - search_value = unpack('q', read) - return search_value - - elif self._DATA_TYPES == "XOR": - if write: - if byteorder == 'little': - search_value = unpack('l', read) - replace_value = unpack('>l', write) - - return search_value, replace_value - - else: - if byteorder == 'little': - search_value = unpack('l', read) - return search_value - - elif self._DATA_TYPES == "UTF-8": - if write: + """ + Decodes the data values based on the specified data type after reading from memory. + + Args: + read: The value to be decoded after reading from memory. + write: The value to be decoded after writing to memory. + + Returns: + The decoded search value and replace value (if provided). + """ + + if write: + decode_func = unpack + search_value = replace_value = None + decode_format = "" + if self._DATA_TYPES == "DWORD": + decode_format = 'i' + elif self._DATA_TYPES == "FLOAT": + decode_format = 'f' + elif self._DATA_TYPES == "DOUBLE": + decode_format = 'd' + elif self._DATA_TYPES == "WORD": + decode_format = 'h' + elif self._DATA_TYPES == "BYTE": + decode_format = 'b' + elif self._DATA_TYPES == "QWORD": + decode_format = 'q' + elif self._DATA_TYPES == "XOR": + decode_format = 'l' + elif self._DATA_TYPES == "UTF-8": search_value = read.encode('utf-8') replace_value = write.encode('utf-8') - - return search_value, replace_value - - else: - search_value = read.encode('utf-8') - return search_value - - elif self._DATA_TYPES == "UTF-16LE": - if write: + elif self._DATA_TYPES == "UTF-16LE": search_value = read.encode('utf-16LE') replace_value = write.encode('utf-16LE') - - return search_value, replace_value - else: - search_value = read.encode('utf-16LE') - return search_value - - else: - return None - - def data_type_bytes(self) -> int: - if self._DATA_TYPES == "DWORD": - return 4 - - elif self._DATA_TYPES == "FLOAT": - return 4 + return None - elif self._DATA_TYPES == "DOUBLE": - return 8 - - elif self._DATA_TYPES == "WORD": - return 2 + if search_value is None: + search_value = decode_func(decode_format, read) + if replace_value is None: + replace_value = decode_func(decode_format, write) - elif self._DATA_TYPES == "BYTE": - return 1 + return search_value, replace_value + else: + decode_func = unpack + decode_format = None + search_value = None + + if self._DATA_TYPES == "DWORD": + decode_format = 'i' + elif self._DATA_TYPES == "FLOAT": + decode_format = 'f' + elif self._DATA_TYPES == "DOUBLE": + decode_format = 'd' + elif self._DATA_TYPES == "WORD": + decode_format = 'h' + elif self._DATA_TYPES == "BYTE": + decode_format = 'b' + elif self._DATA_TYPES == "QWORD": + decode_format = 'q' + elif self._DATA_TYPES == "XOR": + decode_format = 'l' + elif self._DATA_TYPES == "UTF-8": + search_value = read.encode('utf-8') + elif self._DATA_TYPES == "UTF-16LE": + search_value = read.encode('utf-16LE') + else: + return None - elif self._DATA_TYPES == "QWORD": - return 8 + if decode_format: + search_value = decode_func(decode_format, read) - elif self._DATA_TYPES == "XOR": - return 4 + return search_value - elif self._DATA_TYPES == "UTF-8": - return 0 + def data_type_bytes(self) -> int: + """ + Returns the number of bytes occupied by the current data type. + + Returns: + The number of bytes occupied by the current data type. + """ + data_type_sizes = { + "DWORD": 4, + "FLOAT": 4, + "DOUBLE": 8, + "WORD": 2, + "BYTE": 1, + "QWORD": 8, + "XOR": 4, + "UTF-8": 0, + "UTF-16LE": 0 + } + + return data_type_sizes.get(self._DATA_TYPES, -1) - elif self._DATA_TYPES == "UTF-16LE": - return 0 + def init_tool(self, pMAP=PMAP()) -> None: + """ + Initializes the memory tool with the specified memory mapping options. - else: - return -1 + Args: + pMAP: The memory mapping options. - def init_tool(self, pMAP=PMAP()) -> None: + Returns: + None. + """ self._MAPS_ADDR = self.identify_dict(pMAP) self._DATA_BYTE = self.data_type_bytes() self._PID = self.get_pid(self._PKG_NAME) def get_variables(self, is_speed=False, is_pkg=False, is_data_type=False, is_workers=False, is_map_addr=False, is_data_byte=False, is_pid=False) -> any: - if is_speed: - return self._IS_SPEED_MODE - elif is_pkg: - return self._PKG_NAME - elif is_workers: - return self._TOTAL_WORKERS - elif is_data_type: - return self._DATA_TYPES - elif is_map_addr: - return self._MAPS_ADDR - elif is_data_byte: - return self._DATA_BYTE - elif is_pid: - return self._PID - else: - return None + """ + Retrieves the specified variable based on the provided flags. + + Args: + is_speed: Flag indicating whether to retrieve the speed mode value. + is_pkg: Flag indicating whether to retrieve the package name value. + is_data_type: Flag indicating whether to retrieve the data type value. + is_workers: Flag indicating whether to retrieve the number of workers value. + is_map_addr: Flag indicating whether to retrieve the memory map addresses value. + is_data_byte: Flag indicating whether to retrieve the data type bytes value. + is_pid: Flag indicating whether to retrieve the process ID (PID) value. + + Returns: + The requested variable value. + """ + variables = { + is_speed: self._IS_SPEED_MODE, + is_pkg: self._PKG_NAME, + is_workers: self._TOTAL_WORKERS, + is_data_type: self._DATA_TYPES, + is_map_addr: self._MAPS_ADDR, + is_data_byte: self._DATA_BYTE, + is_pid: self._PID + } + + return variables.get(True, None) diff --git a/androidMemoryTool/ThreadingController.py b/androidMemoryTool/ThreadingController.py index 790269b..4658b83 100644 --- a/androidMemoryTool/ThreadingController.py +++ b/androidMemoryTool/ThreadingController.py @@ -1,46 +1,92 @@ """ - * date : 2022/03/23 - * Version : 0.4 + * date : 2023/07/11 + * Version : 0.5 * author : Abdul Moez (abdulmoez123456789@gmail.com) * Study : UnderGraduate in GCU Lahore, Pakistan * https://github.com/Anonym0usWork1221/android-memorytool """ -from concurrent.futures.thread import ThreadPoolExecutor - -from .DataClasses import DataClasses +from .additional_features import AdditionalFeatures from .search_and_writers import SearchAndWrite from .search_and_readers import SearchAndRead -from .additional_features import AdditionalFeatures +from .DataClasses import DataClasses +from concurrent.futures import ThreadPoolExecutor class FastSearchAlgo(DataClasses): + """A class that implements fast search algorithms for text, values, and pattern finding.""" + _SearchAndWriteObject = SearchAndWrite() _SearchAndReadObject = SearchAndRead() _AdditionalFeaturesObject = AdditionalFeatures() - def __int__(self): - super(FastSearchAlgo, self).__int__() + def __init__(self): + super(FastSearchAlgo, self).__init__() def fast_search_algorithms_text(self, pid: str, addr: list, read: any, write=None) -> None: + """ + Performs a fast search for text in the specified memory address range of a process. + + Args: + pid (str): The process ID. + addr (list): A list of memory addresses to search in. + read (any): The text to search for. + write (Optional): The replacement text for writing to memory (default: None). + + Returns: + None + """ if write: with ThreadPoolExecutor(max_workers=super().get_variables(is_workers=True)) as executor: executor.submit(self._SearchAndWriteObject.speed_search_and_write_text, pid, addr, read, write) - else: with ThreadPoolExecutor(max_workers=super().get_variables(is_workers=True)) as executor: executor.submit(self._SearchAndReadObject.speed_search_and_read_text, pid, addr, read) def fast_search_algorithms_value(self, pid: str, addr: list, read: any, buf: int, write=None) -> None: + """ + Performs a fast search for values in the specified memory address range of a process. + + Args: + pid (str): The process ID. + addr (list): A list of memory addresses to search in. + read (any): The value to search for. + buf (int): The buffer size for reading memory. + write (Optional): The replacement value for writing to memory (default: None). + + Returns: + None + """ if write: with ThreadPoolExecutor(max_workers=super().get_variables(is_workers=True)) as executor: executor.submit(self._SearchAndWriteObject.speed_search_and_write, pid, addr, buf, read, write) - else: with ThreadPoolExecutor(max_workers=super().get_variables(is_workers=True)) as executor: executor.submit(self._SearchAndReadObject.speed_search_and_read, pid, addr, buf, read) - def fast_search_algorithms_pattern_finding(self, pid: str, addr: list, buf: int, hex_pattern: str) -> None: - with ThreadPoolExecutor(max_workers=super().get_variables(is_workers=True)) as executor: - executor.submit(self._AdditionalFeaturesObject.speed_find_hexadecimal_pattern, pid, addr, buf, hex_pattern) + def fast_search_algorithms_pattern_finding(self, pid: str, addr: list, buf: int, hex_pattern: str, + replace_pattern: str = None) -> None: + """ + Executes fast search algorithms for finding a hexadecimal pattern in memory addresses. + + Args: + pid (str): The process ID. + addr (list): The list of memory addresses to search. + buf (int): The buffer size for reading memory. + hex_pattern (str): The hexadecimal pattern to search for. + replace_pattern (str, optional): The hexadecimal pattern to replace the found pattern with. + Defaults to None. + + Returns: + None. + """ + + if replace_pattern: + with ThreadPoolExecutor(max_workers=super().get_variables(is_workers=True)) as executor: + executor.submit(self._AdditionalFeaturesObject.speed_find_and_replace_hexadecimal_pattern, pid, + addr, buf, hex_pattern, replace_pattern) + else: + with ThreadPoolExecutor(max_workers=super().get_variables(is_workers=True)) as executor: + executor.submit(self._AdditionalFeaturesObject.speed_find_hexadecimal_pattern, pid, + addr, buf, hex_pattern) diff --git a/androidMemoryTool/__init__.py b/androidMemoryTool/__init__.py index 95898bb..3aec002 100644 --- a/androidMemoryTool/__init__.py +++ b/androidMemoryTool/__init__.py @@ -6,4 +6,4 @@ from .DataClasses import DataClasses from .search_and_writers import SearchAndWrite from .search_and_readers import SearchAndRead - +from .cli_for_tool import AndroidMemoryToolCLI diff --git a/androidMemoryTool/__main__.py b/androidMemoryTool/__main__.py new file mode 100644 index 0000000..781a735 --- /dev/null +++ b/androidMemoryTool/__main__.py @@ -0,0 +1,9 @@ +from androidMemoryTool import AndroidMemoryToolCLI, AndroidMemoryTool + +def execute_cli(): + cli_tool = AndroidMemoryToolCLI(AndroidMemoryTool) + cli_tool.cli_handler() + +if __name__ == '__main__': + execute_cli() + diff --git a/androidMemoryTool/additional_features.py b/androidMemoryTool/additional_features.py index c30c763..a76fe25 100644 --- a/androidMemoryTool/additional_features.py +++ b/androidMemoryTool/additional_features.py @@ -1,6 +1,6 @@ """" - * date : 2022/11/19 - * Version : 0.4 + * date : 2023/07/11 + * Version : 0.5 * author : Abdul Moez (abdulmoez123456789@gmail.com) * Study : UnderGraduate in GCU Lahore, Pakistan * https://github.com/Anonym0usWork1221/android-memorytool @@ -9,88 +9,245 @@ # ! /usr/bin/env python -import re -import queue from functools import wraps +from queue import Queue +from re import search def store_in_queue(function) -> any: + """ + Decorator that stores the return value of a function in a queue. + + Args: + function: The function to be decorated. + + Returns: + The decorated function. + """ + @wraps(function) def wrapper(self, *args): self._returned_values.put(function(self, *args)) + return wrapper class AdditionalFeatures: - _returned_values = queue.Queue() + """ + Class for additional features of memory manipulation. + + Methods: + get_pattern_finder_values() -> list: + Retrieves the values from the pattern finder operation. + + reset_queue() -> None: + Resets the queue of returned values. + + speed_find_hexadecimal_pattern(pid: str, address_list: list, buf: int, hex_pattern: str) -> list: + Searches for a hexadecimal pattern in memory addresses using speed optimization. + + find_hexadecimal_pattern(pid: str, address_list: list, buf: int, hex_pattern: str) -> list: + Searches for a hexadecimal pattern in memory addresses. + """ + + def __init__(self): + pass + + _returned_values = Queue() def get_pattern_finder_values(self) -> list: + """ + Retrieves the values from the pattern finder operation. + + Returns: + The values from the pattern finder operation. + """ return self._returned_values.get() def reset_queue(self) -> None: + """ + Resets the queue of returned values. + """ self._returned_values.queue.clear() @store_in_queue def speed_find_hexadecimal_pattern(self, pid: str, address_list: list, buf: int, hex_pattern: str) -> list: - mem_file = open(f"/proc/{pid}/mem", "rb+") + """ + Searches for a hexadecimal pattern in memory addresses using speed optimization. + + Args: + pid (str): The process ID. + address_list (list): The list of memory addresses to search. + buf (int): The buffer size for reading memory. + hex_pattern (str): The hexadecimal pattern to search for. + + Returns: + list: A list containing the offsets, total values found, and founded patterns. + """ offsets = [] founded_patterns = [] total_values_found = 0 - for address in address_list: - partitions = address.split("-") - start_addr = int(partitions[0], 16) - end_addr = int(partitions[1], 16) - mem_file.seek(start_addr) - total_bytes = end_addr - start_addr - current_bytes = 0 - try: - while current_bytes < total_bytes + 1: - current_bytes += buf - read_hex = mem_file.read(buf).hex().upper() - if re.search(hex_pattern, str(read_hex)): - founded_patterns.append(read_hex) - - address_list = mem_file.seek(-buf, 1) - mem_file.read(buf) - offsets.append(hex(start_addr + address_list)) - total_values_found += 1 - except Exception as e: - print("[*] Exception ", e) - - mem_file.close() - - return list([offsets, total_values_found, founded_patterns]) + with open(f"/proc/{pid}/mem", "rb+") as mem_file: + for address in address_list: + partitions = address.split("-") + start_addr = int(partitions[0], 16) + end_addr = int(partitions[1], 16) + mem_file.seek(start_addr) + total_bytes = end_addr - start_addr + current_bytes = 0 + try: + while current_bytes < total_bytes + 1: + current_bytes += buf + read_bytes = mem_file.read(buf) + read_hex = read_bytes.hex().upper() + # print(f"{read_hex}\t{hex_pattern}") + if search(hex_pattern, read_hex): + founded_patterns.append(read_hex) + current_pos = mem_file.tell() - buf + mem_file.seek(current_pos) + mem_file.read(buf) + offsets.append(hex(current_pos)) + total_values_found += 1 + except Exception as e: + print("[*] Exception ", e) + + return [offsets, total_values_found, founded_patterns] @staticmethod def find_hexadecimal_pattern(pid: str, address_list: list, buf: int, hex_pattern: str) -> list: - mem_file = open(f"/proc/{pid}/mem", "rb+") + """ + Searches for a hexadecimal pattern in memory addresses. + + Args: + pid (str): The process ID. + address_list (list): The list of memory addresses to search. + buf (int): The buffer size for reading memory. + hex_pattern (str): The hexadecimal pattern to search for. + Returns: + list: A list containing the offsets, total values found, and founded patterns. + """ + + offsets = [] + founded_patterns = [] + total_values_found = 0 + + with open(f"/proc/{pid}/mem", "rb+") as mem_file: + for address in address_list: + partitions = address.split("-") + start_addr = int(partitions[0], 16) + end_addr = int(partitions[1], 16) + mem_file.seek(start_addr) + total_bytes = end_addr - start_addr + current_bytes = 0 + try: + while current_bytes < total_bytes + 1: + current_bytes += buf + read_bytes = mem_file.read(buf) + read_hex = read_bytes.hex().upper() + if search(hex_pattern, read_hex): + founded_patterns.append(read_hex) + current_pos = mem_file.tell() - buf + mem_file.seek(current_pos) + mem_file.read(buf) + offsets.append(hex(current_pos)) + total_values_found += 1 + except Exception as e: + print("[*] Exception ", e) + + return [offsets, total_values_found, founded_patterns] + + @store_in_queue + def speed_find_and_replace_hexadecimal_pattern(self, pid: str, address_list: list, buf: int, + search_pattern: str, replace_pattern: str) -> list: + """ + Searches for a hexadecimal pattern in memory addresses and replaces it with a new pattern using speed optimization. + + Args: + pid (str): The process ID. + address_list (list): The list of memory addresses to search and replace. + buf (int): The buffer size for reading memory. + search_pattern (str): The hexadecimal pattern to search for. + replace_pattern (str): The hexadecimal pattern to replace the found pattern with. + + Returns: + list: A list containing the offsets, total values found, and founded patterns. + """ + + offsets = [] + founded_patterns = [] + total_values_found = 0 + + with open(f"/proc/{pid}/mem", "rb+") as mem_file: + for address in address_list: + partitions = address.split("-") + start_addr = int(partitions[0], 16) + end_addr = int(partitions[1], 16) + mem_file.seek(start_addr) + total_bytes = end_addr - start_addr + current_bytes = 0 + try: + while current_bytes < total_bytes + 1: + current_bytes += buf + read_bytes = mem_file.read(buf) + read_hex = read_bytes.hex().upper() + if search(search_pattern, read_hex): + founded_patterns.append(read_hex) + current_pos = mem_file.tell() - buf + mem_file.seek(current_pos) + byte_array_conversion = bytearray.fromhex(replace_pattern) + mem_file.write(byte_array_conversion) + offsets.append(hex(current_pos)) + total_values_found += 1 + except Exception as e: + print("[*] Exception ", e) + + return [offsets, total_values_found, founded_patterns] + + @staticmethod + def find_and_replace_hexadecimal_pattern(pid: str, address_list: list, buf: int, + search_pattern: str, replace_pattern: str) -> list: + """ + Searches for a hexadecimal pattern in memory addresses and replaces it with a new pattern. + + Args: + pid (str): The process ID. + address_list (list): The list of memory addresses to search and replace. + buf (int): The buffer size for reading memory. + search_pattern (str): The hexadecimal pattern to search for. + replace_pattern (str): The hexadecimal pattern to replace the found pattern with. + + Returns: + list: A list containing the offsets, total values found, and founded patterns. + """ offsets = [] founded_patterns = [] total_values_found = 0 - for address in address_list: - partitions = address.split("-") - start_addr = int(partitions[0], 16) - end_addr = int(partitions[1], 16) - mem_file.seek(start_addr) - total_bytes = end_addr - start_addr - current_bytes = 0 - try: - while current_bytes < total_bytes + 1: - current_bytes += buf - read_hex = mem_file.read(buf).hex().upper() - if re.search(hex_pattern, str(read_hex)): - founded_patterns.append(read_hex) - - address_list = mem_file.seek(-buf, 1) - mem_file.read(buf) - offsets.append(hex(start_addr + address_list)) - total_values_found += 1 - except Exception as e: - print("[*] Exception ", e) - - mem_file.close() - return list([offsets, total_values_found, founded_patterns]) + with open(f"/proc/{pid}/mem", "rb+") as mem_file: + for address in address_list: + partitions = address.split("-") + start_addr = int(partitions[0], 16) + end_addr = int(partitions[1], 16) + mem_file.seek(start_addr) + total_bytes = end_addr - start_addr + current_bytes = 0 + try: + while current_bytes < total_bytes + 1: + current_bytes += buf + read_bytes = mem_file.read(buf) + read_hex = read_bytes.hex().upper() + if search(search_pattern, read_hex): + founded_patterns.append(read_hex) + current_pos = mem_file.tell() - buf + mem_file.seek(current_pos) + byte_array_conversion = bytearray.fromhex(replace_pattern) + mem_file.write(byte_array_conversion) + offsets.append(hex(current_pos)) + total_values_found += 1 + except Exception as e: + print("[*] Exception ", e) + + return [offsets, total_values_found, founded_patterns] diff --git a/androidMemoryTool/androidMemoryTool.py b/androidMemoryTool/androidMemoryTool.py index 57da5d2..9499244 100644 --- a/androidMemoryTool/androidMemoryTool.py +++ b/androidMemoryTool/androidMemoryTool.py @@ -1,6 +1,6 @@ """ - * date : 2022/03/23 - * Version : 0.4 + * date : 2023/07/11 + * Version : 0.5 * author : Abdul Moez (abdulmoez123456789@gmail.com) * Study : UnderGraduate in GCU Lahore, Pakistan * https://github.com/Anonym0usWork1221/android-memorytool @@ -8,25 +8,92 @@ """ # ! /usr/bin/env python -from .libs_read_writers import LibsControllers -from .ThreadingController import FastSearchAlgo from .additional_features import AdditionalFeatures +from .ThreadingController import FastSearchAlgo +from .libs_read_writers import LibsControllers from .mapping import Mapping class AndroidMemoryTool(FastSearchAlgo): + """ + This class represents a tool for performing memory-related operations on an Android device. It inherits from the FastSearchAlgo class. + + Constructor: + + __init__(self, PKG: any((str, int)), TYPE=FastSearchAlgo.DataTypes.DWORD, SPEED_MODE=True, WORKERS=50, + pMAP=FastSearchAlgo.PMAP()) -> None: + Initializes an instance of the AndroidMemoryTool class. It takes the following parameters: + PKG: Package name or ID of the target process. + TYPE: Data type to be used in memory operations (default: FastSearchAlgo.DataTypes.DWORD). + SPEED_MODE: Flag indicating whether speed mode is enabled (default: True). + WORKERS: Number of worker threads to use in search operations (default: 50). + pMAP: Instance of the PMAP class for memory mapping (default: FastSearchAlgo.PMAP()). + + Methods: + + initialize(self, PKG: any((str, int)), TYPE: str, SPEED_MODE: bool, + WORKERS: int, pMAP: FastSearchAlgo.PMAP) -> None: + Initializes the AndroidMemoryTool instance with the specified parameters. + + read_value(self, read: any) -> any: Reads the value at a given memory address. It takes the following parameter: + read: Memory address to read from. + + read_write_value(self, read: any, write: any) -> any: + Reads the value at a given memory address and then writes a new value to that address. + It takes the following parameters: + read: Memory address to read from. + write: Value to write to the memory address. + + write_lib(self, base_address: hex, offset: hex, write_value: any) -> any: + Writes a value to a specific offset in a shared library. It takes the following parameters: + base_address: Base address of the shared library. + offset: Offset within the shared library. + write_value: Value to write to the specified offset. + + read_lib(self, base_address: hex, offset: hex, value: any((str, int, None)) = None) -> any: + Reads a value from a specific offset in a shared library. It takes the following parameters: + base_address: Base address of the shared library. + offset: Offset within the shared library. + value (optional): Value to use for dynamic buffer size calculation (default: None). + + refiner_address(self, list_address: list, value_to_refine: any) -> any: + Refines a list of memory addresses based on a specific value. It takes the following parameters: + list_address: List of memory addresses to refine. + value_to_refine: Value to refine the addresses based on. + + get_module_base_address(pid: str, module_name: str) -> any: + Retrieves the base address of a specific module within a process. It takes the following parameters: + pid: Process ID. + module_name: Name of the module. + + raw_dump(self, lib_name: str, path='./') -> bool: + Dumps the raw binary contents of a shared library. It takes the following parameters: + lib_name: Name of the shared library. + path (optional): Path to save the binary dump (default: './'). + + find_hex_pattern(self, hex_pattern: str) -> any: + Searches for a hexadecimal pattern in the memory of the target process. It takes the following parameter: + hex_pattern: Hexadecimal pattern to search for. + + dump_maps(self, path="./") -> bool: + Dumps the memory mapping information of the target process to a file. It takes the following parameter: + path (optional): Path to save the memory mapping file (default: './'). + + """ + VERSION_CODE = 0.5 + DEVELOPER = "Abdul Moez" _LibControllerObject = LibsControllers() def __init__(self, PKG: any((str, int)), TYPE=FastSearchAlgo.DataTypes.DWORD, SPEED_MODE=True, WORKERS=50, pMAP=FastSearchAlgo.PMAP()) -> None: """ - Initialize the tool to read or write runtime memory. - :rtype: None - :param PKG: Any(PID or PackageName) - :param TYPE: DataType - :param SPEED_MODE: Turn on/off Threading - :param WORKERS: Number of threads - :param pMAP: MAPS + Initializes an instance of the AndroidMemoryTool class. It takes the following parameters: + Args: + PKG: Package name or ID of the target process. + TYPE: Data type to be used in memory operations (default: FastSearchAlgo.DataTypes.DWORD). + SPEED_MODE: Flag indicating whether speed mode is enabled (default: True). + WORKERS: Number of worker threads to use in search operations (default: 50). + pMAP: Instance of the PMAP class for memory mapping (default: FastSearchAlgo.PMAP()). """ super(AndroidMemoryTool, self).__init__() self.initialize(PKG, TYPE, SPEED_MODE, WORKERS, pMAP) @@ -34,24 +101,38 @@ def __init__(self, PKG: any((str, int)), TYPE=FastSearchAlgo.DataTypes.DWORD, SP def initialize(self, PKG: any((str, int)), TYPE: str, SPEED_MODE: bool, WORKERS: int, pMAP: FastSearchAlgo.PMAP) -> None: """ - Initialize the DataClass with the provided Data. - :rtype: None - :param PKG: Any(PID or PackageName) - :param TYPE: DataType - :param SPEED_MODE: Turn on/off Threading - :param WORKERS: Number of threads - :param pMAP: MAPS + Initializes the AndroidMemoryTool object. + + Args: + PKG: The package name or process ID. + TYPE: The data type to search for. Defaults to FastSearchAlgo.DataTypes.DWORD. + SPEED_MODE: Whether to enable speed mode. Defaults to True. + WORKERS: The number of workers to use for searching. Defaults to 50. + pMAP: The memory map to use. Defaults to FastSearchAlgo.PMAP(). """ + super().init_setup(PKG, TYPE, SPEED_MODE, WORKERS) super().init_tool(pMAP) + @staticmethod + def get_version_code(): + return AndroidMemoryTool.VERSION_CODE + + @staticmethod + def get_developer(): + return AndroidMemoryTool.DEVELOPER + def read_value(self, read: any) -> any: """ - The Read Value takes Any (int, string, float) as a parameter and search it in memory and return results. - :param read: Value - :type read: any - :return: Any(tuple, list, None) + Reads the value from the specified memory address. + + Args: + read: The memory address to read from. + + Returns: + The value read from the memory address. """ + # gets if speed mode is on or off speed_mode = super().get_variables(is_speed=True) # gets data type bytes in int @@ -90,11 +171,16 @@ def read_value(self, read: any) -> any: def read_write_value(self, read: any, write: any) -> any: """ - The Write Value takes Any (int, string, float) as a parameter and replace it in memory and return results. - :param read: Value(toFind) - :param write: Value(toWrite) - :return: any(tuple, list, None) + Reads the value from the specified memory address and writes the new value to it. + + Args: + read: The memory address to read from. + write: The new value to write. + + Returns: + The value written to the memory address. """ + speed_mode = super().get_variables(is_speed=True) data_types = super().get_variables(is_data_byte=True) proc_id = super().get_variables(is_pid=True) @@ -128,16 +214,23 @@ def read_write_value(self, read: any, write: any) -> any: value_to_write) return None - def write_lib(self, base_address: hex, offset: hex, write_value: any) -> any: + def write_lib(self, base_address: str, offset: str, write_value: any) -> any: """ - Replace address value. - :param base_address: Hex(BaseAddress) - :param offset: Hex(Offset) - :param write_value: Value(toWrite) - :return: Any + Writes the value to the specified library offset. + + Args: + base_address: The base address of the library. + offset: The offset within the library. + write_value: The value to write. + + Returns: + The value written to the library offset. """ + data_types = super().get_variables(is_data_byte=True) proc_id = super().get_variables(is_pid=True) + base_address = str(base_address) + offset = str(offset) if proc_id == '': print('[*] Pid not found') @@ -148,16 +241,23 @@ def write_lib(self, base_address: hex, offset: hex, write_value: any) -> any: return self._LibControllerObject.write_lib_offsets(proc_id, base_address, offset, value_to_write) return None - def read_lib(self, base_address: hex, offset: hex, value: any((str, int, None)) = None) -> any: + def read_lib(self, base_address: str, offset: str, value: any((str, int, None)) = None) -> any: """ - :param value: Optional(For UTF-8 & UTF-16) - :param base_address: Hex(BaseAddress) - :param offset: Hex(Offset) - :return: any + Reads the value from the specified library offset. + + Args: + base_address: The base address of the library. + offset: The offset within the library. + value: The additional value parameter for certain data types. Defaults to None. + + Returns: + The value read from the library offset. """ data_types = super().get_variables(is_data_byte=True) proc_id = super().get_variables(is_pid=True) + base_address = str(base_address) + offset = str(offset) if proc_id == '': print('[*] Pid not found') @@ -171,7 +271,6 @@ def read_lib(self, base_address: hex, offset: hex, value: any((str, int, None)) if isinstance(value, int): buffer = 18 + (2 * int(value)) - elif str(value).isnumeric(): buffer = 18 + (2 * int(str(value))) else: @@ -189,9 +288,14 @@ def read_lib(self, base_address: hex, offset: hex, value: any((str, int, None)) def refiner_address(self, list_address: list, value_to_refine: any) -> any: """ - :param list_address: list(FoundedAddress) - :param value_to_refine: Value(ChangedValue) - :return: Any + Refines the list of addresses based on the specified value to refine. + + Args: + list_address: The list of addresses to refine. + value_to_refine: The value to refine. + + Returns: + The refined list of addresses. """ proc_id = super().get_variables(is_pid=True) @@ -206,18 +310,22 @@ def refiner_address(self, list_address: list, value_to_refine: any) -> any: if data_types == 0: print("[*] Data type not supported") else: - refined_address = self._LibControllerObject.address_refiners(proc_id, list_address, data_types, - value_to_read) - return refined_address + return self._LibControllerObject.address_refiners(proc_id, list_address, data_types, value_to_read) return None @staticmethod def get_module_base_address(pid: str, module_name: str) -> any: """ - :param pid: Str(PID) - :param module_name: Str(Module) - :return: Any + Gets the base address of the specified module for the given process ID. + + Args: + pid: The process ID. + module_name: The name of the module. + + Returns: + The base address of the module. """ + map_file = open(f"/proc/{pid}/maps", 'r') address = [] if map_file is not None: @@ -233,10 +341,16 @@ def get_module_base_address(pid: str, module_name: str) -> any: def raw_dump(self, lib_name: str, path='./') -> bool: """ - :param lib_name: Str(LibName or Address) - :param path: Str(OutputPath) - :return: bool(Ture, False) + Dumps the specified library to a raw binary file. + + Args: + lib_name: The name of the library. + path: The path to save the dumped file. Defaults to './'. + + Returns: + True if the dump was successful, False otherwise. """ + proc_id = super().get_variables(is_pid=True) if "-" in lib_name: @@ -253,16 +367,67 @@ def raw_dump(self, lib_name: str, path='./') -> bool: .write(binary_string) return True - def find_hex_pattern(self, hex_pattern: str) -> any: + def find_hex_pattern(self, search_pattern: str) -> any: """ - :param hex_pattern: Str(Pattern, eg:88 ?? 97) - :return: Any + Finds the specified hexadecimal pattern in the memory. + + Args: + search_pattern: The hexadecimal pattern to search for. + + Returns: + The addresses where the pattern was found. """ + proc_id = super().get_variables(is_pid=True) maps_addr = super().get_variables(is_map_addr=True) speed_mode = super().get_variables(is_speed=True) - filter_user_data = hex_pattern.replace(" ", "") + filter_user_data = search_pattern.replace(" ", "") + bytes_of_filtered_data = int(len(filter_user_data) / 2) + pattern_of_hex = "" + character_counter_hex = 0 + for char in filter_user_data: + if char == "?": + character_counter_hex += 1 + else: + if character_counter_hex != 0: + pattern_of_hex += f"[A-Fa-f0-9]{{{character_counter_hex}}}" + character_counter_hex = 0 + if char != "?": + pattern_of_hex += char + + if character_counter_hex != 0: + pattern_of_hex += f"[A-Fa-f0-9]{{{character_counter_hex}}}" + if speed_mode: + super().fast_search_algorithms_pattern_finding(proc_id, maps_addr, bytes_of_filtered_data, + pattern_of_hex) + values = self._AdditionalFeaturesObject.get_pattern_finder_values() + self._AdditionalFeaturesObject.reset_queue() + return values + + return AdditionalFeatures.find_hexadecimal_pattern(proc_id, maps_addr, bytes_of_filtered_data, pattern_of_hex) + + def find_and_replace_hex_pattern(self, search_pattern: str, replace_pattern: str) -> any: + """ + Finds and replaces a hexadecimal pattern in memory addresses. + + Args: + search_pattern (str): The pattern to search for. It can contain hexadecimal digits (0-9, A-F) + and '?' as a wildcard for any single hexadecimal digit. + replace_pattern (str): The pattern to replace the found pattern with. + It should be a valid hexadecimal string. + + Returns: + any: The result of the pattern finding and replacement operation. + The format of the result depends on the 'speed_mode' setting. + """ + + proc_id = super().get_variables(is_pid=True) + maps_addr = super().get_variables(is_map_addr=True) + speed_mode = super().get_variables(is_speed=True) + + filter_user_data = search_pattern.replace(" ", "") + replace_pattern = replace_pattern.replace(" ", "") bytes_of_filtered_data = int(len(filter_user_data) / 2) pattern_of_hex = "" character_counter_hex = 0 @@ -281,18 +446,26 @@ def find_hex_pattern(self, hex_pattern: str) -> any: if speed_mode: super().fast_search_algorithms_pattern_finding(proc_id, maps_addr, bytes_of_filtered_data, - pattern_of_hex) + pattern_of_hex, replace_pattern) values = self._AdditionalFeaturesObject.get_pattern_finder_values() self._AdditionalFeaturesObject.reset_queue() return values - return AdditionalFeatures.find_hexadecimal_pattern(proc_id, maps_addr, bytes_of_filtered_data, pattern_of_hex) + return AdditionalFeatures.find_and_replace_hexadecimal_pattern(proc_id, maps_addr, + bytes_of_filtered_data, pattern_of_hex, + replace_pattern) def dump_maps(self, path="./") -> bool: """ - :param path: Str(OutputPath) - :return: bool(True, False) + Dumps the memory maps of the process. + + Args: + path: The path to save the dumped file. Defaults to './'. + + Returns: + True if the dump was successful, False otherwise. """ + proc_id = super().get_variables(is_pid=True) if proc_id: map_file = open(f"/proc/{proc_id}/maps", "r").read() @@ -300,3 +473,4 @@ def dump_maps(self, path="./") -> bool: return True return False + diff --git a/androidMemoryTool/cli_for_tool.py b/androidMemoryTool/cli_for_tool.py new file mode 100644 index 0000000..a98e0c6 --- /dev/null +++ b/androidMemoryTool/cli_for_tool.py @@ -0,0 +1,582 @@ +""" + * date : 2023/07/11 + * Version : 0.5 + * author : Abdul Moez (abdulmoez123456789@gmail.com) + * Study : UnderGraduate in GCU Lahore, Pakistan + * https://github.com/Anonym0usWork1221/android-memorytool + +""" + +# !/usr/bin/env python +import argparse + + +class AndroidMemoryToolCLI: + """Command-line interface for the Android Memory Tool.""" + + def __init__(self, android_tool: __build_class__): + """ + Initialize the AndroidMemoryToolCLI. + + Args: + android_tool: An instance of the AndroidMemoryTool class. + """ + + self._android_memory_tool = android_tool + + def _get_data_type(self, user_type): + """ + Get the data type based on the user input. + + Args: + user_type: The user input for the data type. + + Returns: + The corresponding data type from the AndroidMemoryTool class. + + Raises: + SystemExit: If the user_type is not a valid data type. + """ + + data_types = { + "DWORD": self._android_memory_tool.DataTypes.DWORD, + "FLOAT": self._android_memory_tool.DataTypes.FLOAT, + "DOUBLE": self._android_memory_tool.DataTypes.DOUBLE, + "WORD": self._android_memory_tool.DataTypes.WORD, + "BYTE": self._android_memory_tool.DataTypes.BYTE, + "QWORD": self._android_memory_tool.DataTypes.QWORD, + "XOR": self._android_memory_tool.DataTypes.XOR, + "UTF_8": self._android_memory_tool.DataTypes.UTF_8, + "UTF_16LE": self._android_memory_tool.DataTypes.UTF_16LE + } + if user_type in data_types: + return data_types[user_type] + + print("Unknown DataType: Available data types are:") + print(" DWORD") + print(" FLOAT") + print(" DOUBLE") + print(" WORD") + print(" BYTE") + print(" QWORD") + print(" XOR") + print(" UTF_8") + print(" UTF_16LE") + exit() + + def read_value(self, args): + """ + Read a value from memory. + + Args: + args: The arguments parsed from the command-line. + + Prints: + The read value from memory. + """ + + value_to_read = self.convert_to_appropriate_type(args.read) + data_type = self._get_data_type(args.type) + tool = self._android_memory_tool(args.pkg, data_type, args.speed_mode, args.workers) + result = tool.read_value(value_to_read) + if result is not None: + print(f"Read value: {result}") + + def read_write_value(self, args): + """ + Read and write a value in memory. + + Args: + args: The arguments parsed from the command-line. + + Prints: + The read value from memory. + """ + + value_to_write = self.convert_to_appropriate_type(args.write_value) + value_to_read = self.convert_to_appropriate_type(args.read) + + data_type = self._get_data_type(args.type) + tool = self._android_memory_tool(args.pkg, data_type, args.speed_mode, args.workers) + result = tool.read_write_value(value_to_read, value_to_write) + if result is not None: + print(f"Read value: {result}") + + @staticmethod + def convert_to_appropriate_type(value): + """ + Convert the input string to the appropriate type (float, int, or str). + + Args: + value (str): The input value to be converted. + + Returns: + The converted value as float, int, or str, depending on the content of the input. + + """ + try: + converted_value = float(value) + if converted_value.is_integer(): + return int(converted_value) + else: + return converted_value + except (ValueError, TypeError): + return value + + def write_lib(self, args): + """ + Write a value to a library. + + Args: + args: The arguments parsed from the command-line. + + Prints: + A message indicating the write operation was successful. + """ + + value_to_write = self.convert_to_appropriate_type(args.write_value) + data_type = self._get_data_type(args.type) + tool = self._android_memory_tool(args.pkg, data_type) + result = tool.write_lib(args.base_address, args.offset, value_to_write) + if result is not None: + print("Write successful") + + def read_lib(self, args): + """ + Read a value from a library. + + Args: + args: The arguments parsed from the command-line. + + Prints: + The read value from the library. + """ + + value_to_read = self.convert_to_appropriate_type(args.value) + data_type = self._get_data_type(args.type) + tool = self._android_memory_tool(args.pkg, data_type) + result = tool.read_lib(args.base_address, args.offset, value_to_read) + if result is not None: + print(f"Read value: {result}") + + def refiner_address(self, args): + """ + Refine a list of addresses. + + Args: + args: The arguments parsed from the command-line. + + Prints: + The refined addresses. + """ + + value_to_refine = self.convert_to_appropriate_type(args.value_to_refine) + data_type = self._get_data_type(args.type) + tool = self._android_memory_tool(args.pkg, data_type, args.speed_mode, args.workers) + result = tool.refiner_address(args.list_address, value_to_refine) + if result is not None: + print(f"Refined addresses: {result}") + + def get_module_base_address(self, args): + """ + Get the base address of a module. + + Args: + args: The arguments parsed from the command-line. + + Prints: + The base address of the module. + """ + + pid = self._android_memory_tool.get_pid(args.pid) + result = self._android_memory_tool.get_module_base_address(pid, args.module_name) + if result is not None: + print(f"Base address: {result}") + + def raw_dump(self, args): + """ + Dump a library as raw binary. + + Args: + args: The arguments parsed from the command-line. + + Prints: + A message indicating the raw dump was successful. + """ + + tool = self._android_memory_tool(args.pkg) + success = tool.raw_dump(args.lib_name, args.path) + if success: + print("Raw dump successful") + + def find_hex_pattern(self, args): + """ + Find a hexadecimal pattern in memory. + + Args: + args: The arguments parsed from the command-line. + + Prints: + The addresses where the pattern was found. + """ + + data_type = self._get_data_type(args.type) + tool = self._android_memory_tool(args.pkg, data_type, args.speed_mode, args.workers) + result = tool.find_hex_pattern(args.hex_pattern) + if result is not None: + print(f"Pattern found at addresses: {result}") + + def find_and_replace_hex_pattern(self, args): + """ + Find and replace a hexadecimal pattern in memory. + + Args: + args: The arguments parsed from the command-line. + + Prints: + The addresses where the pattern was replaced. + """ + + data_type = self._get_data_type(args.type) + tool = self._android_memory_tool(args.pkg, data_type, args.speed_mode, args.workers) + result = tool.find_and_replace_hex_pattern(args.search_pattern, args.replace_pattern) + if result is not None: + print(f"Pattern replaced at addresses: {result}") + + def dump_maps(self, args): + """ + Dump memory maps. + + Args: + args: The arguments parsed from the command-line. + + Prints: + A message indicating the memory maps were dumped successfully. + """ + + tool = self._android_memory_tool(args.pkg) + success = tool.dump_maps(args.path) + if success: + print("Maps dumped successfully") + + def get_pid(self, args): + """ + Return the PID of a process. + + Args: + args: The arguments parsed from the command-line. + + Prints: + The PID of the process. + """ + + tool = self._android_memory_tool.get_pid(args.pkg) + if tool: + print(f"Pid of process: {tool}") + + def get_version(self): + """ + Get the version information. + + Prints: + The version and developer information. + """ + + version = self._android_memory_tool.get_version_code() + developer = self._android_memory_tool.get_developer() + print(f"Version: {version}\nDeveloper: {developer}") + + def parse_arguments(self): + """ + Parse the command-line arguments. + + Returns: + The parsed arguments. + """ + + parser = argparse.ArgumentParser(description="Android Memory Tool CLI") + subparsers = parser.add_subparsers(title="subcommands", dest="command") + + # Read value command + read_value_parser = subparsers.add_parser("read_value", help="Read a value from memory") + read_value_parser.add_argument("pkg", type=str, help="Package name or PID") + read_value_parser.add_argument("type", type=str, help="Data type") + read_value_parser.add_argument("speed_mode", type=bool, help="Speed mode") + read_value_parser.add_argument("workers", type=int, help="Number of workers") + read_value_parser.add_argument("read", type=str, help="Value to read") + read_value_parser.set_defaults(func=self.read_value) + + # Read-write value command + read_write_value_parser = subparsers.add_parser("read_write_value", help="Read and write a value in memory") + read_write_value_parser.add_argument("pkg", type=str, help="Package name or PID") + read_write_value_parser.add_argument("type", type=str, help="Data type") + read_write_value_parser.add_argument("speed_mode", type=bool, help="Speed mode") + read_write_value_parser.add_argument("workers", type=int, help="Number of workers") + read_write_value_parser.add_argument("read", type=str, help="Value to read") + read_write_value_parser.add_argument("write", type=str, help="Value to write") + read_write_value_parser.set_defaults(func=self.read_write_value) + + # Write lib command + write_lib_parser = subparsers.add_parser("write_lib", help="Write a value to a library") + write_lib_parser.add_argument("pkg", type=str, help="Package name or PID") + write_lib_parser.add_argument("type", type=str, help="Data type") + write_lib_parser.add_argument("base_address", type=str, help="Base address") + write_lib_parser.add_argument("offset", type=str, help="Offset") + write_lib_parser.add_argument("write_value", type=str, help="Value to write") + write_lib_parser.set_defaults(func=self.write_lib) + + # Read lib command + read_lib_parser = subparsers.add_parser("read_lib", help="Read a value from a library") + read_lib_parser.add_argument("pkg", type=str, help="Package name or PID") + read_lib_parser.add_argument("type", type=str, help="Data type") + read_lib_parser.add_argument("base_address", type=str, help="Base address") + read_lib_parser.add_argument("offset", type=str, help="Offset") + read_lib_parser.add_argument("--value", type=str, help="Value to compare") + read_lib_parser.set_defaults(func=self.read_lib) + + # Refiner address command + refiner_address_parser = subparsers.add_parser("refiner_address", help="Refine a list of addresses") + refiner_address_parser.add_argument("pkg", type=str, help="Package name or PID") + refiner_address_parser.add_argument("type", type=str, help="Data type") + refiner_address_parser.add_argument("speed_mode", type=bool, help="Speed mode") + refiner_address_parser.add_argument("workers", type=int, help="Number of workers") + refiner_address_parser.add_argument("list_address", nargs="+", type=str, help="List of addresses") + refiner_address_parser.add_argument("value_to_refine", type=str, help="Value to refine") + refiner_address_parser.set_defaults(func=self.refiner_address) + + # Get module base address command + get_module_base_address_parser = subparsers.add_parser("get_module_base_address", + help="Get the base address of a module") + get_module_base_address_parser.add_argument("pid", type=str, help="Package name or Process ID") + get_module_base_address_parser.add_argument("module_name", type=str, help="Module name") + get_module_base_address_parser.set_defaults(func=self.get_module_base_address) + + # Raw dump command + raw_dump_parser = subparsers.add_parser("raw_dump", help="Dump a library as raw binary") + raw_dump_parser.add_argument("pkg", type=str, help="Package name or PID") + raw_dump_parser.add_argument("lib_name", type=str, help="Library name") + raw_dump_parser.add_argument("path", type=str, help="Output path (default: current directory)", nargs="?", + default="./") + raw_dump_parser.set_defaults(func=self.raw_dump) + + # Find hex pattern command + find_hex_pattern_parser = subparsers.add_parser("find_hex_pattern", help="Find a hexadecimal pattern in memory") + find_hex_pattern_parser.add_argument("pkg", type=str, help="Package name or PID") + find_hex_pattern_parser.add_argument("type", type=str, help="Data type") + find_hex_pattern_parser.add_argument("speed_mode", type=bool, help="Speed mode") + find_hex_pattern_parser.add_argument("workers", type=int, help="Number of workers") + find_hex_pattern_parser.add_argument("hex_pattern", type=str, help="Hexadecimal pattern to find") + find_hex_pattern_parser.set_defaults(func=self.find_hex_pattern) + + # Find and replace hex pattern command + find_and_replace_hex_pattern_parser = subparsers.add_parser("find_and_replace_hex_pattern", + help="Find and replace a hexadecimal pattern in memory") + find_and_replace_hex_pattern_parser.add_argument("pkg", type=str, help="Package name or PID") + find_and_replace_hex_pattern_parser.add_argument("type", type=str, help="Data type") + find_and_replace_hex_pattern_parser.add_argument("speed_mode", type=bool, help="Speed mode") + find_and_replace_hex_pattern_parser.add_argument("workers", type=int, help="Number of workers") + find_and_replace_hex_pattern_parser.add_argument("search_pattern", type=str, + help="Hexadecimal pattern to search for") + find_and_replace_hex_pattern_parser.add_argument("replace_pattern", type=str, + help="Hexadecimal pattern to replace with") + find_and_replace_hex_pattern_parser.set_defaults(func=self.find_and_replace_hex_pattern) + + # Dump maps command + dump_maps_parser = subparsers.add_parser("dump_maps", help="Dump memory maps") + dump_maps_parser.add_argument("pkg", type=str, help="Package name or PID") + dump_maps_parser.add_argument("--path", type=str, help="Output path (default: current directory)", nargs="?", + default="./") + dump_maps_parser.set_defaults(func=self.dump_maps) + + # get pid command + get_pid_parser = subparsers.add_parser("get_pid", help="Return the pid of process") + get_pid_parser.add_argument("pkg", type=str, help="Package name") + get_pid_parser.set_defaults(func=self.get_pid) + + # Help command + help_parser = subparsers.add_parser("help", help="Display help information") + help_parser.add_argument("command", type=str, help="Command to get help for", nargs="?") + help_parser.set_defaults(func=self.display_help) + + # Get version + parser.add_argument("-v", "--version", help="Return Version of tool", action="store_true") + return parser.parse_args() + + @staticmethod + def display_help(args): + """ + Display help information for a command or general help information. + + Args: + args: The parsed command-line arguments. + + Returns: + None. + """ + + if args.command: + print("Help for command:", args.command) + print("-" * 50) + # Added help information for each command + if args.command == "read_value": + print("Command: read_value") + print("Description: Read a value from memory") + print("Usage: read_value ") + print("\nParameters:") + print(" Package name or PID") + print(" Data type") + print(" Speed mode (True or False)") + print(" Number of workers") + print(" Value to read") + elif args.command == "read_write_value": + print("Command: read_write_value") + print("Description: Read and write a value in memory") + print("Usage: read_write_value ") + print("\nParameters:") + print(" Package name or PID") + print(" Data type") + print(" Speed mode (True or False)") + print(" Number of workers") + print(" Value to read") + print(" Value to write") + elif args.command == "write_lib": + print("Command: write_lib") + print("Description: Write a value to a library") + print("Usage: write_lib ") + print("\nParameters:") + print(" Package name or PID") + print(" Data type") + print(" Base address") + print(" Offset") + print(" Value to write") + elif args.command == "read_lib": + print("Command: read_lib") + print("Description: Read a value from a library") + print("Usage: read_lib [--value ]") + print("\nParameters:") + print(" Package name or PID") + print(" Data type") + print(" Base address") + print(" Offset") + print(" --value Value to compare (optional)") + elif args.command == "refiner_address": + print("Command: refiner_address") + print("Description: Refine a list of addresses") + print("Usage: refiner_address ") + print("\nParameters:") + print(" Package name or PID") + print(" Data type") + print(" Speed mode (True or False)") + print(" Number of workers") + print(" List of addresses") + print(" Value to refine") + elif args.command == "get_module_base_address": + print("Command: get_module_base_address") + print("Description: Get the base address of a module") + print("Usage: get_module_base_address ") + print("\nParameters:") + print(" Package name or Process ID") + print(" Module name") + elif args.command == "raw_dump": + print("Command: raw_dump") + print("Description: Dump a library as raw binary") + print("Usage: raw_dump [] ") + print("\nParameters:") + print(" Package name or PID") + print(" Library name") + print(" Output path (default: current directory)") + elif args.command == "find_hex_pattern": + print("Command: find_hex_pattern") + print("Description: Find a hexadecimal pattern in memory") + print("Usage: find_hex_pattern ") + print("\nParameters:") + print(" Package name or PID") + print(" Data type") + print(" Speed mode (True or False)") + print(" Number of workers") + print(" Hexadecimal pattern to find") + elif args.command == "find_and_replace_hex_pattern": + print("Command: find_and_replace_hex_pattern") + print("Description: Find and replace a hexadecimal pattern in memory") + print( + "Usage: find_and_replace_hex_pattern ") + print("\nParameters:") + print(" Package name or PID") + print(" Data type") + print(" Speed mode (True or False)") + print(" Number of workers") + print(" Hexadecimal pattern to search for") + print(" Hexadecimal pattern to replace with") + elif args.command == "dump_maps": + print("Command: dump_maps") + print("Description: Dump memory maps") + print("Usage: dump_maps [--path ]") + print("\nParameters:") + print(" Package name or PID") + print(" --path Output path (default: current directory)") + elif args.command == "dump_maps": + print("Command: dump_maps") + print("Description: Dump memory maps") + print("Usage: dump_maps [--path ]") + print("\nParameters:") + print(" Package name or PID") + print(" --path Output path (default: current directory)") + + elif args.command == "get_pid": + print("Command: get_pid") + print("Description: Return pid of process") + print("Usage: get_pid ") + print("\nParameters:") + print(" Package name") + + else: + print("Error: Unknown command:", args.command) + return + else: + print("General help information") + print("-" * 50) + print("Available commands:") + print(" find_hex_pattern Find a hexadecimal pattern in memory") + print(" read_write_value Read and write a value in memory") + print(" raw_dump Dump a library as raw binary") + print(" read_lib Read a value from a library") + print(" write_lib Write a value to a library") + print(" refiner_address Refine a list of addresses") + print(" read_value Read a value from memory") + print(" get_pid Returns pid of process") + print(" dump_maps Dump memory maps") + print(" get_module_base_address Get the base address of a module") + print(" find_and_replace_hex_pattern Find and replace a hexadecimal pattern in memory") + + def cli_handler(self): + """ + Handle the command-line interface (CLI) for the Android Memory Tool. + + This function parses the command-line arguments using the `parse_arguments` method. + It then checks if the `--version` flag is set and calls the `get_version` + method to display the version information. + If no command is provided, it calls the `display_help` method to show general help information. + Otherwise, it calls the appropriate method based on the command specified in the arguments. + + Args: + self: The current instance of the `AndroidMemoryToolCLI` class. + + Returns: + None. + """ + args = self.parse_arguments() + + if args.version: + self.get_version() + elif args.command is None: + self.display_help(args) + else: + args.func(args) + +# def execute_cli(): +# from androidMemoryTool import AndroidMemoryTool +# cli_tool = AndroidMemoryToolCLI(AndroidMemoryTool) +# cli_tool.cli_handler() +# +# if __name__ == '__main__': +# execute_cli() diff --git a/androidMemoryTool/libs_read_writers.py b/androidMemoryTool/libs_read_writers.py index 4d2b018..9d7604f 100644 --- a/androidMemoryTool/libs_read_writers.py +++ b/androidMemoryTool/libs_read_writers.py @@ -1,6 +1,6 @@ """ - * date : 2022/03/23 - * Version : 0.4 + * date : 2023/07/11 + * Version : 0.5 * author : Abdul Moez (abdulmoez123456789@gmail.com) * Study : UnderGraduate in GCU Lahore, Pakistan * https://github.com/Anonym0usWork1221/android-memorytool @@ -9,12 +9,43 @@ # ! /usr/bin/env python - class LibsControllers: + """ + Class for controlling memory operations on a process. + + Methods: + write_lib_offsets(pid: str, base_address: str, offset: str, value: any) -> bool: + Writes a value to a specific memory address in the memory of a process. + + read_lib_offsets(pid: str, base_address: str, offset: str, buf: int) -> any: + Reads the value from a specific memory address in the memory of a process. + + address_refiners(pid: str, list_address: list, buf: int, changed_value: any) -> list: + Refines a list of memory addresses by checking if the value at each address matches a specified value. + + raw_dumper(pid: str, address: list) -> bytes: + Dumps a range of memory addresses from a process. + """ + + def __init__(self): + pass @staticmethod - def write_lib_offsets(pid: str, base_address: hex, offset: hex, value: any) -> bool: - write_address = int(str(base_address), 16) + int(str(offset), 16) + def write_lib_offsets(pid: str, base_address: str, offset: str, value: any) -> bool: + """ + Writes a value to a specific memory address in the memory of a process. + + Args: + pid (str): The process ID. + base_address (str): The base memory address in hexadecimal format. + offset (str): The offset from the base address in hexadecimal format. + value (any): The value to be written to the memory address. + + Returns: + bool: True if the write operation is successful, False otherwise. + """ + + write_address = int(base_address, 16) + int(offset, 16) mem_file = open(f"/proc/{pid}/mem", "rb+") try: mem_file.seek(write_address) @@ -22,12 +53,26 @@ def write_lib_offsets(pid: str, base_address: hex, offset: hex, value: any) -> b mem_file.close() return True except Exception as e: - print("[*] Exception ", e) + print(f"[-] Exception: {e}\n[-] This error mostly occur due to invalid type of parameters," + f" pass the base_address and offset as a hex string e.g: '0x5645c973ad44'") return False @staticmethod - def read_lib_offsets(pid: str, base_address: hex, offset: hex, buf: int) -> any: - read_address = int(str(base_address), 16) + int(str(offset), 16) + def read_lib_offsets(pid: str, base_address: str, offset: str, buf: int) -> any: + """ + Reads the value from a specific memory address in the memory of a process. + + Args: + pid (str): The process ID. + base_address (str): The base memory address in hexadecimal format. + offset (str): The offset from the base address in hexadecimal format. + buf (int): The number of bytes to read from the memory address. + + Returns: + any: The value read from the memory address. + """ + + read_address = int(base_address, 16) + int(offset, 16) mem_file = open(f"/proc/{pid}/mem", "rb+") try: mem_file.seek(read_address) @@ -35,20 +80,44 @@ def read_lib_offsets(pid: str, base_address: hex, offset: hex, buf: int) -> any: mem_file.close() return founded_value_on_address except Exception as e: - print("[*] Exception ", e) + print(f"[-] Exception: {e}\n[-] This error mostly occur due to invalid type of parameters," + f" pass the base_address and offset as a hex string e.g: '0x5645c973ad44'") return None def address_refiners(self, pid: str, list_address: list, buf: int, changed_value: any) -> list: + """ + Refines a list of memory addresses by checking if the value at each address matches a specified value. + + Args: + pid (str): The process ID. + list_address (list): List of memory addresses to be refined. + buf (int): The number of bytes to read from each memory address. + changed_value (any): The value to compare against the read values. + + Returns: + list: List of refined memory addresses where the read value matches the changed value. + """ + refined_address = [] for address in list_address: - read_value = self.read_lib_offsets(pid, 0x0, address, buf) + read_value = self.read_lib_offsets(pid, '0x0', address, buf) if read_value == changed_value: refined_address.append(address) - return refined_address @staticmethod def raw_dumper(pid: str, address: list) -> bytes: + """ + Dumps a range of memory addresses from a process. + + Args: + pid (str): The process ID. + address (list): List of memory address ranges to dump. + + Returns: + bytes: The dumped bytes from the specified memory address range. + """ + start_addr = int(address[0][0], 16) end_addr = int(address[len(address) - 1][1], 16) mem_file = open(f"/proc/{pid}/mem", "rb+") diff --git a/androidMemoryTool/mapping.py b/androidMemoryTool/mapping.py index 066379d..1a289fe 100644 --- a/androidMemoryTool/mapping.py +++ b/androidMemoryTool/mapping.py @@ -1,37 +1,61 @@ """ - * date : 2022/03/23 - * Version : 0.4 + * date : 2023/07/11 + * Version : 0.5 * author : Abdul Moez (abdulmoez123456789@gmail.com) * Study : UnderGraduate in GCU Lahore, Pakistan * https://github.com/Anonym0usWork1221/android-memorytool """ -# Mapping Structure +""" +Mapping Structure: + +The mapping structure provides information about the memory regions allocated to a process, including their +permissions, offsets, devices, inodes, and pathnames. + +Mapping Format: + [address] [perms] [offset] [device] [inode] [pathname] + +Mapping Fields: + - address: The range of memory addresses covered by the mapping. + - perms: The permissions of the memory region (e.g., read, write, execute). + - offset: The offset of the mapping. + - device: The device associated with the mapping. + - inode: The inode of the file associated with the mapping (if applicable). + - pathname: The pathname of the file associated with the mapping (if applicable). + +Mapping Tags: + - O->others:others: Anonymous mappings without specific tags (e.g., --p, anon, system/framework, etc.). + - Jh->:rw-p, anon:dalvik-main space: Dalvik heap mappings in the Java space. + - Xs->system: --xp, lib, .so, system: System libraries and shared objects mappings. + - J->Java: rw-p, javalib: Java library mappings. + - ch -> c++heap: C++ heap mappings. + - ca -> c++ alloc: Anonymous C++ allocations mappings. + - cd -> c++ data: C++ data mappings. + - cb -> c++ bss: C++ BSS (Block Started by Symbol) mappings. + - PS -> PPSSPP: PPSSPP emulator mappings. + - A -> Anonymous: Anonymous mappings with read-write permissions only. + - S -> Stack: Stack mappings. + - As -> Ashmem: Ashmem (Anonymous Shared Memory) mappings. + - V -> Video: Video mappings. + - B -> Bad(dang): Bad or dangerous mappings. + - Xa -> code app:r-xp, lib, .so, .jar: Application code mappings (read-executable). + +Order of Fields in map.txt: + ['address', 'perms', 'offset', 'device', 'inode', 'pathname'] + +Example: + ['76a01ae000-76a01af000', 'r--p', '00000000', '00:00', '26', '[anon:atexit handler]'] + + The mapping above represents an anonymous mapping with read-only permissions, no offset, device '00:00', + inode '26', and no associated file pathname. -''' -O->others:others -> --p, anon, system/framework, etc -Jh->:rw-p, anon:dalvik-main space -Xs->system: --xp, lib, .so, system -J->Java: rw-p, javalib -ch -> c++heap: -ca -> c++ alloc: anon, key -> anon:libc_malloc, rw-p, rwxp * -cd -> c++ data: * -cb -> c++ bss: -PS -> PPSSPP: -A -> Anonymous: just rw-p * -S -> Stack: -As -> Ashmem: -V -> Video: -B -> Bad(dang): -Xa -> code app:r-xp, lib, .so, .jar -''' +""" # order ''' - address perms offset device inode pathname - - 0 1 2 3 4 5 + address perms offset device inode pathname + 0 1 2 3 4 5 ['76a01ae000-76a01af000', 'r--p', '00000000', '00:00', '26', '[anon:atexit handler]'] A -> 4[] @@ -39,284 +63,326 @@ # ! /usr/bin/env python - class Mapping: + """Class for extracting memory mappings of a process using /proc/[pid]/maps file.""" + + def __init__(self): + pass + + @staticmethod + def _read_map_file(pid: str): + """ + Reads the mapping file for the specified process ID (pid). + Args: + pid (str): The process ID. + + Returns: + list: A list of strings representing the lines read from the mapping file. + """ + + with open(f"/proc/{pid}/maps", "r") as map_file: + return map_file.readlines() + + @staticmethod + def _parse_mapping_line(line: str): + """ + Parses a mapping line and extracts relevant information. + Args: + line (str): A line from the mapping file. + + Returns: + tuple: A tuple containing temporary details, region, and permissions extracted from the line. + """ + temp_details = line.split(" ", 5) + region = temp_details[-1] + perm = temp_details[1] + return temp_details, region, perm + + @staticmethod + def _filter_mappings(lines: list, condition): + """ + Filters the mappings based on a given condition. + Args: + lines (list): A list of strings representing the lines read from the mapping file. + condition: A lambda function that takes temporary details, region, and permissions as input + and returns True or False based on the condition. + + Returns: + dict: A dictionary containing filtered mapping details with keys 'address', 'permissions', + and 'allocated'. + """ + + details = {"address": [], "permissions": [], "allocated": []} + for line in lines: + temp_details, region, perm = Mapping._parse_mapping_line(line) + if condition(temp_details, region, perm): + details["address"].append(temp_details[0]) + details["permissions"].append(temp_details[1]) + details["allocated"].append(region) + return details + @staticmethod - # mapping XA def mapping_xa(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_xa = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] - - # Xa -> code app:r-xp, lib, .so, .jar - if ("lib" in region) and ("rw" in perm) and ( - ".so" in region) and ("system" in region): - details_xa["address"].append(temp_details[0]) - details_xa["permissions"].append(temp_details[1]) - details_xa["allocated"].append(region) - - map_file.close() - return details_xa + """ + Returns the mappings with specific conditions related to libraries in the system. + + Args: + pid (str): The process ID. + + Returns: + dict: A dictionary containing mapping details for libraries with specific conditions. + """ + + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: \ + ("lib" in region) and ("rw" in perm) and (".so" in region) and ("system" in region) + return Mapping._filter_mappings(lines, condition) @staticmethod def mapping_dump_libs(pid: str, lib_name: str) -> list: - map_file = open(f"/proc/{pid}/maps", "r") + """ + Returns the addresses of pages belonging to a specific library in the process. + + Args: + pid (str): The process ID. + lib_name (str): The name of the library. + + Returns: + list: A list of addresses belonging to the specified library. + """ + + lines = Mapping._read_map_file(pid) address = [] - for line in map_file.readlines(): + for line in lines: page = line.split() - module = page[len(page) - 1] + module = page[-1] if lib_name in module: address.append(page[0].split('-')) return address @staticmethod - # mapping CA def mapping_ca(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_ca = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] - - # ca anon, key -> anon:libc_malloc, rw-p, rwxp - if ("anon" in region) and ("rw" in perm) and ("libc_malloc" in region): - details_ca["address"].append(temp_details[0]) - details_ca["permissions"].append(temp_details[1]) - details_ca["allocated"].append(region) - - map_file.close() - return details_ca + """ + Returns the mappings with specific conditions related to libc_malloc and anonymous regions. + + Args: + pid (str): The process ID. + + Returns: + dict: A dictionary containing mapping details for libc_malloc and anonymous regions. + """ + + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: \ + ("anon" in region) and ("rw" in perm) and ("libc_malloc" in region) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping A def mapping_a(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_a = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] - - # A -> Anonymous: just rw-p - if (len(line) < 46) and ("rw" in perm): - details_a["address"].append(temp_details[0]) - details_a["permissions"].append(temp_details[1]) - details_a["allocated"].append(region) - - map_file.close() - return details_a - - # ----------------------------------------- New maps---------------------------- + """ + Returns the mappings with specific conditions related to regions with fewer than 46 details and + read-write permissions. + + Args: + pid (str): The process ID. + + Returns: + dict: A dictionary containing mapping details for regions with specific conditions. + """ + + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: (len(temp_details) < 46) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) + @staticmethod - # mapping B = bad def mapping_b(pid: str) -> dict: - """B->""" - - map_file = open(f"/proc/{pid}/maps", "r") - details_o = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] + """ + Returns the mappings with specific conditions related to regions containing 'fonts' in the name + and read-write permissions. - # B + Args: + pid (str): The process ID. - if ("fonts" in region) and ("rw" in perm): - details_o["address"].append(temp_details[0]) - details_o["permissions"].append(temp_details[1]) - details_o["allocated"].append(region) + Returns: + dict: A dictionary containing mapping details for regions with specific conditions. + """ - map_file.close() - return details_o + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: ("fonts" in region) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping CB = c++ bss def mapping_cb(pid: str) -> dict: - """CB->""" - - map_file = open(f"/proc/{pid}/maps", "r") - details_cb = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] + """ + Returns the mappings with specific conditions related to the '[anon:.bss]' region and + read-write permissions. - # CB + Args: + pid (str): The process ID. - if ("[anon:.bss]" in region) and ("rw" in perm): - details_cb["address"].append(temp_details[0]) - details_cb["permissions"].append(temp_details[1]) - details_cb["allocated"].append(region) + Returns: + dict: A dictionary containing mapping details for the specified region. + """ - map_file.close() - return details_cb + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: ("[anon:.bss]" in region) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping CD = c++ data def mapping_cd(pid: str) -> dict: - """CD->""" + """ + Returns the mappings with specific conditions related to regions starting with '/data/app/' and + read-write permissions. - map_file = open(f"/proc/{pid}/maps", "r") - details_cd = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] + Args: + pid (str): The process ID. - # CD + Returns: + dict: A dictionary containing mapping details for the specified regions. + """ - if ("/data/app/" in region) and ("rw" in perm): - details_cd["address"].append(temp_details[0]) - details_cd["permissions"].append(temp_details[1]) - details_cd["allocated"].append(region) - - map_file.close() - return details_cd + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: ("/data/app/" in region) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping CH = C++ heap def mapping_ch(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_ch = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] + """ + Returns the mappings with specific conditions related to the '[heap]' region and + read-write permissions. + + Args: + pid (str): The process ID. - if ("[heap]" in region) and ("rw" in perm): - details_ch["address"].append(temp_details[0]) - details_ch["permissions"].append(temp_details[1]) - details_ch["allocated"].append(region) + Returns: + dict: A dictionary containing mapping details for the specified region. + """ - map_file.close() - return details_ch + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: ("[heap]" in region) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping JH = Java heap def mapping_jh(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_jh = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] + """ + Returns the mappings with specific conditions related to regions starting with '/dev/ashmem/' and + read-write permissions. - if ("/dev/ashmem/" in region) and ("rw" in perm): - details_jh["address"].append(temp_details[0]) - details_jh["permissions"].append(temp_details[1]) - details_jh["allocated"].append(region) + Args: + pid (str): The process ID. - map_file.close() - return details_jh + Returns: + dict: A dictionary containing mapping details for the specified regions. + """ + + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: ("/dev/ashmem/" in region) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping XS = Code system def mapping_xs(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_xs = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] + """ + Returns the mappings with specific conditions related to regions starting with '/system' and + read-write permissions. + + Args: + pid (str): The process ID. - if ("/system" in region) and ("rw" in perm): - details_xs["address"].append(temp_details[0]) - details_xs["permissions"].append(temp_details[1]) - details_xs["allocated"].append(region) + Returns: + dict: A dictionary containing mapping details for the specified regions. + """ - map_file.close() - return details_xs + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: ("/system" in region) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping S = Stack def mapping_s(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_s = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] + """ + Returns the mappings with specific conditions related to the '[stack]' region and + read-write permissions. + + Args: + pid (str): The process ID. - if ("[stack]" in region) and ("rw" in perm): - details_s["address"].append(temp_details[0]) - details_s["permissions"].append(temp_details[1]) - details_s["allocated"].append(region) + Returns: + dict: A dictionary containing mapping details for the specified region. + """ - map_file.close() - return details_s + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: ("[stack]" in region) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping AS = Ashmem def mapping_as(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_as = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] + """ + Returns the mappings with specific conditions related to regions starting with '/dev/ashmem/' and + containing 'dalvik' in the name and read-write permissions. - if ("/dev/ashmem/" in region and "dalvik" in region) and ("rw" in perm): - details_as["address"].append(temp_details[0]) - details_as["permissions"].append(temp_details[1]) - details_as["allocated"].append(region) + Args: + pid (str): The process ID. - map_file.close() - return details_as + Returns: + dict: A dictionary containing mapping details for the specified regions. + """ + + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: \ + ("/dev/ashmem/" in region and "dalvik" in region) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping J = Java def mapping_j(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_j = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] + """ + Returns the mappings with specific conditions related to regions containing 'javalib' and + 'dalvik' in the name and read-write permissions. + + Args: + pid (str): The process ID. - if ("javalib" in region and "dalvik" in region) and ("rw" in perm): - details_j["address"].append(temp_details[0]) - details_j["permissions"].append(temp_details[1]) - details_j["allocated"].append(region) + Returns: + dict: A dictionary containing mapping details for the specified regions. + """ - map_file.close() - return details_j + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: \ + ("javalib" in region and "dalvik" in region) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping V = video def mapping_v(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_v = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split(" ", 5) - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] + """ + Returns the mappings with specific conditions related to regions containing 'kgsl-3d0' in the name + and read-write permissions. + + Args: + pid (str): The process ID. - if ("kgsl-3d0" in region) and ("rw" in perm): - details_v["address"].append(temp_details[0]) - details_v["permissions"].append(temp_details[1]) - details_v["allocated"].append(region) + Returns: + dict: A dictionary containing mapping details for the specified regions. + """ - map_file.close() - return details_v + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: ("kgsl-3d0" in region) and ("rw" in perm) + return Mapping._filter_mappings(lines, condition) @staticmethod - # mapping ALL def mapping_all(pid: str) -> dict: - map_file = open(f"/proc/{pid}/maps", "r") - details_all = {"address": [], "permissions": [], "allocated": []} - for line in map_file.readlines(): - temp_details = line.split() - region = temp_details[len(temp_details) - 1] - perm = temp_details[1] - - if "rw" in perm: - details_all["address"].append(temp_details[0]) - details_all["permissions"].append(temp_details[1]) - details_all["allocated"].append(region) - - map_file.close() - return details_all + """ + Returns all the mappings with read-write permissions. + + Args: + pid (str): The process ID. + + Returns: + dict: A dictionary containing all mapping details with keys 'address', 'permissions', + and 'allocated'. + """ + + lines = Mapping._read_map_file(pid) + condition = lambda temp_details, region, perm: "rw" in perm + return Mapping._filter_mappings(lines, condition) + + +# if __name__ == '__main__': +# print(Mapping().mapping_a(pid="9628")) diff --git a/androidMemoryTool/search_and_readers.py b/androidMemoryTool/search_and_readers.py index f67ee70..8ddf9fc 100644 --- a/androidMemoryTool/search_and_readers.py +++ b/androidMemoryTool/search_and_readers.py @@ -1,6 +1,6 @@ """ - * date : 2022/03/23 - * Version : 0.4 + * date : 2023/07/11 + * Version : 0.5 * author : Abdul Moez (abdulmoez123456789@gmail.com) * Study : UnderGraduate in GCU Lahore, Pakistan * https://github.com/Anonym0usWork1221/android-memorytool @@ -8,12 +8,22 @@ """ # ! /usr/bin/env python -import re -import queue from functools import wraps +from queue import Queue +from re import finditer def store_in_queue(function) -> any: + """ + Decorator function that stores the return value of the decorated function in a queue. + + Args: + function: The function to be decorated. + + Returns: + any: The return value of the decorated function. + """ + @wraps(function) def wrapper(self, *args): self._returned_v.put(function(self, *args)) @@ -22,16 +32,65 @@ def wrapper(self, *args): class SearchAndRead: - _returned_v = queue.Queue() + """ + Class for searching and reading memory values in a process. + + Methods: + get_readers_values() -> list: + Retrieves the values stored in the queue. + + reset_queue() -> None: + Resets the queue to remove all stored values. + + speed_search_and_read_text(pid: str, address_list: list, read: any) -> tuple[list[str], int]: + Searches and reads text values from memory addresses in a process (optimized). + + speed_search_and_read(pid: str, address_list: list, buf: int, read: any) -> tuple[list[str], int]: + Searches and reads binary values from memory addresses in a process (optimized). + + search_and_read(pid: str, address_list: list, buf: int, read: any) -> list: + Searches and reads binary values from memory addresses in a process. + + search_and_read_text(pid: str, address_list: list, read: any) -> list: + Searches and reads text values from memory addresses in a process. + """ + + def __init__(self): + pass + + _returned_v = Queue() def get_readers_values(self) -> list: + """ + Retrieves the values stored in the queue. + + Returns: + list: List of stored values. + """ + return self._returned_v.get() def reset_queue(self) -> None: + """ + Resets the queue to remove all stored values. + """ + self._returned_v.queue.clear() @store_in_queue def speed_search_and_read_text(self, pid: str, address_list: list, read: any) -> tuple[list[str], int]: + """ + Searches and reads text values from memory addresses in a process (optimized). + + Args: + pid (str): The process ID. + address_list (list): List of memory address ranges to search and read. + read (any): The text value to search for. + + Returns: + tuple[list[str], int]: Tuple containing a list of found offsets and the total number of values found. + """ + mem_file = open(f"/proc/{pid}/mem", "rb+") total_values_found = 0 offsets = [] @@ -43,10 +102,10 @@ def speed_search_and_read_text(self, pid: str, address_list: list, read: any) -> mem_file.seek(start_addr) total_bytes = end_addr - start_addr byte_strings = mem_file.read(total_bytes) - occurrence = [_.start() for _ in re.finditer(read, byte_strings)] + occurrence = [_.start() for _ in finditer(read, byte_strings)] for offset in occurrence: - offsets.append(hex(start_addr + offset)) + offsets.append(hex(offset)) # fixed illegal calculation of hex value total_values_found += 1 mem_file.close() @@ -54,6 +113,19 @@ def speed_search_and_read_text(self, pid: str, address_list: list, read: any) -> @store_in_queue def speed_search_and_read(self, pid: str, address_list: list, buf: int, read: any) -> tuple[list[str], int]: + """ + Searches and reads binary values from memory addresses in a process (optimized). + + Args: + pid (str): The process ID. + address_list (list): List of memory address ranges to search and read. + buf (int): The number of bytes to read at once. + read (any): The binary value to search for. + + Returns: + tuple[list[str], int]: Tuple containing a list of found offsets and the total number of values found. + """ + mem_file = open(f"/proc/{pid}/mem", "rb+") offsets = [] @@ -73,7 +145,7 @@ def speed_search_and_read(self, pid: str, address_list: list, buf: int, read: an if read == read_byte: address_list = mem_file.seek(-buf, 1) mem_file.read(buf) - offsets.append(hex(start_addr + address_list)) + offsets.append(hex(address_list)) # fixed illegal calculation of hex value total_values_found += 1 except Exception as e: print("[*] Exception ", e) @@ -83,6 +155,19 @@ def speed_search_and_read(self, pid: str, address_list: list, buf: int, read: an @staticmethod def search_and_read(pid: str, address_list: list, buf: int, read: any) -> list: + """ + Searches and reads binary values from memory addresses in a process. + + Args: + pid (str): The process ID. + address_list (list): List of memory address ranges to search and read. + buf (int): The number of bytes to read at once. + read (any): The binary value to search for. + + Returns: + list: List containing a list of found offsets and the total number of values found. + """ + mem_file = open(f"/proc/{pid}/mem", "rb+") offsets = [] @@ -102,7 +187,7 @@ def search_and_read(pid: str, address_list: list, buf: int, read: any) -> list: if read == read_byte: address_list = mem_file.seek(-buf, 1) mem_file.read(buf) - offsets.append(hex(start_addr + address_list)) + offsets.append(hex(address_list)) # fixed illegal calculation of hex value total_values_found += 1 except Exception as e: print("[*] Exception ", e) @@ -112,6 +197,18 @@ def search_and_read(pid: str, address_list: list, buf: int, read: any) -> list: @staticmethod def search_and_read_text(pid: str, address_list: list, read: any) -> list: + """ + Searches and reads text values from memory addresses in a process. + + Args: + pid (str): The process ID. + address_list (list): List of memory address ranges to search and read. + read (any): The text value to search for. + + Returns: + list: List containing a list of found offsets and the total number of values found. + """ + mem_file = open(f"/proc/{pid}/mem", "rb+") total_values_found = 0 offsets = [] @@ -123,10 +220,10 @@ def search_and_read_text(pid: str, address_list: list, read: any) -> list: mem_file.seek(start_addr) total_bytes = end_addr - start_addr byte_strings = mem_file.read(total_bytes) - occurrence = [_.start() for _ in re.finditer(read, byte_strings)] + occurrence = [_.start() for _ in finditer(read, byte_strings)] for offset in occurrence: - offsets.append(hex(start_addr + offset)) + offsets.append(hex(offset)) # fixed illegal calculation of hex value total_values_found += 1 mem_file.close() diff --git a/androidMemoryTool/search_and_writers.py b/androidMemoryTool/search_and_writers.py index 63303c5..5608609 100644 --- a/androidMemoryTool/search_and_writers.py +++ b/androidMemoryTool/search_and_writers.py @@ -1,6 +1,6 @@ """ - * date : 2022/03/23 - * Version : 0.4 + * date : 2023/07/11 + * Version : 0.5 * author : Abdul Moez (abdulmoez123456789@gmail.com) * Study : UnderGraduate in GCU Lahore, Pakistan * https://github.com/Anonym0usWork1221/android-memorytool @@ -8,12 +8,22 @@ """ # ! /usr/bin/env python -import re -import queue from functools import wraps +from queue import Queue +from re import finditer def store_in_queue(function) -> any: + """ + Decorator function that stores the return value of the decorated function in a queue. + + Args: + function: The function to be decorated. + + Returns: + any: The return value of the decorated function. + """ + @wraps(function) def wrapper(self, *args): self._returned_values.put(function(self, *args)) @@ -21,19 +31,68 @@ def wrapper(self, *args): class SearchAndWrite: - _returned_values = queue.Queue() + """ + Class for searching and writing memory values in a process. + + Methods: + get_writer_values() -> list: + Retrieves the values stored in the queue. + + reset_queue() -> None: + Resets the queue to remove all stored values. + + speed_search_and_write(pid: str, address_list: list, buf: int, read: any, write: any) -> int: + Searches and writes binary values in memory addresses of a process (optimized). + + speed_search_and_write_text(pid: str, address_list: list, read: any, write: any) -> int: + Searches and writes text values in memory addresses of a process (optimized). + + search_and_write(pid: str, address_list: list, buf: int, read: any, write: any) -> int: + Searches and writes binary values in memory addresses of a process. + + search_and_write_text(pid: str, address_list: list, read: any, write: any) -> int: + Searches and writes text values in memory addresses of a process. + """ + + def __init__(self): + pass + + _returned_values = Queue() def get_writer_values(self) -> list: + """ + Retrieves the values stored in the queue. + + Returns: + list: List of stored values. + """ return self._returned_values.get() def reset_queue(self) -> None: + """ + Resets the queue to remove all stored values. + """ self._returned_values.queue.clear() @store_in_queue def speed_search_and_write(self, pid: str, address_list: list, buf: int, read: any, write: any) -> int: + """ + Searches and writes binary values in memory addresses of a process (optimized). + + Args: + pid (str): The process ID. + address_list (list): List of memory address ranges to search and write. + buf (int): The number of bytes to read at once. + read (any): The binary value to search for. + write (any): The binary value to write. + + Returns: + int: The total number of values replaced. + """ + mem_file = open(f"/proc/{pid}/mem", "rb+") total_values_replaced = 0 - + replaced_addresses = [] for address in address_list: partitions = address.split("-") start_addr = int(partitions[0], 16) @@ -47,18 +106,30 @@ def speed_search_and_write(self, pid: str, address_list: list, buf: int, read: a read_byte = mem_file.read(buf) if read == read_byte: addr = mem_file.seek(-buf, 1) - print(addr) mem_file.write(write) + replaced_addresses.append(hex(addr)) total_values_replaced += 1 except Exception as e: print("[*] Exception ", e) mem_file.close() - return total_values_replaced @store_in_queue def speed_search_and_write_text(self, pid: str, address_list: list, read: any, write: any) -> int: + """ + Searches and writes text values in memory addresses of a process (optimized). + + Args: + pid (str): The process ID. + address_list (list): List of memory address ranges to search and write. + read (any): The text value to search for. + write (any): The text value to write. + + Returns: + int: The total number of values replaced. + """ + mem_file = open(f"/proc/{pid}/mem", "rb+") total_values_replaced = 0 for address in address_list: @@ -68,7 +139,7 @@ def speed_search_and_write_text(self, pid: str, address_list: list, read: any, w mem_file.seek(start_addr) total_bytes = end_addr - start_addr byte_strings = mem_file.read(total_bytes) - occurrence = [_.start() for _ in re.finditer(read, byte_strings)] + occurrence = [_.start() for _ in finditer(read, byte_strings)] for offset in occurrence: mem_file.seek(start_addr + offset) @@ -80,6 +151,20 @@ def speed_search_and_write_text(self, pid: str, address_list: list, read: any, w @staticmethod def search_and_write(pid: str, address_list: list, buf: int, read: any, write: any) -> int: + """ + Searches and writes binary values in memory addresses of a process. + + Args: + pid (str): The process ID. + address_list (list): List of memory address ranges to search and write. + buf (int): The number of bytes to read at once. + read (any): The binary value to search for. + write (any): The binary value to write. + + Returns: + int: The total number of values replaced. + """ + mem_file = open(f"/proc/{pid}/mem", "rb+") total_values_replaced = 0 @@ -107,6 +192,19 @@ def search_and_write(pid: str, address_list: list, buf: int, read: any, write: a @staticmethod def search_and_write_text(pid: str, address_list: list, read: any, write: any) -> int: + """ + Searches and writes text values in memory addresses of a process. + + Args: + pid (str): The process ID. + address_list (list): List of memory address ranges to search and write. + read (any): The text value to search for. + write (any): The text value to write. + + Returns: + int: The total number of values replaced. + """ + mem_file = open(f"/proc/{pid}/mem", "rb+") total_values_replaced = 0 for address in address_list: @@ -116,7 +214,7 @@ def search_and_write_text(pid: str, address_list: list, read: any, write: any) - mem_file.seek(start_addr) total_bytes = end_addr - start_addr byte_strings = mem_file.read(total_bytes) - occurrence = [_.start() for _ in re.finditer(read, byte_strings)] + occurrence = [_.start() for _ in finditer(read, byte_strings)] for offset in occurrence: mem_file.seek(start_addr + offset) diff --git a/setup.cfg b/setup.cfg index 33718ca..d8c3467 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,5 @@ [metadata] description-file=README.md license_files=LICENSE.md + + diff --git a/setup.py b/setup.py index 100b030..36ccf77 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name='androidMemoryTool', - version='0.4', + version='0.5', packages=["androidMemoryTool"], license="GPL-3.0 license", author='Abdul Moez', @@ -21,6 +21,10 @@ ('', ['LICENSE.md']), ], install_requires=['wheel'], + entry_points={ + 'console_scripts': ['amt=androidMemoryTool.__main__:execute_cli'] + }, + python_requires=">=3.0", classifiers=[ "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", "Operating System :: OS Independent",