+# Author: Nicolas VERDIER
+# This file is part of memorpy.
+# memorpy is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# memorpy is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with memorpy. If not, see <>.
+from ctypes import pointer, sizeof, windll, create_string_buffer, c_ulong, byref, GetLastError, c_bool, WinError
+from .structures import *
+import copy
+import struct
+# import utils
+import platform
+from .BaseProcess import BaseProcess, ProcessException
+psapi = windll.psapi
+kernel32 = windll.kernel32
+advapi32 = windll.advapi32
+if hasattr(kernel32,'IsWow64Process'):
+ IsWow64Process=kernel32.IsWow64Process
+ IsWow64Process.restype = c_bool
+ IsWow64Process.argtypes = [c_void_p, POINTER(c_bool)]
+class WinProcess(BaseProcess):
+ def __init__(self, pid=None, name=None, debug=True):
+ """ Create and Open a process object from its pid or from its name """
+ super(WinProcess, self).__init__()
+ if pid:
+ self._open(int(pid), debug=debug)
+ elif name:
+ self._open_from_name(name, debug=debug)
+ else:
+ raise ValueError("You need to instanciate process with at least a name or a pid")
+ if self.is_64bit():
+ si = self.GetNativeSystemInfo()
+ self.max_addr = si.lpMaximumApplicationAddress
+ else:
+ si = self.GetSystemInfo()
+ self.max_addr = 2147418111
+ self.min_addr = si.lpMinimumApplicationAddress
+ def __del__(self):
+ self.close()
+ def is_64bit(self):
+ if not "64" in platform.machine():
+ return False
+ iswow64 = c_bool(False)
+ if IsWow64Process is None:
+ return False
+ if not IsWow64Process(self.h_process, byref(iswow64)):
+ raise WinError()
+ return not iswow64.value
+ @staticmethod
+ def list():
+ processes=[]
+ arr = c_ulong * 256
+ lpidProcess= arr()
+ cb = sizeof(lpidProcess)
+ cbNeeded = c_ulong()
+ hModule = c_ulong()
+ count = c_ulong()
+ modname = create_string_buffer(100)
+ PROCESS_VM_READ = 0x0010
+ psapi.EnumProcesses(byref(lpidProcess), cb, byref(cbNeeded))
+ nReturned = cbNeeded.value/sizeof(c_ulong())
+ pidProcess = [i for i in lpidProcess][:nReturned]
+ for pid in pidProcess:
+ proc={ "pid": int(pid) }
+ hProcess = kernel32.OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)
+ if hProcess:
+ psapi.EnumProcessModules(hProcess, byref(hModule), sizeof(hModule), byref(count))
+ psapi.GetModuleBaseNameA(hProcess, hModule.value, modname, sizeof(modname))
+ proc["name"]=modname.value
+ kernel32.CloseHandle(hProcess)
+ processes.append(proc)
+ return processes
+ @staticmethod
+ def processes_from_name(processName):
+ processes = []
+ for process in WinProcess.list():
+ if processName == process.get("name", None) or (process.get("name","").lower().endswith(".exe") and process.get("name","")[:-4]==processName):
+ processes.append(process)
+ if len(processes) > 0:
+ return processes
+ @staticmethod
+ def name_from_process(dwProcessId):
+ process_list = WinProcess.list()
+ for process in process_list:
+ if == dwProcessId:
+ return process.get("name", None)
+ return False
+ def _open(self, dwProcessId, debug=False):
+ if debug:
+ ppsidOwner = DWORD()
+ ppsidGroup = DWORD()
+ ppDacl = DWORD()
+ ppSacl = DWORD()
+ ppSecurityDescriptor = SECURITY_DESCRIPTOR()
+ process = kernel32.OpenProcess(262144, 0, dwProcessId)
+ advapi32.GetSecurityInfo(kernel32.GetCurrentProcess(), 6, 0, byref(ppsidOwner), byref(ppsidGroup), byref(ppDacl), byref(ppSacl), byref(ppSecurityDescriptor))
+ advapi32.SetSecurityInfo(process, 6, DACL_SECURITY_INFORMATION | UNPROTECTED_DACL_SECURITY_INFORMATION, None, None, ppSecurityDescriptor.dacl,
+ kernel32.CloseHandle(process)
+ self.h_process = kernel32.OpenProcess(2035711, 0, dwProcessId)
+ if self.h_process is not None:
+ self.isProcessOpen = True
+ = dwProcessId
+ return True
+ return False
+ def close(self):
+ if self.h_process is not None:
+ ret = kernel32.CloseHandle(self.h_process) == 1
+ if ret:
+ self.h_process = None
+ = None
+ self.isProcessOpen = False
+ return ret
+ return False
+ def _open_from_name(self, processName, debug=False):
+ processes = self.processes_from_name(processName)
+ if not processes:
+ raise ProcessException("can't get pid from name %s" % processName)
+ elif len(processes)>1:
+ raise ValueError("There is multiple processes with name %s. Please select a process from its pid instead"%processName)
+ if debug:
+ self._open(processes[0]["pid"], debug=True)
+ else:
+ self._open(processes[0]["pid"], debug=False)
+ def GetSystemInfo(self):
+ si = SYSTEM_INFO()
+ kernel32.GetSystemInfo(byref(si))
+ return si
+ def GetNativeSystemInfo(self):
+ si = SYSTEM_INFO()
+ kernel32.GetNativeSystemInfo(byref(si))
+ return si
+ def VirtualQueryEx(self, lpAddress):
+ if not VirtualQueryEx(self.h_process, lpAddress, byref(mbi), sizeof(mbi)):
+ raise ProcessException('Error VirtualQueryEx: 0x%08X' % lpAddress)
+ return mbi
+ def VirtualQueryEx64(self, lpAddress):
+ if not VirtualQueryEx64(self.h_process, lpAddress, byref(mbi), sizeof(mbi)):
+ raise ProcessException('Error VirtualQueryEx: 0x%08X' % lpAddress)
+ return mbi
+ def VirtualProtectEx(self, base_address, size, protection):
+ old_protect = c_ulong(0)
+ if not kernel32.VirtualProtectEx(self.h_process, base_address, size, protection, byref(old_protect)):
+ raise ProcessException('Error: VirtualProtectEx(%08X, %d, %08X)' % (base_address, size, protection))
+ return old_protect.value
+ def iter_region(self, start_offset=None, end_offset=None, protec=None, optimizations=None):
+ offset = start_offset or self.min_addr
+ end_offset = end_offset or self.max_addr
+ while True:
+ if offset >= end_offset:
+ break
+ mbi = self.VirtualQueryEx(offset)
+ offset = mbi.BaseAddress
+ chunk = mbi.RegionSize
+ protect = mbi.Protect
+ state = mbi.State
+ #print "offset: %s, chunk:%s"%(offset, chunk)
+ if state & MEM_FREE or state & MEM_RESERVE:
+ offset += chunk
+ continue
+ if protec:
+ if not protect & protec or protect & PAGE_NOCACHE or protect & PAGE_WRITECOMBINE or protect & PAGE_GUARD:
+ offset += chunk
+ continue
+ yield offset, chunk
+ offset += chunk
+ def write_bytes(self, address, data):
+ address = int(address)
+ if not self.isProcessOpen:
+ raise ProcessException("Can't write_bytes(%s, %s), process %s is not open" % (address, data,
+ buffer = create_string_buffer(data)
+ sizeWriten = c_size_t(0)
+ bufferSize = sizeof(buffer) - 1
+ _address = address
+ _length = bufferSize + 1
+ try:
+ old_protect = self.VirtualProtectEx(_address, _length, PAGE_EXECUTE_READWRITE)
+ except:
+ pass
+ res = kernel32.WriteProcessMemory(self.h_process, address, buffer, bufferSize, byref(sizeWriten))
+ try:
+ self.VirtualProtectEx(_address, _length, old_protect)
+ except:
+ pass
+ return res
+ def read_bytes(self, address, bytes = 4, use_NtWow64ReadVirtualMemory64=False):
+ #print "reading %s bytes from addr %s"%(bytes, address)
+ if use_NtWow64ReadVirtualMemory64:
+ if NtWow64ReadVirtualMemory64 is None:
+ raise WindowsError("NtWow64ReadVirtualMemory64 is not available from a 64bit process")
+ RpM = NtWow64ReadVirtualMemory64
+ else:
+ RpM = ReadProcessMemory
+ address = int(address)
+ buffer = create_string_buffer(bytes)
+ bytesread = c_size_t(0)
+ data = b''
+ length = bytes
+ while length:
+ if RpM(self.h_process, address, buffer, bytes, byref(bytesread)) or (use_NtWow64ReadVirtualMemory64 and GetLastError() == 0):
+ if bytesread.value:
+ data += buffer.raw[:bytesread.value]
+ length -= bytesread.value
+ address += bytesread.value
+ if not len(data):
+ raise ProcessException('Error %s in ReadProcessMemory(%08x, %d, read=%d)' % (GetLastError(),
+ address,
+ length,
+ bytesread.value))
+ return data
+ else:
+ if GetLastError()==299: #only part of ReadProcessMemory has been done, let's return it
+ data += buffer.raw[:bytesread.value]
+ return data
+ raise WinError()
+ # data += buffer.raw[:bytesread.value]
+ # length -= bytesread.value
+ # address += bytesread.value
+ return data
+ def list_modules(self):
+ module_list = []
+ if is not None:
+ hModuleSnap = CreateToolhelp32Snapshot(TH32CS_CLASS.SNAPMODULE,
+ if hModuleSnap is not None:
+ module_entry = MODULEENTRY32()
+ module_entry.dwSize = sizeof(module_entry)
+ success = Module32First(hModuleSnap, byref(module_entry))
+ while success:
+ if module_entry.th32ProcessID ==
+ module_list.append(copy.copy(module_entry))
+ success = Module32Next(hModuleSnap, byref(module_entry))
+ kernel32.CloseHandle(hModuleSnap)
+ return module_list
+ def get_symbolic_name(self, address):
+ for m in self.list_modules():
+ if int(m.modBaseAddr) <= int(address) < int(m.modBaseAddr + m.modBaseSize):
+ return '%s+0x%08X' % (m.szModule, int(address) - m.modBaseAddr)
+ return '0x%08X' % int(address)
+ def hasModule(self, module):
+ if module[-4:] != '.dll':
+ module += '.dll'
+ module_list = self.list_modules()
+ for m in module_list:
+ if module in m.szExePath.split('\\'):
+ return True
+ return False
+ def get_instruction(self, address):
+ """
+ Pydasm disassemble utility function wrapper. Returns the pydasm decoded instruction in self.instruction.
+ """
+ import pydasm
+ try:
+ data = self.read_bytes(int(address), 32)
+ except:
+ return 'Unable to disassemble at %08x' % address
+ return pydasm.get_instruction(data, pydasm.MODE_32)