summaryrefslogtreecommitdiff
path: root/foreign/client_handling/lazagne/config/lib/memorpy/MemWorker.py
blob: 4a971bbefaa2759dfeb3a3e23a4b63fd5b87490e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# Author: Nicolas VERDIER
# This file is part of memorpy.
#
# memorpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# memorpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with memorpy.  If not, see <http://www.gnu.org/licenses/>.
import sys
import string
import re
import logging
import traceback
import binascii
import struct

from .Process import *
from .utils import *
from .Address import Address
from .BaseProcess import ProcessException
from .structures import *

logger = logging.getLogger('memorpy')

REGEX_TYPE=type(re.compile("^plop$"))
class MemWorker(object):

    def __init__(self, pid=None, name=None, end_offset = None, start_offset = None, debug=True):
        self.process = Process(name=name, pid=pid, debug=debug)

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.process.close()

    def Address(self, value, default_type = 'uint'):
        """ wrapper to instanciate an Address class for the memworker.process"""
        return Address(value, process=self.process, default_type=default_type)

    def umem_replace(self, regex, replace):
        """ like search_replace_mem but works with unicode strings """
        regex = re_to_unicode(regex)
        replace = replace.encode('utf-16-le')
        return self.mem_replace(re.compile(regex, re.UNICODE), replace)

    def mem_replace(self, regex, replace):
        """ search memory for a pattern and replace all found occurrences """
        allWritesSucceed = True
        for _, start_offset in self.mem_search(regex, ftype='re'):
            if self.process.write_bytes(start_offset, replace) == 1:
                logger.debug('Write at offset %s succeeded !' % start_offset)
            else:
                allWritesSucceed = False
                logger.debug('Write at offset %s failed !' % start_offset)

        return allWritesSucceed

    def umem_search(self, regex):
        """ like mem_search but works with unicode strings """
        regex = re_to_unicode(regex)
        for _, i in self.mem_search(str(regex), ftype='re'):
            yield i

    def group_search(self, group, start_offset = None, end_offset = None):
        regex = ''
        for value, type in group:
            if type == 'f' or type == 'float':
                f = struct.pack('<f', float(value))
                regex += '..' + f[2:4]
            else:
                raise NotImplementedError('unknown type %s' % type)

        return self.mem_search(regex, ftype='re', start_offset=start_offset, end_offset=end_offset)

    def search_address(self, addr):
        a = '%08X' % addr
        logger.debug('searching address %s' % a)
        regex = ''
        for i in range(len(a) - 2, -1, -2):
            regex += binascii.unhexlify(a[i:i + 2])

        for _, a in self.mem_search(re.escape(regex), ftype='re'):
            yield a

    def parse_re_function(self, b, value, offset):
        for name, regex in value:
            for res in regex.finditer(b):
                yield name, self.Address(offset+res.start(), 'bytes')
                """
                index = b.find(res)
                while index != -1:
                    soffset = offset + index
                    if soffset not in duplicates_cache:
                        duplicates_cache.add(soffset)
                        yield name, self.Address(soffset, 'bytes')
                    index = b.find(res, index + len(res))
                """

    def parse_float_function(self, b, value, offset):
        for index in range(0, len(b)):
            try:
                structtype, structlen = type_unpack('float')
                tmpval = struct.unpack(structtype, b[index:index + 4])[0]
                if int(value) == int(tmpval):
                    soffset = offset + index
                    yield self.Address(soffset, 'float')
            except Exception as e:
                pass

    def parse_named_groups_function(self, b, value, offset=None):
        for name, regex in value:
            for res in regex.finditer(b):
                yield name, res.groupdict()

    def parse_groups_function(self, b, value, offset=None):
        for name, regex in value:
            for res in regex.finditer(b):
                yield name, res.groups()

    def parse_any_function(self, b, value, offset):
        index = b.find(value)
        while index != -1:
            soffset = offset + index
            yield self.Address(soffset, 'bytes')
            index = b.find(value, index + 1)

    def mem_search(self, value, ftype = 'match', protec = PAGE_READWRITE | PAGE_READONLY, optimizations=None, start_offset = None, end_offset = None):
        """ 
                iterator returning all indexes where the pattern has been found
        """
        
        # pre-compile regex to run faster
        if ftype == 're' or ftype == 'groups' or ftype == 'ngroups':
            
            # value should be an array of regex
            if type(value) is not list:
                value = [value]
            
            tmp = []
            for reg in value:
                if type(reg) is tuple:
                    name = reg[0]
                    if type(reg[1]) != REGEX_TYPE:
                        regex = re.compile(reg[1], re.IGNORECASE)
                    else:
                        regex=reg[1]
                elif type(reg) == REGEX_TYPE:
                    name = ''
                    regex=reg
                else:
                    name = ''
                    regex = re.compile(reg, re.IGNORECASE)


                tmp.append((name, regex))
            value = tmp

        elif ftype != 'match' and ftype != 'group' and ftype != 're' and ftype != 'groups' and ftype != 'ngroups' and ftype != 'lambda':
            structtype, structlen = type_unpack(ftype)
            value = struct.pack(structtype, value)

        # different functions avoid if statement before parsing the buffer
        if ftype == 're':
            func = self.parse_re_function        
        
        elif ftype == 'groups':
            func = self.parse_groups_function

        elif ftype == 'ngroups':
            func = self.parse_named_groups_function

        elif ftype == 'float':
            func = self.parse_float_function
        elif ftype == 'lambda': # use a custm function
            func = value
        else:
            func = self.parse_any_function

        if not self.process.isProcessOpen:
            raise ProcessException("Can't read_bytes, process %s is not open" % (self.process.pid))

        for offset, chunk_size in self.process.iter_region(start_offset=start_offset, end_offset=end_offset, protec=protec, optimizations=optimizations):
            b = b''
            current_offset = offset
            chunk_read = 0
            chunk_exc = False
            while chunk_read < chunk_size:
                try:
                    b += self.process.read_bytes(current_offset, chunk_size)
                except IOError as e:
                    print(traceback.format_exc())
                    if e.errno == 13:
                        raise
                    else:
                        logger.warning(e)
                    chunk_exc=True
                    break
                except Exception as e:
                    print('coucou')
                    logger.warning(e)
                    chunk_exc = True
                    break
                finally:
                    current_offset += chunk_size
                    chunk_read += chunk_size

            if chunk_exc:
                continue

            if b:
                if ftype=="lambda":
                    for res in func(b.decode('latin'), offset):
                        yield res
                else:
                    for res in func(b.decode('latin'), value, offset):
                        yield res