python爬虫之json模块解析/多线程爬虫

柳英豪
2023-12-01

前情回顾

控制台抓包

打开方式及常用选项

1、打开浏览器,F12打开控制台,找到Network选项卡
2、控制台常用选项
   1、Network: 抓取网络数据包
        1、ALL: 抓取所有的网络数据包
        2、XHR:抓取异步加载的网络数据包
        3、JS : 抓取所有的JS文件
   2、Sources: 格式化输出并打断点调试JavaScript代码,助于分析爬虫中一些参数
   3、Console: 交互模式,可对JavaScript中的代码进行测试
3、抓取具体网络数据包后
   1、单击左侧网络数据包地址,进入数据包详情,查看右侧
   2、右侧:
       1、Headers: 整个请求信息
            General、Response Headers、Request Headers、Query String、Form Data
       2、Preview: 对响应内容进行预览
       3、Response:响应内容

有道翻译过程梳理

  1. 打开首页
  2. 准备抓包: F12开启控制台
  3. 寻找地址
  页面中输入翻译单词,控制台中抓取到网络数据包,查找并分析返回翻译数据的地址
  4. 发现规律
  找到返回具体数据的地址,在页面中多输入几个单词,找到对应URL地址,分析对比 Network - All(或者XHR) - Form Data,发现对应的规律
  5. 寻找JS文件
  右上角 ... -> Search -> 搜索关键字 -> 单击 -> 跳转到Sources,左下角格式化符号{}
  6、查看JS代码
  搜索关键字,找到相关加密方法
  7、断点调试
  8、完善程序

增量爬取思路

1、将爬取过的地址存放到数据库中
2、程序爬取时先到数据库中查询比对,如果已经爬过则不会继续爬取

动态加载网站数据抓取

1、F12打开控制台,页面动作抓取网络数据包
2、抓取json文件URL地址
# 控制台中 XHR :异步加载的数据包
# XHR -> Query String Parameters(查询参数)

数据抓取最终梳理

# 响应内容中存在
1、确认抓取数据在响应内容中是否存在
2、分析页面结构,观察URL地址规律
   1、大体查看响应内容结构,查看是否有更改 -- (百度视频案例)
   2、查看页面跳转时URL地址变化,查看是否新跳转 -- (民政部案例)
3、开始码代码进行数据抓取

# 响应内容中不存在
1、确认抓取数据在响应内容中是否存在
2、F12抓包,开始刷新页面或执行某些行为,主要查看XHR异步加载数据包
   1、GET请求: Request Headers、Query String Paramters
   2、POST请求:Request Headers、FormData
3、观察查询参数或者Form表单数据规律,如果需要进行进一步抓包分析处理
   1、比如有道翻译的 salt+sign,抓取并分析JS做进一步处理
   2、此处注意请求头中的cookie和referer以及User-Agent
4、使用res.json()获取数据,利用列表或者字典的方法获取所需数据

今日笔记

豆瓣电影数据抓取案例

  • 目标
1、地址: 豆瓣电影 - 排行榜 - 剧情
2、目标: 电影名称、电影评分
  • F12抓包(XHR)
1、Request URL(基准URL地址) :https://movie.douban.com/j/chart/top_list?
2、Query String(查询参数)

# 抓取的查询参数如下:
type: 13 # 电影类型
interval_id: 100:90
action: ''
start: 0  # 每次加载电影的起始索引值 0 20 40 60 
limit: 20 # 每次加载的电影数量
  • 代码实现 - 全站抓取 - 完美接口 - 指定类型所有电影信息
import re

import requests, json
from fake_useragent import UserAgent


class DouBan:
    def __init__(self):
        self.url = "https://movie.douban.com/j/chart/top_list?"
        self.i = 1

    def parse_html(self, params):
        html = requests.get(
            url=self.url,
            params=params,
            headers={"User-Agent": UserAgent().random}
        ).text
        # json.loads():json数据->python数据类型
        html = json.loads(html)
        # print(html)
        item = {}
        for film in html:
            item["name"] = film["title"]
            item["score"] = film["score"]
            self.i += 1
            print(item)

    def run(self):

        type_dic = self.get_name_type()
        name = input("\n请输入类型:")
        typ = type_dic[name]
        totle = self.get_totle(typ)
        for page in range(0, totle, 20):
            params = {
                "type": typ,
                "interval_id": "100:90",
                "action": "",
                "start": str(page),
                "limit": "20"
            }
            self.parse_html(params)
        print(self.i)

    def get_totle(self, typ):
        url = 'https://movie.douban.com/j/chart/top_list_count?type={}&interval_id=100%3A90'.format(typ)
        #返回json类型
        html = requests.get(url=url, headers={"User-Agent": UserAgent().random}).json()
        totle = html["total"]

        return totle

    def get_name_type(self):
        #获取电影类型及其代码
        url = 'https://movie.douban.com/chart'
        html = requests.get(url=url, headers={"User-Agent": UserAgent().random}).text
        p = re.compile('<a.*?type_name=(.*?)&type=(.*?)&interval_id.*?</a>', re.S)
        r_list = p.findall(html)
        type_dict = {}
        print("电影类型:")
        for r in r_list:
            type_dict[r[0]] = r[1]
            print(r[0],end=" ")

        return type_dict


if __name__ == '__main__':
    d = DouBan()
    d.run()

json解析模块

json.loads(json)

  • 作用
把json格式的字符串转为Python数据类型
  • 示例
html_json = json.loads(res.text)
In [1]: import json 
In [2]: d={"app":"QQ","link":"www.weixin.com"}
In [6]: json.loads(a)
Out[6]: {'app': 'QQ', 'link': 'www.weixin.com'}

json.dumps(python)

  • 作用
把 python 类型 转为 json 类型
  • 示例
import json

# json.dumps()之前
item = {'name':'QQ','app_id':1}
print('before dumps',type(item)) # dict
# json.dumps之后
item = json.dumps(item)
print('after dumps',type(item)) # str
In [4]: json.dumps(d)
Out[4]: '{"app": "QQ", "link": "www.weixin.com"}'

json.load(f)

作用

将json文件读取,并转为python类型

示例

import json

with open('D:\\spider_test\\xiaomi.json','r') as f:
    data = json.load(f)
    
print(data)

json.dump(python,f,ensure_ascii=False)

  • 作用
把python数据类型 转为 json格式的字符串
# 一般让你把抓取的数据保存为json文件时使用
  • 参数说明
第1个参数: python类型的数据(字典,列表等)
第2个参数: 文件对象
第3个参数: ensure_ascii=False # 序列化时编码(默认为True,设置为false时可以编译中文)
  • 示例1
import json

item = {'name':'QQ','app_id':1}
with open('小米.json','a') as f:
  json.dump(item,f,ensure_ascii=False)
  • 示例2
import json

item_list = []
for i in range(3):
  item = {'name':'QQ','id':i}
  item_list.append(item)
    
with open('xiaomi.json','a') as f:
	json.dump(item_list,f,ensure_ascii=False)

json模块总结

# 爬虫最常用
1、数据抓取 - json.loads(html)
   将响应内容由: json 转为 python
2、数据保存 - json.dump(item_list,f,ensure_ascii=False)
   将抓取的数据保存到本地 json文件

# 抓取数据一般处理方式
1、txt文件
2、csv文件
3、json文件
4、MySQL数据库
5、MongoDB数据库
6、Redis数据库

腾讯招聘数据抓取

  • 确定URL地址及目标
1、URL: 百度搜索腾讯招聘 - 查看工作岗位
2、目标: 职位名称、工作职责、岗位要求
  • 要求与分析
1、通过查看网页源码,得知所需数据均为 Ajax 动态加载
2、通过F12抓取网络数据包,进行分析
3、一级页面抓取数据: 职位名称
4、二级页面抓取数据: 工作职责、岗位要求
  • 一级页面json地址(index在变,timestamp未检查)
https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn
  • 二级页面地址(postId在变,在一级页面中可拿到)
https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn
  • 代码实现
import random
import time
from urllib import parse

import requests, json
from fake_useragent import UserAgent


class Tencent:
    def __init__(self):
        self.one_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp={}&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword={}&pageIndex={}&pageSize=10&language=zh-cn&area=cn'
        # self.two_url=''
        self.headers = {"User-Agent": UserAgent().random}
        self.i = 0

    def parse_html(self, one_url):
        one_html = requests.get(url=one_url, headers=self.headers).json()
        # 提取postid,拼接二级页面地址
        # print(one_html)
        for one in one_html["Data"]["Posts"]:
            # postid,two_url
            two_html = requests.get( url="https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp={}&postId={}&language=zh-cn".format(time.time(), one["PostId"]), headers=self.headers).json()
            name = two_html["Data"]['RecruitPostName']
            Responsibility = two_html["Data"]['Responsibility']
            Requirement = two_html["Data"]['Requirement']
            # print("岗位:", name)
            # print("职责:", Responsibility)
            # print("要求:", Requirement)
            # print("*" * 60)
            self.i += 1
            print("第%d条爬取成功!" % self.i)
            with open("job.json", "a") as f:
                json.dump({"岗位": name, "职责": Responsibility, "要求": Requirement}, f, ensure_ascii=False)

    # def get_data(self, two_url):

    def run(self):
        key_word = input("请输入关键字:")
        #关键字转为编码值
        keyword = parse.quote(key_word)
        total = self.sotal_page(keyword)
        for page in range(0, total + 1):
            one_url = self.one_url.format(time.time(), keyword, page)
            self.parse_html(one_url)
            time.sleep(random.randint(1, 2))

    def sotal_page(self, keyword):
        #获取总页数,在请求地址中json的count中
        url = self.one_url.format(time.time(), keyword, 1)
        html = requests.get(url=url, headers=self.headers).json()
        count = html["Data"]["Count"]
        if count % 10 == 0:
            total = count // 10
        else:
            total = (count // 10) + 1
        return total


if __name__ == '__main__':
    t = Tencent()
    t.run()

多线程爬虫

应用场景

1、多进程 :CPU密集程序
2、多线程 :爬虫(网络I/O)、本地磁盘I/O

知识点回顾

  • 队列
# 导入模块
from queue import Queue
# 使用
q = Queue()
q.put(url)
q.get() # 当队列为空时,阻塞
q.empty() # 判断队列是否为空,True/False

#先判空,在执行函数
写法1:
    while True:
        if not q.empty():
            function()
        else:
            break
#设置响应过期时间           
写法2:
while True:
      try: 
            url=q.get(timeout=3)
           	function()
      except Exception as e:
    		print(e)
        	
  • 线程模块
# 导入模块
from threading import Thread

# 使用流程  
t = Thread(target=函数名) # 创建线程对象
t.start() # 创建并启动线程
t.join()  # 阻塞等待回收线程

# 如何创建多线程??????
t_list=[]
for i in range(5):
    t=Thread(target=xxx)
    t_list.append(t)
    t.start()
for t in t_list:
    t.join()
线程锁:
n=200

def f1():
    for i in range(5000):
        n+=1
    
def f1():
    for i in range(5000):
        n-=1

#线程锁
from threading inport Lock
lock=Lock()
lock.acquire()  #加锁

#python代码
lock.release()  #释放锁
  • 进程模块
 #队列必须使用进程模块中的Queue,支持跨进程通信
 #标准库模块queue中的Queue不支持进程间通信
 from multiprocessing import Process,Lock,Queue
 t_list=[]
 for i in range(5):
     t=Process(target=xxx)
     t_list.append(t)
     t.start()
 for t in t_list:
     t.join()

小米应用商店抓取(多线程)

目标

1、网址 :百度搜 - 小米应用商店,进入官网
2、目标 :所有应用分类
   应用名称
   应用链接

实现步骤

  • 1、确认是否为动态加载
1、页面局部刷新
2、右键查看网页源代码,搜索关键字未搜到
# 此网站为动态加载网站,需要抓取网络数据包分析
  • 2、F12抓取网络数据包
1、抓取返回json数据的URL地址(Headers中的Request URL)
   http://app.mi.com/categotyAllListApi?page={}&categoryId=2&pageSize=30
        
2、查看并分析查询参数(headers中的Query String Parameters)
   page: 1
   categoryId: 2
   pageSize: 30
   # 只有page在变,0 1 2 3 ... ... ,这样我们就可以通过控制page的值拼接多个返回json数据的URL地址
  • 将抓取数据保存到csv文件
# 注意多线程写入的线程锁问题
from threading import Lock
lock = Lock()
# 加锁
lock.acquire()
python语句
# 释放锁
lock.release()
  • 整体思路
1、在 __init__(self) 中创建文件对象,多线程操作此对象进行文件写入
2、每个线程抓取1页数据后将数据进行文件写入,写入文件时需要加锁
3、所有数据抓取完成关闭文件
  • 代码实现
#正则匹配类别[(),()]
#<a  href="/category/(.*?)">(.*?)</a>
import random
import re

import requests, json, time, csv
from threading import Thread, Lock
from queue import Queue

from fake_useragent import UserAgent


class XiaoMi:
    def __init__(self):
        self.url = 'http://app.mi.com/categotyAllListApi?page={}&categoryId={}&pageSize=30'
        self.headers = {"User-Agent": UserAgent().random}
        # 创建url队列.存放所有待爬取的Url地址
        self.q = Queue()
        self.f = open("xiaomi.csv", "w")
        self.l = Lock()
        self.writer = csv.writer(self.f)
        self.i = 0

    # url入队列
    def url_in(self, code):
        # 获取总页数
        total = self.get_total(code)
        for page in range(total):
            url = self.url.format(page, code)
            self.q.put(url)

    # 线程事件函数
    def parse_html(self):
        #多线程一直抓取,直至队列中没有执行的对象
        while True:
            if not self.q.empty():
                url = self.q.get()
                html = requests.get(url=url, headers=self.headers).json()
                app_list = []
                for app in html["data"]:
                    name = app["displayName"]
                    self.i += 1
                    print(name)
                    app_list.append((name,))
                print(self.i)

                # 加锁
                with self.l:
                    self.writer.writerows(app_list)
                    time.sleep(random.uniform(0, 1))
            else:
                break

    # 入口函数
    def run(self):
        # url地址入队列
        type_code = self.get_all()
        typ = input("\n请输入类别:")
        self.url_in(type_code[typ])
        t_list = []
        for i in range(1):
            t = Thread(target=self.parse_html)
            t_list.append(t)
            t.start()

        for t in t_list:
            t.join()

    # 获取app类别以及code
    def get_all(self):
        url = "http://app.mi.com/"
        html = requests.get(url=url, headers=self.headers).text
        # with open("a.html", "w") as f:
        #     f.write(html)
        p = re.compile('<a.*?href="/category/(\d*?)">(.*?)</a>', re.S)
        r_list = p.findall(html)
        # print(r_list)
        item = {}
        print("类别:")
        for r in r_list:
            item[r[1]] = r[0]
            # 遍历一个类别将此类别链接put到队列
            print(r[1], end=" ")
        # print(item)
        print("*" * 90)
        return item

    # 获取总页数
    def get_total(self, code):
        url = self.url.format(0, code)
        html = requests.get(url=url, headers=self.headers).json()
        count = html["count"]
        if count % 30 == 0:
            total = count // 30
        else:
            total = (count // 30) + 1
        return total


if __name__ == '__main__':
    start = time.time()
    x = XiaoMi()
    x.run()
    end = time.time()
    print("执行时间:", end - start)

cookie模拟登录

适用网站及场景

抓取需要登录才能访问的页面

cookie和session机制

# http协议为无连接协议
cookie: 存放在客户端浏览器
session: 存放在Web服务器

人人网登录案例

  • 方法一 - 登录网站手动抓取Cookie
1、先登录成功1次,获取到携带登录信息的Cookie
   登录成功 - 个人主页 - F12抓包 - 刷新个人主页 - 找到主页的包(profile)
2、携带着cookie发请求
   ** Cookie
   ** User-Agent
# 1、将self.url改为 个人主页的URL地址
# 2、将Cookie的值改为 登录成功的Cookie值
import requests
from lxml import etree

class RenrenLogin(object):
  def __init__(self):
    self.url = 'xxxxxxx'
    self.headers = {
      'Cookie':'xxxxxx',
      'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36'
    }

  def get_html(self):
    html = requests.get(url=self.url,headers=self.headers).text
    self.parse_html(html)

  def parse_html(self,html):
    parse_html = etree.HTML(html)
    r_list = parse_html.xpath('//*[@id="operate_area"]/div[1]/ul/li[1]/span/text()')
    print(r_list)

if __name__ == '__main__':
  spider = RenrenLogin()
  spider.get_html()
  • 方法二

原理

1、把抓取到的cookie处理为字典
2、使用requests.get()中的参数:cookies

处理cookie为字典

    def get_cookies(self):
        cooks='anonymid=k1u1oow6-wd1j9m; depovince=HEN; _r01_=1; JSESSIONID=abcklrIeEKWWt9_YNzx3w; ick_login=cee7832b-2cb4-4d32-9ef0-2f3ac386b351; t=deb5d8758bd2a7d08a064a0316df44e83; societyguester=deb5d8758bd2a7d08a064a0316df44e83; id=972496253; xnsid=2893498e; jebecookies=2a36ede3-d9ca-4d73-87ae-295e9604951e|||||; ver=7.0; loginfrom=null; wp_fold=0'
        cookd={}
        for cook in cooks.split("; "):
            cookl=cook.split("=")
            cookd[cookl[0]]=cookl[1]
        print(cookd)
        return cookd

  • 方法三 - requests模块处理Cookie

原理思路及实现

# 1. 思路
requests模块提供了session类,来实现客户端和服务端的会话保持

# 2. 原理
1、实例化session对象
   session = requests.session()
2、让session对象发送get或者post请求
   res = session.post(url=url,data=data,headers=headers)
   res = session.get(url=url,headers=headers)

# 3. 思路梳理
浏览器原理: 访问需要登录的页面会带着之前登录过的cookie
程序原理: 同样带着之前登录的cookie去访问 - 由session对象完成
1、实例化session对象
2、登录网站: session对象发送请求,登录对应网站,把cookie保存在session对象中
3、访问页面: session对象请求需要登录才能访问的页面,session能够自动携带之前的这个cookie,进行请求

具体步骤

1、寻找Form表单提交地址 - 寻找登录时POST的地址
   查看网页源码,查看form表单,找action对应的地址: http://www.renren.com/PLogin.do

2、发送用户名和密码信息到POST的地址
   * 用户名和密码信息以什么方式发送? -- 字典
     键 :<input>标签中name的值(email,password)
     值 :真实的用户名和密码
     post_data = {'email':'','password':''}

session = requests.session()        
session.post(url=url,ata=data)

今日任务

1、多线程改写 - 腾讯招聘案例

import random

import requests, time, csv
from fake_useragent import UserAgent
from threading import Thread,Lock
#队列必须使用进程模块中的Queue,支持跨进程通信
#标准库模块queue中的Queue不支持进程间通信
from queue import Queue
# from multiprocessing import Process,Lock,Queue


class Tengxun:
    def __init__(self):
        self.headers = {"User-Agent": UserAgent().random}
        self.q = Queue()
        self.url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp={}&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn'
        self.l = Lock()
        self.writer = csv.writer(open("腾讯.csv", "w"))
        self.i = 0

    # 请求页面
    def get_html(self, url):
        html = requests.get(url=url, headers=self.headers).json()
        return html

    # 解析
    def parse_html(self):
        while True:
            if not self.q.empty():
                url = self.q.get()
                one_list = self.get_html(url)
                # 爬取数据
                self.spider_data(one_list)
                time.sleep(5)
            else:
                break

    #  爬取数据
    def spider_data(self, one_list):
        for r in one_list["Data"]["Posts"]:
            two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp={}&postId={}&language=zh-cn'
            l_dict = self.get_html(two_url.format(time.time(), r["PostId"]))
            self.i += 1
            print("第%d页爬取成功!" % self.i)
            # 将数据写入文件
            self.write(l_dict)

    # 写入文件
    def write(self, l_dict):
        with self.l:
            self.writer.writerows([
                (l_dict["Data"]["RecruitPostName"],
                 l_dict["Data"]["Responsibility"],
                 l_dict["Data"]["Requirement"])
            ])

    # 将爬取地址入队列
    def queue_in(self):
        total = self.get_total()
        for page in range(total):
            url = self.url.format(time.time(), page)
            self.q.put(url)

    # 获取总页数
    def get_total(self):
        url = self.url.format(time.time(), 1)
        html = self.get_html(url)
        count = html["Data"]["Count"]
        if count % 10 == 0:
            total = count // 10
        else:
            total = (count // 10) + 1
        return total

    # 创建多线程执行函数
    def threading(self, func):
        t_list = []
        for i in range(5):
            t = Thread(target=func)
            t_list.append(t)
            t.start()
            time.sleep(2)
        for t in t_list:
            t.join()

    # 启动函数
    def run(self):
        self.queue_in()
        self.threading(self.parse_html)


# 函数入口
if __name__ == '__main__':
    t = Tengxun()
    t.run()



2、多线程改写 - 链家二手房案例
3、尝试破解百度翻译
 类似资料: