本博客由闲散白帽子胖胖鹏鹏胖胖鹏潜力所写,仅仅作为个人技术交流分享,不得用做商业用途。转载请注明出处,未经许可禁止将本博客内所有内容转载、商用。
我们在使用Angr分析代码的时候,最常用的功能应该是Simulation Manager的explore()功能了,这个函数能够很方便的让Angr搜索到我们想要的state,并且求出运行到该state所需的约束条件。那今天,我们来分析一下,Simulation Manager是如何做到的。首先来看这个explore函数。
代码位置:/angr/sim_manager.py
def explore(self, stash=None, n=None, find=None, avoid=None, find_stash=None, avoid_stash=None, cfg=None,
num_find=1, **kwargs):
"""
向前拖动stash(最多n次或者知道找到满足"num_find"个状态),寻找满足"find"条件的状态,
避免"avoid"条件,将找到的状态存储到"find_stash",将避免的状态存储到"avoid_stash"。
"find"和"avoid"状态可以是以下任意一种:
- 一个函数地址
- 一个地址set或者list
- 一个函数,这个函数将state作为输入,并且返回是否满足条件
如果将Angr生成的cfg作为参数传给"cfg"变量,并且"find"参数是set/list/单个地址的话,
那么任何需要经过失败状态之后才能到达成功状态的状态,将会被提前抛弃。(有一点绕)
(PS:这句话意思就是说,如果你在到达成功的状态的路径上面,有一个状态是失败的状态,那么这个状态就被提前抛弃)
(也就是说,如果我们生成了一个完整的CFG,那么可以简化explore时候的复杂度?有待考证)
"""
#初始化这些stash
stash = stash or 'active'
find_stash = find_stash or 'found'
avoid_stash = avoid_stash or 'avoid'
num_find += len(self._stashes[find_stash]) if find_stash in self._stashes else 0
#这里是重点,调用了use_technique()
tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))
try:
self.run(stash=stash, n=n, **kwargs)
finally:
self.remove_technique(tech)
return self
在代码中,我们可以看到SimulationManager(以下简称SM)先是使用use_technique函数增加了Explorer插件(这一点和我之前讲过的Angr的插件化类似),之后使用run函数执行,最后在SM中删除Explorer插件。我们逐层分析,先来看看Explorer中都定义了什么内容。
简单的来讲Explorer中做了这么几件事。1.初始化进行搜索的必要参数;2.重写filter函数,判断当前的state是否应该被过滤;3.重写complete函数,用于判断是否已经搜索到find中提供的地址。
class Explorer(ExplorationTechnique):
"""
继承自ExplorationTechnique这个父类,关于ExplorationTechnique的定义我放在后面
感兴趣的话可以读一读,因为他本身是个接口,没有什么特殊的定义
这里的函数说明写的和SimulationManager中的相同。
"""
def __init__(self, find=None, avoid=None, find_stash='found', avoid_stash='avoid', cfg=None, num_find=1, avoid_priority=False):
super(Explorer, self).__init__()
#初始化find和avoid,这里使用到的函数在后面的ExplorationTechnique定义中有讲
self.find = self._condition_to_lambda(find)
self.avoid = self._condition_to_lambda(avoid)
self.find_stash = find_stash
self.avoid_stash = avoid_stash
self.cfg = cfg
self.ok_blocks = set()
self.num_find = num_find
self.avoid_priority = avoid_priority
find_addrs = getattr(self.find, "addrs", None)
avoid_addrs = getattr(self.avoid, "addrs", None)
# 只有所有的地址我们都能够通过静态获取的时候,使用unicorn才是安全的
self._warn_unicorn = (find_addrs is None) or (avoid_addrs is None)
# 即使avoid和find的地址我们不知道,我们还是应该停在我们知道的地址
self._extra_stop_points = (find_addrs or set()) | (avoid_addrs or set())
# TODO: 暂时还没没有办法完成,因为CFGFast不能够解决过程的延续性
# 也就是说,Angr并不推荐使用CFG,并且已经禁止使用CFG了
from .. import analyses
if isinstance(cfg, analyses.CFGFast):
l.error("CFGFast is currently inappropriate for use with Explorer.")
l.error("Usage of the CFG has been disabled for this explorer.")
self.cfg = None
# 哈!上面不是已经设置为None了么,这段代码看来是进不去了
if self.cfg is not None:
avoid = avoid_addrs or set()
# find中的地址必须是静态的
if not find_addrs:
l.error("You must provide at least one 'find' address as a number, set, list, or tuple if you provide a CFG.")
l.error("Usage of the CFG has been disabled for this explorer.")
self.cfg = None
return
# avoid的地址也需要在cfg中
for a in avoid:
if cfg.get_any_node(a) is None:
l.warning("'Avoid' address %#x not present in CFG...", a)
# 这不是队列而是一个栈。。。。他只是一个任务列表而已!
queue = []
for f in find_addrs:
nodes = cfg.get_all_nodes(f)
if len(nodes) == 0:
l.warning("'Find' address %#x not present in CFG...", f)
else:
queue.extend(nodes)
# 这是从find的地址出发,后向搜索block,直到找到从路径的起始点start,也就是后向搜索所有的start,使得start能够到达find的地址
seen_nodes = set()
while len(queue) > 0:
n = queue.pop()
if id(n) in seen_nodes:
continue
if n.addr in avoid:
continue
self.ok_blocks.add(n.addr)
seen_nodes.add(id(n))
queue.extend(n.predecessors)
if len(self.ok_blocks) == 0:
l.error("No addresses could be validated by the provided CFG!")
l.error("Usage of the CFG has been disabled for this explorer.")
self.cfg = None
return
# 确保传入的CFG是完整的
# 提供不完整的CFG将导致一些可行路径的丢失!
l.warning("Please be sure that the CFG you have passed in is complete.")
l.warning("Providing an incomplete CFG can cause viable paths to be discarded!")
# 初始化SM的stash
def setup(self, simgr):
if not self.find_stash in simgr.stashes: simgr.stashes[self.find_stash] = []
if not self.avoid_stash in simgr.stashes: simgr.stashes[self.avoid_stash] = []
# 在将find和avoid添加到停止运行的set中去,便于我们搜索和停止搜索某些地址
def step(self, simgr, stash=None, **kwargs):
base_extra_stop_points = set(kwargs.get("extra_stop_points") or {})
return simgr.step(stash=stash, extra_stop_points=base_extra_stop_points | self._extra_stop_points, **kwargs)
def filter(self, simgr, state, filter_func=None):
if sim_options.UNICORN in state.options and self._warn_unicorn:
self._warn_unicorn = False # 只显示一次警告
# 使用unicorn的时候,可能会步过匹配的条件而不停止
l.warning("Using unicorn with find or avoid conditions that are a lambda (not a number, set, tuple or list).")
l.warning("Unicorn may step over states that match the condition (find or avoid) without stopping.")
# 判断当前state是否是我们想要寻找的state
rFind = self.find(state)
if rFind:
# 如果这条路径是不可达的,那么返回unsat;
# 判断是否可达的方法就是生成Claripy.solver,求解其约束条件,如果约束条件可行,则可解
if not state.history.reachable:
return 'unsat'
# 判断是否是需要避免的state
rAvoid = self.avoid(state)
if rAvoid:
# 如果有冲突(即满足avoid且满足find)
if self.avoid_priority & ((type(rFind) is not set) | (type(rAvoid) is not set)):
# 如果设置了avoid优先级,并且rFind和rAvoid中任意一个不是一个set,就返回avoid_stash
return self.avoid_stash
if type(rAvoid) is not set:
# rAvoid 为False 或者 self.avoid_priority 为False
# 将rAvoid设置为{} 简化剩余的代码
rAvoid = {}
# 如果rFind是个集合的话
if type(rFind) is set:
while state.addr not in rFind: # 这里是检查find或者avoid的地址在block里面的情况
# 如果state地址不在rFind中
# 却又在avoid中,就返回avoid
if state.addr in rAvoid:
return self.avoid_stash
try:
#否则就去寻找后继点,看是不是符合这些特征
state = self.project.factory.successors(state, num_inst=1).successors[0]
except SimIRSBNoDecodeError as ex:
if state.arch.name.startswith('MIPS'):
l.warning('Due to MIPS delay slots, the find address must be executed with other instructions and therefore may not be able to be found' + \
' - Trying to find state that includes find address')
if len(rFind.intersection(set(state.block().instruction_addrs))) > 0:
#there is an address that is both in the block AND in the rFind stat
l.warning('Found state that includes find instruction, this one will be returned')
rFind = rFind.union(set(state.block().instruction_addrs))
else:
raise ex
if self.avoid_priority & (state.addr in rAvoid):
# 只有rAvoid 和 rFind 有交集的时候才会发生
# 但是为什么会有人想找到某地址同时又avoid某地址?
return self.avoid_stash
return (self.find_stash, state)
# 判断当前state是不是我们需要avoid的state,如果是的话,返回avoid_stash
if self.avoid(state): return self.avoid_stash
# 如果提供了CFG,并且当前状态还在cfg中,当前地址不在可行的block中,就返回avoid_stash
if self.cfg is not None and self.cfg.get_any_node(state.addr) is not None:
if state.addr not in self.ok_blocks: return self.avoid_stash
return None
# 判断是否已经运行到完成状态,在explorer中,就是find_stash中state的个数大于等于我们期望的stated的个数
def complete(self, simgr):
return len(simgr.stashes[self.find_stash]) >= self.num_find
从上面的代码中可以看出,其实Angr是不推荐在explore中使用cfg进行辅助分析的,因为目前CFGFast没法保证调用流图的完整性和连贯性,因此这些问题将导致CFG确实某些路径, 而explore基于cfg的分析恰巧是需要完整的cfg路径的,这将导致结果失真。所以,在传入cfg时,一定要确保我们生成的cfg是完整的。(苦笑.jpg)
而进行过滤的方法在filter中,主要的思想就是判断当前state是一个find的state还是avoid的state,如果都不是,就返回none。如果是find(将会得到一串find的地址),还需要判断是否是可达的,也就是使用solver进行求解约束条件;如果是可达的,还要看他和avoid是否有冲突,如果没有冲突还要继续查看后继点,直到后继点满足find条件。
关于ExplorationTechnique类,Angr中是这么定义的。
位置:/angr/exploration_techniques/__init__.py
class ExplorationTechnique(object):
"""
一个Technique就是一系列的hook,这些hook用来帮助SimulationManager在符号搜索中实现新的搜索技术
这些方法中任何一个都可能被子类重写。
Exploration Technique的正确食用方法是调用``sim.use_technique``,并且传入technique的*实例*
"""
# 8<----------------- Compatibility layer -----------------
__metaclass__ = ExplorationTechniqueMeta
# ------------------- Compatibility layer --------------->8
def __init__(self):
# 这条属性应该被上面提到的manager自动设置
if not hasattr(self, 'project'):
self.project = None
def setup(self, simgr):
"""
在manager中执行你可能需要的初始化操作
"""
pass
def step(self, simgr, stash=None, **kwargs): # pylint:disable=no-self-use
"""
将manager中的这个stash向前执行。你应该调用``simgr.step(stash, **kwargs)``进行真正的执行。
返回已经step的manager
"""
return simgr.step(stash=stash, **kwargs)
def filter(self, simgr, state, filter_func=None): # pylint:disable=no-self-use
"""
过滤state。
如果这个state不应该过滤,就返回None。
如果这个state应该过滤,返回要过滤的那个state的stash的名字(state和stash有点分不清)
如果你想在state被过滤前对他执行操作,将返回一个元组,元祖中有要过滤的state以及修改之后的state
"""
return simgr.filter(state, filter_func=filter_func)
def selector(self, simgr, state, selector_func=None): # pylint:disable=no-self-use
"""
如果返回True,则说明这个状态应该在step()中进行处理。
"""
return simgr.selector(state, selector_func=selector_func)
def step_state(self, simgr, state, successor_func=None, **kwargs): # pylint:disable=no-self-use
"""
向前step一个state。
如果step失败啦,返回None;
否则,返回一个stash字典dict,字典中的stash在simulation manager中应该被归并。
所有的states都应该添加到PathGroup的stash中(可是PathGroup不是已经废弃了么?有点迷茫)
"""
return simgr.step_state(state, successor_func=successor_func, **kwargs)
def successors(self, simgr, state, successor_func=None, **run_args): # pylint:disable=no-self-use
"""
返回给定state的successor。
"""
return simgr.successors(state, successor_func=successor_func, **run_args)
def complete(self, simgr): # pylint:disable=no-self-use,unused-argument
"""
返回当前的SM是否达到了complete状态,比如``SimulationManager.run()`` 挂起(halt)了
"""
return False
def _condition_to_lambda(self, condition, default=False):
"""
敲黑板!!!这个就是为什么explore中的find参数支持函数、lambda、地址和set的原因!!!!
将整数、set、list、或者lambda转换成lambda形式,使用这些lambda检查state的地址和给定的地址,state的地址从block中读取出来。
:param condition: 将要转换成lambda的整数、set、 list 或是 lambda.
:param default: 默认的lambda返回值(防止condition为空). 默认为False.
:returns: 一个lambda,将state作为输入,并且返回一个地址set,set中的地址都满足条件
这个lambda中含有一个 `.addrs` 属性,包含了完整的满足条件的地址集合,前提是地址能够被静态确定。
"""
# 如果条件为空,直接返回默认值False
if condition is None:
condition_function = lambda p: default
condition_function.addrs = set()
# 如果是int或者long,交给处理tuple的部分进行处理
elif isinstance(condition, (int, long)):
return self._condition_to_lambda((condition,))
# 如果是tuple、set、list
elif isinstance(condition, (tuple, set, list)):
# 将条件转化为set
addrs = set(condition)
def condition_function(p):
if p.addr in addrs:
# 返回 {p.addr} 而不是True也许可以解决find/avoid 的冲突
return {p.addr}
if not isinstance(self.project.engines.default_engine, engines.SimEngineVEX):
return False
try:
# 如果地址不在set中(这也许意味着他并不在block的顶部)
# 直接检查block是否在bolck里面。
# (Blocks 在每次检查的时候都会被重复创建,但是Angr中有IRSB缓存,所以没有问题)
return addrs.intersection(set(self.project.factory.block(p.addr).instruction_addrs))
except (AngrError, SimError):
return False
# 直接把地址列表放进去
condition_function.addrs = addrs
# 如果是一个函数,直接返回函数
elif hasattr(condition, '__call__'):
condition_function = condition
else:
raise AngrExplorationTechniqueError("ExplorationTechnique is unable to convert given type (%s) to a callable condition function." % condition.__class__)
return condition_function
我的理解就是,其实这只是一个接口类,里面很多方法都有待子类去重写,但是其中的condition转化lambda这个函数很重要,我们在使用explore的时候可以很方便的写find条件,而Angr能够正确识别的原因就在于此。
接下来看看是怎么添加和移除technique的。
def use_technique(self, tech):
"""
在SimulationManager中使用Exploration技术
这些技术在模块:`angr.exploration_techniques`中
:param tech: 一个ExplorationTechnique 对象,包含着修改SM行为的代码
:type tech: ExplorationTechnique
:return: 返回被添加的tech
"""
if not isinstance(tech, ExplorationTechnique):
raise SimulationManagerError
# XXX: as promised
tech.project = self._project
# 执行setup进行初始化
tech.setup(self)
def _is_overriden(name):
return getattr(tech, name).__code__ is not getattr(ExplorationTechnique, name).__code__
#判断ExplorationTechnique类中有没有重写SimulationManager的函数,如果有就进行替换
overriden = filter(_is_overriden, ('step', 'filter', 'selector', 'step_state', 'successors'))
hooks = {name: getattr(tech, name) for name in overriden}
HookSet.install_hooks(self, **hooks)
self._techniques.append(tech)
return tech
def remove_technique(self, tech):
"""
去除一个已经激活的exploration technique
:param tech: 一个ExplorationTechnique对象
:type tech: ExplorationTechnique
"""
if not isinstance(tech, ExplorationTechnique):
raise SimulationManagerError
def _is_overriden(name):
return getattr(tech, name).__code__ is not getattr(ExplorationTechnique, name).__code__
# 取消掉之前hook的函数
overriden = filter(_is_overriden, ('step', 'filter', 'selector', 'step_state', 'successors'))
hooks = {name: getattr(tech, name) for name in overriden}
HookSet.remove_hooks(self, **hooks)
self._techniques.remove(tech)
return tech
接下来我们看下run函数干了什么。
def run(self, stash=None, n=None, until=None, **kwargs):
"""
根据当前的搜索技术,运行SimulationManager直到完成状态
:param stash: 操作stash
:param n: 最多step的次数
:param until: 如果提供了这个参数,他应该是将SimulationManager作为输入参数的函数,并且返回True或者False。
当返回为True的时候停止运行
:return: 返回 SimulationManager.
:rtype: SimulationManager
"""
stash = stash or 'active'
#从0到n进行n次循环
for _ in (itertools.count() if n is None else xrange(0, n)):
#如果没有到达complete状态,并且还有stash没有处理的话
if not self.complete() and self._stashes[stash]:
#对stash进行执行
self.step(stash=stash, **kwargs)
#如果设置了until并且未达到until的条件就continue,否则break
if not (until and until(self)):
continue
break
#最终返回一个SimulationManager
return self
其实代码很简单了,就是不断地执行,知道SimulationManager达到设定好的complete状态。在代码中可以看到,它调用了step()函数,而step函数是我们已经hook过得,在其中就有调用filter对state进行分类的部分。至此,我们完成了Explore()函数的完整分析,从目前来看,我们explore就是单纯的step(),然后求解约束条件,并且对state进行分类。我们之前所说的explore卡死的情况, 应该是solver引擎造成的,毕竟全部使用符号值,solver求解会很麻烦。同时会产生很多个备选state,这样一来就导致了路径爆炸。看来我们需要找到能够约束state个数的地方。
Angr还提供了很多Technique,我这里只进行简单地介绍,后面如果需要使用到的时候,在进行详细分析。
Explorer: 在.explore()函数中实现, 提供搜索和避免执行某些地址的功能
DFS: 深度优先搜索,一次只激活一个state,将其他的state放到deferred stash中,等state结束之后再处理。
LoopLimiter: 限制循环,使用一种粗糙的循环技术,并且超过循环次数的时候就放入spinning stash 在执行完其他的state之后再继续执行?
LengthLimiter: 限制长度,限制state中的路径最大长度。
ManualMergepoint: 手动归并点。讲一个地址标记为程序归并点,在state到达这个地址的时候就会暂留,当其他是state到达是合并到一起。
Veritesting: 一个自动识别归并点的cmu论文的实现。这很有用,可以使用 veritesting=True 参数在构造的时候激活!和其他technique一起使用时不太好使,因为使用静态符号执行。
Tracer: 动态追中state。动态追踪库中有一些工具进行追踪。
Oppologist: 操作序言是一个很有趣的小工具,如果这项技术启动了,Angr会计算一个不支持的指令,比如奇怪的浮点SIMD op指令,Angr将具象化所有的输入传给这个指令,并使用unicorn引擎单指令模拟,允许执行得以继续
Threading: 在step过程中增加线程级别的并行性。但是在Python中由于全局中断锁的存在,并不能够进行多线程,但是其他native层的代码是可以的比如(z3,unicorn,libvex)
Spiller: 当active的state太多的时候,这个technique允许存储某些state到硬盘。
如果我们想要限制路径爆炸的话,veritesting也许值得尝试,但是从原理上讲,Angr难以避免遇到这种问题。