当前位置: 首页 > 工具软件 > Pholcus > 使用案例 >

golang爬虫项目Pholcus源码分析(四)

冀耀
2023-12-01

download部分

该部分用于下载爬来的数据

1.example文件

用于测试项目中的两种内核下载情况,测试了GET和POST方法,用于兼容需要登录的网站和不需要登录的网站。

https://github.com/henrylee2cn/pholcus/blob/master/app/downloader/surfer/example/example.go

package main

import (
	"github.com/henrylee2cn/pholcus/app/downloader/surfer"
	"io/ioutil"
	"log"
	"time"
)

func main() {
	var values = "username=123456@qq.com&password=123456&login_btn=login_btn&submit=login_btn"

	// 默认使用surf内核下载
	log.Println("********************************************* surf内核GET下载测试开始 *********************************************")
	resp, err := surfer.Download(&surfer.DefaultRequest{
		Url: "http://www.baidu.com/",
	})
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("baidu resp.Status: %s\nresp.Header: %#v\n", resp.Status, resp.Header)

	b, err := ioutil.ReadAll(resp.Body)
	resp.Body.Close()
	log.Printf("baidu resp.Body: %s\nerr: %v", b, err)

	// 默认使用surf内核下载
	log.Println("********************************************* surf内核POST下载测试开始 *********************************************")
	resp, err = surfer.Download(&surfer.DefaultRequest{
		Url:      "http://accounts.lewaos.com/",
		Method:   "POST",
		PostData: values,
	})
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("lewaos resp.Status: %s\nresp.Header: %#v\n", resp.Status, resp.Header)

	b, err = ioutil.ReadAll(resp.Body)
	resp.Body.Close()
	log.Printf("lewaos resp.Body: %s\nerr: %v", b, err)

	log.Println("********************************************* phantomjs内核GET下载测试开始 *********************************************")

	// 指定使用phantomjs内核下载
	resp, err = surfer.Download(&surfer.DefaultRequest{
		Url:          "http://www.baidu.com/",
		DownloaderID: 1,
	})
	if err != nil {
		log.Fatal(err)
	}

	log.Printf("baidu resp.Status: %s\nresp.Header: %#v\n", resp.Status, resp.Header)

	b, err = ioutil.ReadAll(resp.Body)
	resp.Body.Close()
	log.Printf("baidu resp.Body: %s\nerr: %v", b, err)

	log.Println("********************************************* phantomjs内核POST下载测试开始 *********************************************")

	// 指定使用phantomjs内核下载
	resp, err = surfer.Download(&surfer.DefaultRequest{
		DownloaderID: 1,
		Url:          "http://accounts.lewaos.com/",
		Method:       "POST",
		PostData:     values,
	})
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("lewaos resp.Status: %s\nresp.Header: %#v\n", resp.Status, resp.Header)

	b, err = ioutil.ReadAll(resp.Body)
	resp.Body.Close()
	log.Printf("lewaos resp.Body: %s\nerr: %v", b, err)

	surfer.DestroyJsFiles()

	time.Sleep(10e9)
}

2.下载器分析

download接口

https://github.com/henrylee2cn/pholcus/blob/master/app/downloader/downloader.go#L11

// The Downloader interface.
// You can implement the interface by implement function Download.
// Function Download need to return Page instance pointer that has request result downloaded from Request.
type Downloader interface {
	Download(*spider.Spider, *request.Request) *spider.Context
}

Download接口的第一个参数是Spider,第二个参数是Request

位置 https://github.com/henrylee2cn/pholcus/blob/master/app/downloader/request/request.go#L17

type (
	Request interface {
		// url
		GetUrl() string
		// GET POST POST-M HEAD
		GetMethod() string
		// POST values
		GetPostData() string
		// http header
		GetHeader() http.Header
		// enable http cookies
		GetEnableCookie() bool
		// dial tcp: i/o timeout
		GetDialTimeout() time.Duration
		// WSARecv tcp: i/o timeout
		GetConnTimeout() time.Duration
		// the max times of download
		GetTryTimes() int
		// the pause time of retry
		GetRetryPause() time.Duration
		// the download ProxyHost
		GetProxy() string
		// max redirect times
		GetRedirectTimes() int
		// select Surf ro PhomtomJS
		GetDownloaderID() int
	}

	// 默认实现的Request
	DefaultRequest struct {
		// url (必须填写)
		Url string
		// GET POST POST-M HEAD (默认为GET)
		Method string
		// http header
		Header http.Header
		// 是否使用cookies,在Spider的EnableCookie设置
		EnableCookie bool
		// POST values
		PostData string
		// dial tcp: i/o timeout
		DialTimeout time.Duration
		// WSARecv tcp: i/o timeout
		ConnTimeout time.Duration
		// the max times of download
		TryTimes int
		// how long pause when retry
		RetryPause time.Duration
		// max redirect times
		// when RedirectTimes equal 0, redirect times is ∞
		// when RedirectTimes less than 0, redirect times is 0
		RedirectTimes int
		// the download ProxyHost
		Proxy string

		// 指定下载器ID
		// 0为Surf高并发下载器,各种控制功能齐全
		// 1为PhantomJS下载器,特点破防力强,速度慢,低并发
		DownloaderID int

		// 保证prepare只调用一次
		once sync.Once
	}
)

// 声明常量,其中标识了使用surf下载器还是PhmtomJs下载器
const (
	SurfID             = 0               // Surf下载器标识符
	PhomtomJsID        = 1               // PhomtomJs下载器标识符
	DefaultMethod      = "GET"           // 默认请求方法
	DefaultDialTimeout = 2 * time.Minute // 默认请求服务器超时
	DefaultConnTimeout = 2 * time.Minute // 默认下载超时
	DefaultTryTimes    = 3               // 默认最大下载次数
	DefaultRetryPause  = 2 * time.Second // 默认重新下载前停顿时长
)

该文件中默认实现了Request接口

surfer.Download(&surfer.DefaultRequest{
        Url: "http://www.baidu.com/",
    })

找到sufer中实现的Download接口

https://github.com/henrylee2cn/pholcus/blob/master/app/downloader/surfer/surfer.go#L35

var (
	surf         Surfer
	phantom      Surfer
	once_surf    sync.Once
	once_phantom sync.Once
	tempJsDir    = "./tmp"
	// phantomjsFile = filepath.Clean(path.Join(os.Getenv("GOPATH"), `/src/github.com/henrylee2cn/surfer/phantomjs/phantomjs`))
	phantomjsFile = `./phantomjs`
	cookieJar, _  = cookiejar.New(nil)
)

func Download(req Request) (resp *http.Response, err error) {

    // 根据上面的request里的常量,DownloaderID是0为SurfID,是1为PhomtomJsID
	switch req.GetDownloaderID() {
	case SurfID:
        // surf内核
		once_surf.Do(func() { surf = New(cookieJar) })
		resp, err = surf.Download(req)
	case PhomtomJsID:
        // PhomtomJs内核
		once_phantom.Do(func() { phantom = NewPhantom(phantomjsFile, tempJsDir, cookieJar) })
		resp, err = phantom.Download(req)
	}
	return
}

sync.Once实现分析,实现线程安全的单例模式。https://studygolang.com/articles/7270

1)surf内核

// 如上块代码,初始化一个surf下载器并下载
once_surf.Do(func() { surf = New(cookieJar) })
resp, err = surf.Download(req)


// 初始化下载器的New方法 
// https://github.com/henrylee2cn/pholcus/blob/master/app/downloader/surfer/surf.go#L39
type Surf struct {
	CookieJar *cookiejar.Jar
}

// New 创建一个Surf下载器
func New(jar ...*cookiejar.Jar) Surfer {
	s := new(Surf)
	if len(jar) != 0 {
		s.CookieJar = jar[0]
	} else {
		s.CookieJar, _ = cookiejar.New(nil)
	}
	return s
}

// Download 实现surfer下载器接口
func (self *Surf) Download(req Request) (resp *http.Response, err error) {

    // param在下面
	param, err := NewParam(req)
	if err != nil {
		return nil, err
	}

    // 设置header中为短链接,防止出现大量close_wait
	param.header.Set("Connection", "close")
	param.client = self.buildClient(param)
	resp, err = self.httpRequest(param)

	if err == nil {
		switch resp.Header.Get("Content-Encoding") {
		case "gzip":
			var gzipReader *gzip.Reader
			gzipReader, err = gzip.NewReader(resp.Body)
			if err == nil {
                // 爬来的数据放在body里
				resp.Body = gzipReader
			}

		case "deflate":
			resp.Body = flate.NewReader(resp.Body)

		case "zlib":
			var readCloser io.ReadCloser
			readCloser, err = zlib.NewReader(resp.Body)
			if err == nil {
				resp.Body = readCloser
			}
		}
	}

    // 请求完毕后writeback,方法在下面
	resp = param.writeback(resp)

	return
}

// param处理方法
// https://github.com/henrylee2cn/pholcus/blob/master/app/downloader/surfer/param.go#L46
type Param struct {
	method        string
	url           *url.URL
	proxy         *url.URL
	body          io.Reader
	header        http.Header
	enableCookie  bool
	dialTimeout   time.Duration
	connTimeout   time.Duration
	tryTimes      int
	retryPause    time.Duration
	redirectTimes int
	client        *http.Client
}

// 初始化param
func NewParam(req Request) (param *Param, err error) {
	param = new(Param)
	param.url, err = UrlEncode(req.GetUrl())
	if err != nil {
		return nil, err
	}

	if req.GetProxy() != "" {
		if param.proxy, err = url.Parse(req.GetProxy()); err != nil {
			return nil, err
		}
	}

	param.header = req.GetHeader()
	if param.header == nil {
		param.header = make(http.Header)
	}

	switch method := strings.ToUpper(req.GetMethod()); method {
	case "GET", "HEAD":
		param.method = method
	case "POST":
		param.method = method
		param.header.Add("Content-Type", "application/x-www-form-urlencoded")
		param.body = strings.NewReader(req.GetPostData())
	case "POST-M":
		param.method = "POST"
		body := &bytes.Buffer{}
		writer := multipart.NewWriter(body)
		values, _ := url.ParseQuery(req.GetPostData())
		for k, vs := range values {
			for _, v := range vs {
				writer.WriteField(k, v)
			}
		}
		err := writer.Close()
		if err != nil {
			return nil, err
		}
		param.header.Add("Content-Type", writer.FormDataContentType())
		param.body = body

	default:
		param.method = "GET"
	}

	param.enableCookie = req.GetEnableCookie()

	if len(param.header.Get("User-Agent")) == 0 {
		if param.enableCookie {
			param.header.Add("User-Agent", agent.UserAgents["common"][0])
		} else {
			l := len(agent.UserAgents["common"])
			r := rand.New(rand.NewSource(time.Now().UnixNano()))
			param.header.Add("User-Agent", agent.UserAgents["common"][r.Intn(l)])
		}
	}

	param.dialTimeout = req.GetDialTimeout()
	if param.dialTimeout < 0 {
		param.dialTimeout = 0
	}

	param.connTimeout = req.GetConnTimeout()
	param.tryTimes = req.GetTryTimes()
	param.retryPause = req.GetRetryPause()
	param.redirectTimes = req.GetRedirectTimes()
	return
}

// 回写Request内容
func (self *Param) writeback(resp *http.Response) *http.Response {
	if resp == nil {
		resp = new(http.Response)
		resp.Request = new(http.Request)
	} else if resp.Request == nil {
		resp.Request = new(http.Request)
	}

	if resp.Header == nil {
		resp.Header = make(http.Header)
	}

	resp.Request.Method = self.method
	resp.Request.Header = self.header
	resp.Request.Host = self.url.Host

	return resp
}

2)PhomtomJs内核

// 下载代码
once_phantom.Do(func() { phantom = NewPhantom(phantomjsFile, tempJsDir, cookieJar) })
resp, err = phantom.Download(req)

// phantom部分
// https://github.com/henrylee2cn/pholcus/blob/master/app/downloader/surfer/phantom.go#L62
type (
	// Phantom 基于Phantomjs的下载器实现,作为surfer的补充
	// 效率较surfer会慢很多,但是因为模拟浏览器,破防性更好
	// 支持UserAgent/TryTimes/RetryPause/自定义js
	Phantom struct {
		PhantomjsFile string            //Phantomjs完整文件名
		TempJsDir     string            //临时js存放目录
		jsFileMap     map[string]string //已存在的js文件
		CookieJar     *cookiejar.Jar
	}
	// Response 用于解析Phantomjs的响应内容
	Response struct {
		Cookies []string
		Body    string
		Error   string
		Header  []struct {
			Name  string
			Value string
		}
	}

	//给phantomjs传输cookie用
	Cookie struct {
		Name   string `json:"name"`
		Value  string `json:"value"`
		Domain string `json:"domain"`
		Path   string `json:"path"`
	}
)

// 初始化函数
func NewPhantom(phantomjsFile, tempJsDir string, jar ...*cookiejar.Jar) Surfer {
	phantom := &Phantom{
		PhantomjsFile: phantomjsFile,
		TempJsDir:     tempJsDir,
		jsFileMap:     make(map[string]string),
	}
	if len(jar) != 0 {
		phantom.CookieJar = jar[0]
	} else {
		phantom.CookieJar, _ = cookiejar.New(nil)
	}
	if !filepath.IsAbs(phantom.PhantomjsFile) {
		phantom.PhantomjsFile, _ = filepath.Abs(phantom.PhantomjsFile)
	}
	if !filepath.IsAbs(phantom.TempJsDir) {
		phantom.TempJsDir, _ = filepath.Abs(phantom.TempJsDir)
	}
	// 创建/打开目录
	err := os.MkdirAll(phantom.TempJsDir, 0777)
	if err != nil {
		log.Printf("[E] Surfer: %v\n", err)
		return phantom
	}
	phantom.createJsFile("js", js)
	return phantom
}

// 实现surfer下载器接口
func (self *Phantom) Download(req Request) (resp *http.Response, err error) {
	var encoding = "utf-8"
	if _, params, err := mime.ParseMediaType(req.GetHeader().Get("Content-Type")); err == nil {
		if cs, ok := params["charset"]; ok {
			encoding = strings.ToLower(strings.TrimSpace(cs))
		}
	}

	req.GetHeader().Del("Content-Type")

	param, err := NewParam(req)
	if err != nil {
		return nil, err
	}

	cookie := ""
	if req.GetEnableCookie() {
		httpCookies := self.CookieJar.Cookies(param.url)
		if len(httpCookies) > 0 {
			surferCookies := make([]*Cookie, len(httpCookies))

			for n, c := range httpCookies {
				surferCookie := &Cookie{Name: c.Name, Value: c.Value, Domain: param.url.Host, Path: "/"}
				surferCookies[n] = surferCookie
			}

			c, err := json.Marshal(surferCookies)
			if err != nil {
				log.Printf("cookie marshal error:%v", err)
			}
			cookie = string(c)
		}
	}

	resp = param.writeback(resp)
	resp.Request.URL = param.url

	var args = []string{
		self.jsFileMap["js"],
		req.GetUrl(),
		cookie,
		encoding,
		param.header.Get("User-Agent"),
		req.GetPostData(),
		strings.ToLower(param.method),
		fmt.Sprint(int(req.GetDialTimeout() / time.Millisecond)),
	}
	if req.GetProxy() != "" {
		args = append([]string{"--proxy=" + req.GetProxy()}, args...)
	}

	for i := 0; i < param.tryTimes; i++ {
		if i != 0 {
			time.Sleep(param.retryPause)
		}

		cmd := exec.Command(self.PhantomjsFile, args...)
		if resp.Body, err = cmd.StdoutPipe(); err != nil {
			continue
		}
		err = cmd.Start()
		if err != nil || resp.Body == nil {
			continue
		}
		var b []byte
		b, err = ioutil.ReadAll(resp.Body)
		if err != nil {
			continue
		}
		retResp := Response{}
		err = json.Unmarshal(b, &retResp)
		if err != nil {
			continue
		}

		if retResp.Error != "" {
			log.Printf("phantomjs response error:%s", retResp.Error)
			continue
		}

		//设置header
		for _, h := range retResp.Header {
			resp.Header.Add(h.Name, h.Value)
		}

		//设置cookie
		for _, c := range retResp.Cookies {
			resp.Header.Add("Set-Cookie", c)
		}
		if req.GetEnableCookie() {
			if rc := resp.Cookies(); len(rc) > 0 {
				self.CookieJar.SetCookies(param.url, rc)
			}
		}
		resp.Body = ioutil.NopCloser(strings.NewReader(retResp.Body))
		break
	}

	if err == nil {
		resp.StatusCode = http.StatusOK
		resp.Status = http.StatusText(http.StatusOK)
	} else {
		resp.StatusCode = http.StatusBadGateway
		resp.Status = err.Error()
	}
	return
}

两种下载器的区别就是surf是用的http请求,快一些。PhomtomJs模拟的浏览器,破防性好,慢一些。

 类似资料: