1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
|
# Author: Nicolas VERDIER
# This file is part of memorpy.
#
# memorpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# memorpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with memorpy. If not, see <http://www.gnu.org/licenses/>.
import sys
import string
import re
import logging
import traceback
import binascii
import struct
from .Process import *
from .utils import *
from .Address import Address
from .BaseProcess import ProcessException
from .structures import *
logger = logging.getLogger('memorpy')
REGEX_TYPE=type(re.compile("^plop$"))
class MemWorker(object):
def __init__(self, pid=None, name=None, end_offset = None, start_offset = None, debug=True):
self.process = Process(name=name, pid=pid, debug=debug)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.process.close()
def Address(self, value, default_type = 'uint'):
""" wrapper to instanciate an Address class for the memworker.process"""
return Address(value, process=self.process, default_type=default_type)
def umem_replace(self, regex, replace):
""" like search_replace_mem but works with unicode strings """
regex = re_to_unicode(regex)
replace = replace.encode('utf-16-le')
return self.mem_replace(re.compile(regex, re.UNICODE), replace)
def mem_replace(self, regex, replace):
""" search memory for a pattern and replace all found occurrences """
allWritesSucceed = True
for _, start_offset in self.mem_search(regex, ftype='re'):
if self.process.write_bytes(start_offset, replace) == 1:
logger.debug('Write at offset %s succeeded !' % start_offset)
else:
allWritesSucceed = False
logger.debug('Write at offset %s failed !' % start_offset)
return allWritesSucceed
def umem_search(self, regex):
""" like mem_search but works with unicode strings """
regex = re_to_unicode(regex)
for _, i in self.mem_search(str(regex), ftype='re'):
yield i
def group_search(self, group, start_offset = None, end_offset = None):
regex = ''
for value, type in group:
if type == 'f' or type == 'float':
f = struct.pack('<f', float(value))
regex += '..' + f[2:4]
else:
raise NotImplementedError('unknown type %s' % type)
return self.mem_search(regex, ftype='re', start_offset=start_offset, end_offset=end_offset)
def search_address(self, addr):
a = '%08X' % addr
logger.debug('searching address %s' % a)
regex = ''
for i in range(len(a) - 2, -1, -2):
regex += binascii.unhexlify(a[i:i + 2])
for _, a in self.mem_search(re.escape(regex), ftype='re'):
yield a
def parse_re_function(self, b, value, offset):
for name, regex in value:
for res in regex.finditer(b):
yield name, self.Address(offset+res.start(), 'bytes')
"""
index = b.find(res)
while index != -1:
soffset = offset + index
if soffset not in duplicates_cache:
duplicates_cache.add(soffset)
yield name, self.Address(soffset, 'bytes')
index = b.find(res, index + len(res))
"""
def parse_float_function(self, b, value, offset):
for index in range(0, len(b)):
try:
structtype, structlen = type_unpack('float')
tmpval = struct.unpack(structtype, b[index:index + 4])[0]
if int(value) == int(tmpval):
soffset = offset + index
yield self.Address(soffset, 'float')
except Exception as e:
pass
def parse_named_groups_function(self, b, value, offset=None):
for name, regex in value:
for res in regex.finditer(b):
yield name, res.groupdict()
def parse_groups_function(self, b, value, offset=None):
for name, regex in value:
for res in regex.finditer(b):
yield name, res.groups()
def parse_any_function(self, b, value, offset):
index = b.find(value)
while index != -1:
soffset = offset + index
yield self.Address(soffset, 'bytes')
index = b.find(value, index + 1)
def mem_search(self, value, ftype = 'match', protec = PAGE_READWRITE | PAGE_READONLY, optimizations=None, start_offset = None, end_offset = None):
"""
iterator returning all indexes where the pattern has been found
"""
# pre-compile regex to run faster
if ftype == 're' or ftype == 'groups' or ftype == 'ngroups':
# value should be an array of regex
if type(value) is not list:
value = [value]
tmp = []
for reg in value:
if type(reg) is tuple:
name = reg[0]
if type(reg[1]) != REGEX_TYPE:
regex = re.compile(reg[1], re.IGNORECASE)
else:
regex=reg[1]
elif type(reg) == REGEX_TYPE:
name = ''
regex=reg
else:
name = ''
regex = re.compile(reg, re.IGNORECASE)
tmp.append((name, regex))
value = tmp
elif ftype != 'match' and ftype != 'group' and ftype != 're' and ftype != 'groups' and ftype != 'ngroups' and ftype != 'lambda':
structtype, structlen = type_unpack(ftype)
value = struct.pack(structtype, value)
# different functions avoid if statement before parsing the buffer
if ftype == 're':
func = self.parse_re_function
elif ftype == 'groups':
func = self.parse_groups_function
elif ftype == 'ngroups':
func = self.parse_named_groups_function
elif ftype == 'float':
func = self.parse_float_function
elif ftype == 'lambda': # use a custm function
func = value
else:
func = self.parse_any_function
if not self.process.isProcessOpen:
raise ProcessException("Can't read_bytes, process %s is not open" % (self.process.pid))
for offset, chunk_size in self.process.iter_region(start_offset=start_offset, end_offset=end_offset, protec=protec, optimizations=optimizations):
b = b''
current_offset = offset
chunk_read = 0
chunk_exc = False
while chunk_read < chunk_size:
try:
b += self.process.read_bytes(current_offset, chunk_size)
except IOError as e:
print(traceback.format_exc())
if e.errno == 13:
raise
else:
logger.warning(e)
chunk_exc=True
break
except Exception as e:
print('coucou')
logger.warning(e)
chunk_exc = True
break
finally:
current_offset += chunk_size
chunk_read += chunk_size
if chunk_exc:
continue
if b:
if ftype=="lambda":
for res in func(b.decode('latin'), offset):
yield res
else:
for res in func(b.decode('latin'), value, offset):
yield res
|