当前位置: 首页 > 工具软件 > Explore++ > 使用案例 >

Angr源码分析——Explorer 技术及ExplorationTechnique机制简单解析

秦昂然
2023-12-01

本博客由闲散白帽子胖胖鹏鹏胖胖鹏潜力所写,仅仅作为个人技术交流分享,不得用做商业用途。转载请注明出处,未经许可禁止将本博客内所有内容转载、商用。    

      我们在使用Angr分析代码的时候,最常用的功能应该是Simulation Manager的explore()功能了,这个函数能够很方便的让Angr搜索到我们想要的state,并且求出运行到该state所需的约束条件。那今天,我们来分析一下,Simulation Manager是如何做到的。首先来看这个explore函数。

0x01 Simulation Manager中的explore技术

代码位置:/angr/sim_manager.py

def explore(self, stash=None, n=None, find=None, avoid=None, find_stash=None, avoid_stash=None, cfg=None,
			num_find=1, **kwargs):
	"""
	向前拖动stash(最多n次或者知道找到满足"num_find"个状态),寻找满足"find"条件的状态,
	避免"avoid"条件,将找到的状态存储到"find_stash",将避免的状态存储到"avoid_stash"。
	"find"和"avoid"状态可以是以下任意一种:
	- 一个函数地址
	- 一个地址set或者list
	- 一个函数,这个函数将state作为输入,并且返回是否满足条件
	如果将Angr生成的cfg作为参数传给"cfg"变量,并且"find"参数是set/list/单个地址的话,
	那么任何需要经过失败状态之后才能到达成功状态的状态,将会被提前抛弃。(有一点绕)
	(PS:这句话意思就是说,如果你在到达成功的状态的路径上面,有一个状态是失败的状态,那么这个状态就被提前抛弃)
	(也就是说,如果我们生成了一个完整的CFG,那么可以简化explore时候的复杂度?有待考证)
	"""
	#初始化这些stash
	stash = stash or 'active'
	find_stash = find_stash or 'found'
	avoid_stash = avoid_stash or 'avoid'

	num_find += len(self._stashes[find_stash]) if find_stash in self._stashes else 0
	#这里是重点,调用了use_technique()
	tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))

	try:
		self.run(stash=stash, n=n, **kwargs)
	finally:
		self.remove_technique(tech)

	return self

      在代码中,我们可以看到SimulationManager(以下简称SM)先是使用use_technique函数增加了Explorer插件(这一点和我之前讲过的Angr的插件化类似),之后使用run函数执行,最后在SM中删除Explorer插件。我们逐层分析,先来看看Explorer中都定义了什么内容。

0x02 Exploration_Technique中的Explorer

      简单的来讲Explorer中做了这么几件事。1.初始化进行搜索的必要参数;2.重写filter函数,判断当前的state是否应该被过滤;3.重写complete函数,用于判断是否已经搜索到find中提供的地址。

class Explorer(ExplorationTechnique):
    """
    继承自ExplorationTechnique这个父类,关于ExplorationTechnique的定义我放在后面
    感兴趣的话可以读一读,因为他本身是个接口,没有什么特殊的定义
    这里的函数说明写的和SimulationManager中的相同。
    """
    def __init__(self, find=None, avoid=None, find_stash='found', avoid_stash='avoid', cfg=None, num_find=1, avoid_priority=False):
        super(Explorer, self).__init__()
	#初始化find和avoid,这里使用到的函数在后面的ExplorationTechnique定义中有讲
        self.find = self._condition_to_lambda(find)
        self.avoid = self._condition_to_lambda(avoid)
        self.find_stash = find_stash
        self.avoid_stash = avoid_stash
        self.cfg = cfg
        self.ok_blocks = set()
        self.num_find = num_find
        self.avoid_priority = avoid_priority

        find_addrs = getattr(self.find, "addrs", None)
        avoid_addrs = getattr(self.avoid, "addrs", None)

        # 只有所有的地址我们都能够通过静态获取的时候,使用unicorn才是安全的
        self._warn_unicorn = (find_addrs is None) or (avoid_addrs is None)

        # 即使avoid和find的地址我们不知道,我们还是应该停在我们知道的地址
        self._extra_stop_points = (find_addrs or set()) | (avoid_addrs or set())


        # TODO: 暂时还没没有办法完成,因为CFGFast不能够解决过程的延续性
	# 也就是说,Angr并不推荐使用CFG,并且已经禁止使用CFG了
        from .. import analyses
        if isinstance(cfg, analyses.CFGFast):
            l.error("CFGFast is currently inappropriate for use with Explorer.")
            l.error("Usage of the CFG has been disabled for this explorer.")
            self.cfg = None

	# 哈!上面不是已经设置为None了么,这段代码看来是进不去了
        if self.cfg is not None:
            avoid = avoid_addrs or set()

            # find中的地址必须是静态的
            if not find_addrs:
                l.error("You must provide at least one 'find' address as a number, set, list, or tuple if you provide a CFG.")
                l.error("Usage of the CFG has been disabled for this explorer.")
                self.cfg = None
                return
	    # avoid的地址也需要在cfg中
            for a in avoid:
                if cfg.get_any_node(a) is None:
                    l.warning("'Avoid' address %#x not present in CFG...", a)

            # 这不是队列而是一个栈。。。。他只是一个任务列表而已!
            queue = []
            for f in find_addrs:
                nodes = cfg.get_all_nodes(f)
                if len(nodes) == 0:
                    l.warning("'Find' address %#x not present in CFG...", f)
                else:
                    queue.extend(nodes)

	    # 这是从find的地址出发,后向搜索block,直到找到从路径的起始点start,也就是后向搜索所有的start,使得start能够到达find的地址
            seen_nodes = set()
            while len(queue) > 0:
                n = queue.pop()
                if id(n) in seen_nodes:
                    continue
                if n.addr in avoid:
                    continue
                self.ok_blocks.add(n.addr)
                seen_nodes.add(id(n))
                queue.extend(n.predecessors)

            if len(self.ok_blocks) == 0:
                l.error("No addresses could be validated by the provided CFG!")
                l.error("Usage of the CFG has been disabled for this explorer.")
                self.cfg = None
                return

	    # 确保传入的CFG是完整的
	    # 提供不完整的CFG将导致一些可行路径的丢失!
            l.warning("Please be sure that the CFG you have passed in is complete.")
            l.warning("Providing an incomplete CFG can cause viable paths to be discarded!")

    # 初始化SM的stash
    def setup(self, simgr):
        if not self.find_stash in simgr.stashes: simgr.stashes[self.find_stash] = []
        if not self.avoid_stash in simgr.stashes: simgr.stashes[self.avoid_stash] = []
    # 在将find和avoid添加到停止运行的set中去,便于我们搜索和停止搜索某些地址
    def step(self, simgr, stash=None, **kwargs):
        base_extra_stop_points = set(kwargs.get("extra_stop_points") or {})
        return simgr.step(stash=stash, extra_stop_points=base_extra_stop_points | self._extra_stop_points, **kwargs)

    def filter(self, simgr, state, filter_func=None):
        if sim_options.UNICORN in state.options and self._warn_unicorn:
            self._warn_unicorn = False # 只显示一次警告
	    # 使用unicorn的时候,可能会步过匹配的条件而不停止
            l.warning("Using unicorn with find or avoid conditions that are a lambda (not a number, set, tuple or list).")
            l.warning("Unicorn may step over states that match the condition (find or avoid) without stopping.")
	# 判断当前state是否是我们想要寻找的state
        rFind = self.find(state)
        if rFind:
        # 如果这条路径是不可达的,那么返回unsat;
	# 判断是否可达的方法就是生成Claripy.solver,求解其约束条件,如果约束条件可行,则可解
            if not state.history.reachable:
                return 'unsat'
	    # 判断是否是需要避免的state
            rAvoid = self.avoid(state)
            if rAvoid:
                # 如果有冲突(即满足avoid且满足find)
                if self.avoid_priority & ((type(rFind) is not set) | (type(rAvoid) is not set)):
                # 如果设置了avoid优先级,并且rFind和rAvoid中任意一个不是一个set,就返回avoid_stash
                    return self.avoid_stash
            if type(rAvoid) is not set:
                # rAvoid 为False 或者 self.avoid_priority 为False
                # 将rAvoid设置为{} 简化剩余的代码
                rAvoid = {}
	    # 如果rFind是个集合的话
            if type(rFind) is set:
                while state.addr not in rFind: # 这里是检查find或者avoid的地址在block里面的情况
		# 如果state地址不在rFind中
		    # 却又在avoid中,就返回avoid
                    if state.addr in rAvoid:
                        return self.avoid_stash
                    try:
			#否则就去寻找后继点,看是不是符合这些特征
                        state = self.project.factory.successors(state, num_inst=1).successors[0]
                    except SimIRSBNoDecodeError as ex:
                        if state.arch.name.startswith('MIPS'):
                            l.warning('Due to MIPS delay slots, the find address must be executed with other instructions and therefore may not be able to be found' + \
                                ' - Trying to find state that includes find address')
                            if len(rFind.intersection(set(state.block().instruction_addrs))) > 0:
                                #there is an address that is both in the block AND in the rFind stat
                                l.warning('Found state that includes find instruction, this one will be returned')
                                rFind = rFind.union(set(state.block().instruction_addrs))
                        else:
                                raise ex
                if self.avoid_priority & (state.addr in rAvoid):
                # 只有rAvoid 和 rFind 有交集的时候才会发生
                # 但是为什么会有人想找到某地址同时又avoid某地址?
                    return self.avoid_stash
            return (self.find_stash, state)
	# 判断当前state是不是我们需要avoid的state,如果是的话,返回avoid_stash
        if self.avoid(state): return self.avoid_stash
	# 如果提供了CFG,并且当前状态还在cfg中,当前地址不在可行的block中,就返回avoid_stash
        if self.cfg is not None and self.cfg.get_any_node(state.addr) is not None:
            if state.addr not in self.ok_blocks: return self.avoid_stash
        return None

    # 判断是否已经运行到完成状态,在explorer中,就是find_stash中state的个数大于等于我们期望的stated的个数
    def complete(self, simgr):
        return len(simgr.stashes[self.find_stash]) >= self.num_find

      从上面的代码中可以看出,其实Angr是不推荐在explore中使用cfg进行辅助分析的,因为目前CFGFast没法保证调用流图的完整性和连贯性,因此这些问题将导致CFG确实某些路径, 而explore基于cfg的分析恰巧是需要完整的cfg路径的,这将导致结果失真。所以,在传入cfg时,一定要确保我们生成的cfg是完整的。(苦笑.jpg)

      而进行过滤的方法在filter中,主要的思想就是判断当前state是一个find的state还是avoid的state,如果都不是,就返回none。如果是find(将会得到一串find的地址),还需要判断是否是可达的,也就是使用solver进行求解约束条件;如果是可达的,还要看他和avoid是否有冲突,如果没有冲突还要继续查看后继点,直到后继点满足find条件。

关于ExplorationTechnique类,Angr中是这么定义的。

位置:/angr/exploration_techniques/__init__.py

class ExplorationTechnique(object):
    """
	一个Technique就是一系列的hook,这些hook用来帮助SimulationManager在符号搜索中实现新的搜索技术
	这些方法中任何一个都可能被子类重写。
	Exploration Technique的正确食用方法是调用``sim.use_technique``,并且传入technique的*实例*
    """
    # 8<----------------- Compatibility layer -----------------
    __metaclass__ = ExplorationTechniqueMeta
    # ------------------- Compatibility layer --------------->8

    def __init__(self):
        # 这条属性应该被上面提到的manager自动设置
        if not hasattr(self, 'project'):
            self.project = None

    def setup(self, simgr):
        """
        在manager中执行你可能需要的初始化操作
        """
        pass

    def step(self, simgr, stash=None, **kwargs):  # pylint:disable=no-self-use
        """
	将manager中的这个stash向前执行。你应该调用``simgr.step(stash, **kwargs)``进行真正的执行。
	返回已经step的manager
        """
        return simgr.step(stash=stash, **kwargs)

    def filter(self, simgr, state, filter_func=None):  # pylint:disable=no-self-use
        """
	过滤state。
	如果这个state不应该过滤,就返回None。
	如果这个state应该过滤,返回要过滤的那个state的stash的名字(state和stash有点分不清)
	如果你想在state被过滤前对他执行操作,将返回一个元组,元祖中有要过滤的state以及修改之后的state
        """
        return simgr.filter(state, filter_func=filter_func)

    def selector(self, simgr, state, selector_func=None):  # pylint:disable=no-self-use
        """
	如果返回True,则说明这个状态应该在step()中进行处理。
        """
        return simgr.selector(state, selector_func=selector_func)

    def step_state(self, simgr, state, successor_func=None, **kwargs):  # pylint:disable=no-self-use
        """
	向前step一个state。
	如果step失败啦,返回None;
	否则,返回一个stash字典dict,字典中的stash在simulation manager中应该被归并。
	所有的states都应该添加到PathGroup的stash中(可是PathGroup不是已经废弃了么?有点迷茫)
        """
        return simgr.step_state(state, successor_func=successor_func, **kwargs)

    def successors(self, simgr, state, successor_func=None, **run_args):  # pylint:disable=no-self-use
        """
	返回给定state的successor。
        """
        return simgr.successors(state, successor_func=successor_func, **run_args)

    def complete(self, simgr):  # pylint:disable=no-self-use,unused-argument
        """
	返回当前的SM是否达到了complete状态,比如``SimulationManager.run()`` 挂起(halt)了
        """
        return False

    def _condition_to_lambda(self, condition, default=False):
        """
	敲黑板!!!这个就是为什么explore中的find参数支持函数、lambda、地址和set的原因!!!!
	将整数、set、list、或者lambda转换成lambda形式,使用这些lambda检查state的地址和给定的地址,state的地址从block中读取出来。
        :param condition:   将要转换成lambda的整数、set、 list 或是 lambda.
        :param default:     默认的lambda返回值(防止condition为空). 默认为False.
        :returns:           一个lambda,将state作为输入,并且返回一个地址set,set中的地址都满足条件
		                    这个lambda中含有一个 `.addrs` 属性,包含了完整的满足条件的地址集合,前提是地址能够被静态确定。
        """
	# 如果条件为空,直接返回默认值False
        if condition is None:
            condition_function = lambda p: default
            condition_function.addrs = set()
        # 如果是int或者long,交给处理tuple的部分进行处理
        elif isinstance(condition, (int, long)):
            return self._condition_to_lambda((condition,))
        # 如果是tuple、set、list
        elif isinstance(condition, (tuple, set, list)):
             # 将条件转化为set
            addrs = set(condition)
            def condition_function(p):
                if p.addr in addrs:
                    # 返回 {p.addr} 而不是True也许可以解决find/avoid 的冲突
                    return {p.addr}

                if not isinstance(self.project.engines.default_engine, engines.SimEngineVEX):
                    return False

                try:
                    # 如果地址不在set中(这也许意味着他并不在block的顶部)
                    # 直接检查block是否在bolck里面。
                    # (Blocks 在每次检查的时候都会被重复创建,但是Angr中有IRSB缓存,所以没有问题)
                    return addrs.intersection(set(self.project.factory.block(p.addr).instruction_addrs))
                except (AngrError, SimError):
                    return False
            # 直接把地址列表放进去
            condition_function.addrs = addrs
        # 如果是一个函数,直接返回函数
		elif hasattr(condition, '__call__'):
            condition_function = condition
        else:
            raise AngrExplorationTechniqueError("ExplorationTechnique is unable to convert given type (%s) to a callable condition function." % condition.__class__)

        return condition_function		
      我的理解就是,其实这只是一个接口类,里面很多方法都有待子类去重写,但是其中的condition转化lambda这个函数很重要,我们在使用explore的时候可以很方便的写find条件,而Angr能够正确识别的原因就在于此。
0x03 Exploration_Technique插件与SimulationManager的联系

接下来看看是怎么添加和移除technique的。

def use_technique(self, tech):
	"""
	在SimulationManager中使用Exploration技术
	这些技术在模块:`angr.exploration_techniques`中
	:param tech:    一个ExplorationTechnique 对象,包含着修改SM行为的代码
	:type tech:     ExplorationTechnique
	:return:        返回被添加的tech
	"""
	if not isinstance(tech, ExplorationTechnique):
		raise SimulationManagerError

	# XXX: as promised
	tech.project = self._project
	# 执行setup进行初始化
	tech.setup(self)

	def _is_overriden(name):
		return getattr(tech, name).__code__ is not getattr(ExplorationTechnique, name).__code__
    #判断ExplorationTechnique类中有没有重写SimulationManager的函数,如果有就进行替换
	overriden = filter(_is_overriden, ('step', 'filter', 'selector', 'step_state', 'successors'))
	hooks = {name: getattr(tech, name) for name in overriden}
	HookSet.install_hooks(self, **hooks)

	self._techniques.append(tech)
	return tech

    def remove_technique(self, tech):
        """
        去除一个已经激活的exploration technique 
        :param tech:    一个ExplorationTechnique对象
        :type tech:     ExplorationTechnique
        """
        if not isinstance(tech, ExplorationTechnique):
            raise SimulationManagerError

        def _is_overriden(name):
            return getattr(tech, name).__code__ is not getattr(ExplorationTechnique, name).__code__
		# 取消掉之前hook的函数
        overriden = filter(_is_overriden, ('step', 'filter', 'selector', 'step_state', 'successors'))
        hooks = {name: getattr(tech, name) for name in overriden}
        HookSet.remove_hooks(self, **hooks)

        self._techniques.remove(tech)
        return tech

接下来我们看下run函数干了什么。

def run(self, stash=None, n=None, until=None, **kwargs):
	"""
	根据当前的搜索技术,运行SimulationManager直到完成状态
	:param stash:       操作stash
	:param n:           最多step的次数
	:param until:       如果提供了这个参数,他应该是将SimulationManager作为输入参数的函数,并且返回True或者False。
						当返回为True的时候停止运行
	:return:            返回 SimulationManager.
	:rtype:             SimulationManager
	"""
	stash = stash or 'active'
	#从0到n进行n次循环
	for _ in (itertools.count() if n is None else xrange(0, n)):
		#如果没有到达complete状态,并且还有stash没有处理的话
		if not self.complete() and self._stashes[stash]:
			#对stash进行执行
			self.step(stash=stash, **kwargs)
			#如果设置了until并且未达到until的条件就continue,否则break
			if not (until and until(self)):
				continue
		break
	#最终返回一个SimulationManager
	return self

       其实代码很简单了,就是不断地执行,知道SimulationManager达到设定好的complete状态。在代码中可以看到,它调用了step()函数,而step函数是我们已经hook过得,在其中就有调用filter对state进行分类的部分。至此,我们完成了Explore()函数的完整分析,从目前来看,我们explore就是单纯的step(),然后求解约束条件,并且对state进行分类。我们之前所说的explore卡死的情况, 应该是solver引擎造成的,毕竟全部使用符号值,solver求解会很麻烦。同时会产生很多个备选state,这样一来就导致了路径爆炸。看来我们需要找到能够约束state个数的地方。

0x04 其余的Exploration Technique

      Angr还提供了很多Technique,我这里只进行简单地介绍,后面如果需要使用到的时候,在进行详细分析。

Explorer: 在.explore()函数中实现, 提供搜索和避免执行某些地址的功能
DFS: 深度优先搜索,一次只激活一个state,将其他的state放到deferred stash中,等state结束之后再处理。
LoopLimiter: 限制循环,使用一种粗糙的循环技术,并且超过循环次数的时候就放入spinning stash 在执行完其他的state之后再继续执行?
LengthLimiter: 限制长度,限制state中的路径最大长度。
ManualMergepoint: 手动归并点。讲一个地址标记为程序归并点,在state到达这个地址的时候就会暂留,当其他是state到达是合并到一起。
Veritesting: 一个自动识别归并点的cmu论文的实现。这很有用,可以使用 veritesting=True 参数在构造的时候激活!和其他technique一起使用时不太好使,因为使用静态符号执行。
Tracer: 动态追中state。动态追踪库中有一些工具进行追踪。
Oppologist: 操作序言是一个很有趣的小工具,如果这项技术启动了,Angr会计算一个不支持的指令,比如奇怪的浮点SIMD op指令,Angr将具象化所有的输入传给这个指令,并使用unicorn引擎单指令模拟,允许执行得以继续
Threading: 在step过程中增加线程级别的并行性。但是在Python中由于全局中断锁的存在,并不能够进行多线程,但是其他native层的代码是可以的比如(z3,unicorn,libvex)
Spiller: 当active的state太多的时候,这个technique允许存储某些state到硬盘。
如果我们想要限制路径爆炸的话,veritesting也许值得尝试,但是从原理上讲,Angr难以避免遇到这种问题。
 类似资料: