Jak*_*ams 5 winapi ctypes python-2.7
到现在为止,我有这段代码(我知道它很丑,但这不是现在的重点)
我无法弄清楚如何发出以下系统调用并构建正确的结构来访问另一个进程的 PEB。
我想做以下事情:
HANDLE pHandle = OpenProcessNTSTATUS status = NtQueryInformationProcess(pHandle, 0, peb, peb_len, 0)代码:
from ctypes import *
from ctypes.wintypes import *
from _multiprocessing import win32
import argparse
class UNICODE_STRING(Structure):
_fields_ = [
("Length", USHORT),
("MaximumLength", USHORT),
("Buffer", c_wchar_p)
]
class RTL_USER_PROCESS_PARAMETERS(Structure):
_fields_ = [
("Reserved1", BYTE*16),
("Reserved2", BYTE*10),
("ImagePathName", UNICODE_STRING),
("CommandLine", UNICODE_STRING)
]
class PEB(Structure):
_fields_ = [
("Reserved1", BYTE*2),
("BeingDebugged", BYTE),
("Reserved2", BYTE),
("Rserved3", LPVOID),
("Ldr", LPVOID),
("ProcessParameters", POINTER(RTL_USER_PROCESS_PARAMETERS)),
("Reserved4", BYTE*104),
("Reserved5", LPVOID*52),
("PostProcessInitRoutine", LPVOID),
("Reserved6", BYTE*128),
("Reserved7", LPVOID),
("SessionId", ULONG)
]
class PROCESS_BASIC_INFORMATION(Structure):
_fields_ = [
("Reserved1", LPVOID),
("PebBaseAddress", POINTER(PEB)),
("Reserved2", LPVOID*2),
("UniqueProcessId", POINTER(ULONG)),
("Reserved3", LPVOID)
]
def main():
# Command Line Arguments Parsing
parser = argparse.ArgumentParser()
parser.add_argument('pid', metavar='<process id>', type=int, help='shows basic info about the process')
parser.add_argument('-dS', metavar='dump strings', help='dump all used strings to txt file')
parser.add_argument('-dD', metavar='dump dll', help='dump all used strings to txt file')
args = parser.parse_args()
var_pid = args.pid
# WinAPi Calls
# Variables Definition
pHandle = HANDLE()
NTSTATUS = ULONG()
pbi = PROCESS_BASIC_INFORMATION()
pPEB = PEB()
pRTL = RTL_USER_PROCESS_PARAMETERS()
pCMD = UNICODE_STRING()
ReturnValue = BOOL()
bytesRead = ULONG()
# OpenProcess
pHandle = windll.kernel32.OpenProcess(win32.PROCESS_ALL_ACCESS, 0, var_pid)
# NtQueryInformationProcess
NTSTATUS = windll.ntdll.NtQueryInformationProcess(pHandle, 0, byref(pbi), sizeof(pbi), None)
# ReadProcessMemory
ReturnValue = windll.kernel32.ReadProcessMemory(pHandle, pbi.PebBaseAddress, byref(pPEB), sizeof(PEB), byref(bytesRead))
ReturnValue = windll.kernel32.ReadProcessMemory(pHandle, pPEB.ProcessParameters, byref(pRTL), sizeof(RTL_USER_PROCESS_PARAMETERS), byref(bytesRead))
#msvcrt = cdll.msvcrt
#msvcrt.printf("%s", pRTL.CommandLine.Buffer)
temp = pRTL.ImagePathName.Buffer # cant read the unicode from this buffer
ReturnValue = windll.kernel32.CloseHandle(pHandle)
exit(0)
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
**编辑:我设法获得了 PEB 并在其中遍历了结构,但我无法从它们的缓冲区中读取 UNICODE 字符串。例如我想读取命令行参数
值得重复的是,这NtQueryInformationProcess是一个本机系统调用,在 Windows 编程中不鼓励这样做。Microsoft 没有提供 ntdll.dll 的导入库,因此动态调用其导出函数的唯一方法是通过GetProcAddress. 这当然就是 ctypes 的工作原理,因此从 Python 调用本机 NTAPI 函数并不困难。问题在于缺乏官方支持和文档,并且 NT 数据结构、API 和可用信息类都可能发生变化。
另请注意,ProcessBasicInformation当从 64 位进程调用时,查询将检索 64 位 PEB 的地址。因此,当从 64 位进程查询 WOW64 32 位进程时,您只会看到本机 64 位模块 ntdll.dll、wow64.dll、wow64win.dll 和 wow64cpu.dll。这里的答案提供了一种技术,通过使用从 64 位 TEB 到 32 位 TEB 的魔术偏移量来查找 32 位 PEB 的地址,32 位 TEB 有一个指向 32 位 PEB 的指针。但当然,这个实现细节可以随时更改,从而破坏依赖于它的代码。
以下示例包含查询和使用ProcessBasicInformation具有相同体系结构(即本机 64 位或 WOW64 32 位)的给定进程所需的 ctypes 定义。它包含一个类,该类演示用法并提供进程 ID、会话 ID、图像路径、命令行和加载模块的路径的属性。
该示例使用 的RemotePointer子类ctypes._Pointer以及RPOINTER工厂函数。此类重写__getitem__有助于取消引用另一个进程的地址空间中的指针值。索引键是 形式的元组index, handle[, size]。可选size参数(以字节为单位)对于大小字符串很有用,例如 NTAPI UNICODE_STRING,例如ustr.Buffer[0, hProcess, usrt.Length]。不支持以空结尾的字符串,因为ReadProcessMemory需要一定大小的缓冲区。
遍历加载器数据的逻辑位于私有_modules_iter方法中,该方法使用内存中顺序链表遍历加载的模块。请注意,InMemoryOrderModuleList链接到结构InMemoryOrderLinks的字段LDR_DATA_TABLE_ENTRY,对于列表中的每个链接依此类推。模块迭代器必须通过该字段的偏移量来调整每个条目的基地址。在 C API 中,这将使用CONTAINING_RECORD宏。
如果没有提供进程 ID 或句柄,构造ProcessInformation函数默认查询当前进程。如果调用状态是错误或警告(即否定NTSTATUS),则调用以获取3.3 之前的或 的NtError实例。OSErrorWindowsError
我有一个更复杂的版本,但没有包含NtError该版本,该版本FormatMessage使用 ntdll.dll 作为源模块来调用以获取格式化的错误消息。我可以根据要求更新答案以包含此版本。
该示例在 Windows 7 和 10 中使用 32 位和 64 位版本的 Python 2.7 和 3.5 进行了测试。对于远程进程测试,子进程模块用于启动第二个 Python 实例。事件句柄被传递给子进程以进行同步。如果父进程没有等待子进程完成加载并设置事件,那么子进程的加载器数据在读取时可能没有完全初始化。
import ctypes
from ctypes import wintypes
ntdll = ctypes.WinDLL('ntdll')
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
# WINAPI Definitions
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
ERROR_INVALID_HANDLE = 0x0006
ERROR_PARTIAL_COPY = 0x012B
PULONG = ctypes.POINTER(wintypes.ULONG)
ULONG_PTR = wintypes.LPVOID
SIZE_T = ctypes.c_size_t
def _check_bool(result, func, args):
if not result:
raise ctypes.WinError(ctypes.get_last_error())
return args
kernel32.ReadProcessMemory.errcheck = _check_bool
kernel32.ReadProcessMemory.argtypes = (
wintypes.HANDLE, # _In_ hProcess
wintypes.LPCVOID, # _In_ lpBaseAddress
wintypes.LPVOID, # _Out_ lpBuffer
SIZE_T, # _In_ nSize
ctypes.POINTER(SIZE_T)) # _Out_ lpNumberOfBytesRead
kernel32.CloseHandle.errcheck = _check_bool
kernel32.CloseHandle.argtypes = (wintypes.HANDLE,)
kernel32.GetCurrentProcess.restype = wintypes.HANDLE
kernel32.GetCurrentProcess.argtypes = ()
kernel32.OpenProcess.errcheck = _check_bool
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.OpenProcess.argtypes = (
wintypes.DWORD, # _In_ dwDesiredAccess
wintypes.BOOL, # _In_ bInheritHandle
wintypes.DWORD) # _In_ dwProcessId
class RemotePointer(ctypes._Pointer):
def __getitem__(self, key):
# TODO: slicing
size = None
if not isinstance(key, tuple):
raise KeyError('must be (index, handle[, size])')
if len(key) > 2:
index, handle, size = key
else:
index, handle = key
if isinstance(index, slice):
raise TypeError('slicing is not supported')
dtype = self._type_
offset = ctypes.sizeof(dtype) * index
address = PVOID.from_buffer(self).value + offset
simple = issubclass(dtype, ctypes._SimpleCData)
if simple and size is not None:
if dtype._type_ == wintypes.WCHAR._type_:
buf = (wintypes.WCHAR * (size // 2))()
else:
buf = (ctypes.c_char * size)()
else:
buf = dtype()
nread = SIZE_T()
kernel32.ReadProcessMemory(handle,
address,
ctypes.byref(buf),
ctypes.sizeof(buf),
ctypes.byref(nread))
if simple:
return buf.value
return buf
def __setitem__(self, key, value):
# TODO: kernel32.WriteProcessMemory
raise TypeError('remote pointers are read only')
@property
def contents(self):
# a handle is required
raise NotImplementedError
_remote_pointer_cache = {}
def RPOINTER(dtype):
if dtype in _remote_pointer_cache:
return _remote_pointer_cache[dtype]
name = 'RP_%s' % dtype.__name__
ptype = type(name, (RemotePointer,), {'_type_': dtype})
_remote_pointer_cache[dtype] = ptype
return ptype
# NTAPI Definitions
NTSTATUS = wintypes.LONG
PVOID = wintypes.LPVOID
RPWSTR = RPOINTER(wintypes.WCHAR)
PROCESSINFOCLASS = wintypes.ULONG
ProcessBasicInformation = 0
ProcessDebugPort = 7
ProcessWow64Information = 26
ProcessImageFileName = 27
ProcessBreakOnTermination = 29
STATUS_UNSUCCESSFUL = NTSTATUS(0xC0000001)
STATUS_INFO_LENGTH_MISMATCH = NTSTATUS(0xC0000004).value
STATUS_INVALID_HANDLE = NTSTATUS(0xC0000008).value
STATUS_OBJECT_TYPE_MISMATCH = NTSTATUS(0xC0000024).value
class UNICODE_STRING(ctypes.Structure):
_fields_ = (('Length', wintypes.USHORT),
('MaximumLength', wintypes.USHORT),
('Buffer', RPWSTR))
class LIST_ENTRY(ctypes.Structure):
pass
RPLIST_ENTRY = RPOINTER(LIST_ENTRY)
LIST_ENTRY._fields_ = (('Flink', RPLIST_ENTRY),
('Blink', RPLIST_ENTRY))
class LDR_DATA_TABLE_ENTRY(ctypes.Structure):
_fields_ = (('Reserved1', PVOID * 2),
('InMemoryOrderLinks', LIST_ENTRY),
('Reserved2', PVOID * 2),
('DllBase', PVOID),
('EntryPoint', PVOID),
('Reserved3', PVOID),
('FullDllName', UNICODE_STRING),
('Reserved4', wintypes.BYTE * 8),
('Reserved5', PVOID * 3),
('CheckSum', PVOID),
('TimeDateStamp', wintypes.ULONG))
RPLDR_DATA_TABLE_ENTRY = RPOINTER(LDR_DATA_TABLE_ENTRY)
class PEB_LDR_DATA(ctypes.Structure):
_fields_ = (('Reserved1', wintypes.BYTE * 8),
('Reserved2', PVOID * 3),
('InMemoryOrderModuleList', LIST_ENTRY))
RPPEB_LDR_DATA = RPOINTER(PEB_LDR_DATA)
class RTL_USER_PROCESS_PARAMETERS(ctypes.Structure):
_fields_ = (('Reserved1', wintypes.BYTE * 16),
('Reserved2', PVOID * 10),
('ImagePathName', UNICODE_STRING),
('CommandLine', UNICODE_STRING))
RPRTL_USER_PROCESS_PARAMETERS = RPOINTER(RTL_USER_PROCESS_PARAMETERS)
PPS_POST_PROCESS_INIT_ROUTINE = PVOID
class PEB(ctypes.Structure):
_fields_ = (('Reserved1', wintypes.BYTE * 2),
('BeingDebugged', wintypes.BYTE),
('Reserved2', wintypes.BYTE * 1),
('Reserved3', PVOID * 2),
('Ldr', RPPEB_LDR_DATA),
('ProcessParameters', RPRTL_USER_PROCESS_PARAMETERS),
('Reserved4', wintypes.BYTE * 104),
('Reserved5', PVOID * 52),
('PostProcessInitRoutine', PPS_POST_PROCESS_INIT_ROUTINE),
('Reserved6', wintypes.BYTE * 128),
('Reserved7', PVOID * 1),
('SessionId', wintypes.ULONG))
RPPEB = RPOINTER(PEB)
class PROCESS_BASIC_INFORMATION(ctypes.Structure):
_fields_ = (('Reserved1', PVOID),
('PebBaseAddress', RPPEB),
('Reserved2', PVOID * 2),
('UniqueProcessId', ULONG_PTR),
('Reserved3', PVOID))
def NtError(status):
import sys
descr = 'NTSTATUS(%#08x) ' % (status % 2**32,)
if status & 0xC0000000 == 0xC0000000:
descr += '[Error]'
elif status & 0x80000000 == 0x80000000:
descr += '[Warning]'
elif status & 0x40000000 == 0x40000000:
descr += '[Information]'
else:
descr += '[Success]'
if sys.version_info[:2] < (3, 3):
return WindowsError(status, descr)
return OSError(None, descr, None, status)
NtQueryInformationProcess = ntdll.NtQueryInformationProcess
NtQueryInformationProcess.restype = NTSTATUS
NtQueryInformationProcess.argtypes = (
wintypes.HANDLE, # _In_ ProcessHandle
PROCESSINFOCLASS, # _In_ ProcessInformationClass
PVOID, # _Out_ ProcessInformation
wintypes.ULONG, # _In_ ProcessInformationLength
PULONG) # _Out_opt_ ReturnLength
class ProcessInformation(object):
_close_handle = False
_closed = False
_module_names = None
def __init__(self, process_id=None, handle=None):
if process_id is None and handle is None:
handle = kernel32.GetCurrentProcess()
elif handle is None:
handle = kernel32.OpenProcess(PROCESS_VM_READ |
PROCESS_QUERY_INFORMATION,
False, process_id)
self._close_handle = True
self._handle = handle
self._query_info()
if process_id is not None and self._process_id != process_id:
raise NtError(STATUS_UNSUCCESSFUL)
def __del__(self, CloseHandle=kernel32.CloseHandle):
if self._close_handle and not self._closed:
try:
CloseHandle(self._handle)
except WindowsError as e:
if e.winerror != ERROR_INVALID_HANDLE:
raise
self._closed = True
def _query_info(self):
info = PROCESS_BASIC_INFORMATION()
handle = self._handle
status = NtQueryInformationProcess(handle,
ProcessBasicInformation,
ctypes.byref(info),
ctypes.sizeof(info),
None)
if status < 0:
raise NtError(status)
self._process_id = info.UniqueProcessId
self._peb = peb = info.PebBaseAddress[0, handle]
self._params = peb.ProcessParameters[0, handle]
self._ldr = peb.Ldr[0, handle]
def _modules_iter(self):
headaddr = (PVOID.from_buffer(self._peb.Ldr).value +
PEB_LDR_DATA.InMemoryOrderModuleList.offset)
offset = LDR_DATA_TABLE_ENTRY.InMemoryOrderLinks.offset
pentry = self._ldr.InMemoryOrderModuleList.Flink
while pentry:
pentry_void = PVOID.from_buffer_copy(pentry)
if pentry_void.value == headaddr:
break
pentry_void.value -= offset
pmod = RPLDR_DATA_TABLE_ENTRY.from_buffer(pentry_void)
mod = pmod[0, self._handle]
yield mod
pentry = LIST_ENTRY.from_buffer(mod, offset).Flink
def update_module_names(self):
names = []
for m in self._modules_iter():
ustr = m.FullDllName
name = ustr.Buffer[0, self._handle, ustr.Length]
names.append(name)
self._module_names = names
@property
def module_names(self):
if self._module_names is None:
self.update_module_names()
return self._module_names
@property
def process_id(self):
return self._process_id
@property
def session_id(self):
return self._peb.SessionId
@property
def image_path(self):
ustr = self._params.ImagePathName
return ustr.Buffer[0, self._handle, ustr.Length]
@property
def command_line(self):
ustr = self._params.CommandLine
buf = ustr.Buffer[0, self._handle, ustr.Length]
return buf
Run Code Online (Sandbox Code Playgroud)
例子:
if __name__ == '__main__':
import os
import sys
import subprocess
import textwrap
class SECURITY_ATTRIBUTES(ctypes.Structure):
_fields_ = (('nLength', wintypes.DWORD),
('lpSecurityDescriptor', wintypes.LPVOID),
('bInheritHandle', wintypes.BOOL))
def __init__(self, *args, **kwds):
super(SECURITY_ATTRIBUTES, self).__init__(*args, **kwds)
self.nLength = ctypes.sizeof(self)
def test_remote(use_pid=True, show_modules=False):
sa = SECURITY_ATTRIBUTES(bInheritHandle=True)
hEvent = kernel32.CreateEventW(ctypes.byref(sa), 0, 0, None)
try:
script = textwrap.dedent(r"""
import sys
import ctypes
kernel32 = ctypes.WinDLL('kernel32')
kernel32.SetEvent(%d)
sys.stdin.read()""").strip() % hEvent
cmd = '"%s" -c "%s"' % (sys.executable, script)
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE,
close_fds=False)
try:
kernel32.WaitForSingleObject(hEvent, 5000)
if use_pid:
pi = ProcessInformation(proc.pid)
else:
pi = ProcessInformation(handle=int(proc._handle))
assert pi.process_id == proc.pid
assert pi.image_path == sys.executable
assert pi.command_line == cmd
assert pi.module_names[0] == sys.executable
if show_modules:
print('\n'.join(pi.module_names))
finally:
proc.terminate()
finally:
kernel32.CloseHandle(hEvent)
print('Test 1: current process')
pi = ProcessInformation()
assert os.getpid() == pi.process_id
assert pi.image_path == pi.module_names[0]
print('Test 2: remote process (Handle)')
test_remote(use_pid=False)
print('Test 3: remote process (PID)')
test_remote(show_modules=True)
Run Code Online (Sandbox Code Playgroud)
Windows 10 中的输出,使用 64 位 Python 3.5:
import ctypes
from ctypes import wintypes
ntdll = ctypes.WinDLL('ntdll')
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
# WINAPI Definitions
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
ERROR_INVALID_HANDLE = 0x0006
ERROR_PARTIAL_COPY = 0x012B
PULONG = ctypes.POINTER(wintypes.ULONG)
ULONG_PTR = wintypes.LPVOID
SIZE_T = ctypes.c_size_t
def _check_bool(result, func, args):
if not result:
raise ctypes.WinError(ctypes.get_last_error())
return args
kernel32.ReadProcessMemory.errcheck = _check_bool
kernel32.ReadProcessMemory.argtypes = (
wintypes.HANDLE, # _In_ hProcess
wintypes.LPCVOID, # _In_ lpBaseAddress
wintypes.LPVOID, # _Out_ lpBuffer
SIZE_T, # _In_ nSize
ctypes.POINTER(SIZE_T)) # _Out_ lpNumberOfBytesRead
kernel32.CloseHandle.errcheck = _check_bool
kernel32.CloseHandle.argtypes = (wintypes.HANDLE,)
kernel32.GetCurrentProcess.restype = wintypes.HANDLE
kernel32.GetCurrentProcess.argtypes = ()
kernel32.OpenProcess.errcheck = _check_bool
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.OpenProcess.argtypes = (
wintypes.DWORD, # _In_ dwDesiredAccess
wintypes.BOOL, # _In_ bInheritHandle
wintypes.DWORD) # _In_ dwProcessId
class RemotePointer(ctypes._Pointer):
def __getitem__(self, key):
# TODO: slicing
size = None
if not isinstance(key, tuple):
raise KeyError('must be (index, handle[, size])')
if len(key) > 2:
index, handle, size = key
else:
index, handle = key
if isinstance(index, slice):
raise TypeError('slicing is not supported')
dtype = self._type_
offset = ctypes.sizeof(dtype) * index
address = PVOID.from_buffer(self).value + offset
simple = issubclass(dtype, ctypes._SimpleCData)
if simple and size is not None:
if dtype._type_ == wintypes.WCHAR._type_:
buf = (wintypes.WCHAR * (size // 2))()
else:
buf = (ctypes.c_char * size)()
else:
buf = dtype()
nread = SIZE_T()
kernel32.ReadProcessMemory(handle,
address,
ctypes.byref(buf),
ctypes.sizeof(buf),
ctypes.byref(nread))
if simple:
return buf.value
return buf
def __setitem__(self, key, value):
# TODO: kernel32.WriteProcessMemory
raise TypeError('remote pointers are read only')
@property
def contents(self):
# a handle is required
raise NotImplementedError
_remote_pointer_cache = {}
def RPOINTER(dtype):
if dtype in _remote_pointer_cache:
return _remote_pointer_cache[dtype]
name = 'RP_%s' % dtype.__name__
ptype = type(name, (RemotePointer,), {'_type_': dtype})
_remote_pointer_cache[dtype] = ptype
return ptype
# NTAPI Definitions
NTSTATUS = wintypes.LONG
PVOID = wintypes.LPVOID
RPWSTR = RPOINTER(wintypes.WCHAR)
PROCESSINFOCLASS = wintypes.ULONG
ProcessBasicInformation = 0
ProcessDebugPort = 7
ProcessWow64Information = 26
ProcessImageFileName = 27
ProcessBreakOnTermination = 29
STATUS_UNSUCCESSFUL = NTSTATUS(0xC0000001)
STATUS_INFO_LENGTH_MISMATCH = NTSTATUS(0xC0000004).value
STATUS_INVALID_HANDLE = NTSTATUS(0xC0000008).value
STATUS_OBJECT_TYPE_MISMATCH = NTSTATUS(0xC0000024).value
class UNICODE_STRING(ctypes.Structure):
_fields_ = (('Length', wintypes.USHORT),
('MaximumLength', wintypes.USHORT),
('Buffer', RPWSTR))
class LIST_ENTRY(ctypes.Structure):
pass
RPLIST_ENTRY = RPOINTER(LIST_ENTRY)
LIST_ENTRY._fields_ = (('Flink', RPLIST_ENTRY),
('Blink', RPLIST_ENTRY))
class LDR_DATA_TABLE_ENTRY(ctypes.Structure):
_fields_ = (('Reserved1', PVOID * 2),
('InMemoryOrderLinks', LIST_ENTRY),
('Reserved2', PVOID * 2),
('DllBase', PVOID),
('EntryPoint', PVOID),
('Reserved3', PVOID),
('FullDllName', UNICODE_STRING),
('Reserved4', wintypes.BYTE * 8),
('Reserved5', PVOID * 3),
('CheckSum', PVOID),
('TimeDateStamp', wintypes.ULONG))
RPLDR_DATA_TABLE_ENTRY = RPOINTER(LDR_DATA_TABLE_ENTRY)
class PEB_LDR_DATA(ctypes.Structure):
_fields_ = (('Reserved1', wintypes.BYTE * 8),
('Reserved2', PVOID * 3),
('InMemoryOrderModuleList', LIST_ENTRY))
RPPEB_LDR_DATA = RPOINTER(PEB_LDR_DATA)
class RTL_USER_PROCESS_PARAMETERS(ctypes.Structure):
_fields_ = (('Reserved1', wintypes.BYTE * 16),
('Reserved2', PVOID * 10),
('ImagePathName', UNICODE_STRING),
('CommandLine', UNICODE_STRING))
RPRTL_USER_PROCESS_PARAMETERS = RPOINTER(RTL_USER_PROCESS_PARAMETERS)
PPS_POST_PROCESS_INIT_ROUTINE = PVOID
class PEB(ctypes.Structure):
_fields_ = (('Reserved1', wintypes.BYTE * 2),
('BeingDebugged', wintypes.BYTE),
('Reserved2', wintypes.BYTE * 1),
('Reserved3', PVOID * 2),
('Ldr', RPPEB_LDR_DATA),
('ProcessParameters', RPRTL_USER_PROCESS_PARAMETERS),
('Reserved4', wintypes.BYTE * 104),
('Reserved5', PVOID * 52),
('PostProcessInitRoutine', PPS_POST_PROCESS_INIT_ROUTINE),
('Reserved6', wintypes.BYTE * 128),
('Reserved7', PVOID * 1),
('SessionId', wintypes.ULONG))
RPPEB = RPOINTER(PEB)
class PROCESS_BASIC_INFORMATION(ctypes.Structure):
_fields_ = (('Reserved1', PVOID),
('PebBaseAddress', RPPEB),
('Reserved2', PVOID * 2),
('UniqueProcessId', ULONG_PTR),
('Reserved3', PVOID))
def NtError(status):
import sys
descr = 'NTSTATUS(%#08x) ' % (status % 2**32,)
if status & 0xC0000000 == 0xC0000000:
descr += '[Error]'
elif status & 0x80000000 == 0x80000000:
descr += '[Warning]'
elif status & 0x40000000 == 0x40000000:
descr += '[Information]'
else:
descr += '[Success]'
if sys.version_info[:2] < (3, 3):
return WindowsError(status, descr)
return OSError(None, descr, None, status)
NtQueryInformationProcess = ntdll.NtQueryInformationProcess
NtQueryInformationProcess.restype = NTSTATUS
NtQueryInformationProcess.argtypes = (
wintypes.HANDLE, # _In_ ProcessHandle
PROCESSINFOCLASS, # _In_ ProcessInformationClass
PVOID, # _Out_ ProcessInformation
wintypes.ULONG, # _In_ ProcessInformationLength
PULONG) # _Out_opt_ ReturnLength
class ProcessInformation(object):
_close_handle = False
_closed = False
_module_names = None
def __init__(self, process_id=None, handle=None):
if process_id is None and handle is None:
handle = kernel32.GetCurrentProcess()
elif handle is None:
handle = kernel32.OpenProcess(PROCESS_VM_READ |
PROCESS_QUERY_INFORMATION,
False, process_id)
self._close_handle = True
self._handle = handle
self._query_info()
if process_id is not None and self._process_id != process_id:
raise NtError(STATUS_UNSUCCESSFUL)
def __del__(self, CloseHandle=kernel32.CloseHandle):
if self._close_handle and not self._closed:
try:
CloseHandle(self._handle)
except WindowsError as e:
if e.winerror != ERROR_INVALID_HANDLE:
raise
self._closed = True
def _query_info(self):
info = PROCESS_BASIC_INFORMATION()
handle = self._handle
status = NtQueryInformationProcess(handle,
ProcessBasicInformation,
ctypes.byref(info),
ctypes.sizeof(info),
None)
if status < 0:
raise NtError(status)
self._process_id = info.UniqueProcessId
self._peb = peb = info.PebBaseAddress[0, handle]
self._params = peb.ProcessParameters[0, handle]
self._ldr = peb.Ldr[0, handle]
def _modules_iter(self):
headaddr = (PVOID.from_buffer(self._peb.Ldr).value +
PEB_LDR_DATA.InMemoryOrderModuleList.offset)
offset = LDR_DATA_TABLE_ENTRY.InMemoryOrderLinks.offset
pentry = self._ldr.InMemoryOrderModuleList.Flink
while pentry:
pentry_void = PVOID.from_buffer_copy(pentry)
if pentry_void.value == headaddr:
break
pentry_void.value -= offset
pmod = RPLDR_DATA_TABLE_ENTRY.from_buffer(pentry_void)
mod = pmod[0, self._handle]
yield mod
pentry = LIST_ENTRY.from_buffer(mod, offset).Flink
def update_module_names(self):
names = []
for m in self._modules_iter():
ustr = m.FullDllName
name = ustr.Buffer[0, self._handle, ustr.Length]
names.append(name)
self._module_names = names
@property
def module_names(self):
if self._module_names is None:
self.update_module_names()
return self._module_names
@property
def process_id(self):
return self._process_id
@property
def session_id(self):
return self._peb.SessionId
@property
def image_path(self):
ustr = self._params.ImagePathName
return ustr.Buffer[0, self._handle, ustr.Length]
@property
def command_line(self):
ustr = self._params.CommandLine
buf = ustr.Buffer[0, self._handle, ustr.Length]
return buf
Run Code Online (Sandbox Code Playgroud)