Source code for rpcclient.symbol

import ctypes
import os
import struct
from contextlib import contextmanager
from typing import List

from capstone import CS_ARCH_ARM64, CS_ARCH_X86, CS_MODE_64, CS_MODE_LITTLE_ENDIAN, Cs, CsInsn
from construct import Container

from rpcclient.protocol import arch_t
from rpcclient.structs.generic import Dl_info

ADDRESS_SIZE_TO_STRUCT_FORMAT = {1: 'B', 2: 'H', 4: 'I', 8: 'Q'}
RETVAL_BIT_COUNT = 64


[docs] class Symbol(int): """ wrapper for a remote symbol object """ PROXY_METHODS = ['peek', 'poke']
[docs] @classmethod def create(cls, value: int, client): """ Create a Symbol object. :param value: Symbol address. :param rpcclient.darwin_client.Client client: client. :return: Symbol object. :rtype: Symbol """ if not isinstance(value, int): raise TypeError() value &= 0xFFFFFFFFFFFFFFFF symbol = cls(value) symbol._prepare(client) return symbol
def _clone_from_value(self, value: int): return self.create(value, self._client) def _prepare(self, client): self.retval_bit_count = RETVAL_BIT_COUNT self.is_retval_signed = True self.item_size = 8 # private members self._client = client self._offset = 0 for method_name in Symbol.PROXY_METHODS: getattr(self.__class__, method_name).__doc__ = \ getattr(client, method_name).__doc__
[docs] @contextmanager def change_item_size(self, new_item_size: int): """ Temporarily change item size :param new_item_size: Temporary item size """ save_item_size = self.item_size self.item_size = new_item_size try: yield finally: self.item_size = save_item_size
[docs] def peek(self, count): return self._client.peek(self, count)
[docs] def poke(self, buf): return self._client.poke(self, buf)
[docs] def peek_str(self, encoding='utf-8') -> str: """ peek string at given address """ return self.peek(self._client.symbols.strlen(self)).decode(encoding)
[docs] def close(self): """ Construct compliance. """ pass
[docs] def seek(self, offset, whence): """ Construct compliance. """ if whence == os.SEEK_CUR: self._offset += offset elif whence == os.SEEK_SET: self._offset = offset - self else: raise IOError('Unsupported whence')
[docs] def read(self, count: int): """ Construct compliance. """ val = (self + self._offset).peek(count) self._offset += count return val
[docs] def write(self, buf): """ Construct compliance. """ val = (self + self._offset).poke(buf) self._offset += len(buf) return val
[docs] def tell(self): """ Construct compliance. """ return self + self._offset
[docs] def disass(self, size=40) -> List[CsInsn]: """ peek disassembled lines of 'size' bytes """ if self._client.arch == arch_t.ARCH_ARM64: return list(Cs(CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN).disasm(self.peek(size), self)) else: # assume x86_64 by default return list(Cs(CS_ARCH_X86, CS_MODE_LITTLE_ENDIAN | CS_MODE_64).disasm(self.peek(size), self))
@property def c_int64(self) -> int: """ cast to c_int64 """ return ctypes.c_int64(self).value @property def c_uint64(self) -> int: """ cast to c_uint64 """ return ctypes.c_uint64(self).value @property def c_int32(self) -> int: """ cast to c_int32 """ return ctypes.c_int32(self).value @property def c_uint32(self) -> int: """ cast to c_uint32 """ return ctypes.c_uint32(self).value @property def c_int16(self) -> int: """ cast to c_int16 """ return ctypes.c_int16(self).value @property def c_uint16(self) -> int: """ cast to c_uint16 """ return ctypes.c_uint16(self).value @property def c_bool(self) -> bool: """ cast to c_bool """ return ctypes.c_bool(self).value @property def dl_info(self) -> Container: dl_info = Dl_info(self._client) sizeof = dl_info.sizeof() with self._client.safe_malloc(sizeof) as info: if 0 == self._client.symbols.dladdr(self, info): self._client.raise_errno_exception(f'failed to extract info for: {self}') return dl_info.parse_stream(info) @property def name(self) -> str: return self.dl_info.dli_sname @property def filename(self) -> str: return self.dl_info.dli_fname def __add__(self, other): try: return self._clone_from_value(int(self) + other) except TypeError: return int(self) + other def __radd__(self, other): return self.__add__(other) def __sub__(self, other): try: return self._clone_from_value(int(self) - other) except TypeError: return int(self) - other def __rsub__(self, other): try: return self._clone_from_value(other - int(self)) except TypeError: return other - int(self) def __mul__(self, other): try: return self._clone_from_value(int(self) * other) except TypeError: return int(self) * other def __rmul__(self, other): return self.__mul__(other) def __truediv__(self, other): return self._clone_from_value(int(self) / other) def __floordiv__(self, other): return self._clone_from_value(int(self) // other) def __mod__(self, other): return self._clone_from_value(int(self) % other) def __and__(self, other): return self._clone_from_value(int(self) & other) def __or__(self, other): return self._clone_from_value(int(self) | other) def __xor__(self, other): return self._clone_from_value(int(self) ^ other) def __getitem__(self, item): fmt = ADDRESS_SIZE_TO_STRUCT_FORMAT[self.item_size] addr = self + item * self.item_size return self._clone_from_value( struct.unpack(self._client._endianness + fmt, self._client.peek(addr, self.item_size))[0]) def __setitem__(self, item, value): fmt = ADDRESS_SIZE_TO_STRUCT_FORMAT[self.item_size] value = struct.pack(self._client._endianness + fmt, int(value)) self._client.poke(self + item * self.item_size, value) def __repr__(self): return f'<{type(self).__name__}: {hex(self)}>' def __str__(self): return hex(self) def __call__(self, *args, **kwargs): return self._client.call(self, args, **kwargs)