python多线程mysql导出100万_Python多线程读写MySQL,python

唐元凯
2023-12-01

import pymysql

import threading

import datetime

import random

import requests

import json

import re

import time

class DB(object):

"""创建MySQL实例"""

def __init__(self, host=None, username=None, pwd=None, dbname=None):

self.pool = {}

self.host = host

self.username = username

self.pwd = pwd

self.dbname = dbname

def get_instance(self):

name = threading.current_thread().name

if name not in self.pool:

conn = pymysql.connect(self.host, self.username, self.pwd, self.dbname)

self.pool[name] = conn

return self.pool[name]

class Bilibili(object):

"""

拉取服务器bilibili_search第七日数据

根据视频aid号,访问哔哩哔哩,更新七日后的播放量与弹幕数,顺便补充aid, avid与视频类型

"""

def __init__(self):

self.server_db = DB("...", "...", "...", "...")

self.local_db = DB("...", "...", "...", "...")

self.start_index = 0

self.end_index = 0

self.lock = threading.Lock()

self.last7day = str(datetime.date.today() + datetime.timedelta(days=-7)) # 前七天日期字符串

self.UserAgent = [

"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36 Edge/18.17763",

"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",

"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1",

"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36",

"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:2.0.1) Gecko/20100101 Firefox/4.0.1",

"Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",

"Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11",

"Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11",

"Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko",

"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)",

"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)",

"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; GTB7.0)",

"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)",

"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SE 2.X MetaSr 1.0; SE 2.X MetaSr 1.0; .NET CLR 2.0.50727; SE 2.X MetaSr 1.0)",

"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)",

"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)",

]

self.main()

def main(self):

bilibili_dict = self.get_bilibili_data()

bilibili_dict = self.update_bilibili_data(bilibili_dict)

threads = []

for i in range(150):

t = threading.Thread(target=self.insert_data,args=(bilibili_dict,))

t.start()

threads.append(t)

for t in threads:

t.join()

def get_bilibili_data(self):

# 获取bilibili数据

server_db = self.server_db.get_instance()

server_cursor = server_db.cursor(cursor=pymysql.cursors.DictCursor)

search_sql = "select game_name, title, title_url, des, watch_num, hide, upload_time, up_name, insert_data, bvid, aid, typename from bilibili_search where upload_time = '{0}' and 7_days_later_watch_num <= 1;".format(self.last7day)

server_cursor.execute(search_sql)

bilibili_dict = server_cursor.fetchall()

server_cursor.close()

server_db.close()

return bilibili_dict

def update_bilibili_data(self, bilibili_dict=None):

# 更改七日后的观看数和弹幕数

index = 0

if bilibili_dict:

for data in bilibili_dict:

game_name = data["game_name"]

title = data["title"]

title_url = data["title_url"]

des = data["des"]

watch_num = data["watch_num"]

hide = data["hide"]

upload_time = data["upload_time"]

up_name = data["up_name"]

insert_data = data["insert_data"]

bvid = data["bvid"]

aid = data["aid"]

typename = data["typename"]

UA = random.sample(self.UserAgent, 1)[0]

url_aid = re.search("av(.*)", title_url).group(1)

url = "https://api.bilibili.com/x/web-interface/archive/stat?aid={0}".format(url_aid)

headers = {

"scheme":"https",

"path":"/x/web-interface/archive/stat?aid=497867194",

"method":"GET",

"authority" : "api.bilibili.com",

"accept": "*/*",

"accept-encoding": "gzip, deflate, br",

"accept-language": "zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7",

"Cache-Control": "max-age=0",

"Cookie": "_uuid=3CBD8F2A-B448-E234-9306-0A787599BD1723208infoc; buvid3=921B1D5A-488B-40AE-8ABD-A78CA71D2120155831infoc; LIVE_BUVID=AUTO4415737227306757; CURRENT_FNVAL=16; stardustvideo=1; laboratory=1-1; sid=i7593k0h; PVID=1; bsource=seo_baidu; bfe_id=1bad38f44e358ca77469025e0405c4a6",

'origin': 'https://www.bilibili.com',

"referer": "https://www.bilibili.com/video/BV1VQ4y1N7ZC?from=search&seid=5783695562986839813",

"sec-fetch-dest": "empty",

"sec-fetch-mode": "cors",

"sec-fetch-site": "same-site",

"user-agent": UA,

}

response = requests.get(url, headers=headers)

response.encoding ='utf8'

html = response.text

js_obj = json.loads(html)

print("第{0}个视频,url:{1}, message:{2}".format(index, url, js_obj["message"]))

index += 1

if js_obj["message"] == "请求被拦截":

time.sleep(30)

if js_obj["message"] == "访问权限不足":

data["7_days_later_watch_num"] = watch_num

data["7_days_later_hide"] = hide

continue

try:

js_data = js_obj["data"]

view = js_data["view"]

danmaku = js_data["danmaku"]

if not aid:

aid = js_data["aid"]

data["aid"] = aid

if not bvid:

bvid = js_data["bvid"]

data["bvid"] = bvid

data["7_days_later_watch_num"] = view

data["7_days_later_hide"] = danmaku

except Exception as e:

print(e)

print(js_obj)

data["7_days_later_watch_num"] = watch_num

data["7_days_later_hide"] = hide

continue

else:

ex = Exception("未检索到当天数据,请检查")

raise ex

self.end_index = len(bilibili_dict)

return bilibili_dict

def insert_data(self, bilibili_dict):

# 将数据保存在本地

local_db = self.local_db.get_instance()

cursor = local_db.cursor()

while True:

if self.start_index >= self.end_index:

break

s = self.start_index

with self.lock:

self.start_index += 50

if self.start_index > self.end_index:

self.start_index = self.end_index

e = self.start_index

for i in range(s, e):

data = bilibili_dict[i]

game_name = data["game_name"]

title = data["title"]

title_url = data["title_url"]

des = data["des"]

watch_num = data["watch_num"]

hide = data["hide"]

upload_time = data["upload_time"]

up_name = data["up_name"]

insert_data = data["insert_data"]

bvid = data["bvid"]

aid = data["aid"]

typename = data["typename"]

later_watch_num = data["7_days_later_watch_num"]

later_hide = data["7_days_later_hide"]

if not typename:

typename = "暂无"

sql = 'insert into bilibili_search(game_name, title, title_url, des, watch_num, hide, upload_time, up_name, insert_data, bvid, aid, typename, 7_days_later_watch_num, 7_days_later_hide)\

values("{0}", "{1}", "{2}", "{3}", "{4}", "{5}", "{6}", "{7}", "{8}", "{9}", "{10}", "{11}", "{12}", "{13}");'.format(game_name, title, title_url, des, watch_num, hide, upload_time, up_name, insert_data, bvid, aid, typename, later_watch_num, later_hide)

try:

cursor.execute(sql)

local_db.commit()

print(threading.current_thread().name, ': ', sql, ': success')

except:

local_db.rollback()

print(threading.current_thread().name, ': ', sql, ':failed')

raise

if __name__ == '__main__':

Bilibili()

# while True:

# if time.strftime("%H:%M:%S", time.localtime()) == "00:00:01":

# Bilibili()

# else:

# time.sleep(1)

 类似资料: