diff options
author | AL-LCL <alvin@alvinhavel.com> | 2023-05-19 11:01:49 +0200 |
---|---|---|
committer | AL-LCL <alvin@alvinhavel.com> | 2023-05-19 11:01:49 +0200 |
commit | 20dbeb2f38684c65ff0a4b99012c161295708e88 (patch) | |
tree | a5b8445f55da2fbbb92443b68e9d7354a290c598 /foreign/client_handling/lazagne/config/lib/memorpy/MemWorker.py |
Diffstat (limited to 'foreign/client_handling/lazagne/config/lib/memorpy/MemWorker.py')
-rw-r--r-- | foreign/client_handling/lazagne/config/lib/memorpy/MemWorker.py | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/foreign/client_handling/lazagne/config/lib/memorpy/MemWorker.py b/foreign/client_handling/lazagne/config/lib/memorpy/MemWorker.py new file mode 100644 index 0000000..4a971bb --- /dev/null +++ b/foreign/client_handling/lazagne/config/lib/memorpy/MemWorker.py @@ -0,0 +1,226 @@ +# Author: Nicolas VERDIER +# This file is part of memorpy. +# +# memorpy is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# memorpy is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with memorpy. If not, see <http://www.gnu.org/licenses/>. +import sys +import string +import re +import logging +import traceback +import binascii +import struct + +from .Process import * +from .utils import * +from .Address import Address +from .BaseProcess import ProcessException +from .structures import * + +logger = logging.getLogger('memorpy') + +REGEX_TYPE=type(re.compile("^plop$")) +class MemWorker(object): + + def __init__(self, pid=None, name=None, end_offset = None, start_offset = None, debug=True): + self.process = Process(name=name, pid=pid, debug=debug) + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.process.close() + + def Address(self, value, default_type = 'uint'): + """ wrapper to instanciate an Address class for the memworker.process""" + return Address(value, process=self.process, default_type=default_type) + + def umem_replace(self, regex, replace): + """ like search_replace_mem but works with unicode strings """ + regex = re_to_unicode(regex) + replace = replace.encode('utf-16-le') + return self.mem_replace(re.compile(regex, re.UNICODE), replace) + + def mem_replace(self, regex, replace): + """ search memory for a pattern and replace all found occurrences """ + allWritesSucceed = True + for _, start_offset in self.mem_search(regex, ftype='re'): + if self.process.write_bytes(start_offset, replace) == 1: + logger.debug('Write at offset %s succeeded !' % start_offset) + else: + allWritesSucceed = False + logger.debug('Write at offset %s failed !' % start_offset) + + return allWritesSucceed + + def umem_search(self, regex): + """ like mem_search but works with unicode strings """ + regex = re_to_unicode(regex) + for _, i in self.mem_search(str(regex), ftype='re'): + yield i + + def group_search(self, group, start_offset = None, end_offset = None): + regex = '' + for value, type in group: + if type == 'f' or type == 'float': + f = struct.pack('<f', float(value)) + regex += '..' + f[2:4] + else: + raise NotImplementedError('unknown type %s' % type) + + return self.mem_search(regex, ftype='re', start_offset=start_offset, end_offset=end_offset) + + def search_address(self, addr): + a = '%08X' % addr + logger.debug('searching address %s' % a) + regex = '' + for i in range(len(a) - 2, -1, -2): + regex += binascii.unhexlify(a[i:i + 2]) + + for _, a in self.mem_search(re.escape(regex), ftype='re'): + yield a + + def parse_re_function(self, b, value, offset): + for name, regex in value: + for res in regex.finditer(b): + yield name, self.Address(offset+res.start(), 'bytes') + """ + index = b.find(res) + while index != -1: + soffset = offset + index + if soffset not in duplicates_cache: + duplicates_cache.add(soffset) + yield name, self.Address(soffset, 'bytes') + index = b.find(res, index + len(res)) + """ + + def parse_float_function(self, b, value, offset): + for index in range(0, len(b)): + try: + structtype, structlen = type_unpack('float') + tmpval = struct.unpack(structtype, b[index:index + 4])[0] + if int(value) == int(tmpval): + soffset = offset + index + yield self.Address(soffset, 'float') + except Exception as e: + pass + + def parse_named_groups_function(self, b, value, offset=None): + for name, regex in value: + for res in regex.finditer(b): + yield name, res.groupdict() + + def parse_groups_function(self, b, value, offset=None): + for name, regex in value: + for res in regex.finditer(b): + yield name, res.groups() + + def parse_any_function(self, b, value, offset): + index = b.find(value) + while index != -1: + soffset = offset + index + yield self.Address(soffset, 'bytes') + index = b.find(value, index + 1) + + def mem_search(self, value, ftype = 'match', protec = PAGE_READWRITE | PAGE_READONLY, optimizations=None, start_offset = None, end_offset = None): + """ + iterator returning all indexes where the pattern has been found + """ + + # pre-compile regex to run faster + if ftype == 're' or ftype == 'groups' or ftype == 'ngroups': + + # value should be an array of regex + if type(value) is not list: + value = [value] + + tmp = [] + for reg in value: + if type(reg) is tuple: + name = reg[0] + if type(reg[1]) != REGEX_TYPE: + regex = re.compile(reg[1], re.IGNORECASE) + else: + regex=reg[1] + elif type(reg) == REGEX_TYPE: + name = '' + regex=reg + else: + name = '' + regex = re.compile(reg, re.IGNORECASE) + + + tmp.append((name, regex)) + value = tmp + + elif ftype != 'match' and ftype != 'group' and ftype != 're' and ftype != 'groups' and ftype != 'ngroups' and ftype != 'lambda': + structtype, structlen = type_unpack(ftype) + value = struct.pack(structtype, value) + + # different functions avoid if statement before parsing the buffer + if ftype == 're': + func = self.parse_re_function + + elif ftype == 'groups': + func = self.parse_groups_function + + elif ftype == 'ngroups': + func = self.parse_named_groups_function + + elif ftype == 'float': + func = self.parse_float_function + elif ftype == 'lambda': # use a custm function + func = value + else: + func = self.parse_any_function + + if not self.process.isProcessOpen: + raise ProcessException("Can't read_bytes, process %s is not open" % (self.process.pid)) + + for offset, chunk_size in self.process.iter_region(start_offset=start_offset, end_offset=end_offset, protec=protec, optimizations=optimizations): + b = b'' + current_offset = offset + chunk_read = 0 + chunk_exc = False + while chunk_read < chunk_size: + try: + b += self.process.read_bytes(current_offset, chunk_size) + except IOError as e: + print(traceback.format_exc()) + if e.errno == 13: + raise + else: + logger.warning(e) + chunk_exc=True + break + except Exception as e: + print('coucou') + logger.warning(e) + chunk_exc = True + break + finally: + current_offset += chunk_size + chunk_read += chunk_size + + if chunk_exc: + continue + + if b: + if ftype=="lambda": + for res in func(b.decode('latin'), offset): + yield res + else: + for res in func(b.decode('latin'), value, offset): + yield res + + |