当前位置: 首页 > 工具软件 > Mitmproxy > 使用案例 >

mitmproxy流量重放

李星波
2023-12-01

mitmproxy录制流量并重放,官方提供了简短的例子
https://docs.mitmproxy.org/stable/tute-clientreplay/

mitmdump -w wireless-login
mitmdump -C wireless-login

这样只是死板的录制重放,我们并不能控制中间流程。为了控制中间流程,我看了源码并提取出一个模版:

mitmproxy版本:8.1.1

from typing import BinaryIO
from mitmproxy import io
from mitmproxy.http import HTTPFlow
from mitmproxy.exceptions import FlowReadException
from urllib.parse import parse_qs
import time
import asyncio
from mitmproxy import options
from mitmproxy.tools.dump import DumpMaster
from mitmproxy.addons.clientplayback import ClientPlayback
from mitmproxy import ctx
from mitmproxy import exceptions


class MyClientPlayback(ClientPlayback):
    """方便对flows进行管理, 以后flow太多可以转为数据库存取"""
    def get_flows(self) -> list:
        flows = io.read_flows_from_paths(ctx.options.client_replay)  # 读取记录的flow
        flows.reverse()
        return flows

    def configure(self, updated):
        if "client_replay" in updated and ctx.options.client_replay:
            try:
                flows = self.get_flows()
            except exceptions.FlowReadException as e:
                raise exceptions.OptionsError(str(e))
            self.start_replay(flows)

        if "client_replay_concurrency" in updated:
            if ctx.options.client_replay_concurrency not in [-1, 1]:
                raise exceptions.OptionsError(
                    "Currently the only valid client_replay_concurrency values are -1 and 1."
                )


class Writer:
    def __init__(self, path: str) -> None:
        self.f: BinaryIO = open(path, "ab")
        self.w = io.FlowWriter(self.f)

    def response(self, flow: HTTPFlow) -> None:
        print(f"记录flow")
        self.w.add(flow)

    def done(self):
        print("writer done")
        self.f.close()


class InterceptAD:
    def response(self, flow):
        print(flow.request.url)
        time.sleep(15)


class Record:

    async def write(self):
        opts = options.Options(listen_host='0.0.0.0', listen_port=8877, confdir="~/.mitmproxy_custom")
        master = DumpMaster(
            opts,
            with_termlog=False,
            with_dumper=False
        )
        master.addons.add(Writer(file))
        await master.run()
        return master

    def record(self):
        asyncio.run(self.write())

    async def read(self):
        opts = options.Options()
        master = DumpMaster(
            opts,
            with_termlog=False,
            with_dumper=False
        )
        master.addons.add(InterceptAD())
        master.addons.remove(*[i for i in master.addons.chain if i.__class__.__name__ == 'ClientPlayback'])
        master.addons.add(MyClientPlayback())
        opts.merge({
            "listen_host": '0.0.0.0',
            "listen_port": 8877,
            "confdir": "~/.mitmproxy_custom",
            "client_replay": [file]
        })
        await master.run()
        return master

    def replay(self):
        asyncio.run(self.read())


if __name__ == '__main__':
    file = "traffic"

    Record().record()
    # Record().replay()

Record().record()记录流量,Record().replay()重放流量,如此便可以对flow进行精细化控制了。

 类似资料: