当前位置: 首页 > 工具软件 > QQWry > 使用案例 >

qqwry 解析(python3) 并且dump 到 mysql

丰超
2023-12-01

准备工作

  1. python 3.7 + IDE
  2. internet

致谢

本项目借鉴于qqwry-py3,相关文件已附上地址:
https://pypi.org/project/qqwry-py3/

吐槽

搞了一下午,总算出来了,qqwry-py3中解析那部分写的是非常棒,但是在更新文件这里,UA头可能是以前的,现在不能用了,替换一下就欧克了。
数据库这部分通过索引做的单文件,来回来去绕的我头发懵,dump都毫无思路,经过不懈的努力,终于搞出来了,主要对于这个数据库不是查询,而是dump,然后可以通过这些数据做自己想做的事情了,就可以脱离原来的那个软件了。

声明

能力有限,大佬轻喷。

直接上代码吧

# sql文件
CREATE TABLE `ipdb` (
  `startip` varchar(15) DEFAULT NULL,
  `endip` varchar(15) DEFAULT NULL,
  `startip_iton` varchar(11) DEFAULT NULL,
  `endip_iton` varchar(11) DEFAULT NULL,
  `address` varchar(256) DEFAULT NULL,
  KEY `s` (`startip`),
  KEY `e` (`endip`),
  KEY `l` (`address`),
  KEY `si` (`startip_iton`),
  KEY `ei` (`endip_iton`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
import requests
import re
import MySQLdb
import conf # 这个是我自己做的包
import zlib
import struct
import socket
import warnings
import os
database_info = conf.getdb() # 这里是获取mysql数据库的方法。
ver_url = 'http://update.cz88.net/ip/copywrite.rar'
db_url = 'http://update.cz88.net/ip/qqwry.rar'
db = MySQLdb.connect(database_info['ip'], database_info['user'], database_info['password'], database_info['db'], charset=database_info['charset']) # 连接到你的mysql数据库
headers={
    'User-agent':'Mozilla/3.0 (compatible; Indy Library)',
    'Accept':'text/html, */*',
    'Host': 'update.cz88.net'
} # 抓包而来的请求头


def int3(data, offset): # 此代码来源于 qqwry-py3中
    return data[offset] + (data[offset + 1] << 8) + \
           (data[offset + 2] << 16)


def int4(data, offset):# 此代码来源于 qqwry-py3中
    return data[offset] + (data[offset + 1] << 8) + \
           (data[offset + 2] << 16) + (data[offset + 3] << 24)
class wrydb: # 此代码来源于 qqwry-py3中,但是我做了一小部分的修改
    def __init__(self):
        self.clear()

    def clear(self):
        self.idx1 = None
        self.idx2 = None
        self.idxo = None
        self.data = None
        self.index_begin = -1
        self.index_end = -1
        self.index_count = -1
        self.__fun = None

    def load_file(self, filename, loadindex=False):
        self.clear()
        if type(filename) == bytes:
            self.data = buffer = filename
            filename = 'memory data'
        elif type(filename) == str:
            # read file
            try:
                with open(filename, 'br') as f:
                    self.data = buffer = f.read()
                    self.data2 = open(filename, 'rb')
            except Exception as e:
                print('[!] Open or load failed:', e)
                self.clear()
                return False

            if self.data == None:
                print('[!] %s load failed' % filename)
                self.clear()
                return False
        else:
            self.clear()
            return False

        if len(buffer) < 8:
            print('[!] %s load failed, file only %d bytes' %
                  (filename, len(buffer))
                  )
            self.clear()
            return False
        index_begin = int4(buffer, 0)
        index_end = int4(buffer, 4)
        if index_begin > index_end or \
                (index_end - index_begin) % 7 != 0 or \
                index_end + 7 > len(buffer):
            print('[!] %s index error' % filename)
            self.clear()
            return False
        self.index_begin = index_begin
        self.index_end = index_end
        self.index_count = (index_end - index_begin) // 7 + 1

        if not loadindex:
            print('[#] %s %s bytes, %d segments.' %
                  (filename, format(len(buffer), ','), self.index_count)
                  )
            self.__fun = self.raw_search
            return True

    def __get_addr(self, offset):
        mode = self.data[offset]
        if mode == 1:
            offset = int3(self.data, offset + 1)
            mode = self.data[offset]
        if mode == 2:
            off1 = int3(self.data, offset + 1)
            c = self.data[off1:self.data.index(b'\x00', off1)]
            offset += 4
        else:
            c = self.data[offset:self.data.index(b'\x00', offset)]
            offset += len(c) + 1
        if self.data[offset] == 2:
            offset = int3(self.data, offset + 1)
        p = self.data[offset:self.data.index(b'\x00', offset)]
        return str(c.decode('gb18030', errors='replace')) + " " + str(p.decode('gb18030', errors='replace'))

    def raw_search(self, ip):
        l = 0
        r = self.index_count
        while r - l > 1:
            m = (l + r) // 2
            offset = self.index_begin + m * 7
            new_ip = int4(self.data, offset)
            if ip < new_ip:
                r = m
            else:
                l = m

        offset = self.index_begin + 7 * l
        ip_begin = int4(self.data, offset)
        offset = int3(self.data, offset + 4)
        ip_end = int4(self.data, offset)

        if ip_begin <= ip <= ip_end:
            return self.__get_addr(offset + 4)
        else:
            return None

    def toip(self, hexip):
        return socket.inet_ntoa(struct.pack(">I", hexip))

    def dump(self): # 就是这个地方,写了好久,放弃这段又拾起这段反反复复,最后还是选择这个了。
        db = MySQLdb.connect(database_info['ip'], database_info['user'], database_info['password'],
                             database_info['db'], charset=database_info['charset'])
        old = ""
        l = 0
        r = self.index_count
        print("[!] Now will delete all record for ipdb table, please press [Y/y] to confirm, if you input [n/N] to exit")
        while True:
            cf = input("[@]y/n? : ")
            if cf == 'y' or cf == 'Y':
                break
            elif cf == 'n' or cf == 'N':
                return
            else:
                continue
        print("[#] progressing...")
        cursor = db.cursor()
        cursor.execute("delete from ipdb")
        db.commit()
        print("[#] Successfully deleted all records for ipdb table")
        for m in range(l, r): # 灵感来源于raw_search这个方法
            offset = self.index_begin + m * 7
            ip_begin = int4(self.data, offset)
            offset = int3(self.data, offset + 4)
            ip_end = int4(self.data, offset)
            address = self.__get_addr(offset + 4)
            startip = self.toip(ip_begin)
            endip = self.toip(ip_end)
            startip_iton = ip_begin
            endip_iton = ip_end
            cursor = db.cursor()
            sql = "insert into ipdb(startip, endip, startip_iton, endip_iton, address) values (%s,%s,%s,%s,%s)"
            cursor.execute(sql, (startip, endip, startip_iton, endip_iton, address))
            if m % 1000 ==0 or m == r-1:
                db.commit()
                print("\rNow Progress: "+str(m)+", Total number: "+str(r) + "  % : "+ str("%.3f" %(int(m)/int(r)*100)), end=" ")
        return old

def get_lasetst_version():  # 正则用不太好,见笑了。
    res = requests.get(ver_url, headers=headers, timeout=5, verify=False)
    res.encoding = "gbk"
    text = res.text
    year = re.findall(r'(?=20).+?(?=年)', text)[0]
    month = re.findall(r'(?=年).+.(?=月)', text)[0].split('年')[1]
    day = re.findall(r'(?=月).+.(?=日)', text)[0].split('月')[1]

    return (year+"-"+month+"-"+day)

def get_lasetst_db(filename):
    def get_content(url):
        res = requests.get(url,headers=headers,timeout=60).content
        return res
    print("[#] Downloading ver file...")
    data = get_content(ver_url)
    print("[#] Checking ver file...")
    if not data:
        warnings.warn("[#] Download verinfo failure", RuntimeWarning)
    if len(data) <= 24 or data[:4] != b'CZIP':
        warnings.warn("[#] Decode verinfo failure", RuntimeWarning)
    version, unknown1, size, unknown2, key = \
        struct.unpack_from('<IIIII', data, 4)
    if unknown1 != 1:
        warnings.warn("[#] Decode verinfo failure", RuntimeWarning)
    print("[#] Downloading db file...")
    data = get_content(db_url)
    print("[#] Checking db file...")
    if not data:
        warnings.warn("[#] Download dbinfo failure", RuntimeWarning)
    if size != len(data):
        warnings.warn("[#] File check failure", RuntimeWarning)
    head = bytearray(0x200)
    for i in range(0x200):
        key = (key * 0x805 + 1) & 0xff
        head[i] = data[i] ^ key
    data = head + data[0x200:]
    print("[#] Unpacking file...")
    try:
        data = zlib.decompress(data)
    except:
        warnings.warn("[#] Unpack failure", RuntimeWarning)
    if filename == None:
        return data
    elif type(filename) == str:
        print("[#] Saving file...")
        try:
            with open(filename, 'wb') as f:
                f.write(data)
            return len(data)
        except:
            warnings.warn("[#] File save failure", RuntimeWarning)
    else:
        warnings.warn("[#] File save failure", RuntimeWarning)
    return True


def update_mydb():
        fn = 'lastest.dat'
        q = wrydb()
        q.load_file(fn)
        q.dump()
        q.clear()


def autorun():
    if get_lasetst_db('lastest.dat'):
        update_mydb()
    else:
        print("[#] download failed")
autorun()

 类似资料: