当前位置: 首页 > 工具软件 > Scapy > 使用案例 >

Python scapy抓包程序

卫寒
2023-12-01

尝试使用Python scapy 包中 sniff 函数写个简单的抓包程序,sniff 抓取数据包并写入本地文件

1. 安装scapy,windows7 系统需要先安装 npcap,pip 之后 ,简单尝试scapy命令,发现提示 warning: No route found ,网上都是ipv6的问题,自己尝试解决,运行scapy的conf.route命令,结果只有一个虚拟网卡,其他网卡信息没有读取,估计是npcap的问题, 在安装npcap时提示win7需要kb4474419补丁,下载安装后问题解决。

2. 使用sniff函数,可以在filter 中定义抓取的条件,在抓取过程中使用回调函数完成写入文件操作。

3. sniff不会识别http层,需要安装scapy_http 包,pip安装后发现没有用,查看源码

bind_layers(TCP, HTTP, dport=80)
bind_layers(TCP, HTTP, sport=80)

原来是绑定了端口,看看源码大概是通过raw层头部信息来判断,如 有get post字符的是http request。放弃http包,自己尝试写在回调函数中

4. tcp 传输过程中会有1500字节最大传输单元限制,包括tcp头部信息等内容,单次传输大概1480字节,分片传输后在重新组合。在抓包的数据中分辨不出来是http request还是http response,使用wireshark软件查看,这种分片情况会标记 tcp segment of a reassembled PDU,可以通过ip 端口等信息判断,对于同一源ip端口和目的源ip端口,相同的接收长度ack,发送长度seq不同的数据包为tcp分片传输。写入文件时标记[tcp segment],上一数据包seq ack信息字典形式保存到内存中

源码如下

from scapy.all import *
from io import StringIO
import json
def fun_scapy():
    print('start sniff packet')
    if os.path.isfile('D:/packet.txt'):
        os.remove('D:/packet.txt')
    sniff(filter='host 192.168.1.100',prn=callback)


def callback(packet):
    global buff

    with open('D:/packet.txt','a') as f:
        s = packet.summary()
        if packet.haslayer('TCP'):
            tcp_seq = packet['TCP'].seq
            tcp_ack = packet['TCP'].ack
            last_seq_ack_key = str(packet['IP'].src) + str(packet['IP'].sport) +          str(packet['IP'].dst) + str(packet['IP'].dport)
            if 'buff' in globals().keys():
                last_seq_ack = buff.getvalue()
                last_seq_ack = json.loads(last_seq_ack)
            else:
                last_seq_ack={last_seq_ack_key:('','')}
            if packet.haslayer('Raw'):
                raw_data = packet['Raw'].load
                index = raw_data.find('\r\n\r\n'.encode())
                if index != -1:
                    r= raw_data[:index+4].decode()
                    pattern_request = re.compile(r"^(?:OPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT) "r"(?:.+?) "r"(HTTP/\d\.\d)")
                    pattern_reponse = re.compile(r"^HTTP/\d\.\d \d\d\d .*")
                    if pattern_request.match(r):
                        f.write(s + ' seq:' + str(tcp_seq) + ' ack:' + str(tcp_ack) + ' len:' + str(len(raw_data)) +' HTTP Request'+'\n')
                    elif pattern_reponse.match(r):
                        f.write(s + ' seq:' + str(tcp_seq) + ' ack:' + str(tcp_ack) + ' len:' + str(len(raw_data)) + ' HTTP Reponse' + '\n')
                    else:
                        f.write(s + ' seq:' + str(tcp_seq) + ' ack:' + str(tcp_ack) + ' len:' + str(len(raw_data)) +'\n')
                    try:
                        r = r + raw_data[index+4:].decode()
                    except Exception as e:
                        r = r + str(raw_data[index+4:])
                    f.write(r + '\n')
                else:
                    last_seq = last_seq_ack[last_seq_ack_key][0]
                    last_ack = last_seq_ack[last_seq_ack_key][1]
                    if tcp_seq != last_seq and tcp_ack == last_ack:
                        append_info = ' [TCP segment]'
                    else:
                        append_info = ''
                    try:
                        r = raw_data.decode()
                    except Exception as e:
                        r = str(raw_data)
                    f.write(s + ' seq:' + str(tcp_seq) + ' ack:' + str(tcp_ack) + ' len:' + str(len(raw_data)) +append_info+ '\n')
                    f.write(r +'\n')
            else:
                f.write(s+' seq:'+str(tcp_seq)+' ack:'+str(tcp_ack)+' len:0'+'\n')


            last_seq_ack[last_seq_ack_key] = (tcp_seq,tcp_ack)
            last_seq_ack = json.dumps(last_seq_ack)
            buff = StringIO(last_seq_ack)
        else:
            f.write(s+'\n')
        f.write('\n')
    print(packet.summary())


if __name__ == '__main__':
     fun_scapy()
 类似资料: