mitmproxy录制流量并重放,官方提供了简短的例子
https://docs.mitmproxy.org/stable/tute-clientreplay/
mitmdump -w wireless-login
mitmdump -C wireless-login
这样只是死板的录制重放,我们并不能控制中间流程。为了控制中间流程,我看了源码并提取出一个模版:
mitmproxy版本:8.1.1
from typing import BinaryIO
from mitmproxy import io
from mitmproxy.http import HTTPFlow
from mitmproxy.exceptions import FlowReadException
from urllib.parse import parse_qs
import time
import asyncio
from mitmproxy import options
from mitmproxy.tools.dump import DumpMaster
from mitmproxy.addons.clientplayback import ClientPlayback
from mitmproxy import ctx
from mitmproxy import exceptions
class MyClientPlayback(ClientPlayback):
"""方便对flows进行管理, 以后flow太多可以转为数据库存取"""
def get_flows(self) -> list:
flows = io.read_flows_from_paths(ctx.options.client_replay) # 读取记录的flow
flows.reverse()
return flows
def configure(self, updated):
if "client_replay" in updated and ctx.options.client_replay:
try:
flows = self.get_flows()
except exceptions.FlowReadException as e:
raise exceptions.OptionsError(str(e))
self.start_replay(flows)
if "client_replay_concurrency" in updated:
if ctx.options.client_replay_concurrency not in [-1, 1]:
raise exceptions.OptionsError(
"Currently the only valid client_replay_concurrency values are -1 and 1."
)
class Writer:
def __init__(self, path: str) -> None:
self.f: BinaryIO = open(path, "ab")
self.w = io.FlowWriter(self.f)
def response(self, flow: HTTPFlow) -> None:
print(f"记录flow")
self.w.add(flow)
def done(self):
print("writer done")
self.f.close()
class InterceptAD:
def response(self, flow):
print(flow.request.url)
time.sleep(15)
class Record:
async def write(self):
opts = options.Options(listen_host='0.0.0.0', listen_port=8877, confdir="~/.mitmproxy_custom")
master = DumpMaster(
opts,
with_termlog=False,
with_dumper=False
)
master.addons.add(Writer(file))
await master.run()
return master
def record(self):
asyncio.run(self.write())
async def read(self):
opts = options.Options()
master = DumpMaster(
opts,
with_termlog=False,
with_dumper=False
)
master.addons.add(InterceptAD())
master.addons.remove(*[i for i in master.addons.chain if i.__class__.__name__ == 'ClientPlayback'])
master.addons.add(MyClientPlayback())
opts.merge({
"listen_host": '0.0.0.0',
"listen_port": 8877,
"confdir": "~/.mitmproxy_custom",
"client_replay": [file]
})
await master.run()
return master
def replay(self):
asyncio.run(self.read())
if __name__ == '__main__':
file = "traffic"
Record().record()
# Record().replay()
Record().record()记录流量,Record().replay()重放流量,如此便可以对flow进行精细化控制了。