本博客地址:https://security.blog.csdn.net/article/details/125284336
TCP代理可以用来分析未知的协议,篡改应用的网络流量,或者为fuzz测试创建测试用例。
本工具主要有4个主要功能:
1、把本地设备和远程设备之间的通信过程显示在屏幕上;
2、从本地设备或远程设备的入口socket接收数据;
3、控制远程设备和本地设备之间的流量方向;
4、创建一个监听socket,并把它传给我们的proxy_handler。
源代码如下(具体逻辑见代码注释):
#!/usr/bin/python
#-*- coding:utf8 -*-
import sys
import socket
import threading
# 所有可打印字符的位置上,保持原有的字符不变
# 所有不可打印字符的位置上,放一个句号“.”
HEX_FILTER = "".join(
[(len(repr(chr(i))) == 3) and chr(i) or "." for i in range(256)]
)
# 接收bytes或string类型的输入,并将其转换为16进制格式输出到屏幕上
def hexdump(src, length=16, show=True):
# 只处理string类型的数据,如果是byte类型,则进行类型转换
if isinstance(src, bytes):
src = src.decode()
results = list()
for i in range(0, len(src), length):
# 切一小段数据,放到word里面
word = str(src[i:i+length])
# 通过trasnslate函数将整段数据转换成可打印字符的形式
printable = word.translate(HEX_FILTER)
# 同样的,把这段数据转换成16进制格式
hexa = " ".join([f"{ord(c):02X}" for c in word])
hexwidth = length*3
# 将word变量起始点的偏移、16进制、可打印字符打包成一行字符串
results.append(f"{i:04x} {hexa:<{hexwidth}} {printable}")
if show:
for line in results:
print (line)
else:
return results
# 实时观察代理内数据流通的方法。从代理两端接收数据
def receive_from(connection):
# 空的byte变量,用来存储socket对象返回的数据
buffer = b""
#设置5秒的超时
connection.settimeout(5)
try:
# 把返回的数据写进buffer,直到没有数据或者连接超时
while True:
data = connection.recv(4096)
if not data:
break
buffer += data
except Exception as e:
pass
return buffer
# 代理转发数据包之前,修改请求数据时
def request_handler(buffer):
return buffer
# 代理转发数据包之前,修改返回数据时
def response_handler(buffer):
return buffer
def proxy_handler(client_socket, remote_host, remote_port, receive_first):
# 连接远程主机
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote_socket.connect((remote_host, remote_port))
# 确认是否需要从远程主机接收数据,如需要则执行
if receive_first:
# 对通信两端分别调用receive_from函数,从已连接的socket对象中收取数据
remote_buffer = receive_from(remote_socket)
hexdump(remote_buffer)
# 发送给我们的响应处理
remote_buffer = response_handler(remote_buffer)
# 如果我们有数据传递给本地客户端
if len(remote_buffer):
print ("[<==] Sending %d bytes to localhost." % len(remote_buffer))
client_socket.send(remote_buffer)
# 从本地循环读取数据,处理数据,转发给远程服务器
# 从远程服务器读取数据,处理数据,转发给本地客户端
while True:
# 从本地读取数据
local_buffer = receive_from(client_socket)
if len(local_buffer):
line = "[==>] Received %d bytes from localhost" % len(local_buffer)
print (line)
hexdump(local_buffer)
# 发送给我们的本地请求
local_buffer = request_handler(local_buffer)
# 向远程主机发送数据
remote_socket.send(local_buffer)
print ("[==>] Sent to remote.")
# 接受响应的数据
remote_buffer = receive_from(remote_socket)
if len(remote_buffer):
print ("[<==] Received %d bytes from remote." % len(remote_buffer))
hexdump(remote_buffer)
#发送到响应处理函数
remote_buffer = response_handler(remote_buffer)
#将响应发送给本地socket
client_socket.send(remote_buffer)
print ("[<==] Sent to localhost.")
#如果两边都没有数据时,关闭连接
if not len(local_buffer) or not len(remote_buffer):
client_socket.close()
remote_socket.close()
print ("[*] No more data. Closing connections.")
break
def server_loop(local_host, local_port, remote_host, remote_port, receive_first):
# 创建一个socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定到本地主机并开始监听
try:
server.bind((local_host, local_port))
except Exception as e:
print ("problem on build: %r" % e)
print ("[!!] Failed to listen on %s:%d" % (local_host, local_port))
print ("[!!] Check for other listening sockets or correct permissions.")
sys.exit(0)
print ("[*] Listening on %s:%d" % (local_host, local_port))
server.listen(5)
# 每出现一个新连接,就开一个新线程,将新连接交给proxy_handler
while True:
client_socket, addr = server.accept()
# 打印出本地连接信息
line = "> Received incoming connectiong from %s:%d" % (addr[0], addr[1])
print (line)
# proxy_handler给数据流的两端收发数据
proxy_thread = threading.Thread(target=proxy_handler, args=(client_socket, remote_host, remote_port, receive_first))
proxy_thread.start()
def main():
# 没有华丽的命令行解析
if len(sys.argv[1:]) != 5:
print ("Usage: ./proxy.py [localhost] [localport]", end = "")
print ("[remotehost] [remoteport] [receive_first]")
print ("Example: ./proxy.py 127.0.0.1 9000 10.12.132.1 9000 True")
sys.exit(0)
# 设置本地监听参数
local_host = sys.argv[1]
local_port = int(sys.argv[2])
#设置远程目标
remote_host = sys.argv[3]
remote_port = int(sys.argv[4])
#告诉代理在发送给远程主机之前连接和接受数据
receive_first = sys.argv[5]
if "True" in receive_first:
receive_first = True
else:
receive_first = False
#现在设置好我们的监听socket
server_loop(local_host, local_port, remote_host, remote_port, receive_first)
if __name__ == "__main__":
main()
请先仔细阅读以上代码及代码中的注释,理解清楚代码的逻辑。
启动代理:python 111.py 192.168.153.141 21 192.168.153.141 21 True
该代理能够指向一台能够实际响应请求的FTP服务器。
另一个终端新建FTP会话:ftp 192.168.153.141
就能收到FTP服务器的信息了