安全开发--6--一个简单的TCP代理工具开发

吴胜
2023-12-01

本博客地址:https://security.blog.csdn.net/article/details/125284336

一、使用Python3开发TCP代理

TCP代理可以用来分析未知的协议,篡改应用的网络流量,或者为fuzz测试创建测试用例。

本工具主要有4个主要功能:
1、把本地设备和远程设备之间的通信过程显示在屏幕上;
2、从本地设备或远程设备的入口socket接收数据;
3、控制远程设备和本地设备之间的流量方向;
4、创建一个监听socket,并把它传给我们的proxy_handler。

源代码如下(具体逻辑见代码注释):

#!/usr/bin/python
#-*- coding:utf8 -*-

import sys
import socket
import threading

# 所有可打印字符的位置上,保持原有的字符不变
# 所有不可打印字符的位置上,放一个句号“.”
HEX_FILTER = "".join(
    [(len(repr(chr(i))) == 3) and chr(i) or "." for i in range(256)]
)

# 接收bytes或string类型的输入,并将其转换为16进制格式输出到屏幕上
def hexdump(src, length=16, show=True):
    # 只处理string类型的数据,如果是byte类型,则进行类型转换
    if isinstance(src, bytes):
        src = src.decode()
    results = list()

    for i in range(0, len(src), length):
        # 切一小段数据,放到word里面
        word = str(src[i:i+length])
        # 通过trasnslate函数将整段数据转换成可打印字符的形式
        printable = word.translate(HEX_FILTER)
        # 同样的,把这段数据转换成16进制格式
        hexa = " ".join([f"{ord(c):02X}" for c in word])
        hexwidth = length*3
        # 将word变量起始点的偏移、16进制、可打印字符打包成一行字符串
        results.append(f"{i:04x} {hexa:<{hexwidth}} {printable}")

    if show:
        for line in results:
            print (line)
    else:
        return results

# 实时观察代理内数据流通的方法。从代理两端接收数据
def receive_from(connection):
    # 空的byte变量,用来存储socket对象返回的数据
    buffer = b""
    #设置5秒的超时
    connection.settimeout(5)

    try:
        # 把返回的数据写进buffer,直到没有数据或者连接超时
        while True:
            data = connection.recv(4096)
            if not data:
                break
            buffer += data
    except Exception as e:
        pass
    return  buffer

# 代理转发数据包之前,修改请求数据时
def request_handler(buffer):
    return buffer

# 代理转发数据包之前,修改返回数据时
def response_handler(buffer):
    return buffer

def proxy_handler(client_socket, remote_host, remote_port, receive_first):
    # 连接远程主机
    remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    remote_socket.connect((remote_host, remote_port))

    # 确认是否需要从远程主机接收数据,如需要则执行
    if receive_first:
        # 对通信两端分别调用receive_from函数,从已连接的socket对象中收取数据
        remote_buffer = receive_from(remote_socket)
        hexdump(remote_buffer)

    # 发送给我们的响应处理
    remote_buffer = response_handler(remote_buffer)

    # 如果我们有数据传递给本地客户端
    if len(remote_buffer):
        print ("[<==] Sending %d bytes to localhost." % len(remote_buffer))
        client_socket.send(remote_buffer)

    # 从本地循环读取数据,处理数据,转发给远程服务器
    # 从远程服务器读取数据,处理数据,转发给本地客户端
    while True:
        # 从本地读取数据
        local_buffer = receive_from(client_socket)
        if len(local_buffer):
            line = "[==>] Received %d bytes from localhost" % len(local_buffer)
            print (line)
            hexdump(local_buffer)

            # 发送给我们的本地请求
            local_buffer = request_handler(local_buffer)

            # 向远程主机发送数据
            remote_socket.send(local_buffer)
            print ("[==>] Sent to remote.")

        # 接受响应的数据
        remote_buffer = receive_from(remote_socket)

        if len(remote_buffer):
            print ("[<==] Received %d bytes from remote." % len(remote_buffer))
            hexdump(remote_buffer)

            #发送到响应处理函数
            remote_buffer = response_handler(remote_buffer)

            #将响应发送给本地socket
            client_socket.send(remote_buffer)

            print ("[<==] Sent to localhost.")

        #如果两边都没有数据时,关闭连接
        if not len(local_buffer) or not len(remote_buffer):
            client_socket.close()
            remote_socket.close()
            print ("[*] No more data. Closing connections.")

            break

def server_loop(local_host, local_port, remote_host, remote_port, receive_first):
    # 创建一个socket
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 绑定到本地主机并开始监听
    try:
        server.bind((local_host, local_port))
    except Exception as e:
        print ("problem on build: %r" % e)
        print ("[!!] Failed to listen on %s:%d" % (local_host, local_port))
        print ("[!!] Check for other listening sockets or correct permissions.")
        sys.exit(0)

    print ("[*] Listening on %s:%d" % (local_host, local_port))

    server.listen(5)
    # 每出现一个新连接,就开一个新线程,将新连接交给proxy_handler
    while True:
        client_socket, addr = server.accept()

        # 打印出本地连接信息
        line = "> Received incoming connectiong from %s:%d" %  (addr[0], addr[1])
        print (line)
        # proxy_handler给数据流的两端收发数据
        proxy_thread = threading.Thread(target=proxy_handler, args=(client_socket, remote_host, remote_port, receive_first))
        proxy_thread.start()


def main():
    # 没有华丽的命令行解析
    if len(sys.argv[1:]) != 5:
        print ("Usage: ./proxy.py [localhost] [localport]", end = "")
        print ("[remotehost] [remoteport] [receive_first]")
        print ("Example: ./proxy.py 127.0.0.1 9000 10.12.132.1 9000 True")
        sys.exit(0)

    # 设置本地监听参数
    local_host = sys.argv[1]
    local_port = int(sys.argv[2])

    #设置远程目标
    remote_host = sys.argv[3]
    remote_port = int(sys.argv[4])

    #告诉代理在发送给远程主机之前连接和接受数据
    receive_first = sys.argv[5]

    if "True" in receive_first:
        receive_first = True
    else:
        receive_first = False

    #现在设置好我们的监听socket
    server_loop(local_host, local_port, remote_host, remote_port, receive_first)

if __name__ == "__main__":
    main()

二、工具使用

请先仔细阅读以上代码及代码中的注释,理解清楚代码的逻辑。

启动代理:python 111.py 192.168.153.141 21 192.168.153.141 21 True
该代理能够指向一台能够实际响应请求的FTP服务器。

另一个终端新建FTP会话:ftp 192.168.153.141

就能收到FTP服务器的信息了

 类似资料: