如何使用python ctypes访问另一个进程的PEB

Jak*_*ams 5 winapi ctypes python-2.7

到现在为止,我有这段代码(我知道它很丑,但这不是现在的重点)

我无法弄清楚如何发出以下系统调用并构建正确的结构来访问另一个进程的 PEB。

我想做以下事情:

  1. HANDLE pHandle = OpenProcess
  2. NTSTATUS status = NtQueryInformationProcess(pHandle, 0, peb, peb_len, 0)
  3. 走 PEB_LDR_DATA

代码:

from ctypes import *
from ctypes.wintypes import *
from _multiprocessing import win32
import argparse


class UNICODE_STRING(Structure):
    _fields_ = [
        ("Length", USHORT),
        ("MaximumLength", USHORT),
        ("Buffer", c_wchar_p)
    ]

class RTL_USER_PROCESS_PARAMETERS(Structure):
    _fields_ = [
        ("Reserved1", BYTE*16),
        ("Reserved2", BYTE*10),
        ("ImagePathName", UNICODE_STRING),
        ("CommandLine", UNICODE_STRING)
    ]

class PEB(Structure):
    _fields_ = [
        ("Reserved1", BYTE*2),
        ("BeingDebugged", BYTE),
        ("Reserved2", BYTE),
        ("Rserved3", LPVOID),
        ("Ldr", LPVOID),
        ("ProcessParameters", POINTER(RTL_USER_PROCESS_PARAMETERS)),
        ("Reserved4", BYTE*104),
        ("Reserved5", LPVOID*52),
        ("PostProcessInitRoutine", LPVOID),
        ("Reserved6", BYTE*128),
        ("Reserved7", LPVOID),
        ("SessionId", ULONG)
    ]

class PROCESS_BASIC_INFORMATION(Structure):
    _fields_ = [
        ("Reserved1", LPVOID),
        ("PebBaseAddress", POINTER(PEB)),
        ("Reserved2", LPVOID*2),
        ("UniqueProcessId", POINTER(ULONG)),
        ("Reserved3", LPVOID)
    ]


def main():
    # Command Line Arguments Parsing
    parser = argparse.ArgumentParser()
    parser.add_argument('pid', metavar='<process id>', type=int, help='shows basic info about the process')
    parser.add_argument('-dS', metavar='dump strings', help='dump all used strings to txt file')
    parser.add_argument('-dD', metavar='dump dll', help='dump all used strings to txt file')
    args = parser.parse_args()

    var_pid = args.pid

    # WinAPi Calls

    # Variables Definition
    pHandle = HANDLE()
    NTSTATUS = ULONG()
    pbi = PROCESS_BASIC_INFORMATION()
    pPEB = PEB()
    pRTL = RTL_USER_PROCESS_PARAMETERS()
    pCMD = UNICODE_STRING()
    ReturnValue = BOOL()
    bytesRead = ULONG()

    # OpenProcess
    pHandle = windll.kernel32.OpenProcess(win32.PROCESS_ALL_ACCESS, 0, var_pid)

    # NtQueryInformationProcess
    NTSTATUS = windll.ntdll.NtQueryInformationProcess(pHandle, 0, byref(pbi), sizeof(pbi), None)

    # ReadProcessMemory
    ReturnValue = windll.kernel32.ReadProcessMemory(pHandle, pbi.PebBaseAddress, byref(pPEB), sizeof(PEB), byref(bytesRead))
    ReturnValue = windll.kernel32.ReadProcessMemory(pHandle, pPEB.ProcessParameters, byref(pRTL), sizeof(RTL_USER_PROCESS_PARAMETERS), byref(bytesRead))

    #msvcrt = cdll.msvcrt
    #msvcrt.printf("%s", pRTL.CommandLine.Buffer)

    temp = pRTL.ImagePathName.Buffer # cant read the unicode from this buffer

    ReturnValue = windll.kernel32.CloseHandle(pHandle)
    exit(0)

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

**编辑:我设法获得了 PEB 并在其中遍历了结构,但我无法从它们的缓冲区中读取 UNICODE 字符串。例如我想读取命令行参数

Ery*_*Sun 4

值得重复的是,这NtQueryInformationProcess是一个本机系统调用,在 Windows 编程中不鼓励这样做。Microsoft 没有提供 ntdll.dll 的导入库,因此动态调用其导出函数的唯一方法是通过GetProcAddress. 这当然就是 ctypes 的工作原理,因此从 Python 调用本机 NTAPI 函数并不困难。问题在于缺乏官方支持和文档,并且 NT 数据结构、API 和可用信息类都可能发生变化。

另请注意,ProcessBasicInformation当从 64 位进程调用时,查询将检索 64 位 PEB 的地址。因此,当从 64 位进程查询 WOW64 32 位进程时,您只会看到本机 64 位模块 ntdll.dll、wow64.dll、wow64win.dll 和 wow64cpu.dll。这里的答案提供了一种技术,通过使用从 64 位 TEB 到 32 位 TEB 的魔术偏移量来查找 32 位 PEB 的地址,32 位 TEB 有一个指向 32 位 PEB 的指针。但当然,这个实现细节可以随时更改,从而破坏依赖于它的代码。

以下示例包含查询和使用ProcessBasicInformation具有相同体系结构(即本机 64 位或 WOW64 32 位)的给定进程所需的 ctypes 定义。它包含一个类,该类演示用法并提供进程 ID、会话 ID、图像路径、命令行和加载模块的路径的属性。

该示例使用 的RemotePointer子类ctypes._Pointer以及RPOINTER工厂函数。此类重写__getitem__有助于取消引用另一个进程的地址空间中的指针值。索引键是 形式的元组index, handle[, size]。可选size参数(以字节为单位)对于大小字符串很有用,例如 NTAPI UNICODE_STRING,例如ustr.Buffer[0, hProcess, usrt.Length]。不支持以空结尾的字符串,因为ReadProcessMemory需要一定大小的缓冲区。

遍历加载器数据的逻辑位于私有_modules_iter方法中,该方法使用内存中顺序链表遍历加载的模块。请注意,InMemoryOrderModuleList链接到结构InMemoryOrderLinks的字段LDR_DATA_TABLE_ENTRY,对于列表中的每个链接依此类推。模块迭代器必须通过该字段的偏移量来调整每个条目的基地址。在 C API 中,这将使用CONTAINING_RECORD宏。

如果没有提供进程 ID 或句柄,构造ProcessInformation函数默认查询当前进程。如果调用状态是错误或警告(即否定NTSTATUS),则调用以获取3.3 之前的或 的NtError实例。OSErrorWindowsError

我有一个更复杂的版本,但没有包含NtError该版本,该版本FormatMessage使用 ntdll.dll 作为源模块来调用以获取格式化的错误消息。我可以根据要求更新答案以包含此版本。

该示例在 Windows 7 和 10 中使用 32 位和 64 位版本的 Python 2.7 和 3.5 进行了测试。对于远程进程测试,子进程模块用于启动第二个 Python 实例。事件句柄被传递给子进程以进行同步。如果父进程没有等待子进程完成加载并设置事件,那么子进程的加载器数据在读取时可能没有完全初始化。

import ctypes
from ctypes import wintypes

ntdll = ctypes.WinDLL('ntdll')
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)

# WINAPI Definitions

PROCESS_VM_READ           = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400

ERROR_INVALID_HANDLE = 0x0006
ERROR_PARTIAL_COPY   = 0x012B

PULONG = ctypes.POINTER(wintypes.ULONG)
ULONG_PTR = wintypes.LPVOID
SIZE_T = ctypes.c_size_t

def _check_bool(result, func, args):
    if not result:
        raise ctypes.WinError(ctypes.get_last_error())
    return args

kernel32.ReadProcessMemory.errcheck = _check_bool
kernel32.ReadProcessMemory.argtypes = (
    wintypes.HANDLE,  # _In_  hProcess
    wintypes.LPCVOID, # _In_  lpBaseAddress
    wintypes.LPVOID,  # _Out_ lpBuffer
    SIZE_T,           # _In_  nSize
    ctypes.POINTER(SIZE_T))  # _Out_ lpNumberOfBytesRead

kernel32.CloseHandle.errcheck = _check_bool
kernel32.CloseHandle.argtypes = (wintypes.HANDLE,)

kernel32.GetCurrentProcess.restype = wintypes.HANDLE
kernel32.GetCurrentProcess.argtypes = ()

kernel32.OpenProcess.errcheck = _check_bool
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.OpenProcess.argtypes = (
   wintypes.DWORD, # _In_ dwDesiredAccess
   wintypes.BOOL,  # _In_ bInheritHandle
   wintypes.DWORD) # _In_ dwProcessId

class RemotePointer(ctypes._Pointer):
    def __getitem__(self, key):
        # TODO: slicing
        size = None
        if not isinstance(key, tuple):
            raise KeyError('must be (index, handle[, size])')
        if len(key) > 2:
            index, handle, size = key
        else:
            index, handle = key
        if isinstance(index, slice):
            raise TypeError('slicing is not supported')
        dtype = self._type_
        offset = ctypes.sizeof(dtype) * index
        address = PVOID.from_buffer(self).value + offset
        simple = issubclass(dtype, ctypes._SimpleCData)
        if simple and size is not None:
            if dtype._type_ == wintypes.WCHAR._type_:
                buf = (wintypes.WCHAR * (size // 2))()
            else:
                buf = (ctypes.c_char * size)()
        else:
            buf = dtype()
        nread = SIZE_T()
        kernel32.ReadProcessMemory(handle,
                                   address,
                                   ctypes.byref(buf),
                                   ctypes.sizeof(buf),
                                   ctypes.byref(nread))
        if simple:
            return buf.value
        return buf

    def __setitem__(self, key, value):
        # TODO: kernel32.WriteProcessMemory
        raise TypeError('remote pointers are read only')

    @property
    def contents(self):
        # a handle is required
        raise NotImplementedError

_remote_pointer_cache = {}
def RPOINTER(dtype):
    if dtype in _remote_pointer_cache:
        return _remote_pointer_cache[dtype]
    name = 'RP_%s' % dtype.__name__
    ptype = type(name, (RemotePointer,), {'_type_': dtype})
    _remote_pointer_cache[dtype] = ptype
    return ptype

# NTAPI Definitions

NTSTATUS = wintypes.LONG
PVOID = wintypes.LPVOID
RPWSTR = RPOINTER(wintypes.WCHAR)
PROCESSINFOCLASS = wintypes.ULONG

ProcessBasicInformation   = 0
ProcessDebugPort          = 7
ProcessWow64Information   = 26
ProcessImageFileName      = 27
ProcessBreakOnTermination = 29

STATUS_UNSUCCESSFUL         = NTSTATUS(0xC0000001)
STATUS_INFO_LENGTH_MISMATCH = NTSTATUS(0xC0000004).value
STATUS_INVALID_HANDLE       = NTSTATUS(0xC0000008).value
STATUS_OBJECT_TYPE_MISMATCH = NTSTATUS(0xC0000024).value

class UNICODE_STRING(ctypes.Structure):
    _fields_ = (('Length',        wintypes.USHORT),
                ('MaximumLength', wintypes.USHORT),
                ('Buffer',        RPWSTR))

class LIST_ENTRY(ctypes.Structure):
    pass

RPLIST_ENTRY = RPOINTER(LIST_ENTRY)

LIST_ENTRY._fields_ = (('Flink', RPLIST_ENTRY),
                       ('Blink', RPLIST_ENTRY))

class LDR_DATA_TABLE_ENTRY(ctypes.Structure):
    _fields_ = (('Reserved1',          PVOID * 2),
                ('InMemoryOrderLinks', LIST_ENTRY),
                ('Reserved2',          PVOID * 2),
                ('DllBase',            PVOID),
                ('EntryPoint',         PVOID),
                ('Reserved3',          PVOID),
                ('FullDllName',        UNICODE_STRING),
                ('Reserved4',          wintypes.BYTE * 8),
                ('Reserved5',          PVOID * 3),
                ('CheckSum',           PVOID),
                ('TimeDateStamp',      wintypes.ULONG))

RPLDR_DATA_TABLE_ENTRY = RPOINTER(LDR_DATA_TABLE_ENTRY)

class PEB_LDR_DATA(ctypes.Structure):
    _fields_ = (('Reserved1',               wintypes.BYTE * 8),
                ('Reserved2',               PVOID * 3),
                ('InMemoryOrderModuleList', LIST_ENTRY))

RPPEB_LDR_DATA = RPOINTER(PEB_LDR_DATA)

class RTL_USER_PROCESS_PARAMETERS(ctypes.Structure):
    _fields_ = (('Reserved1',     wintypes.BYTE * 16),
                ('Reserved2',     PVOID * 10),
                ('ImagePathName', UNICODE_STRING),
                ('CommandLine',   UNICODE_STRING))

RPRTL_USER_PROCESS_PARAMETERS = RPOINTER(RTL_USER_PROCESS_PARAMETERS)
PPS_POST_PROCESS_INIT_ROUTINE = PVOID

class PEB(ctypes.Structure):
    _fields_ = (('Reserved1',              wintypes.BYTE * 2),
                ('BeingDebugged',          wintypes.BYTE),
                ('Reserved2',              wintypes.BYTE * 1),
                ('Reserved3',              PVOID * 2),
                ('Ldr',                    RPPEB_LDR_DATA),
                ('ProcessParameters',      RPRTL_USER_PROCESS_PARAMETERS),
                ('Reserved4',              wintypes.BYTE * 104),
                ('Reserved5',              PVOID * 52),
                ('PostProcessInitRoutine', PPS_POST_PROCESS_INIT_ROUTINE),
                ('Reserved6',              wintypes.BYTE * 128),
                ('Reserved7',              PVOID * 1),
                ('SessionId',              wintypes.ULONG))

RPPEB = RPOINTER(PEB)

class PROCESS_BASIC_INFORMATION(ctypes.Structure):
    _fields_ = (('Reserved1',       PVOID),
                ('PebBaseAddress',  RPPEB),
                ('Reserved2',       PVOID * 2),
                ('UniqueProcessId', ULONG_PTR),
                ('Reserved3',       PVOID))

def NtError(status):
    import sys
    descr = 'NTSTATUS(%#08x) ' % (status % 2**32,)
    if status & 0xC0000000 == 0xC0000000:
        descr += '[Error]'
    elif status & 0x80000000 == 0x80000000:
        descr += '[Warning]'
    elif status & 0x40000000 == 0x40000000:
        descr += '[Information]'
    else:
        descr += '[Success]'
    if sys.version_info[:2] < (3, 3):
        return WindowsError(status, descr)
    return OSError(None, descr, None, status)

NtQueryInformationProcess = ntdll.NtQueryInformationProcess
NtQueryInformationProcess.restype = NTSTATUS
NtQueryInformationProcess.argtypes = (
    wintypes.HANDLE,  # _In_      ProcessHandle
    PROCESSINFOCLASS, # _In_      ProcessInformationClass
    PVOID,            # _Out_     ProcessInformation
    wintypes.ULONG,   # _In_      ProcessInformationLength
    PULONG)           # _Out_opt_ ReturnLength

class ProcessInformation(object):
    _close_handle = False
    _closed = False
    _module_names = None

    def __init__(self, process_id=None, handle=None):
        if process_id is None and handle is None:
            handle = kernel32.GetCurrentProcess()
        elif handle is None:
            handle = kernel32.OpenProcess(PROCESS_VM_READ |
                                          PROCESS_QUERY_INFORMATION,
                                          False, process_id)
            self._close_handle = True
        self._handle = handle
        self._query_info()
        if process_id is not None and self._process_id != process_id:
            raise NtError(STATUS_UNSUCCESSFUL)

    def __del__(self, CloseHandle=kernel32.CloseHandle):
        if self._close_handle and not self._closed:
            try:
                CloseHandle(self._handle)
            except WindowsError as e:
                if e.winerror != ERROR_INVALID_HANDLE:
                    raise
            self._closed = True

    def _query_info(self):
        info = PROCESS_BASIC_INFORMATION()
        handle = self._handle
        status = NtQueryInformationProcess(handle,
                                           ProcessBasicInformation,
                                           ctypes.byref(info),
                                           ctypes.sizeof(info),
                                           None)
        if status < 0:
            raise NtError(status)
        self._process_id = info.UniqueProcessId
        self._peb = peb = info.PebBaseAddress[0, handle]
        self._params = peb.ProcessParameters[0, handle]
        self._ldr = peb.Ldr[0, handle]

    def _modules_iter(self):
        headaddr = (PVOID.from_buffer(self._peb.Ldr).value +
                    PEB_LDR_DATA.InMemoryOrderModuleList.offset)
        offset = LDR_DATA_TABLE_ENTRY.InMemoryOrderLinks.offset
        pentry = self._ldr.InMemoryOrderModuleList.Flink
        while pentry:
            pentry_void = PVOID.from_buffer_copy(pentry)
            if pentry_void.value == headaddr:
                break
            pentry_void.value -= offset
            pmod = RPLDR_DATA_TABLE_ENTRY.from_buffer(pentry_void)
            mod = pmod[0, self._handle]
            yield mod
            pentry = LIST_ENTRY.from_buffer(mod, offset).Flink

    def update_module_names(self):
        names = []
        for m in self._modules_iter():
            ustr = m.FullDllName
            name = ustr.Buffer[0, self._handle, ustr.Length]
            names.append(name)
        self._module_names = names

    @property
    def module_names(self):
        if self._module_names is None:
            self.update_module_names()
        return self._module_names

    @property
    def process_id(self):
        return self._process_id

    @property
    def session_id(self):
        return self._peb.SessionId

    @property
    def image_path(self):
        ustr = self._params.ImagePathName
        return ustr.Buffer[0, self._handle, ustr.Length]

    @property
    def command_line(self):
        ustr = self._params.CommandLine
        buf = ustr.Buffer[0, self._handle, ustr.Length]
        return buf
Run Code Online (Sandbox Code Playgroud)

例子:

if __name__ == '__main__':
    import os
    import sys
    import subprocess
    import textwrap

    class SECURITY_ATTRIBUTES(ctypes.Structure):
        _fields_ = (('nLength',              wintypes.DWORD),
                    ('lpSecurityDescriptor', wintypes.LPVOID),
                    ('bInheritHandle',       wintypes.BOOL))
        def __init__(self, *args, **kwds):
            super(SECURITY_ATTRIBUTES, self).__init__(*args, **kwds)
            self.nLength = ctypes.sizeof(self)

    def test_remote(use_pid=True, show_modules=False):
        sa = SECURITY_ATTRIBUTES(bInheritHandle=True)
        hEvent = kernel32.CreateEventW(ctypes.byref(sa), 0, 0, None)
        try:
            script = textwrap.dedent(r"""
            import sys
            import ctypes
            kernel32 = ctypes.WinDLL('kernel32')
            kernel32.SetEvent(%d)
            sys.stdin.read()""").strip() % hEvent
            cmd = '"%s" -c "%s"' % (sys.executable, script)
            proc = subprocess.Popen(cmd, stdin=subprocess.PIPE,
                                    close_fds=False)
            try:
                kernel32.WaitForSingleObject(hEvent, 5000)
                if use_pid:
                    pi = ProcessInformation(proc.pid)
                else:
                    pi = ProcessInformation(handle=int(proc._handle))
                assert pi.process_id == proc.pid
                assert pi.image_path == sys.executable
                assert pi.command_line == cmd
                assert pi.module_names[0] == sys.executable
                if show_modules:
                    print('\n'.join(pi.module_names))
            finally:
                proc.terminate()
        finally:
            kernel32.CloseHandle(hEvent)

    print('Test 1: current process')
    pi = ProcessInformation()
    assert os.getpid() == pi.process_id
    assert pi.image_path == pi.module_names[0]
    print('Test 2: remote process (Handle)')
    test_remote(use_pid=False)
    print('Test 3: remote process (PID)')
    test_remote(show_modules=True)
Run Code Online (Sandbox Code Playgroud)

Windows 10 中的输出,使用 64 位 Python 3.5:

import ctypes
from ctypes import wintypes

ntdll = ctypes.WinDLL('ntdll')
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)

# WINAPI Definitions

PROCESS_VM_READ           = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400

ERROR_INVALID_HANDLE = 0x0006
ERROR_PARTIAL_COPY   = 0x012B

PULONG = ctypes.POINTER(wintypes.ULONG)
ULONG_PTR = wintypes.LPVOID
SIZE_T = ctypes.c_size_t

def _check_bool(result, func, args):
    if not result:
        raise ctypes.WinError(ctypes.get_last_error())
    return args

kernel32.ReadProcessMemory.errcheck = _check_bool
kernel32.ReadProcessMemory.argtypes = (
    wintypes.HANDLE,  # _In_  hProcess
    wintypes.LPCVOID, # _In_  lpBaseAddress
    wintypes.LPVOID,  # _Out_ lpBuffer
    SIZE_T,           # _In_  nSize
    ctypes.POINTER(SIZE_T))  # _Out_ lpNumberOfBytesRead

kernel32.CloseHandle.errcheck = _check_bool
kernel32.CloseHandle.argtypes = (wintypes.HANDLE,)

kernel32.GetCurrentProcess.restype = wintypes.HANDLE
kernel32.GetCurrentProcess.argtypes = ()

kernel32.OpenProcess.errcheck = _check_bool
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.OpenProcess.argtypes = (
   wintypes.DWORD, # _In_ dwDesiredAccess
   wintypes.BOOL,  # _In_ bInheritHandle
   wintypes.DWORD) # _In_ dwProcessId

class RemotePointer(ctypes._Pointer):
    def __getitem__(self, key):
        # TODO: slicing
        size = None
        if not isinstance(key, tuple):
            raise KeyError('must be (index, handle[, size])')
        if len(key) > 2:
            index, handle, size = key
        else:
            index, handle = key
        if isinstance(index, slice):
            raise TypeError('slicing is not supported')
        dtype = self._type_
        offset = ctypes.sizeof(dtype) * index
        address = PVOID.from_buffer(self).value + offset
        simple = issubclass(dtype, ctypes._SimpleCData)
        if simple and size is not None:
            if dtype._type_ == wintypes.WCHAR._type_:
                buf = (wintypes.WCHAR * (size // 2))()
            else:
                buf = (ctypes.c_char * size)()
        else:
            buf = dtype()
        nread = SIZE_T()
        kernel32.ReadProcessMemory(handle,
                                   address,
                                   ctypes.byref(buf),
                                   ctypes.sizeof(buf),
                                   ctypes.byref(nread))
        if simple:
            return buf.value
        return buf

    def __setitem__(self, key, value):
        # TODO: kernel32.WriteProcessMemory
        raise TypeError('remote pointers are read only')

    @property
    def contents(self):
        # a handle is required
        raise NotImplementedError

_remote_pointer_cache = {}
def RPOINTER(dtype):
    if dtype in _remote_pointer_cache:
        return _remote_pointer_cache[dtype]
    name = 'RP_%s' % dtype.__name__
    ptype = type(name, (RemotePointer,), {'_type_': dtype})
    _remote_pointer_cache[dtype] = ptype
    return ptype

# NTAPI Definitions

NTSTATUS = wintypes.LONG
PVOID = wintypes.LPVOID
RPWSTR = RPOINTER(wintypes.WCHAR)
PROCESSINFOCLASS = wintypes.ULONG

ProcessBasicInformation   = 0
ProcessDebugPort          = 7
ProcessWow64Information   = 26
ProcessImageFileName      = 27
ProcessBreakOnTermination = 29

STATUS_UNSUCCESSFUL         = NTSTATUS(0xC0000001)
STATUS_INFO_LENGTH_MISMATCH = NTSTATUS(0xC0000004).value
STATUS_INVALID_HANDLE       = NTSTATUS(0xC0000008).value
STATUS_OBJECT_TYPE_MISMATCH = NTSTATUS(0xC0000024).value

class UNICODE_STRING(ctypes.Structure):
    _fields_ = (('Length',        wintypes.USHORT),
                ('MaximumLength', wintypes.USHORT),
                ('Buffer',        RPWSTR))

class LIST_ENTRY(ctypes.Structure):
    pass

RPLIST_ENTRY = RPOINTER(LIST_ENTRY)

LIST_ENTRY._fields_ = (('Flink', RPLIST_ENTRY),
                       ('Blink', RPLIST_ENTRY))

class LDR_DATA_TABLE_ENTRY(ctypes.Structure):
    _fields_ = (('Reserved1',          PVOID * 2),
                ('InMemoryOrderLinks', LIST_ENTRY),
                ('Reserved2',          PVOID * 2),
                ('DllBase',            PVOID),
                ('EntryPoint',         PVOID),
                ('Reserved3',          PVOID),
                ('FullDllName',        UNICODE_STRING),
                ('Reserved4',          wintypes.BYTE * 8),
                ('Reserved5',          PVOID * 3),
                ('CheckSum',           PVOID),
                ('TimeDateStamp',      wintypes.ULONG))

RPLDR_DATA_TABLE_ENTRY = RPOINTER(LDR_DATA_TABLE_ENTRY)

class PEB_LDR_DATA(ctypes.Structure):
    _fields_ = (('Reserved1',               wintypes.BYTE * 8),
                ('Reserved2',               PVOID * 3),
                ('InMemoryOrderModuleList', LIST_ENTRY))

RPPEB_LDR_DATA = RPOINTER(PEB_LDR_DATA)

class RTL_USER_PROCESS_PARAMETERS(ctypes.Structure):
    _fields_ = (('Reserved1',     wintypes.BYTE * 16),
                ('Reserved2',     PVOID * 10),
                ('ImagePathName', UNICODE_STRING),
                ('CommandLine',   UNICODE_STRING))

RPRTL_USER_PROCESS_PARAMETERS = RPOINTER(RTL_USER_PROCESS_PARAMETERS)
PPS_POST_PROCESS_INIT_ROUTINE = PVOID

class PEB(ctypes.Structure):
    _fields_ = (('Reserved1',              wintypes.BYTE * 2),
                ('BeingDebugged',          wintypes.BYTE),
                ('Reserved2',              wintypes.BYTE * 1),
                ('Reserved3',              PVOID * 2),
                ('Ldr',                    RPPEB_LDR_DATA),
                ('ProcessParameters',      RPRTL_USER_PROCESS_PARAMETERS),
                ('Reserved4',              wintypes.BYTE * 104),
                ('Reserved5',              PVOID * 52),
                ('PostProcessInitRoutine', PPS_POST_PROCESS_INIT_ROUTINE),
                ('Reserved6',              wintypes.BYTE * 128),
                ('Reserved7',              PVOID * 1),
                ('SessionId',              wintypes.ULONG))

RPPEB = RPOINTER(PEB)

class PROCESS_BASIC_INFORMATION(ctypes.Structure):
    _fields_ = (('Reserved1',       PVOID),
                ('PebBaseAddress',  RPPEB),
                ('Reserved2',       PVOID * 2),
                ('UniqueProcessId', ULONG_PTR),
                ('Reserved3',       PVOID))

def NtError(status):
    import sys
    descr = 'NTSTATUS(%#08x) ' % (status % 2**32,)
    if status & 0xC0000000 == 0xC0000000:
        descr += '[Error]'
    elif status & 0x80000000 == 0x80000000:
        descr += '[Warning]'
    elif status & 0x40000000 == 0x40000000:
        descr += '[Information]'
    else:
        descr += '[Success]'
    if sys.version_info[:2] < (3, 3):
        return WindowsError(status, descr)
    return OSError(None, descr, None, status)

NtQueryInformationProcess = ntdll.NtQueryInformationProcess
NtQueryInformationProcess.restype = NTSTATUS
NtQueryInformationProcess.argtypes = (
    wintypes.HANDLE,  # _In_      ProcessHandle
    PROCESSINFOCLASS, # _In_      ProcessInformationClass
    PVOID,            # _Out_     ProcessInformation
    wintypes.ULONG,   # _In_      ProcessInformationLength
    PULONG)           # _Out_opt_ ReturnLength

class ProcessInformation(object):
    _close_handle = False
    _closed = False
    _module_names = None

    def __init__(self, process_id=None, handle=None):
        if process_id is None and handle is None:
            handle = kernel32.GetCurrentProcess()
        elif handle is None:
            handle = kernel32.OpenProcess(PROCESS_VM_READ |
                                          PROCESS_QUERY_INFORMATION,
                                          False, process_id)
            self._close_handle = True
        self._handle = handle
        self._query_info()
        if process_id is not None and self._process_id != process_id:
            raise NtError(STATUS_UNSUCCESSFUL)

    def __del__(self, CloseHandle=kernel32.CloseHandle):
        if self._close_handle and not self._closed:
            try:
                CloseHandle(self._handle)
            except WindowsError as e:
                if e.winerror != ERROR_INVALID_HANDLE:
                    raise
            self._closed = True

    def _query_info(self):
        info = PROCESS_BASIC_INFORMATION()
        handle = self._handle
        status = NtQueryInformationProcess(handle,
                                           ProcessBasicInformation,
                                           ctypes.byref(info),
                                           ctypes.sizeof(info),
                                           None)
        if status < 0:
            raise NtError(status)
        self._process_id = info.UniqueProcessId
        self._peb = peb = info.PebBaseAddress[0, handle]
        self._params = peb.ProcessParameters[0, handle]
        self._ldr = peb.Ldr[0, handle]

    def _modules_iter(self):
        headaddr = (PVOID.from_buffer(self._peb.Ldr).value +
                    PEB_LDR_DATA.InMemoryOrderModuleList.offset)
        offset = LDR_DATA_TABLE_ENTRY.InMemoryOrderLinks.offset
        pentry = self._ldr.InMemoryOrderModuleList.Flink
        while pentry:
            pentry_void = PVOID.from_buffer_copy(pentry)
            if pentry_void.value == headaddr:
                break
            pentry_void.value -= offset
            pmod = RPLDR_DATA_TABLE_ENTRY.from_buffer(pentry_void)
            mod = pmod[0, self._handle]
            yield mod
            pentry = LIST_ENTRY.from_buffer(mod, offset).Flink

    def update_module_names(self):
        names = []
        for m in self._modules_iter():
            ustr = m.FullDllName
            name = ustr.Buffer[0, self._handle, ustr.Length]
            names.append(name)
        self._module_names = names

    @property
    def module_names(self):
        if self._module_names is None:
            self.update_module_names()
        return self._module_names

    @property
    def process_id(self):
        return self._process_id

    @property
    def session_id(self):
        return self._peb.SessionId

    @property
    def image_path(self):
        ustr = self._params.ImagePathName
        return ustr.Buffer[0, self._handle, ustr.Length]

    @property
    def command_line(self):
        ustr = self._params.CommandLine
        buf = ustr.Buffer[0, self._handle, ustr.Length]
        return buf
Run Code Online (Sandbox Code Playgroud)