import dataclasses
import errno
import logging
import posixpath
import re
import struct
from collections import namedtuple
from datetime import datetime
from pathlib import Path
from typing import List, Mapping, Optional
from cached_property import cached_property
from construct import Array, Container, Int32ul
from parameter_decorators import path_to_str
from rpcclient.darwin.consts import TASK_DYLD_INFO, VM_FLAGS_ANYWHERE, ARMThreadFlavors, x86_THREAD_STATE64
from rpcclient.darwin.structs import ARM_THREAD_STATE64_COUNT, FAT_CIGAM, FAT_MAGIC, LOAD_COMMAND_TYPE, MAXPATHLEN, \
PROC_PIDFDPIPEINFO, PROC_PIDFDSOCKETINFO, PROC_PIDFDVNODEPATHINFO, PROC_PIDLISTFDS, PROC_PIDTASKALLINFO, \
PROX_FDTYPE_KQUEUE, PROX_FDTYPE_PIPE, PROX_FDTYPE_SOCKET, PROX_FDTYPE_VNODE, TASK_DYLD_INFO_COUNT, \
all_image_infos_t, arm_thread_state64_t, dyld_image_info_t, fat_header, mach_header_t, pid_t, pipe_info, \
proc_fdinfo, proc_taskallinfo, procargs2_t, so_family_t, so_kind_t, socket_fdinfo, task_dyld_info_data_t, \
vnode_fdinfowithpath, x86_thread_state64_t
from rpcclient.darwin.symbol import DarwinSymbol
from rpcclient.exceptions import ArgumentError, BadReturnValueError, MissingLibraryError, ProcessSymbolAbsentError, \
RpcClientException, SymbolAbsentError
from rpcclient.processes import Processes
from rpcclient.protocol import arch_t
from rpcclient.structs.consts import RTLD_NOW, SEEK_SET, SIGTERM
from rpcclient.symbol import ADDRESS_SIZE_TO_STRUCT_FORMAT, Symbol
from rpcclient.sysctl import CTL, KERN
_CF_STRING_ARRAY_PREFIX_LEN = len(' "')
_CF_STRING_ARRAY_SUFFIX_LEN = len('",')
_BACKTRACE_FRAME_REGEX = re.compile(r'\[\s*(\d+)\] (0x[0-9a-f]+)\s+\{(.+?) \+ (.+?)\} (.*)')
FdStruct = namedtuple('FdStruct', 'fd struct')
logger = logging.getLogger(__name__)
CHUNK_SIZE = 1024 * 64
APP_SUFFIX = '.app/'
[docs]
@dataclasses.dataclass()
class Fd:
fd: int
[docs]
@dataclasses.dataclass()
class KQueueFd(Fd):
pass
[docs]
@dataclasses.dataclass()
class PipeFd(Fd):
pass
[docs]
@dataclasses.dataclass()
class FileFd(Fd):
path: str
[docs]
@dataclasses.dataclass()
class UnixFd(Fd):
path: str
[docs]
@dataclasses.dataclass()
class SocketFd(Fd):
pass
[docs]
@dataclasses.dataclass()
class Ipv4SocketFd(SocketFd):
local_address: str
local_port: int
remote_address: str
remote_port: int # when remote 0, the socket is for listening
[docs]
@dataclasses.dataclass()
class Ipv6SocketFd(SocketFd):
local_address: str
local_port: int
remote_address: str
remote_port: int # when remote 0, the socket is for listening
[docs]
@dataclasses.dataclass()
class Ipv4TcpFd(Ipv4SocketFd):
pass
[docs]
@dataclasses.dataclass()
class Ipv6TcpFd(Ipv6SocketFd):
pass
[docs]
@dataclasses.dataclass()
class Ipv4UdpFd(Ipv4SocketFd):
pass
[docs]
@dataclasses.dataclass()
class Ipv6UdpFd(Ipv6SocketFd):
pass
Image = namedtuple('Image', 'address path')
SOCKET_TYPE_DATACLASS = {
so_family_t.AF_INET: {
so_kind_t.SOCKINFO_TCP: Ipv4TcpFd,
so_kind_t.SOCKINFO_IN: Ipv4UdpFd,
},
so_family_t.AF_INET6: {
so_kind_t.SOCKINFO_TCP: Ipv6TcpFd,
so_kind_t.SOCKINFO_IN: Ipv6UdpFd,
}
}
[docs]
class Thread:
def __init__(self, client, thread_id: int):
self._client = client
self._thread_id = thread_id
@property
def thread_id(self) -> int:
return self._thread_id
[docs]
def get_state(self):
raise NotImplementedError()
[docs]
def set_state(self, state: Mapping):
raise NotImplementedError()
[docs]
def resume(self):
raise NotImplementedError()
[docs]
def suspend(self):
raise NotImplementedError()
def __repr__(self):
return f'<{self.__class__.__name__} TID:{self._thread_id}>'
[docs]
class IntelThread64(Thread):
[docs]
def get_state(self):
with self._client.safe_malloc(x86_thread_state64_t.sizeof()) as p_state:
with self._client.safe_malloc(x86_thread_state64_t.sizeof()) as p_thread_state_count:
p_thread_state_count[0] = x86_thread_state64_t.sizeof() // Int32ul.sizeof()
if self._client.symbols.thread_get_state(self._thread_id, x86_THREAD_STATE64,
p_state, p_thread_state_count):
raise BadReturnValueError('thread_get_state() failed')
return x86_thread_state64_t.parse_stream(p_state)
[docs]
def set_state(self, state: Mapping):
if self._client.symbols.thread_set_state(self._thread_id, x86_THREAD_STATE64,
x86_thread_state64_t.build(state),
x86_thread_state64_t.sizeof() // Int32ul.sizeof()):
raise BadReturnValueError('thread_set_state() failed')
[docs]
class ArmThread64(Thread):
[docs]
def get_state(self):
with self._client.safe_malloc(arm_thread_state64_t.sizeof()) as p_state:
with self._client.safe_malloc(arm_thread_state64_t.sizeof()) as p_thread_state_count:
p_thread_state_count[0] = ARM_THREAD_STATE64_COUNT
if self._client.symbols.thread_get_state(self._thread_id, ARMThreadFlavors.ARM_THREAD_STATE64,
p_state, p_thread_state_count):
raise BadReturnValueError('thread_get_state() failed')
return arm_thread_state64_t.parse_stream(p_state)
[docs]
def set_state(self, state: Mapping):
if self._client.symbols.thread_set_state(self._thread_id, ARMThreadFlavors.ARM_THREAD_STATE64,
arm_thread_state64_t.build(state),
ARM_THREAD_STATE64_COUNT):
raise BadReturnValueError('thread_set_state() failed')
[docs]
def suspend(self):
if self._client.symbols.thread_suspend(self._thread_id):
raise BadReturnValueError('thread_suspend() failed')
[docs]
def resume(self):
if self._client.symbols.thread_resume(self._thread_id):
raise BadReturnValueError('thread_resume() failed')
[docs]
@dataclasses.dataclass
class Region:
region_type: str
start: 'ProcessSymbol'
end: int
vsize: str
protection: str
protection_max: str
region_detail: str
@property
def size(self) -> int:
return self.end - self.start
[docs]
@dataclasses.dataclass
class LoadedClass:
name: str
type_name: str
binary_path: str
[docs]
@dataclasses.dataclass
class Frame:
depth: int
address: int
section: str
offset: int
symbol_name: str
def __repr__(self):
return f'<{self.__class__.__name__} [{self.depth:3}] 0x{self.address:x} ({self.section} + 0x{self.offset:x}) ' \
f'{self.symbol_name}>'
[docs]
@dataclasses.dataclass
class Backtrace:
flavor: str
time_start: float
time_end: float
pid: int
thread_id: int
dispatch_queue_serial_num: int
frames: List[Frame]
def __init__(self, vmu_backtrace: DarwinSymbol):
backtrace = vmu_backtrace.objc_call('description').py()
match = re.match(r'VMUBacktrace \(Flavor: (?P<flavor>.+?) Simple Time: (?P<time_start>.+?) - (?P<time_end>.+?) '
r'Process: (?P<pid>\d+) Thread: (?P<thread_id>.+?) Dispatch queue serial num: '
r'(?P<dispatch_queue_serial_num>\d+)\)', backtrace)
self.flavor = match.group('flavor')
self.time_start = float(match.group('time_start'))
self.time_end = float(match.group('time_end'))
self.pid = int(match.group('pid'))
self.thread_id = int(match.group('thread_id'), 16)
self.dispatch_queue_serial_num = int(match.group('dispatch_queue_serial_num'))
self.frames = []
for frame in re.findall(_BACKTRACE_FRAME_REGEX, backtrace):
self.frames.append(Frame(depth=int(frame[0]), address=int(frame[1], 0), section=frame[2],
offset=int(frame[3], 0), symbol_name=frame[4]))
def __repr__(self):
buf = f'<{self.__class__.__name__} PID: {self.pid} TID: {self.thread_id}\n'
for frame in self.frames:
buf += f' {frame}\n'
buf += '>'
return buf
[docs]
class ProcessSymbol(Symbol):
[docs]
@classmethod
def create(cls, value: int, client, process):
symbol = super().create(value, client)
symbol = ProcessSymbol(symbol)
symbol._prepare(client)
symbol.process = process
return symbol
def _clone_from_value(self, value: int):
return self.create(value, self._client, self.process)
[docs]
def peek(self, count: int) -> bytes:
return self.process.peek(self, count)
[docs]
def poke(self, buf: bytes) -> None:
return self.process.poke(self, buf)
[docs]
def peek_str(self, encoding='utf-8') -> str:
""" peek string at given address """
return self.process.peek_str(self, encoding)
@property
def dl_info(self) -> Container:
raise NotImplementedError('dl_info isn\'t implemented for remote process symbols')
@property
def name(self) -> str:
return self.process.get_symbol_name(self)
@property
def filename(self) -> str:
return self.process.get_symbol_image(self.name).path
def __getitem__(self, item) -> 'ProcessSymbol':
fmt = ADDRESS_SIZE_TO_STRUCT_FORMAT[self.item_size]
addr = self + item * self.item_size
deref = struct.unpack(self._client._endianness + fmt, self.process.peek(addr, self.item_size))[0]
return self.create(deref, self._client, self.process)
def __setitem__(self, item, value):
fmt = ADDRESS_SIZE_TO_STRUCT_FORMAT[self.item_size]
value = struct.pack(self._client._endianness + fmt, int(value))
self.process.poke(self + item * self.item_size, value)
def __call__(self, *args, **kwargs):
raise RpcClientException('ProcessSymbol is not callable')
[docs]
class Process:
PEEK_STR_CHUNK_SIZE = 0x100
def __init__(self, client, pid: int):
self._client = client
self._pid = pid
if self._client.arch == arch_t.ARCH_ARM64:
self._thread_class = ArmThread64
else:
self._thread_class = IntelThread64
[docs]
def kill(self, sig: int = SIGTERM):
""" kill(pid, sig) at remote. read man for more details. """
return self._client.processes.kill(self._pid, sig)
[docs]
def waitpid(self, flags: int = 0):
""" waitpid(pid, stat_loc, 0) at remote. read man for more details. """
return self._client.processes.waitpid(self._pid, flags)
[docs]
def peek(self, address: int, size: int) -> bytes:
""" peek at memory address """
with self._client.safe_malloc(size) as buf:
with self._client.safe_malloc(8) as p_size:
p_size[0] = size
if self._client.symbols.vm_read_overwrite(self.task, address, size, buf, p_size):
raise BadReturnValueError('vm_read() failed')
return buf.peek(size)
[docs]
def peek_str(self, address: int, encoding='utf-8') -> str:
""" peek string at memory address """
size = self.PEEK_STR_CHUNK_SIZE
buf = b''
while size:
try:
buf += self.peek(address, size)
if b'\x00' in buf:
return buf.split(b'\x00', 1)[0].decode(encoding)
address += size
except BadReturnValueError:
size = size // 2
[docs]
def poke(self, address: int, buf: bytes):
""" poke at memory address """
if self._client.symbols.vm_write(self.task, address, buf, len(buf)):
raise BadReturnValueError('vm_write() failed')
[docs]
def get_symbol_name(self, address: int) -> str:
if self._client.arch != arch_t.ARCH_ARM64:
raise NotImplementedError('implemented only on ARCH_ARM64')
x = self.vmu_object_identifier.objc_call('symbolForAddress:', address, return_raw=True).x
if x[0] == 0 and x[1] == 0:
raise SymbolAbsentError()
return self._client.symbols.CSSymbolGetName(x[0], x[1]).peek_str()
[docs]
def get_symbol_image(self, name: str) -> Image:
for image in self.images:
result = self.get_symbol_address(name, posixpath.basename(image.path))
if result:
return image
raise ProcessSymbolAbsentError()
[docs]
def get_symbol_class_info(self, address: int) -> DarwinSymbol:
return self.vmu_object_identifier.objc_call('classInfoForMemory:length:', address, 8)
[docs]
def get_symbol_address(self, name: str, lib: str = None) -> ProcessSymbol:
if lib is not None:
return ProcessSymbol.create(
self.vmu_object_identifier.objc_call('addressOfSymbol:inLibrary:', name, lib).c_uint64, self._client,
self)
image = self.get_symbol_image(name)
return self.get_symbol_address(name, posixpath.basename(image.path))
@property
def loaded_classes(self):
realized_classes = self.vmu_object_identifier.objc_call('realizedClasses')
for i in range(1, realized_classes.objc_call('count') + 1):
class_info = realized_classes.objc_call('classInfoForIndex:', i)
name = class_info.objc_call('className').py()
type_name = class_info.objc_call('typeName').py()
binary_path = class_info.objc_call('binaryPath').py()
yield LoadedClass(name=name, type_name=type_name, binary_path=binary_path)
@property
def images(self) -> List[Image]:
""" get loaded image list """
result = []
with self._client.safe_malloc(task_dyld_info_data_t.sizeof()) as dyld_info:
with self._client.safe_calloc(8) as count:
count[0] = TASK_DYLD_INFO_COUNT
if self._client.symbols.task_info(self.task, TASK_DYLD_INFO, dyld_info, count):
raise BadReturnValueError('task_info(TASK_DYLD_INFO) failed')
dyld_info_data = task_dyld_info_data_t.parse_stream(dyld_info)
all_image_infos = all_image_infos_t.parse(
self.peek(dyld_info_data.all_image_info_addr, dyld_info_data.all_image_info_size))
buf = self.peek(all_image_infos.infoArray, all_image_infos.infoArrayCount * dyld_image_info_t.sizeof())
for image in Array(all_image_infos.infoArrayCount, dyld_image_info_t).parse(buf):
path = self.peek_str(image.imageFilePath)
result.append(Image(address=self.get_process_symbol(image.imageLoadAddress), path=path))
return result
@property
def app_images(self) -> List[Image]:
return [image for image in self.images if APP_SUFFIX in image.path]
@property
def threads(self) -> List[Thread]:
result = []
with self._client.safe_malloc(8) as threads:
with self._client.safe_malloc(4) as count:
count.item_size = 4
if self._client.symbols.task_threads(self.task, threads, count):
raise BadReturnValueError('task_threads() failed')
for tid in Array(count[0].c_uint32, Int32ul).parse(threads[0].peek(count[0] * 4)):
result.append(self._thread_class(self._client, tid))
return result
@property
def pid(self) -> int:
""" get pid """
return self._pid
@property
def fds(self) -> List[Fd]:
""" get a list of process opened file descriptors """
result = []
for fdstruct in self.fd_structs:
fd = fdstruct.fd
parsed = fdstruct.struct
if fd.proc_fdtype == PROX_FDTYPE_VNODE:
result.append(FileFd(fd=fd.proc_fd, path=parsed.pvip.vip_path))
elif fd.proc_fdtype == PROX_FDTYPE_KQUEUE:
result.append(KQueueFd(fd=fd.proc_fd))
elif fd.proc_fdtype == PROX_FDTYPE_PIPE:
result.append(PipeFd(fd=fd.proc_fd))
elif fd.proc_fdtype == PROX_FDTYPE_SOCKET:
if parsed.psi.soi_kind in (so_kind_t.SOCKINFO_TCP, so_kind_t.SOCKINFO_IN):
correct_class = SOCKET_TYPE_DATACLASS[parsed.psi.soi_family][parsed.psi.soi_kind]
if parsed.psi.soi_kind == so_kind_t.SOCKINFO_TCP:
info = parsed.psi.soi_proto.pri_tcp.tcpsi_ini
else:
info = parsed.psi.soi_proto.pri_in
result.append(correct_class(fd=fd.proc_fd,
local_address=info.insi_laddr.ina_46.i46a_addr4,
local_port=info.insi_lport,
remote_address=info.insi_faddr.ina_46.i46a_addr4,
remote_port=info.insi_fport))
elif parsed.psi.soi_kind == so_kind_t.SOCKINFO_UN:
result.append(UnixFd(fd=fd.proc_fd, path=parsed.psi.soi_proto.pri_un.unsi_addr.ua_sun.sun_path))
return result
@property
def fd_structs(self) -> List[FdStruct]:
""" get a list of process opened file descriptors as raw structs """
result = []
size = self._client.symbols.proc_pidinfo(self.pid, PROC_PIDLISTFDS, 0, 0, 0)
vi_size = 8196 # should be enough for all structs
with self._client.safe_malloc(vi_size) as vi_buf:
with self._client.safe_malloc(size) as fdinfo_buf:
size = int(self._client.symbols.proc_pidinfo(self.pid, PROC_PIDLISTFDS, 0, fdinfo_buf, size))
if not size:
raise BadReturnValueError('proc_pidinfo(PROC_PIDLISTFDS) failed')
for fd in Array(size // proc_fdinfo.sizeof(), proc_fdinfo).parse(fdinfo_buf.peek(size)):
if fd.proc_fdtype == PROX_FDTYPE_VNODE:
# file
vs = self._client.symbols.proc_pidfdinfo(self.pid, fd.proc_fd, PROC_PIDFDVNODEPATHINFO, vi_buf,
vi_size)
if not vs:
if self._client.errno == errno.EBADF:
# lsof treats this as fine
continue
raise BadReturnValueError(
f'proc_pidinfo(PROC_PIDFDVNODEPATHINFO) failed for fd: {fd.proc_fd} '
f'({self._client.last_error})')
result.append(
FdStruct(fd=fd,
struct=vnode_fdinfowithpath.parse(vi_buf.peek(vnode_fdinfowithpath.sizeof()))))
elif fd.proc_fdtype == PROX_FDTYPE_KQUEUE:
result.append(FdStruct(fd=fd, struct=None))
elif fd.proc_fdtype == PROX_FDTYPE_SOCKET:
# socket
vs = self._client.symbols.proc_pidfdinfo(self.pid, fd.proc_fd, PROC_PIDFDSOCKETINFO, vi_buf,
vi_size)
if not vs:
if self._client.errno == errno.EBADF:
# lsof treats this as fine
continue
raise BadReturnValueError(
f'proc_pidinfo(PROC_PIDFDSOCKETINFO) failed ({self._client.last_error})')
result.append(FdStruct(fd=fd, struct=socket_fdinfo.parse(vi_buf.peek(vi_size))))
elif fd.proc_fdtype == PROX_FDTYPE_PIPE:
# pipe
vs = self._client.symbols.proc_pidfdinfo(self.pid, fd.proc_fd, PROC_PIDFDPIPEINFO, vi_buf,
vi_size)
if not vs:
if self._client.errno == errno.EBADF:
# lsof treats this as fine
continue
raise BadReturnValueError(
f'proc_pidinfo(PROC_PIDFDPIPEINFO) failed ({self._client.last_error})')
result.append(
FdStruct(fd=fd,
struct=pipe_info.parse(vi_buf.peek(pipe_info.sizeof()))))
return result
@property
def task_all_info(self):
""" get a list of process opened file descriptors """
with self._client.safe_malloc(proc_taskallinfo.sizeof()) as pti:
if not self._client.symbols.proc_pidinfo(self.pid, PROC_PIDTASKALLINFO, 0, pti, proc_taskallinfo.sizeof()):
raise BadReturnValueError('proc_pidinfo(PROC_PIDTASKALLINFO) failed')
return proc_taskallinfo.parse_stream(pti)
@property
def backtraces(self) -> List[Backtrace]:
result = []
for bt in self._client.symbols.objc_getClass('VMUSampler').objc_call('sampleAllThreadsOfTask:', self.task).py():
result.append(Backtrace(bt))
return result
@cached_property
def vmu_proc_info(self) -> DarwinSymbol:
return self._client.symbols.objc_getClass('VMUProcInfo').objc_call('alloc').objc_call('initWithTask:',
self.task)
@cached_property
def vmu_region_identifier(self) -> DarwinSymbol:
return self._client.symbols.objc_getClass('VMUVMRegionIdentifier').objc_call('alloc').objc_call('initWithTask:',
self.task)
@cached_property
def vmu_object_identifier(self) -> DarwinSymbol:
return self._client.symbols.objc_getClass('VMUObjectIdentifier').objc_call('alloc').objc_call('initWithTask:',
self.task)
@cached_property
def task(self) -> int:
self_task_port = self._client.symbols.mach_task_self()
if self.pid == self._client.pid:
return self_task_port
with self._client.safe_malloc(8) as p_task:
if self._client.symbols.task_for_pid(self_task_port, self.pid, p_task):
raise BadReturnValueError('task_for_pid() failed')
return p_task[0].c_int64
@cached_property
def path(self) -> Optional[str]:
""" call proc_pidpath(filename, ...) at remote. review xnu header for more details. """
with self._client.safe_malloc(MAXPATHLEN) as path:
path_len = self._client.symbols.proc_pidpath(self.pid, path, MAXPATHLEN)
if not path_len:
return None
return path.peek(path_len).decode()
@cached_property
def basename(self) -> Optional[str]:
path = self.path
if not path:
return None
return Path(path).parts[-1]
@cached_property
def name(self) -> str:
return self.task_all_info.pbsd.pbi_name
@cached_property
def ppid(self) -> int:
return self.task_all_info.pbsd.pbi_ppid
@cached_property
def uid(self) -> int:
return self.task_all_info.pbsd.pbi_uid
@cached_property
def gid(self) -> int:
return self.task_all_info.pbsd.pbi_gid
@cached_property
def ruid(self) -> int:
return self.task_all_info.pbsd.pbi_ruid
@cached_property
def rgid(self) -> int:
return self.task_all_info.pbsd.pbi_rgid
@cached_property
def start_time(self) -> datetime:
if self._client.arch != arch_t.ARCH_ARM64:
raise NotImplementedError('implemented only on ARCH_ARM64')
val = self.vmu_proc_info.objc_call('startTime', return_raw=True)
tv_sec = val.x[0]
tv_nsec = val.x[1]
return datetime.fromtimestamp(tv_sec + (tv_nsec / (10 ** 9)))
@property
def parent(self) -> 'Process':
return Process(self._client, self.ppid)
@property
def environ(self) -> List[str]:
return self.vmu_proc_info.objc_call('envVars').py()
@property
def arguments(self) -> List[str]:
return self.vmu_proc_info.objc_call('arguments').py()
@property
def raw_procargs2(self) -> bytes:
return self._client.sysctl.get(CTL.KERN, KERN.PROCARGS2, self.pid)
@property
def procargs2(self) -> Container:
return procargs2_t.parse(self.raw_procargs2)
@property
def regions(self) -> List[Region]:
result = []
# remove the '()' wrapping the list:
# (
# "item1",
# "item2",
# )
buf = self.vmu_region_identifier.objc_call('regions').cfdesc.split('\n')[1:-1]
for line in buf:
# remove line prefix and suffix and split into words
line = line[_CF_STRING_ARRAY_PREFIX_LEN:-_CF_STRING_ARRAY_SUFFIX_LEN].split()
i = 0
region_type = line[i]
i += 1
while '-' not in line[i]:
# skip till address range
i += 1
address_range = line[i]
i += 1
start, end = address_range.split('-')
start = int(start, 16)
end = int(end, 16)
vsize = None
if 'V=' in line[i]:
vsize = line[i].split('V=', 1)[1].split(']', 1)[0]
while '/' not in line[i]:
i += 1
protection = line[i].split('/')
i += 1
region_detail = None
if len(line) >= i + 1:
region_detail = line[i]
result.append(Region(region_type=region_type, start=self.get_process_symbol(start), end=end, vsize=vsize,
protection=protection[0], protection_max=protection[1], region_detail=region_detail))
return result
[docs]
def get_process_symbol(self, address: int) -> ProcessSymbol:
return ProcessSymbol.create(address, self._client, self)
[docs]
def vm_allocate(self, size: int) -> ProcessSymbol:
with self._client.safe_malloc(8) as out_address:
if self._client.symbols.vm_allocate(self.task, out_address, size, VM_FLAGS_ANYWHERE):
raise BadReturnValueError('vm_allocate() failed')
return self.get_process_symbol(out_address[0])
[docs]
@path_to_str('output_dir')
def dump_app(self, output_dir: str, chunk_size=CHUNK_SIZE) -> None:
"""
Based on:
https://github.com/AloneMonkey/frida-ios-dump/blob/master/dump.js
"""
for image in self.app_images:
relative_name = image.path.split(APP_SUFFIX, 1)[1]
output_file = Path(output_dir).expanduser() / relative_name
output_file.parent.mkdir(exist_ok=True, parents=True)
logger.debug(f'dumping: {output_file}')
with open(output_file, 'wb') as output_file:
macho_in_memory = mach_header_t.parse_stream(image.address)
with self._client.fs.open(image.path, 'r') as fat_file:
# locating the correct MachO offset within the FAT image (if FAT)
magic_in_fs = Int32ul.parse_stream(fat_file)
fat_file.seek(0, SEEK_SET)
if magic_in_fs in (FAT_CIGAM, FAT_MAGIC):
parsed_fat = fat_header.parse_stream(fat_file)
for arch in parsed_fat.archs:
if arch.cputype == macho_in_memory.cputype and arch.cpusubtype == macho_in_memory.cpusubtype:
# correct MachO offset found
file_offset = arch.offset
break
fat_file.seek(file_offset, SEEK_SET)
# perform actual MachO dump
output_file.seek(0, SEEK_SET)
while True:
chunk = fat_file.read(chunk_size)
if len(chunk) == 0:
break
output_file.write(chunk)
offset_cryptid = None
# if image is encrypted, patch its encryption loader command
for load_command in macho_in_memory.load_commands:
if load_command.cmd in (LOAD_COMMAND_TYPE.LC_ENCRYPTION_INFO_64,
LOAD_COMMAND_TYPE.LC_ENCRYPTION_INFO):
offset_cryptid = load_command.data.cryptid_offset - image.address
crypt_offset = load_command.data.cryptoff
crypt_size = load_command.data.cryptsize
break
if offset_cryptid is not None:
output_file.seek(offset_cryptid, SEEK_SET)
output_file.write(b'\x00' * 4) # cryptid = 0
output_file.seek(crypt_offset, SEEK_SET)
output_file.flush()
output_file.write((image.address + crypt_offset).peek(crypt_size))
def __repr__(self):
return f'<{self.__class__.__name__} PID:{self.pid} PATH:{self.path}>'
[docs]
class DarwinProcesses(Processes):
""" manage processes """
def __init__(self, client):
super().__init__(client)
self._load_symbolication_library()
self._self_process = Process(self._client, self._client.pid)
def _load_symbolication_library(self):
options = [
'/System/Library/PrivateFrameworks/Symbolication.framework/Symbolication'
]
for option in options:
if self._client.dlopen(option, RTLD_NOW):
return
raise MissingLibraryError('Symbolication library isn\'t available')
[docs]
def get_self(self) -> Process:
""" get self process """
return self._self_process
[docs]
def get_by_pid(self, pid: int) -> Process:
""" get process object by pid """
proc_list = self.list()
for p in proc_list:
if p.pid == pid:
return p
raise ArgumentError(f'failed to locate process with pid: {pid}')
[docs]
def get_by_basename(self, name: str) -> Process:
""" get process object by basename """
proc_list = self.list()
for p in proc_list:
if p.basename == name:
return p
raise ArgumentError(f'failed to locate process with name: {name}')
[docs]
def get_by_name(self, name: str) -> Process:
""" get process object by name """
proc_list = self.list()
for p in proc_list:
if p.name == name:
return p
raise ArgumentError(f'failed to locate process with name: {name}')
[docs]
def grep(self, name: str) -> List[Process]:
""" get process list by basename filter """
result = []
proc_list = self.list()
for p in proc_list:
if p.basename and name in p.basename:
result.append(p)
return result
[docs]
def get_processes_by_listening_port(self, port: int) -> List[Process]:
""" get a process object listening on the specified port """
listening_processes = []
for process in self.list():
try:
fds = process.fds
except BadReturnValueError:
# it's possible to get error if new processes have since died or the rpcserver
# doesn't have the required permissions to access all the processes
continue
for fd in fds:
if (isinstance(fd, Ipv4SocketFd) or isinstance(fd, Ipv6SocketFd)) and \
fd.local_port == port and fd.remote_port == 0:
listening_processes.append(process)
return listening_processes
[docs]
def lsof(self) -> Mapping[int, List[Fd]]:
""" get dictionary of pid to its opened fds """
result = {}
for process in self.list():
try:
fds = process.fds
except BadReturnValueError:
# it's possible to get error if new processes have since died or the rpcserver
# doesn't have the required permissions to access all the processes
continue
result[process.pid] = fds
return result
[docs]
@path_to_str('path')
def fuser(self, path: str) -> List[Process]:
"""get a list of all processes have an open hande to the specified path """
result = []
proc_list = self.list()
for process in proc_list:
try:
fds = process.fds
except BadReturnValueError:
# it's possible to get error if new processes have since died or the rpcserver
# doesn't have the required permissions to access all the processes
continue
for fd in fds:
if isinstance(fd, FileFd):
if str(Path(fd.path).absolute()) == str(Path(path).absolute()):
result.append(process)
return result
[docs]
def list(self) -> List[Process]:
""" list all currently running processes """
n = self._client.symbols.proc_listallpids(0, 0)
pid_buf_size = pid_t.sizeof() * n
with self._client.safe_malloc(pid_buf_size) as pid_buf:
pid_buf.item_size = pid_t.sizeof()
n = self._client.symbols.proc_listallpids(pid_buf, pid_buf_size)
result = []
for i in range(n):
pid = int(pid_buf[i])
result.append(Process(self._client, pid))
return result