# Author: Nicolas VERDIER
# This file is part of memorpy.
#
# memorpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# memorpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with memorpy. If not, see .
import copy
import struct
# import utils
import platform
import ctypes, re, sys
from ctypes import create_string_buffer, byref, c_int, c_void_p, c_long, c_size_t, c_ssize_t, POINTER, get_errno
import errno
import os
import signal
from .BaseProcess import BaseProcess, ProcessException
from .structures import *
import logging
logger = logging.getLogger('memorpy')
libc=ctypes.CDLL("libc.so.6", use_errno=True)
get_errno_loc = libc.__errno_location
get_errno_loc.restype = POINTER(c_int)
def errcheck(ret, func, args):
if ret == -1:
_errno = get_errno() or errno.EPERM
raise OSError(os.strerror(_errno))
return ret
c_ptrace = libc.ptrace
c_pid_t = ctypes.c_int32 # This assumes pid_t is int32_t
c_ptrace.argtypes = [c_int, c_pid_t, c_void_p, c_void_p]
c_ptrace.restype = c_long
mprotect = libc.mprotect
mprotect.restype = c_int
mprotect.argtypes = [c_void_p, c_size_t, c_int]
LARGE_FILE_SUPPORT=False
try:
c_off64_t=ctypes.c_longlong
lseek64 = libc.lseek64
lseek64.argtypes = [c_int, c_off64_t, c_int]
lseek64.errcheck=errcheck
open64 = libc.open64
open64.restype = c_int
open64.argtypes = [c_void_p, c_int]
open64.errcheck=errcheck
pread64=libc.pread64
pread64.argtypes = [c_int, c_void_p, c_size_t, c_off64_t]
pread64.restype = c_ssize_t
pread64.errcheck=errcheck
c_close=libc.close
c_close.argtypes = [c_int]
c_close.restype = c_int
LARGE_FILE_SUPPORT=True
except:
logger.warning("no Large File Support")
class LinProcess(BaseProcess):
def __init__(self, pid=None, name=None, debug=True, ptrace=None):
""" Create and Open a process object from its pid or from its name """
super(LinProcess, self).__init__()
self.mem_file=None
self.ptrace_started=False
if pid is not None:
self.pid=pid
elif name is not None:
self.pid=LinProcess.pid_from_name(name)
else:
raise ValueError("You need to instanciate process with at least a name or a pid")
if ptrace is None:
if os.getuid()==0:
self.read_ptrace=False # no need to ptrace the process when root to read memory
else:
self.read_ptrace=True
self._open()
def check_ptrace_scope(self):
""" check ptrace scope and raise an exception if privileges are unsufficient
The sysctl settings (writable only with CAP_SYS_PTRACE) are:
0 - classic ptrace permissions: a process can PTRACE_ATTACH to any other
process running under the same uid, as long as it is dumpable (i.e.
did not transition uids, start privileged, or have called
prctl(PR_SET_DUMPABLE...) already). Similarly, PTRACE_TRACEME is
unchanged.
1 - restricted ptrace: a process must have a predefined relationship
with the inferior it wants to call PTRACE_ATTACH on. By default,
this relationship is that of only its descendants when the above
classic criteria is also met. To change the relationship, an
inferior can call prctl(PR_SET_PTRACER, debugger, ...) to declare
an allowed debugger PID to call PTRACE_ATTACH on the inferior.
Using PTRACE_TRACEME is unchanged.
2 - admin-only attach: only processes with CAP_SYS_PTRACE may use ptrace
with PTRACE_ATTACH, or through children calling PTRACE_TRACEME.
3 - no attach: no processes may use ptrace with PTRACE_ATTACH nor via
PTRACE_TRACEME. Once set, this sysctl value cannot be changed.
"""
try:
with open("/proc/sys/kernel/yama/ptrace_scope",'rb') as f:
ptrace_scope=int(f.read().strip())
if ptrace_scope==3:
logger.warning("yama/ptrace_scope == 3 (no attach). :/")
if os.getuid()==0:
return
elif ptrace_scope == 1:
logger.warning("yama/ptrace_scope == 1 (restricted). you can't ptrace other process ... get root")
elif ptrace_scope == 2:
logger.warning("yama/ptrace_scope == 2 (admin-only). Warning: check you have CAP_SYS_PTRACE")
except IOError:
pass
except Exception as e:
logger.warning("Error getting ptrace_scope ?? : %s"%e)
def close(self):
if self.mem_file:
if not LARGE_FILE_SUPPORT:
self.mem_file.close()
else:
c_close(self.mem_file)
self.mem_file=None
if self.ptrace_started:
self.ptrace_detach()
def __del__(self):
self.close()
def _open(self):
self.isProcessOpen = True
self.check_ptrace_scope()
if os.getuid()!=0:
#to raise an exception if ptrace is not allowed
self.ptrace_attach()
self.ptrace_detach()
#open file descriptor
if not LARGE_FILE_SUPPORT:
self.mem_file=open("/proc/" + str(self.pid) + "/mem", 'rb', 0)
else:
path=create_string_buffer(b"/proc/%d/mem" % self.pid)
self.mem_file=open64(byref(path), os.O_RDONLY)
@staticmethod
def list():
processes=[]
for pid in os.listdir("/proc"):
try:
exe=os.readlink("/proc/%s/exe"%pid)
processes.append({"pid":int(pid), "name":exe})
except:
pass
return processes
@staticmethod
def pid_from_name(name):
#quick and dirty, works with all linux not depending on ps output
for pid in os.listdir("/proc"):
try:
int(pid)
except:
continue
pname=""
with open("/proc/%s/cmdline"%pid,'r') as f:
pname=f.read()
if name in pname:
return int(pid)
raise ProcessException("No process with such name: %s"%name)
## Partial interface to ptrace(2), only for PTRACE_ATTACH and PTRACE_DETACH.
def _ptrace(self, attach):
op = ctypes.c_int(PTRACE_ATTACH if attach else PTRACE_DETACH)
c_pid = c_pid_t(self.pid)
null = ctypes.c_void_p()
if not attach:
os.kill(self.pid, signal.SIGSTOP)
os.waitpid(self.pid, 0)
err = c_ptrace(op, c_pid, null, null)
if not attach:
os.kill(self.pid, signal.SIGCONT)
if err != 0:
raise OSError("%s: %s"%(
'PTRACE_ATTACH' if attach else 'PTRACE_DETACH',
errno.errorcode.get(ctypes.get_errno(), 'UNKNOWN')
))
def iter_region(self, start_offset=None, end_offset=None, protec=None, optimizations=None):
"""
optimizations :
i for inode==0 (no file mapping)
s to avoid scanning shared regions
x to avoid scanning x regions
r don't scan ronly regions
"""
with open("/proc/" + str(self.pid) + "/maps", 'r') as maps_file:
for line in maps_file:
m = re.match(r'([0-9A-Fa-f]+)-([0-9A-Fa-f]+)\s+([-rwpsx]+)\s+([0-9A-Fa-f]+)\s+([0-9A-Fa-f]+:[0-9A-Fa-f]+)\s+([0-9]+)\s*(.*)', line)
if not m:
continue
start, end, region_protec, offset, dev, inode, pathname = int(m.group(1), 16), int(m.group(2), 16), m.group(3), m.group(4), m.group(5), int(m.group(6)), m.group(7)
if start_offset is not None:
if start < start_offset:
continue
if end_offset is not None:
if start > end_offset:
continue
chunk=end-start
if 'r' in region_protec: # TODO: handle protec parameter
if optimizations:
if 'i' in optimizations and inode != 0:
continue
if 's' in optimizations and 's' in region_protec:
continue
if 'x' in optimizations and 'x' in region_protec:
continue
if 'r' in optimizations and not 'w' in region_protec:
continue
yield start, chunk
def ptrace_attach(self):
if not self.ptrace_started:
res=self._ptrace(True)
self.ptrace_started=True
return res
def ptrace_detach(self):
if self.ptrace_started:
res=self._ptrace(False)
self.ptrace_started=False
return res
def write_bytes(self, address, data):
if not self.ptrace_started:
self.ptrace_attach()
c_pid = c_pid_t(self.pid)
null = ctypes.c_void_p()
#we can only copy data per range of 4 or 8 bytes
word_size=ctypes.sizeof(ctypes.c_void_p)
#mprotect(address, len(data)+(len(data)%word_size), PROT_WRITE|PROT_READ)
for i in range(0, len(data), word_size):
word=data[i:i+word_size]
if len(word)