This is an Immunity template plugin for function hooking while reverse engineering.
4f540c9274c23c9ad1693cb060f324330b4f61ac079c288de7734e407dfd7a77
################################################################################
# ____ _ __ #
# ___ __ __/ / /__ ___ ______ ______(_) /___ __ #
# / _ \/ // / / (_-</ -_) __/ // / __/ / __/ // / #
# /_//_/\_,_/_/_/___/\__/\__/\_,_/_/ /_/\__/\_, / #
# /___/ team #
# #
# immhooktmpl - Immunity template plugin for function hooking #
# #
# FILE #
# immhooktmpl.py #
# #
# DATE #
# 30-03-2014 #
# #
# DESCRIPTION #
# Due to the lack of documentation for Immnunity API this is an easy #
# template for function hooking while RE. #
# Howto: #
# 1. set API names or addresses to hook #
# 2. define routine to run when hook reached - Hook class #
# #
# AUTHOR #
# nrz@nullsecurity.net #
# #
################################################################################
"""
Example:
!immhooktmpl -h
0BADF00D --==[ immhooktmpl by nrz@nullsecurity.net ]==--
0BADF00D [+] hook WS2_32.recvfrom 0x71A32D0F
0BADF00D -> Hook.add WS2_32.recvfrom returned 26465696
0BADF00D -> Hook added
0BADF00D [+] hook WS2_32.sendto 0x71A32C69
0BADF00D -> Hook.add WS2_32.sendto returned 26465992
0BADF00D -> Hook added
0BADF00D [+] hook WS2_32.socket 0x71A33B91
0BADF00D -> Hook.add WS2_32.socket returned 26466320
0BADF00D -> Hook added
[+] Done.
0BADF00D [hook][default] WS2_32.socket
0BADF00D ESP: 0x017ef77c RETURN TO 0x01f35e0e
0BADF00D ESP + 4: 0x017ef780 -> 0x00000017
0BADF00D ESP + 8: 0x017ef784 -> 0x00000002
0BADF00D ESP + 12: 0x017ef788 -> 0x00000000
0BADF00D [hook] WS2_32.sendto
0BADF00D EIP on Stack: 0x0172f434 RETURN TO 0x015fbf65
0BADF00D 1st arg - (SOCKET) socket: 0x0172f438 -> 0x00000718
0BADF00D 2nd arg - (char *) buff: 0x0172f43c -> 0x024e0318
0BADF00D -> ascii buffer: [........]
0BADF00D -> hex buffer:
0BADF00D -> Byte 0 : 0xa5
0BADF00D -> Byte 1 : 0xb3
0BADF00D -> Byte 2 : 0xcc
0BADF00D -> Byte 3 : 0x34
0BADF00D -> Byte 4 : 0x01
0BADF00D -> Byte 5 : 0xbe
0BADF00D -> Byte 6 : 0x4b
0BADF00D -> Byte 7 : 0x25
0BADF00D -> Byte 8 : 0x25
0BADF00D -> Byte 9 : 0x1f
0BADF00D 3rd arg - (int) len: 0x0172f440 -> 0x00000082
0BADF00D 4th arg - (int) flags: 0x0172f444 -> 0x00000000
0BADF00D 5th arg - (struct sockaddr *) to: 0x0172f448 -> 0x024e008c
0BADF00D 6th arg - (int) tolen: 0x0172f44c -> 0x00000015
!immhooktmpl -d
0BADF00D --==[ immhooktmpl by nrz@nullsecurity.net ]==--
0BADF00D [!] WARNING: Removed hook WS2_32.recvfrom
0BADF00D [!] WARNING: Removed hook WS2_32.sendto
0BADF00D [!] WARNING: Removed hook WS2_32.socket
[+] Functions in code cleaned up
v0.2
!immhooktmpl -A _crypt+0x01001646
0BADF00D [+] hook _crypt 0x01001646
0BADF00D -> Hook.add _crypt returned 26466320
0BADF00D -> Hook added
"""
# imports
import sys
import getopt
import immutils
from immlib import *
# !! FUNCTIONS TO HOOK - !immhooktmpl -N
RECVFROM = "WS2_32.recvfrom"
SENDTO = "WS2_32.sendto"
SOCKET = "WS2_32.socket"
FUNCTIONS = [RECVFROM, # API fnc
SENDTO, # API fnc
SOCKET] # API fnc
# !! ADDRESSES TO HOOK - !immhooktmpl -A
ADDRESSES = [
[ "fnc1", "0x0BAD1337" ], # non API routine
[ "fnc2", "0x0BAD1337" ] # address hook
]
# global
imm = Debugger()
# version of immhooktmpl
VERSION = "immhooktmpl v0.2-dev"
# description for immunity
DESC = "Immunity template plugin for hooking"
# exit codes
SUCCESS = "[+] Done."
FAILURE = "[-] Failed. !immhooktmpl -H"
class Wrapper:
""" class for wrapper methods """
def __init__(self, options):
""" init """
self.options = options
def verbose(self, msg):
""" print verbose lines """
if "verbose" in self.options:
imm.log(msg)
return
class Error:
""" class for error handling """
def __init__(self):
""" init """
pass
@staticmethod
def warn(msg):
""" print warning message """
imm.log("[!] WARNING: " + msg \
, focus=1, highlight=1)
return SUCCESS
@staticmethod
def error(msg):
""" print error message and exit """
imm.log("[-] ERROR: " + msg \
, focus=1, highlight=1)
return FAILURE
class Help:
""" class for printing help and usage messages """
def __init__(self):
""" init """
pass
@staticmethod
def banner():
""" print banner """
imm.log("--==[ immhooktmpl by nrz@nullsecurity.net ]==--" \
, focus=1, highlight=1)
return SUCCESS
@staticmethod
def usage():
""" print usage and help """
imm.log("")
imm.log("Usage: !immimmhooktmpl [options] | <misc>")
imm.log("")
imm.log("Dynamic Options:")
imm.log(" -n <name_function>: add API name function to hook [default callback]")
imm.log(" -a <name+0xADDR>: add address to hook [i.e. _crypt+0x0031337][default callback]")
imm.log(" -r <name_function>: remove default hook")
imm.log("Static Options [edit source code]:")
imm.log(" -N: run all name hooks")
imm.log(" -A: run all address hooks")
imm.log(" -D: delete all hooks")
imm.log("")
imm.log("Misc:")
imm.log(" -v: verbose mode (default: quiet)")
imm.log(" -V: print version of immhooktmpl and exit")
imm.log(" -H: print this help and exit")
return SUCCESS
class OptsParser:
""" class for command line parsing """
def __init__(self, options):
""" init """
self.options = options
def parse_opts(self, args):
""" PARSE command line options """
try:
self.opts, self.args = getopt.getopt(args, \
"n:a:r:NADvVH")
except getopt.GetoptError as err:
return (str(err))
for self.o, self.a in self.opts:
if self.o == "-n":
self.options["add_name"] = self.a
elif self.o == "-a":
self.options["add_address"] = self.a
elif self.o == "-r":
self.options["remove"] = self.a
elif self.o == "-N":
self.options["run_name_hook"] = True
elif self.o == "-A":
self.options["run_addr_hook"] = True
elif self.o == "-D":
self.options["cleanup"] = True
elif self.o == "-v":
self.options["verbose"] = True
elif self.o == "-V":
imm.log(VERSION)
return SUCCESS
elif self.o == "-H":
Help.usage()
return SUCCESS
if self.options:
return self.options
else:
return FAILURE
class Hook(LogBpHook):
def __init__(self):
LogBpHook.__init__(self)
return
def set_hook_by_name(self, name):
"""
hook functions by API function name
"""
addr = imm.getAddress(name)
imm.log("[+] hook %s 0x%X" % (name, addr))
imm.setBreakpoint(addr)
if addr == -1:
return Error.error("Could not locate function %s" % name)
ret = self.add(name, addr)
imm.log(" -> Hook.add %s returned %s" % (name, ret))
if ret == -1:
return Error.error("Failed to add hook %s" % name)
else:
imm.log(" -> Hook added")
imm.addKnowledge("%08x" % addr, name)
def set_hook_by_address(self, name, addr):
"""
hook functions by Address function name
"""
addr = int(addr, 16)
imm.log("[+] hook %s 0x%X" % (name, addr))
imm.setBreakpoint(addr)
if addr == -1:
return Error.error("Could not locate function %s" % name)
ret = self.add(name, addr)
imm.log(" -> Hook.add %s returned %s" % (name, ret))
if ret == -1:
return Error.error("Failed to add hook %s" % name)
else:
imm.log(" -> Hook added")
imm.addKnowledge("%08x" % addr, name)
# not implemented yet
pass
def _print_ascii_buffer(self, imm, buff, length):
"""
print ascii buffer
"""
imm.log(" -> ascii buffer: %s" % str(imm.readMemory(buff, length)))
def _print_hex_buffer(self, imm, buff, length):
"""
print hex values from buffer
"""
imm.log(" -> hex buffer: ")
for i in range(0, length):
mem = imm.readMemory(buff+i, 1)
imm.log(" -> Byte %d : %s" % (i, hex(ord(mem))))
"""
!! DEFINE ROUTINE FOR EACH HOOK HERE
"""
def recvfrom_hook(self, imm, function_name, regs):
"""
Example recvfrom routine
int recvfrom(_In_ SOCKET s,_Out_ char *buf,_In_ int len,
_In_ int flags,_Out_ struct sockaddr *from,
_Inout_opt_ int *fromlen);
"""
imm.log("[hook] %s" % function_name)
esp = regs['ESP']
imm.log("EIP on Stack: 0x%08x RETURN TO 0x%08x" % \
( esp, imm.readLong(esp) ) )
imm.log("1st arg - (SOCKET) socket: 0x%08x -> 0x%08x" % \
( (esp+4), imm.readLong(esp+4) ) )
imm.log("2nd arg - (char *) buff: 0x%08x -> 0x%08x" % \
( (esp+8), imm.readLong(esp+8) ) )
self._print_ascii_buffer(imm, imm.readLong(esp+8), 10)
self._print_hex_buffer(imm, imm.readLong(esp+8), 10)
imm.log("3rd arg - (int) len: 0x%08x -> 0x%08x" % \
( (esp+12), imm.readLong(esp+12) ) )
imm.log("4th arg - (int) flags: 0x%08x -> 0x%08x" % \
( (esp+16), imm.readLong(esp+16) ) )
imm.log("5th arg - (struct sockaddr *)from: 0x%08x -> 0x%08x" % \
( (esp+20), imm.readLong(esp+20) ) )
imm.log("6th arg - (int *) fromlen: 0x%08x -> 0x%08x" % \
( (esp+24), imm.readLong(esp+24) ) )
def socket_hook(self, imm, function_name, regs):
"""
Example socket routine
SOCKET WSAAPI socket(_In_ int af, _In_ int type,
_In_ int protocol);
"""
imm.log("[hook] %s" % function_name)
esp = regs['ESP']
imm.log("EIP on Stack: 0x%08x RETURN TO 0x%08x" % \
( esp, imm.readLong(esp) ) )
imm.log("1st arg - (int) addressfamily: 0x%08x -> 0x%08x" % \
( (esp+4), imm.readLong(esp+4) ) )
imm.log("2nd arg - (int) type: 0x%08x -> 0x%08x" % \
( (esp+8), imm.readLong(esp+8) ) )
imm.log("3nd arg - (int) protocol: 0x%08x -> 0x%08x" % \
( (esp+12), imm.readLong(esp+12) ) )
def sendto_hook(self, imm, function_name, regs):
"""
Example sendto routine
int sendto(_In_ SOCKET s, _In_ const char *buf,
_In_ int len, _In_ int flags,
_In_ const struct sockaddr *to, _In_ int tolen);
"""
imm.log("[hook] %s" % function_name)
esp = regs['ESP']
imm.log("EIP on Stack: 0x%08x RETURN TO 0x%08x" % \
( esp, imm.readLong(esp) ) )
imm.log("1st arg - (SOCKET) socket: 0x%08x -> 0x%08x" % \
( (esp+4), imm.readLong(esp+4) ) )
imm.log("2nd arg - (char *) buff: 0x%08x -> 0x%08x" % \
( (esp+8), imm.readLong(esp+8) ) )
self._print_ascii_buffer(imm, imm.readLong(esp+8), 10)
self._print_hex_buffer(imm, imm.readLong(esp+8), 10)
imm.log("3rd arg - (int) len: 0x%08x -> 0x%08x" % \
( (esp+12), imm.readLong(esp+12) ) )
imm.log("4th arg - (int) flags: 0x%08x -> 0x%08x" % \
( (esp+16), imm.readLong(esp+16) ) )
imm.log("5th arg - (struct sockaddr *) to: 0x%08x -> 0x%08x" % \
( (esp+20), imm.readLong(esp+20) ) )
imm.log("6th arg - (int) tolen: 0x%08x -> 0x%08x" % \
( (esp+24), imm.readLong(esp+24) ) )
def default_hook(self, imm, function_name, regs):
"""
Catch all calls from not defined hooks
"""
imm.log("[hook][default] %s" % function_name)
esp = regs['ESP']
# read stack and pointer address - print buffer?
imm.log("ESP: 0x%08x RETURN TO 0x%08x" % \
( esp, imm.readLong(esp) ) )
imm.log("ESP + 4: 0x%08x -> 0x%08x" % \
( (esp+4), imm.readLong(esp+4) ) )
imm.log("ESP + 8: 0x%08x -> 0x%08x" % \
( (esp+8), imm.readLong(esp+8) ) )
imm.log("ESP + 12: 0x%08x -> 0x%08x" % \
( (esp+12), imm.readLong(esp+12) ) )
def run(self, regs):
'''
Hook controller
It will run up when function hook reached
'''
imm = Debugger()
# call the routine for the hook reached
function_name = imm.getKnowledge("%08x" % regs['EIP'])
if function_name == RECVFROM:
self.recvfrom_hook(imm, function_name, regs)
elif function_name == SENDTO:
self.sendto_hook(imm, function_name, regs)
elif function_name == SOCKET:
self.socket_hook(imm, function_name, regs)
else:
self.default_hook(imm, function_name, regs)
return
class Controller:
""" control program flow routines """
def __init__(self, args):
""" init """
# functions to hook
self.ret = SUCCESS
self.args = args
self.options = {} # options from command line, config, etc.
self.wrapper = Wrapper(self.options)
self.hook = ''
self.parser = OptsParser(self.options)
def start(self):
""" main """
Help.banner()
self.ret = self.parser.parse_opts(self.args)
# check args
if not self.options:
return self.ret
self.wrapper.verbose("verbose")
if self.options.get("add_name"):
self.hook = Hook()
self.hook.set_hook_by_name(self.options.get("add_name"))
elif self.options.get("add_address"):
self.hook = Hook()
self.hook.set_hook_by_address( \
self.options.get("add_address").split("+")[0], \
self.options.get("add_address").split("+")[1].replace('0x',''))
elif self.options.get("run_name_hook"):
self.hook = Hook()
imm.addKnowledge("immhooktmpl", self.hook)
for name in FUNCTIONS:
self.hook.set_hook_by_name(name)
elif self.options.get("run_addr_hook"):
self.hook = Hook()
imm.addKnowledge("immhooktmpl", self.hook)
for addr in ADDRESSES:
self.hook.set_hook_by_address( \
addr[0], \
addr[1])
elif self.options.get("remove"):
imm.removeHook(self.options.get("remove"))
Error.warn("Removed hook %s " % self.options.get("remove"))
elif self.options.get("cleanup"):
self.hook = imm.getKnowledge("immhooktmpl")
for name in FUNCTIONS:
imm.removeHook(name)
Error.warn("Removed hook %s " % name)
for addr in ADDRESSES:
imm.removeHook(addr[0])
Error.warn("Removed Address hook %s" % addr[0])
imm.forgetKnowledge("immhooktmpl")
return "[+] Functions in code cleaned up"
return SUCCESS
def end(self):
""" do last needed things """
pass
def main(args):
ret = SUCCESS
cntrl = Controller(args)
ret = cntrl.start()
cntrl.end()
return ret
# EOF