import datetime
import logging
import re
from typing import List
from cached_property import cached_property
from rpcclient.darwin.cfpreferences import kCFPreferencesAnyHost
from rpcclient.darwin.consts import OsLogLevel
from rpcclient.darwin.processes import Process
from rpcclient.darwin.symbol import DarwinSymbol
from rpcclient.exceptions import BadReturnValueError, HarGlobalNotFoundError, MissingLibraryError
from rpcclient.structs.consts import SIGKILL
MOV_X0_X9 = b'\xE0\x03\x13\xAA'
MOV_X1_0 = b'\x01\x00\x80\xD2'
MOV_W2_0XA = b'\x42\x01\x80\x52'
PATTERN = MOV_X0_X9 + MOV_X1_0 + MOV_W2_0XA
logger = logging.getLogger(__name__)
[docs]
class OsLogPreferencesBase:
def __init__(self, client, obj: DarwinSymbol):
self._client = client
self._object = obj
@property
def name(self) -> str:
return self._object.objc_call('name').py()
@property
def persisted_level(self) -> OsLogLevel:
return OsLogLevel(self._object.objc_call('persistedLevel'))
@persisted_level.setter
def persisted_level(self, value: OsLogLevel) -> None:
return self._object.objc_call('setPersistedLevel:', value)
@property
def default_persisted_level(self) -> OsLogLevel:
return OsLogLevel(self._object.objc_call('defaultPersistedLevel'))
@property
def enabled_level(self) -> OsLogLevel:
return OsLogLevel(self._object.objc_call('enabledLevel'))
@enabled_level.setter
def enabled_level(self, value: OsLogLevel) -> None:
return self._object.objc_call('setEnabledLevel:', value)
@property
def default_enabled_level(self) -> OsLogLevel:
return OsLogLevel(self._object.objc_call('defaultEnabledLevel'))
@property
def _verbosity_description(self) -> str:
return (f'ENABLED_LEVEL:{self.enabled_level.name}({self.enabled_level.value}) '
f'PERSISTED_LEVEL:{self.persisted_level.name}({self.persisted_level.value})')
[docs]
def reset(self) -> None:
self._object.objc_call('reset')
def __repr__(self) -> str:
return f'<{self.__class__.__name__} {self._verbosity_description}>'
[docs]
class OsLogPreferencesCategory(OsLogPreferencesBase):
def __init__(self, client, category: str, subsystem: DarwinSymbol):
obj = client.symbols.objc_getClass('OSLogPreferencesCategory').objc_call('alloc').objc_call(
'initWithName:subsystem:', client.cf(category), subsystem)
super().__init__(client, obj)
@property
def name(self) -> str:
return self._object.objc_call('name').py()
def __repr__(self) -> str:
return f'<{self.__class__.__name__} NAME:{self.name} {self._verbosity_description}>'
[docs]
class OsLogPreferencesSubsystem(OsLogPreferencesBase):
def __init__(self, client, subsystem: str):
obj = client.symbols.objc_getClass('OSLogPreferencesSubsystem').objc_call('alloc').objc_call(
'initWithName:', client.cf(subsystem))
super().__init__(client, obj)
@property
def category_strings(self) -> List[OsLogPreferencesCategory]:
return self._object.objc_call('categories').py()
@property
def categories(self) -> List[OsLogPreferencesCategory]:
result = []
for name in self.category_strings:
result.append(self.get_category(name))
return result
[docs]
def get_category(self, name: str) -> OsLogPreferencesCategory:
return OsLogPreferencesCategory(self._client, name, self._object)
[docs]
class OsLogPreferencesManager(OsLogPreferencesBase):
def __init__(self, client):
obj = client.symbols.objc_getClass('OSLogPreferencesManager').objc_call('sharedManager')
super().__init__(client, obj)
@property
def subsystem_strings(self) -> List[str]:
return self._object.objc_call('subsystems').py()
@property
def subsystems(self) -> List[OsLogPreferencesSubsystem]:
result = []
for name in self.subsystem_strings:
result.append(self.get_subsystem(name))
return result
[docs]
def get_subsystem(self, name: str) -> OsLogPreferencesSubsystem:
return OsLogPreferencesSubsystem(self._client, name)
[docs]
class Syslog:
"""" manage syslog """
def __init__(self, client):
"""
@type client: rpcclient.darwin.client.DarwinClient
"""
self._client = client
self._load_logging_support_library()
self.preferences_manager = OsLogPreferencesManager(self._client)
@cached_property
def _enable_har_global(self) -> int:
r"""
In order to find an unexported global variable, we use a sequence of instructions that we know comes after
the aforementioned variable is accessed.
88 07 2D D0 ADRP X8, #har_global@PAGE
08 41 55 39 LDRB W8, [X8,#har_global@PAGEOFF]
08 03 00 34 CBZ W8, loc_187DA1294
E0 03 13 AA MOV X0, X19 ; __str
01 00 80 D2 MOV X1, #0 ; __endptr
42 01 80 52 MOV W2, #0xA ; __base
After getting the address of the instruction `mov x0,x19`, we need to rewind 2 instructions. Next, we verify
that the above 2 instructions are `adrp` && `ldrb` . Then, extract the page address + page offset and
calculate the affective address.
"""
address = []
regex_hex = r'0x[0-9a-fA-F]+'
self._client.load_framework('CFNetwork')
cfnetwork = [image for image in self._client.images if
image.name == '/System/Library/Frameworks/CFNetwork.framework/CFNetwork']
if len(cfnetwork) < 1:
raise MissingLibraryError()
pattern_addr = self._client.symbols.memmem(cfnetwork[0].base_address, 0xffffffff, PATTERN, len(PATTERN))
pattern_sym = self._client.symbol(pattern_addr)
disass = (pattern_sym - len(PATTERN)).disass(8)
if disass[0].mnemonic != 'adrp' and disass[1].mnemonic != 'ldrb':
raise HarGlobalNotFoundError()
for instruction in disass:
address.append(
int(re.findall(regex_hex, instruction.op_str)[0], 16)
)
return sum(address)
[docs]
def set_harlogger_for_process(self, value: bool, pid: int) -> None:
process = self._client.processes.get_by_pid(pid)
self._set_harlogger_for_process(value, process)
self.set_har_capture_global(True)
[docs]
def set_harlogger_for_all(self, value: bool, expression: str = None) -> None:
for p in self._client.processes.list():
if p.pid == 0:
continue
if expression and expression not in p.basename:
continue
try:
self._set_harlogger_for_process(value, p)
logger.info(f'{"Enabled" if value else "Disabled"} for {p.basename}')
except BadReturnValueError:
logger.error(f'Failed To enabled for {p}')
self.set_har_capture_global(True)
[docs]
def set_unredacted_logs(self, enable: bool = True):
"""
enable/disable unredacted logs (allows seeing the <private> strings)
https://github.com/EthanArbuckle/unredact-private-os_logs
"""
with self._client.preferences.sc.open(
'/Library/Preferences/Logging/com.apple.system.logging.plist') as pref:
pref.set_dict({'Enable-Logging': True, 'Enable-Private-Data': enable})
self._client.processes.get_by_basename('logd').kill(SIGKILL)
[docs]
def set_har_capture_global(self, enable: bool = True):
"""
enable/disable HAR logging
https://github.com/doronz88/harlogger
"""
users = ['mobile', 'root']
if enable:
for user in users:
# settings copied from DiagnosticExtension.appex
self._client.preferences.cf.set('har-capture-global',
datetime.datetime(9999, 12, 31, 23, 59, 59),
'com.apple.CFNetwork', user, hostname=kCFPreferencesAnyHost)
self._client.preferences.cf.set('har-body-size-limit',
0x100000,
'com.apple.CFNetwork', user, hostname=kCFPreferencesAnyHost)
subsystem = self.preferences_manager.get_subsystem('com.apple.CFNetwork')
category = subsystem.get_category('HAR')
category.enabled_level = 2
category.persisted_level = 2
else:
for user in users:
self._client.preferences.cf.set('har-capture-global',
datetime.datetime(1970, 1, 1, 1, 1, 1),
'com.apple.CFNetwork', user, hostname=kCFPreferencesAnyHost)
if self._client.symbols.notify_post('com.apple.CFNetwork.har-capture-update'):
raise BadReturnValueError('notify_post() failed')
def _load_logging_support_library(self) -> None:
self._client.load_framework('LoggingSupport')
def _set_harlogger_for_process(self, value: bool, process: Process) -> None:
process.poke(self._enable_har_global, value.to_bytes(1, 'little'))