示例插件

优质
小牛编辑
119浏览
2023-12-01

示例插件

  • events-http-specific.py-HTTP特定的事件。
  • http-stream-modify.py-修改流式响应。
  • commands-flows.py —将流作为命令参数处理。
  • log-events.py —将消息发布到mitmproxy的事件日志。
  • scripting-minimal-example.py
  • wsgi-flask-app.py —在mitmproxy中托管WSGI应用程序。
  • http-modify-form.py-修改HTTP表单提交。
  • plicate-modify-replay.py —接收传入的HTTP请求,并使用修改后的参数重播它们。
  • http-modify-query-string.py-修改HTTP查询参数。
  • io-write-flow-file.py-生成一个mitmproxy转储文件。
  • events.py-通用事件挂钩。
  • http-add-header.py-将HTTP标头添加到每个响应。
  • tcp-simple.py-处理来自TCP连接的单个消息。
  • anatomy.py-mitmproxy插件的基本骨架。
  • nonblocking.py-使事件挂钩为非阻塞。
  • http-redirect-requests.py-将HTTP请求重定向到另一台服务器。
  • http-stream-simple.py-选择应流式传输的响应。
  • options-configure.py —对配置更改做出反应。
  • http-trailers.py-此脚本仅打印所有接收到的HTTP Trailer。
  • commands-paths.py —将文件路径作为命令参数处理。
  • events-websocket-specific.py-WebSocket特定的事件。
  • websocket-simple.py-处理来自WebSocket连接的单个消息。
  • filter-flows.py —在脚本中使用mitmproxy的过滤器模式。
  • options-simple.py -添加新mitmproxy选项。
  • http-reply-from-proxy.py-从代理发送答复,而不向远程服务器发送任何数据。
  • commands-simple.py —将自定义命令添加到mitmproxy的命令提示符。
  • internet_in_mirror.py-镜像所有网页。
  • contentview.py —添加自定义消息正文漂亮打印机,以在mitmproxy内部使用。
  • io-read-saved-flows.py —读取mitmproxy转储文件。
  • websocket-inject-message.py —将WebSocket消息插入正在运行的连接中。
  • events-tcp-specific.py-特定于TCP的事件。

社区实例

由mitmproxy社区贡献的其他示例可以 在GitHub上找到。

示例:events-http-specific.py

"""HTTP-specific events."""
import mitmproxy.http

class Events:
    def http_connect(self, flow: mitmproxy.http.HTTPFlow):
        """
            An HTTP CONNECT request was received. Setting a non 2xx response on
            the flow will return the response to the client abort the
            connection. CONNECT requests and responses do not generate the usual
            HTTP handler events. CONNECT requests are only valid in regular and
            upstream proxy modes.
        """

    def requestheaders(self, flow: mitmproxy.http.HTTPFlow):
        """
            HTTP request headers were successfully read. At this point, the body
            is empty.
        """

    def request(self, flow: mitmproxy.http.HTTPFlow):
        """
            The full HTTP request has been read.
        """

    def responseheaders(self, flow: mitmproxy.http.HTTPFlow):
        """
            HTTP response headers were successfully read. At this point, the body
            is empty.
        """

    def response(self, flow: mitmproxy.http.HTTPFlow):
        """
            The full HTTP response has been read.
        """

    def error(self, flow: mitmproxy.http.HTTPFlow):
        """
            An HTTP error has occurred, e.g. invalid server responses, or
            interrupted connections. This is distinct from a valid server HTTP
            error response, which is simply a response with an HTTP error code.
        """

示例:http-stream-modify.py

"""
Modify a streamed response.

Generally speaking, we recommend *not* to stream messages you need to modify.
Modifying streamed responses is tricky and brittle:
    - If the transfer encoding isn't chunked, you cannot simply change the content length.
    - If you want to replace all occurrences of "foobar", make sure to catch the cases
      where one chunk ends with [...]foo" and the next starts with "bar[...].
"""

def modify(chunks):
    """
    chunks is a generator that can be used to iterate over all chunks.
    """
    for chunk in chunks:
        yield chunk.replace("foo", "bar")

def responseheaders(flow):
    flow.response.stream = modify

### 示例:commands-flows.py
"""Handle flows as command arguments."""
import typing

from mitmproxy import command
from mitmproxy import ctx
from mitmproxy import flow

class MyAddon:
    @command.command("myaddon.addheader")
    def addheader(self, flows: typing.Sequence[flow.Flow]) -> None:
        for f in flows:
            f.request.headers["myheader"] = "value"
        ctx.log.alert("done")

addons = [
    MyAddon()
]

示例:log-events.py

"""Post messages to mitmproxy's event log."""
from mitmproxy import ctx

def load(l):
    ctx.log.info("This is some informative text.")
    ctx.log.warn("This is a warning.")
    ctx.log.error("This is an error.")

示例:scripting-minimal-example.py

def request(flow):
    flow.request.headers["myheader"] = "value"

示例:wsgi-flask-app.py

"""
Host a WSGI app in mitmproxy.

This example shows how to graft a WSGI app onto mitmproxy. In this
instance, we're using the Flask framework (http://flask.pocoo.org/) to expose
a single simplest-possible page.
"""
from flask import Flask
from mitmproxy.addons import asgiapp

app = Flask("proxapp")

@app.route('/')
def hello_world() -> str:
    return 'Hello World!'

addons = [
    # Host app at the magic domain "example.com" on port 80. Requests to this
    # domain and port combination will now be routed to the WSGI app instance.
    asgiapp.WSGIApp(app, "example.com", 80)
    # SSL works too, but the magic domain needs to be resolvable from the mitmproxy machine due to mitmproxy's design.
    # mitmproxy will connect to said domain and use serve its certificate (unless --no-upstream-cert is set)
    # but won't send any data.
    # mitmproxy.ctx.master.apps.add(app, "example.com", 443)
]

示例:http-modify-form.py

"""Modify an HTTP form submission."""
from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    if flow.request.urlencoded_form:
        # If there's already a form, one can just add items to the dict:
        flow.request.urlencoded_form["mitmproxy"] = "rocks"
    else:
        # One can also just pass new form data.
        # This sets the proper content type and overrides the body.
        flow.request.urlencoded_form = [
            ("foo", "bar")
        ]

示例:duplicate-modify-replay.py

"""Take incoming HTTP requests and replay them with modified parameters."""
from mitmproxy import ctx

def request(flow):
    # Avoid an infinite loop by not replaying already replayed requests
    if flow.is_replay == "request":
        return
    flow = flow.copy()
    # Only interactive tools have a view. If we have one, add a duplicate entry
    # for our flow.
    if "view" in ctx.master.addons:
        ctx.master.commands.call("view.flows.add", [flow])
    flow.request.path = "/changed"
    ctx.master.commands.call("replay.client", [flow])

示例:http-modify-query-string.py

"""Modify HTTP query parameters."""
from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    flow.request.query["mitmproxy"] = "rocks"

示例:io-write-flow-file.py

"""
Generate a mitmproxy dump file.

This script demonstrates how to generate a mitmproxy dump file,
as it would also be generated by passing `-w` to mitmproxy.
In contrast to `-w`, this gives you full control over which
flows should be saved and also allows you to rotate files or log
to multiple files in parallel.
"""
import random
import sys
from mitmproxy import io, http
import typing  # noqa

class Writer:
    def __init__(self, path: str) -> None:
        self.f: typing.IO[bytes] = open(path, "wb")
        self.w = io.FlowWriter(self.f)

    def response(self, flow: http.HTTPFlow) -> None:
        if random.choice([True, False]):
            self.w.add(flow)

    def done(self):
        self.f.close()

addons = [Writer(sys.argv[1])]

示例:events.py

"""Generic event hooks."""
import typing

import mitmproxy.addonmanager
import mitmproxy.connections
import mitmproxy.log
import mitmproxy.proxy.protocol

class Events:
    # Network lifecycle
    def clientconnect(self, layer: mitmproxy.proxy.protocol.Layer):
        """
            A client has connected to mitmproxy. Note that a connection can
            correspond to multiple HTTP requests.
        """

    def clientdisconnect(self, layer: mitmproxy.proxy.protocol.Layer):
        """
            A client has disconnected from mitmproxy.
        """

    def serverconnect(self, conn: mitmproxy.connections.ServerConnection):
        """
            Mitmproxy has connected to a server. Note that a connection can
            correspond to multiple requests.
        """

    def serverdisconnect(self, conn: mitmproxy.connections.ServerConnection):
        """
            Mitmproxy has disconnected from a server.
        """

    def next_layer(self, layer: mitmproxy.proxy.protocol.Layer):
        """
            Network layers are being switched. You may change which layer will
            be used by returning a new layer object from this event.
        """

    # General lifecycle
    def configure(self, updated: typing.Set[str]):
        """
            Called when configuration changes. The updated argument is a
            set-like object containing the keys of all changed options. This
            event is called during startup with all options in the updated set.
        """

    def done(self):
        """
            Called when the addon shuts down, either by being removed from
            the mitmproxy instance, or when mitmproxy itself shuts down. On
            shutdown, this event is called after the event loop is
            terminated, guaranteeing that it will be the final event an addon
            sees. Note that log handlers are shut down at this point, so
            calls to log functions will produce no output.
        """

    def load(self, entry: mitmproxy.addonmanager.Loader):
        """
            Called when an addon is first loaded. This event receives a Loader
            object, which contains methods for adding options and commands. This
            method is where the addon configures itself.
        """

    def log(self, entry: mitmproxy.log.LogEntry):
        """
            Called whenever a new log entry is created through the mitmproxy
            context. Be careful not to log from this event, which will cause an
            infinite loop!
        """

    def running(self):
        """
            Called when the proxy is completely up and running. At this point,
            you can expect the proxy to be bound to a port, and all addons to be
            loaded.
        """

    def update(self, flows: typing.Sequence[mitmproxy.flow.Flow]):
        """
            Update is called when one or more flow objects have been modified,
            usually from a different addon.
        """

示例:http-add-header.py

"""Add an HTTP header to each response."""

class AddHeader:
    def __init__(self):
        self.num = 0

    def response(self, flow):
        self.num = self.num + 1
        flow.response.headers["count"] = str(self.num)

addons = [
    AddHeader()
]

示例:tcp-simple.py

"""
Process individual messages from a TCP connection.

This script replaces full occurences of "foo" with "bar" and prints various details for each message.
Please note that TCP is stream-based and *not* message-based. mitmproxy splits stream contents into "messages"
as they are received by socket.recv(). This is pretty arbitrary and should not be relied on.
However, it is sometimes good enough as a quick hack.

Example Invocation:

    mitmdump --rawtcp --tcp-hosts ".*" -s examples/tcp-simple.py
"""
from mitmproxy.utils import strutils
from mitmproxy import ctx
from mitmproxy import tcp

def tcp_message(flow: tcp.TCPFlow):
    message = flow.messages[-1]
    message.content = message.content.replace(b"foo", b"bar")

    ctx.log.info(
        f"tcp_message[from_client={message.from_client}), content={strutils.bytes_to_escaped_str(message.content)}]"
    )

示例:anatomy.py

"""
Basic skeleton of a mitmproxy addon.

Run as follows: mitmproxy -s anatomy.py
"""
from mitmproxy import ctx

class Counter:
    def __init__(self):
        self.num = 0

    def request(self, flow):
        self.num = self.num + 1
        ctx.log.info("We've seen %d flows" % self.num)

addons = [
    Counter()
]

示例:nonblocking.py

"""
Make events hooks non-blocking.

When event hooks are decorated with @concurrent, they will be run in their own thread, freeing the main event loop.
Please note that this generally opens the door to race conditions and decreases performance if not required.
"""
import time

from mitmproxy.script import concurrent

@concurrent  # Remove this and see what happens
def request(flow):
    # This is ugly in mitmproxy's UI, but you don't want to use mitmproxy.ctx.log from a different thread.
    print("handle request: %s%s" % (flow.request.host, flow.request.path))
    time.sleep(5)
    print("start  request: %s%s" % (flow.request.host, flow.request.path))

示例:http-redirect-requests.py

"""Redirect HTTP requests to another server."""
from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    # pretty_host takes the "Host" header of the request into account,
    # which is useful in transparent mode where we usually only have the IP
    # otherwise.
    if flow.request.pretty_host == "example.org":
        flow.request.host = "mitmproxy.org"

示例:http-stream-simple.py

"""
Select which responses should be streamed.

Enable response streaming for all HTTP flows.
This is equivalent to passing `--set stream_large_bodies=1` to mitmproxy.
"""

def responseheaders(flow):
    """
    Enables streaming for all responses.
    This is equivalent to passing `--set stream_large_bodies=1` to mitmproxy.
    """
    flow.response.stream = True

示例:options-configure.py

"""React to configuration changes."""
import typing

from mitmproxy import ctx
from mitmproxy import exceptions

class AddHeader:
    def load(self, loader):
        loader.add_option(
            name = "addheader",
            typespec = typing.Optional[int],
            default = None,
            help = "Add a header to responses",
        )

    def configure(self, updates):
        if "addheader" in updates:
            if ctx.options.addheader is not None and ctx.options.addheader > 100:
                raise exceptions.OptionsError("addheader must be <= 100")

    def response(self, flow):
        if ctx.options.addheader is not None:
            flow.response.headers["addheader"] = str(ctx.options.addheader)

addons = [
    AddHeader()
]

示例:http-trailers.py

"""
This script simply prints all received HTTP Trailers.

HTTP requests and responses can container trailing headers which are sent after
the body is fully transmitted. Such trailers need to be announced in the initial
headers by name, so the receiving endpoint can wait and read them after the
body.
"""

from mitmproxy import http
from mitmproxy.net.http import Headers

def request(flow: http.HTTPFlow):
    if flow.request.trailers:
        print("HTTP Trailers detected! Request contains:", flow.request.trailers)

    if flow.request.path == "/inject_trailers":
        if flow.request.is_http10:
            # HTTP/1.0 doesn't support trailers
            return
        elif flow.request.is_http11:
            if not flow.request.content:
                # Avoid sending a body on GET requests or a 0 byte chunked body with trailers.
                # Otherwise some servers return 400 Bad Request.
                return
            # HTTP 1.1 requires transfer-encoding: chunked to send trailers
            flow.request.headers["transfer-encoding"] = "chunked"
        # HTTP 2+ supports trailers on all requests/responses

        flow.request.headers["trailer"] = "x-my-injected-trailer-header"
        flow.request.trailers = Headers([
            (b"x-my-injected-trailer-header", b"foobar")
        ])
        print("Injected a new request trailer...", flow.request.headers["trailer"])

def response(flow: http.HTTPFlow):
    if flow.response.trailers:
        print("HTTP Trailers detected! Response contains:", flow.response.trailers)

    if flow.request.path == "/inject_trailers":
        if flow.request.is_http10:
            return
        elif flow.request.is_http11:
            if not flow.response.content:
                return
            flow.response.headers["transfer-encoding"] = "chunked"

        flow.response.headers["trailer"] = "x-my-injected-trailer-header"
        flow.response.trailers = Headers([
            (b"x-my-injected-trailer-header", b"foobar")
        ])
        print("Injected a new response trailer...", flow.response.headers["trailer"])

示例:commands-paths.py

"""Handle file paths as command arguments."""
import typing

from mitmproxy import command
from mitmproxy import ctx
from mitmproxy import flow
from mitmproxy import types

class MyAddon:
    @command.command("myaddon.histogram")
    def histogram(
        self,
        flows: typing.Sequence[flow.Flow],
        path: types.Path,
    ) -> None:
        totals = {}
        for f in flows:
            totals[f.request.host] = totals.setdefault(f.request.host, 0) + 1

        with open(path, "w+") as fp:
            for cnt, dom in sorted([(v, k) for (k, v) in totals.items()]):
                fp.write("%s: %s\n" % (cnt, dom))

        ctx.log.alert("done")

addons = [
    MyAddon()
]

示例:events-websocket-specific.py

"""WebSocket-specific events."""
import mitmproxy.http
import mitmproxy.websocket

class Events:
    # Websocket lifecycle
    def websocket_handshake(self, flow: mitmproxy.http.HTTPFlow):
        """
            Called when a client wants to establish a WebSocket connection. The
            WebSocket-specific headers can be manipulated to alter the
            handshake. The flow object is guaranteed to have a non-None request
            attribute.
        """

    def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
        """
            A websocket connection has commenced.
        """

    def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
        """
            Called when a WebSocket message is received from the client or
            server. The most recent message will be flow.messages[-1]. The
            message is user-modifiable. Currently there are two types of
            messages, corresponding to the BINARY and TEXT frame types.
        """

    def websocket_error(self, flow: mitmproxy.websocket.WebSocketFlow):
        """
            A websocket connection has had an error.
        """

    def websocket_end(self, flow: mitmproxy.websocket.WebSocketFlow):
        """
            A websocket connection has ended.
        """

示例:websocket-simple.py

"""Process individual messages from a WebSocket connection."""
import re
from mitmproxy import ctx

def websocket_message(flow):
    # get the latest message
    message = flow.messages[-1]

    # was the message sent from the client or server?
    if message.from_client:
        ctx.log.info("Client sent a message: {}".format(message.content))
    else:
        ctx.log.info("Server sent a message: {}".format(message.content))

    # manipulate the message content
    message.content = re.sub(r'^Hello', 'HAPPY', message.content)

    if 'FOOBAR' in message.content:
        # kill the message and not send it to the other endpoint
        message.kill()

示例:filter-flows.py

"""
Use mitmproxy's filter pattern in scripts.
"""
from mitmproxy import flowfilter
from mitmproxy import ctx, http

class Filter:
    def __init__(self):
        self.filter: flowfilter.TFilter = None

    def configure(self, updated):
        self.filter = flowfilter.parse(ctx.options.flowfilter)

    def load(self, l):
        l.add_option(
            "flowfilter", str, "", "Check that flow matches filter."
        )

    def response(self, flow: http.HTTPFlow) -> None:
        if flowfilter.match(self.filter, flow):
            ctx.log.info("Flow matches filter:")
            ctx.log.info(flow)

addons = [Filter()]

示例:options-simple.py

"""
Add a new mitmproxy option.

Usage:

    mitmproxy -s options-simple.py --set addheader true
"""
from mitmproxy import ctx

class AddHeader:
    def __init__(self):
        self.num = 0

    def load(self, loader):
        loader.add_option(
            name = "addheader",
            typespec = bool,
            default = False,
            help = "Add a count header to responses",
        )

    def response(self, flow):
        if ctx.options.addheader:
            self.num = self.num + 1
            flow.response.headers["count"] = str(self.num)

addons = [
    AddHeader()
]

示例:http-reply-from-proxy.py

"""Send a reply from the proxy without sending any data to the remote server."""
from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    if flow.request.pretty_url == "http://example.com/path":
        flow.response = http.HTTPResponse.make(
            200,  # (optional) status code
            b"Hello World",  # (optional) content
            {"Content-Type": "text/html"}  # (optional) headers
        )

示例:commands-simple.py

"""Add a custom command to mitmproxy's command prompt."""
from mitmproxy import command
from mitmproxy import ctx

class MyAddon:
    def __init__(self):
        self.num = 0

    @command.command("myaddon.inc")
    def inc(self) -> None:
        self.num += 1
        ctx.log.info(f"num = {self.num}")

addons = [
    MyAddon()
]

示例:internet_in_mirror.py

"""
Mirror all web pages.

Useful if you are living down under.
"""
from mitmproxy import http

def response(flow: http.HTTPFlow) -> None:
    reflector = b"<style>body {transform: scaleX(-1);}</style></head>"
    flow.response.content = flow.response.content.replace(b"</head>", reflector)

示例:contentview.py

"""
Add a custom message body pretty-printer for use inside mitmproxy.

This example shows how one can add a custom contentview to mitmproxy,
which is used to pretty-print HTTP bodies for example.
The content view API is explained in the mitmproxy.contentviews module.
"""
from mitmproxy import contentviews

class ViewSwapCase(contentviews.View):
    name = "swapcase"
    content_types = ["text/plain"]

    def __call__(self, data, **metadata) -> contentviews.TViewResult:
        return "case-swapped text", contentviews.format_text(data.swapcase())

view = ViewSwapCase()

def load(l):
    contentviews.add(view)

def done():
    contentviews.remove(view)

示例:io-read-saved-flows.py

#!/usr/bin/env python
"""
Read a mitmproxy dump file.
"""
from mitmproxy import io
from mitmproxy.exceptions import FlowReadException
import pprint
import sys

with open(sys.argv[1], "rb") as logfile:
    freader = io.FlowReader(logfile)
    pp = pprint.PrettyPrinter(indent=4)
    try:
        for f in freader.stream():
            print(f)
            print(f.request.host)
            pp.pprint(f.get_state())
            print("")
    except FlowReadException as e:
        print("Flow file corrupted: {}".format(e))

示例:websocket-inject-message.py

"""
Inject a WebSocket message into a running connection.

This example shows how to inject a WebSocket message to the client.
Every new WebSocket connection will trigger a new asyncio task that
periodically injects a new message to the client.
"""
import asyncio
import mitmproxy.websocket

class InjectWebSocketMessage:
    async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
        i = 0
        while not flow.ended and not flow.error:
            await asyncio.sleep(5)
            flow.inject_message(flow.client_conn, f'This is the #{i} injected message!')
            i += 1

    def websocket_start(self, flow):
        asyncio.get_event_loop().create_task(self.inject(flow))

addons = [InjectWebSocketMessage()]

示例:events-tcp-specific.py

"""TCP-specific events."""
import mitmproxy.tcp

class Events:
    def tcp_start(self, flow: mitmproxy.tcp.TCPFlow):
        """
            A TCP connection has started.
        """

    def tcp_message(self, flow: mitmproxy.tcp.TCPFlow):
        """
            A TCP connection has received a message. The most recent message
            will be flow.messages[-1]. The message is user-modifiable.
        """

    def tcp_error(self, flow: mitmproxy.tcp.TCPFlow):
        """
            A TCP error has occurred.
        """

    def tcp_end(self, flow: mitmproxy.tcp.TCPFlow):
        """
            A TCP connection has ended.
        """