Patch Linux processes on the fly with GDB

    Function hooking techniques on Linux are well known and described on the Internet. The simplest method is to write a dynamic library with “clone functions” and use the LD_PRELOAD mechanism to override the import table at the process loading stage.

    The disadvantage of LD_PRELOAD is that it is necessary to control the start of the process. To intercept functions in an already running process or functions that are not in the import table, you can use “splicing” - recording the command to go to the interceptor at the beginning of the intercepted function.

    Python is also known to have a module.ctypesallowing you to interact with data and functions of the C language (i.e., a large number of dynamic libraries with a C interface). Thus, nothing prevents you from intercepting a function of a process and directing it to Python with a method wrapped in C-callback ctypes.

    It is convenient to use the GDB debugger, which supports writing extension modules in Python ( https://sourceware.org/gdb/current/onlinedocs/gdb/Python-API.html ), to intercept control and load code into the target process .
    Nuances
    The code for the example is given completely at the end of the article and consists of two files:

    • pyinject.py - GDB extension
    • hook.py - module with hook functions

    On the GDB side, the code is conveniently formatted as a user command. You can create a new team by inheriting from the class gdb.Command. When using a command in GDB, a method will be called invoke(argument, from_tty).

    You can also create custom parameters inheriting from gdb.Parameter. In the sample article, it is used to specify a file name with interception functions.

    It’s PIDconvenient to connect to a working process and load a module right away when starting GDB
    gdb -ex 'attach PID' -ex 'source pyinject.py' -ex 'set hookfile hook.py'
    The field of this debugged process is stopped and the interactive GDB command line is launched, in which the new pyinject command will be available.

    Interception can be divided into three stages:
    1. Injecting a Python interpreter into the address space of the target process
    2. Capturing Information about a Captured Function
    3. Actually interception
    Clauses 1 and 2 are easier to do on the side of the debugger, clause 3 is already inside the target process.

    Python interpreter injection


    Most of the Python GDB interface is designed to extend debugging capabilities. For everything else, there is gdb.execute(command, from_tty, to_string)one that allows you to execute an arbitrary GDB command and get its output as a string.
    For instance:
    out = gdb.execute("info registers", False, True)
    
    Also useful gdb.parse_end_eval(expression)is evaluating an expression and returning a result in the form gdb.Value.

    The first step is to load the Python library into the address space of the target process. To do this, call dlopenin the context of the target process.
    You can use the command callin gdb.execute, or gdb.parse_and_eval:
    # pyinject.py
    gdb.execute('call dlopen("libpython2.7.so", %d)' % RTLD_LAZY)
    assert long(gdb.history(0))
    handle = gdb.parse_and_eval('dlopen("libpython2.7.so", %d)' % RTLD_LAZY)
    assert long(handle)
    

    After that, you can initialize the interpreter
    # pyinject.py
    gdb.execute('call PyEval_InitThreads()')
    gdb.execute('call Py_Initialize()')
    
    The first call creates a GIL (global interpreter lock), the second prepares the Python C-API for use.

    And load the module with interception functions
    # pyinject.py
    fp = gdb.parse_and_eval('fopen("hook.py", "r")')
    assert long(fp) != 0
    pyret = gdb.parse_and_eval('PyRun_AnyFileEx(%u, "hook.py", 1)' % fp)
    
    PyRun_AnyFileExexecutes code from a file in the context of the module __main__.
    Nuances
    The above will only work if the target process does not use Python (as the main or scripting language). If this is not so, then everything is seriously complicated. The main problem is that in the process stopped for debugging in a random place, you cannot use any Python C-API functions (except maybe Py_AddPendingCall).

    Hook.py module


    The hook.py module contains hook functions and a class Hookthat performs the hook itself.
    Interceptor functions are indicated using a decorator. For example, for a function of a openstandard library, we print its arguments and return the result of calling the original function stored in the fieldorig
    # hook.py
    @hook(symbol='open', ctype=CFUNCTYPE(c_int, c_char_p, c_int))
    def python_open(fname, oflag):
        print "open: ", fname, oflag
        return python_open.orig(fname, oflag)
    

    The decorator @hooktakes two parameters:
    • symbol - the name of the symbol to be intercepted (it is assumed that the symbol is available in GDB from import tables or debugging information, but nothing prevents intercepting functions by addresses instead of symbols)
    • ctype - a class ctypesspecifying the type of function
    The decorator registers the function in the Hook class and returns without changing.
    # hook.py
    def hook(symbol, ctype):
        def deco(func):
            Hook.register(symbol, ctype, func)
            return func
        return deco
    

    The method registercreates an instance of the class and stores it in the dictionary all_hooks. Thus, after the file is executed, thanks to the decorators, there Hook.all_hookswill be all the information about the available functions of interceptors.
    # hook.py
    class Hook(object):
        all_hooks = {}
        @staticmethod
        def register(symbol, *args):
            Hook.all_hooks[symbol] = Hook(symbol, *args)
    

    In order to intercept from GDB by calling one function, it is convenient to define a static method in the class Hookresponsible for interception
    # hook.py
    class Hook(object):
        @staticmethod
        def hook(symbol, *args):
            h = Hook.all_hooks[symbol]
            if h.active:
                return
            h.install(*args)
    
    In *argshere it is transmitted additional information about the intercepted function. Which one depends on the method of interception.

    Splicing Interception Methods


    Splicing is globally divided into two subspecies by the method of calling the original function.

    In a simple hook, calling the original function consists of several steps:
    1. the beginning of the original function is restored from the saved copy
    2. making a call
    3. the beginning is again overwritten by the transition instruction to the interceptor
    Nuances
    The disadvantage is obvious, in a multi-threaded program it cannot be guaranteed that another thread will not call the function while overwriting its beginning. This is partially treated by stopping other threads while the original function is being called. But firstly, there is no standard way to achieve this, and secondly, you can catch deadlock if you fail to call a function like malloc

    In the trampoline hook, the beginning of the original function is copied to a new location and after it the transition to the body of the original function is recorded. In this option, the original function is always available at the new address.

    Trampoline hook works in multi-threaded programs, but it is much more difficult to install. An integer number of instructions must be rewritten, for which a disassembler is usually used. The advent of the x86_64 architecture added even more problems due to the ubiquity of memory addressing relative to the register %rip(address of the current command).
    Nuances
    Let's look at the beginning of a function openin GDB:
    0x7f6cc8aa83e0 :          83 3d ed 33 2d 00 00  cmpl    $0x0,0x2d33ed(%rip)
    0x7f6cc8aa83e7 :          75 10                 jne     0x7f6cc8aa83f9 
    0x7f6cc8aa83e9 <__open_nocancel+0>: b8 02 00 00 00        mov     $0x2,%eax
    0x7f6cc8aa83ee <__open_nocancel+5>: 0f 05                 syscall
    

    If we rewrite the first command " cmpl $0x0,0x2d33ed(%rip)" to a different address, then the relative address 0x2d33ed(%rip), which now points to 0x7f6cc8d7b7d4, will point to a different place (hello SIGSEGV).

    To make the trampoline hook of this function you need:
    1. determine the size of the commands at the beginning of the function
    2. allocate memory no further than 2GB from the destination address of the cmpl command ( 0x2d33ed(%rip)32-bit signed offset )
    3. copy the beginning to a new location and patch memory access relatively %ripincmpl
    To top it off, the jump instruction should be shorter than 9 bytes, as This is a function with two entry points and is 0x7f6cc8aa83e9already located at the address __open_nocancel. This means that our springboard should be no further than 2 GB from the start openfor the possibility of a 32-bit transition (all 64-bit transitions are longer than 9 bytes).

    In principle, having all the power of GDB behind ( gdb.execute()), nothing prevents you from correctly implementing trampoline hook, but for the sake of simplicity of the example, a simple hook will be used in this article.

    In simple hook, the only limitation is the length of the jump instruction.
    There are two options (main):
    • The E9 opcode (5 bytes) is a relative 32-bit transition to additionally allocated memory (as in a trampoline hook) and from there a full-fledged 64-bit transition to an interceptor.
      0x7f6cc8aa83e0 :          e9 1b 6c 55 37        jmp     0x7f6cfffff000
      
      Go to 0x7f6cc8aa83e0 + 0x37556c1b + 5 = 0x7f6cfffff000
    • The FF 25 opcode (6 bytes) is an absolute 64-bit jump to an address in memory relative to% rip. For the address, you still need to allocate additional memory no further than 2 GB from the start of the function.
      0x00007f6cc8aa83e0 :      ff 25 1a 6c 55 37     jmpq    *0x37556c1a(%rip)
      
      Here, the 0x7f6cc8aa83e0 + 0x37556c1a + 6 = 0x7f6cfffff000address of the absolute transition is saved.

    The second method is used in the article.
    # hook.py
    class Hook(object):
        @staticmethod
        def get_indlongjmp(srcaddr, proxyaddr):
            s = struct.pack('=BBl', 0xff, 0x25, proxyaddr - srcaddr - 6)
            return map(ord, s)
    
    get_indlongjmpreturns the code for jumping from address srcaddrto address stored in QWORD at address proxyaddr

    Now you can finally write the missing class methods Hook. The method installreceives the address of the original function addressand the address of the auxiliary zone proxyaddr. Then it rewrites the beginning of the function (previously saving it in self.code) by switching to the interceptor
    # hook.py
        def install(self, address, proxyaddr):
            self.address = address
            self.proxyaddr = proxyaddr
            proxymemory = (c_void_p * 1).from_address(self.proxyaddr)
            proxymemory[0] = Hook.cast_to_void_p(self.cfunc)
            self.jmp = self.get_indlongjmp(self.address, self.proxyaddr)
            self.memory = (c_ubyte * len(self.jmp)).from_address(self.address)
            self.code = list(self.memory)
            self.patchmem(self.jmp)
            self.pyfunc.orig = self.origfunc()
            self.active = True
    

    patchmem overwrites the beginning of the original function with data from src
    # hook.py
        def patchmem(self, src):
            for i in range(len(src)):
                self.memory[i] = src[i]
    

    origfunc wraps a function call in code that removes and sets the transition to the interceptor.
    # hook.py
        def origfunc(self):
            ofunc = self.ctype(self.address)
            def wrap(*args):
                self.patchmem(self.code)
                val = ofunc(*args)
                self.patchmem(self.jmp)
                return val
            return wrap
    

    Finishing touches


    Python is loaded into the address space, the hook.py file is loaded into Python. It remains to call Hook.hook(symbol, address, proxyaddr)the Python module GDB.

    Find the address of the function " open"
    line = gdb.execute('info address %s' % "open" False, True)
    m = re.match(r'.*?(0x[0-9a-f]+)', line)
    addr = int(m.group(1), 16)
    
    Nuances
    In the general case, before you run to rewrite the code of a stopped process, you need to make sure that it is not stopped in the middle of this code (or is going to return to it). The easiest way to do this is by parsing the output.gdb.execute("thread apply all backtrace")

    Allocate memory near addr
    prot = PROT_READ | PROT_WRITE | PROT_EXEC
    flags = MAP_PRIVATE | MAP_ANONYMOUS
    maddr = gdb.parse_and_eval('(void*)mmap(0x%x, %d, %d, %d, -1, 0)\n'
                               % (addr | 0x7FFFFFFF, 4096, prot, flags))
    maddr = (long(maddr) & 0x00000000FFFFFFFF) | (addr & 0xFFFFFFFF00000000)
    
    Nuances
    The last line is a workaround for a bug in GDB that eats up the high bits of the result. The argument (addr | 0x7FFFFFFF)uses an undocumented property mmapto return a memory with an address less than the desired desired.

    Without tricks, it’s a little longer in the right way: you need to parse the output gdb.execute('info proc mappings', False, True), find the hole closest to addr in the address space and tear out mmap c MAP_FIXED. Well, of course, it is not necessary to allocate a whole page of memory for each intercepted function.

    Allow rewriting of the original function (aka SIGSEGV)
    gdb.parse_and_eval('mprotect(0x%x, %u, %d)' % (addr & -0x1000, 4096*2, prot))
    

    Call Hook.hookthroughPyRun_SimpleString
    pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.hook(\\"open\\", 0x%x, 0x%x)")'
                                % (addr, maddr))
    

    Done! Now the call " open" in the target process will be intercepted and routed to python_openfrom hook.py.

    Sample Files


    Full example files (with a little more checks, but without taking into account many nuances)
    pyinject.py
    # pyinject.py
    import re
    import os
    RTLD_LAZY = 1
    PROT_READ = 0x1
    PROT_WRITE = 0x2
    PROT_EXEC = 0x4
    MAP_PRIVATE = 0x2
    MAP_FIXED = 0x10
    MAP_ANONYMOUS = 0x20
    LIBPYTHON = 'libpython2.7.so'
    class ParamHookfile(gdb.Parameter):
        instance = None
        def __init__(self, default=''):
            super(ParamHookfile, self).__init__("hookfile",
                                                gdb.COMMAND_NONE, gdb.PARAM_FILENAME)
            self.value = default
            ParamHookfile.instance = self
        def get_set_string(self):
            return self.value
        def get_show_string(self, svalue):
            return svalue
    class CmdHook(gdb.Command):
        instance = None
        def __init__(self):
            super(CmdHook, self).__init__("pyinject", gdb.COMMAND_NONE)
            self.initialized = False
            CmdHook.instance = self
        def complete(self, text, word):
            matching = [s[4:] for s in dir(self)
                         if s.startswith('cmd_')
                         and s[4:].startswith(text)]
            return matching
        def invoke(self, subcmd, from_tty):
            self.dont_repeat()
            if subcmd.startswith("hook"):
                self.cmd_hook(*gdb.string_to_argv(subcmd))
            elif subcmd.startswith("unhook"):
                self.cmd_unhook(*gdb.string_to_argv(subcmd))
            else:
                gdb.write('unknown sub-command "%s"' % subcmd)
        def cmd_hook(self, *args):
            self.initialize()
            if not self.initialized:
                return
            pyret = gdb.parse_and_eval('PyRun_SimpleString("print Hook")')
            if long(pyret) != 0:
                hookfile = ParamHookfile.instance.value
                if not os.path.exists(hookfile):
                    gdb.write('Use "set hookfile "\n')
                    return
                fp = gdb.parse_and_eval('fopen("%s", "r")' % hookfile)
                assert long(fp) != 0
                pyret = gdb.parse_and_eval('PyRun_AnyFileEx(%u, "%s", 1)' % (fp, hookfile))
                if long(pyret) != 0:
                    gdb.write('Error loading "%s"\n' % hookfile)
                    return
            for symbol in args:
                try:
                    line = gdb.execute('info address %s' % symbol, False, True)
                    m = re.match(r'.*?(0x[0-9a-f]+)', line)
                    if m:
                        addr = int(m.group(1), 16)
                except gdb.error:
                    continue
                prot = PROT_READ | PROT_WRITE | PROT_EXEC
                flags = MAP_PRIVATE | MAP_ANONYMOUS # | MAP_FIXED
                maddr = gdb.parse_and_eval('(void*)mmap(0x%x, %d, %d, %d, -1, 0)\n'
                                           % (addr | 0x7FFFFFFF , 4096, prot, flags))
                maddr = (long(maddr) & 0x00000000FFFFFFFF) | (addr & 0xFFFFFFFF00000000)
                gdb.write("mmap = 0x%x\n" % maddr)
                if maddr == 0:
                    continue
                gdb.parse_and_eval('mprotect(0x%x, %u, %d)' % (addr & -0x1000, 4096*2, prot))
                pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.hook(\\"%s\\", 0x%x, 0x%x)")'
                                           % (symbol, addr, maddr))
                if long(pyret) == 0:
                    gdb.write('hook "%s" OK\n' % symbol)
        def cmd_unhook(self, *args):
            for symbol in args:
                pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.unhook(\\"%s\\")")'
                                           % (symbol))
                if long(pyret) == 0:
                    gdb.write('unhook "%s" OK\n' % symbol)
        def initialize(self):
            if self.initialized:
                return
            handle = gdb.parse_and_eval('dlopen("%s", %d)' % (LIBPYTHON, RTLD_LAZY))
            if not long(handle):
                gdb.write('Cannot load library %s\n' % LIBPYTHON)
                return
            if not long(gdb.parse_and_eval('Py_IsInitialized()')):
                gdb.execute('call PyEval_InitThreads()')
                gdb.execute('call Py_Initialize()')
            self.initialized = True
    if __name__ == '__main__':
        ParamHookfile()
        CmdHook()
    

    hook.py
    # hook.py
    import struct
    from ctypes import (CFUNCTYPE, POINTER, c_ubyte, c_int, c_char_p, c_void_p)
    class Hook(object):
        all_hooks = {}
        @staticmethod
        def cast_to_void_p(pointer):
            return CFUNCTYPE(c_void_p, c_void_p)(lambda x: x)(pointer)
        @staticmethod
        def register(symbol, *args):
            Hook.all_hooks[symbol] = Hook(symbol, *args)
        def __init__(self, symbol, ctype, pyfunc):
            self.symbol = symbol
            self.ctype = ctype
            self.pyfunc = pyfunc
            self.cfunc = self.ctype(self.pyfunc)
            self.address = 0
            self.proxyaddr = 0
            self.jmp = None
            self.memory = None
            self.code = None
            self.active = False
        def install(self, address, proxyaddr):
            print "install:", hex(address)
            self.address = address
            self.proxyaddr = proxyaddr
            proxymemory = (c_void_p * 1).from_address(self.proxyaddr)
            proxymemory[0] = Hook.cast_to_void_p(self.cfunc)
            self.jmp = self.get_indlongjmp(self.address, self.proxyaddr)
            self.memory = (c_ubyte * len(self.jmp)).from_address(self.address)
            self.code = list(self.memory)
            self.patchmem(self.jmp)
            self.pyfunc.orig = self.origfunc()
            self.active = True
        def uninstall(self):
            self.patchmem(self.code)
            self.active = False
        def origfunc(self):
            ofunc = self.ctype(self.address)
            def wrap(*args):
                self.patchmem(self.code)
                val = ofunc(*args)
                self.patchmem(self.jmp)
                return val
            return wrap
        def patchmem(self, src):
            for i in range(len(src)):
                self.memory[i] = src[i]
        @staticmethod
        def get_indlongjmp(srcaddr, proxyaddr):
            # 64-bit indirect absolute jump (6 + 8 bytes)
            # ff 25 off32     jmpq  *off32(%rip)
            try:
                s = struct.pack('=BBl', 0xff, 0x25, proxyaddr - srcaddr - 6)
                return map(ord, s)
            except:
                print hex(proxyaddr), hex(srcaddr), hex(proxyaddr - srcaddr - 6)
                raise
        @staticmethod
        def hook(symbol, address, proxyaddr):
            h = Hook.all_hooks[symbol]
            if h.active:
                return
            h.install(address, proxyaddr)
        @staticmethod
        def unhook(symbol):
            h = Hook.all_hooks[symbol]
            if not h.active:
                return
            h.uninstall()
    def hook(symbol, ctype):
        def deco(func):
            Hook.register(symbol, ctype, func)
            return func
        return deco
    #int open (const char *__file, int __oflag, ...)
    @hook(symbol='open', ctype=CFUNCTYPE(c_int, c_char_p, c_int))
    def python_open(fname, oflag):
        print "open: ", fname, oflag
        return python_open.orig(fname, oflag)
    

    Running an example (better with absolute paths)
    gdb -ex 'attach PID' -ex 'source /path/pyinject.py' -ex 'set hookfile /path/hook.py'
    (gdb) pyinject hook open
    (gdb) continue

    Also popular now: