这系列文章主要分析分析webmagic框架,没有实战内容,如有实战问题可以讨论,也可以提供技术支持。
欢迎加群313557283(刚创建),小白互相学习~
package us.codecraft.webmagic;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.downloader.Downloader;
import us.codecraft.webmagic.downloader.HttpClientDownloader;
import us.codecraft.webmagic.pipeline.CollectorPipeline;
import us.codecraft.webmagic.pipeline.ConsolePipeline;
import us.codecraft.webmagic.pipeline.Pipeline;
import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline;
import us.codecraft.webmagic.processor.PageProcessor;
import us.codecraft.webmagic.scheduler.QueueScheduler;
import us.codecraft.webmagic.scheduler.Scheduler;
import us.codecraft.webmagic.thread.CountableThreadPool;
import us.codecraft.webmagic.utils.UrlUtils;
import us.codecraft.webmagic.utils.WMCollections;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Entrance of a crawler.<br>
* A spider contains four modules: Downloader, Scheduler, PageProcessor and
* Pipeline.<br>
* Every module is a field of Spider. <br>
* The modules are defined in interface. <br>
* You can customize a spider with various implementations of them. <br>
* Examples: <br>
* <br>
* A simple crawler: <br>
* Spider.create(new SimplePageProcessor("http://my.oschina.net/",
* "http://my.oschina.net/*blog/*")).run();<br>
* <br>
* Store results to files by FilePipeline: <br>
* Spider.create(new SimplePageProcessor("http://my.oschina.net/",
* "http://my.oschina.net/*blog/*")) <br>
* .pipeline(new FilePipeline("/data/temp/webmagic/")).run(); <br>
* <br>
* Use FileCacheQueueScheduler to store urls and cursor in files, so that a
* Spider can resume the status when shutdown. <br>
* Spider.create(new SimplePageProcessor("http://my.oschina.net/",
* "http://my.oschina.net/*blog/*")) <br>
* .scheduler(new FileCacheQueueScheduler("/data/temp/webmagic/cache/")).run(); <br>
*
* @author code4crafter@gmail.com <br>
* @see Downloader
* @see Scheduler
* @see PageProcessor
* @see Pipeline
* @since 0.1.0
*/
//实现两个接口一个Runnable,一个Task
/**
* 对于每个任务返回一个
* @return uuid
*//*
public String getUUID();
*//**
* 返回site对象
* @return site
*//*
public Site getSite();*/
public class Spider implements Runnable, Task {
//下载器
protected Downloader downloader;
//流管道,处理数据
protected List<Pipeline> pipelines = new ArrayList<Pipeline>();
//处理模块,解析页面,抽取有用信息
protected PageProcessor pageProcessor;
//请求集合
protected List<Request> startRequests;
//site 放请求信息
protected Site site;
//uuid
protected String uuid;
//负责管理待抓取的URL,以及一些去重的工作
protected Scheduler scheduler = new QueueScheduler();
//日志
protected Logger logger = LoggerFactory.getLogger(getClass());
//自己的线程池
protected CountableThreadPool threadPool;
//Executor直接的扩展接口,也是最常用的线程池接口,我们通常见到的线程池定时任务线程池都是它的实现类
protected ExecutorService executorService;
//线程数
protected int threadNum = 1;
//初始化提供原子操作
protected AtomicInteger stat = new AtomicInteger(STAT_INIT);
//处理完全标志位
protected boolean exitWhenComplete = true;
//初始化
protected final static int STAT_INIT = 0;
//运行
protected final static int STAT_RUNNING = 1;
//结束
protected final static int STAT_STOPPED = 2;
//不清楚
protected boolean spawnUrl = true;
//销毁标志位
protected boolean destroyWhenExit = true;
//重入锁(ReentrantLock)是一种递归无阻塞的同步机制
private ReentrantLock newUrlLock = new ReentrantLock();
//条件
private Condition newUrlCondition = newUrlLock.newCondition();
//爬虫监听集合 存放请求是否成功
private List<SpiderListener> spiderListeners;
//初始化提供原子操作
private final AtomicLong pageCount = new AtomicLong(0);
//开始时间
private Date startTime;
//空闲休眠30秒
private int emptySleepTime = 30000;
//通过加入PageProcessor创建返回Spider
/**
* create a spider with pageProcessor.
*
* @param pageProcessor pageProcessor
* @return new spider
* @see PageProcessor
*/
public static Spider create(PageProcessor pageProcessor) {
return new Spider(pageProcessor);
}
//默认要初始化site
/**
* create a spider with pageProcessor.
*
* @param pageProcessor pageProcessor
*/
public Spider(PageProcessor pageProcessor) {
this.pageProcessor = pageProcessor;
this.site = pageProcessor.getSite();
}
/**
* Set startUrls of Spider.<br>
* Prior to startUrls of Site.
*
* @param startUrls startUrls
* @return this
*/
public Spider startUrls(List<String> startUrls) {
//检查是否已经运行
checkIfRunning();
//将url 转换成request
this.startRequests = UrlUtils.convertToRequests(startUrls);
return this;
}
/**
* Set startUrls of Spider.<br>
* Prior to startUrls of Site.
*
* @param startRequests startRequests
* @return this
*/
public Spider startRequest(List<Request> startRequests) {
//检查是否已经运行
checkIfRunning();
//如果是request 不转换
this.startRequests = startRequests;
return this;
}
/**
* Set an uuid for spider.<br>
* Default uuid is domain of site.<br>
*
* @param uuid uuid
* @return this
*/
public Spider setUUID(String uuid) {
//设置任务
this.uuid = uuid;
return this;
}
/**
* set scheduler for Spider
*
* @param scheduler scheduler
* @return this
* @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler)
*/
@Deprecated
public Spider scheduler(Scheduler scheduler) {
return setScheduler(scheduler);
}
/**
* set scheduler for Spider
*
* @param scheduler scheduler
* @return this
* @see Scheduler
* @since 0.2.1
*/
public Spider setScheduler(Scheduler scheduler) {
//检查是否运行
checkIfRunning();
Scheduler oldScheduler = this.scheduler;
this.scheduler = scheduler;
//可以理解 如果老的scheduler 还有请求没有去fetch 新的会把剩余加进去
//老队列吐出一个请求 新队列加一个
if (oldScheduler != null) {
Request request;
while ((request = oldScheduler.poll(this)) != null) {
this.scheduler.push(request, this);
}
}
return this;
}
/**
* add a pipeline for Spider
*
* @param pipeline pipeline
* @return this
* @see #addPipeline(us.codecraft.webmagic.pipeline.Pipeline)
* @deprecated
*/
public Spider pipeline(Pipeline pipeline) {
return addPipeline(pipeline);
}
/**
* add a pipeline for Spider
*
* @param pipeline pipeline
* @return this
* @see Pipeline
* @since 0.2.1
*/
public Spider addPipeline(Pipeline pipeline) {
checkIfRunning();
this.pipelines.add(pipeline);
return this;
}
/**
* set pipelines for Spider
*
* @param pipelines pipelines
* @return this
* @see Pipeline
* @since 0.4.1
*/
public Spider setPipelines(List<Pipeline> pipelines) {
checkIfRunning();
//加入批量数据持久化
this.pipelines = pipelines;
return this;
}
/**
* clear the pipelines set
*
* @return this
*/
public Spider clearPipeline() {
pipelines = new ArrayList<Pipeline>();
return this;
}
/**
* set the downloader of spider
*
* @param downloader downloader
* @return this
* @see #setDownloader(us.codecraft.webmagic.downloader.Downloader)
* @deprecated
*/
public Spider downloader(Downloader downloader) {
return setDownloader(downloader);
}
/**
* set the downloader of spider
*
* @param downloader downloader
* @return this
* @see Downloader
*/
public Spider setDownloader(Downloader downloader) {
checkIfRunning();
this.downloader = downloader;
return this;
}
//初始化
protected void initComponent() {
if (downloader == null) {
this.downloader = new HttpClientDownloader();
}
if (pipelines.isEmpty()) {
pipelines.add(new ConsolePipeline());
}
//默认一个线程
downloader.setThread(threadNum);
//生成线程池
if (threadPool == null || threadPool.isShutdown()) {
if (executorService != null && !executorService.isShutdown()) {
threadPool = new CountableThreadPool(threadNum, executorService);
} else {
threadPool = new CountableThreadPool(threadNum);
}
}
//批量加入初始链接
if (startRequests != null) {
for (Request request : startRequests) {
addRequest(request);
}
startRequests.clear();
}
//初始化开始时间
startTime = new Date();
}
//同步
@Override
public void run() {
//检查状态
checkRunningStat();
//初始化
initComponent();
logger.info("Spider {} started!",getUUID());
//已经运行并且测试线程 Thread 对象 是否已经是中断状态,但不清楚状态标志。
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
//默认是初始化队列scheduler 把任务放进去
final Request request = scheduler.poll(this);
if (request == null) {
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
break;
}
// wait until new url added
waitNewUrl();
} else {
//异步
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
//处理
processRequest(request);
//计数 多少页下载一般最后下完完毕都可以看到
onSuccess(request);
} catch (Exception e) {
onError(request);
logger.error("process request " + request + " error", e);
} finally {
//总计
pageCount.incrementAndGet();
//等待新的url
signalNewUrl();
}
}
});
}
}
//完成就设置状态over
stat.set(STAT_STOPPED);
// release some resources
if (destroyWhenExit) {
close();
}
logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}
//失败计数
protected void onError(Request request) {
if (CollectionUtils.isNotEmpty(spiderListeners)) {
for (SpiderListener spiderListener : spiderListeners) {
spiderListener.onError(request);
}
}
}
//成功计数
protected void onSuccess(Request request) {
if (CollectionUtils.isNotEmpty(spiderListeners)) {
for (SpiderListener spiderListener : spiderListeners) {
spiderListener.onSuccess(request);
}
}
}
//检查初始化状态
private void checkRunningStat() {
while (true) {
int statNow = stat.get();
if (statNow == STAT_RUNNING) {
throw new IllegalStateException("Spider is already running!");
}
// CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
if (stat.compareAndSet(statNow, STAT_RUNNING)) {
break;
}
}
}
//停止方法
public void close() {
destroyEach(downloader);
destroyEach(pageProcessor);
destroyEach(scheduler);
for (Pipeline pipeline : pipelines) {
destroyEach(pipeline);
}
threadPool.shutdown();
}
//对象销毁
private void destroyEach(Object object) {
if (object instanceof Closeable) {
try {
((Closeable) object).close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* Process specific urls without url discovering.
*
* @param urls urls to process
*/
public void test(String... urls) {
initComponent();
if (urls.length > 0) {
for (String url : urls) {
processRequest(new Request(url));
}
}
}
//处理请求
private void processRequest(Request request) {
Page page = downloader.download(request, this);
if (page.isDownloadSuccess()){
onDownloadSuccess(request, page);
} else {
onDownloaderFail(request);
}
}
//下载成功
private void onDownloadSuccess(Request request, Page page) {
//包含响应码
if (site.getAcceptStatCode().contains(page.getStatusCode())){
//处理
pageProcessor.process(page);
//不知道
extractAndAddRequests(page, spawnUrl);
if (!page.getResultItems().isSkip()) {
for (Pipeline pipeline : pipelines) {
//多pipeline 处理数据
pipeline.process(page.getResultItems(), this);
}
}
} else {
logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
}
//site 设值 休眠
sleep(site.getSleepTime());
return;
}
//下载失败
private void onDownloaderFail(Request request) {
//放弃
if (site.getCycleRetryTimes() == 0) {
sleep(site.getSleepTime());
} else {
// for cycle retry
//重试
doCycleRetry(request);
}
}
//重试
private void doCycleRetry(Request request) {
//重试参数放在 request extra key='_cycle_tried_times'
Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES);
if (cycleTriedTimesObject == null) {
addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, 1));
} else {
int cycleTriedTimes = (Integer) cycleTriedTimesObject;
cycleTriedTimes++;
if (cycleTriedTimes < site.getCycleRetryTimes()) {
addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, cycleTriedTimes));
}
}
sleep(site.getRetrySleepTime());
}
protected void sleep(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
logger.error("Thread interrupted when sleep",e);
}
}
protected void extractAndAddRequests(Page page, boolean spawnUrl) {
if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) {
for (Request request : page.getTargetRequests()) {
addRequest(request);
}
}
}
//加入请求
private void addRequest(Request request) {
//如果没有域名自动给你加域名
if (site.getDomain() == null && request != null && request.getUrl() != null) {
site.setDomain(UrlUtils.getDomain(request.getUrl()));
}
//加入scheduler 默认是队列
scheduler.push(request, this);
}
//检查状态
protected void checkIfRunning() {
if (stat.get() == STAT_RUNNING) {
throw new IllegalStateException("Spider is already running!");
}
}
//异步启动
public void runAsync() {
Thread thread = new Thread(this);
thread.setDaemon(false);
thread.start();
}
/**
* Add urls to crawl. <br>
*
* @param urls urls
* @return this
*/
//url 转换成request
//例如 addurl(url1).addurl(url2)
public Spider addUrl(String... urls) {
for (String url : urls) {
addRequest(new Request(url));
}
signalNewUrl();
return this;
}
/**
* Download urls synchronizing.
*
* @param urls urls
* @param <T> type of process result
* @return list downloaded
*/
public <T> List<T> getAll(Collection<String> urls) {
destroyWhenExit = false;
spawnUrl = false;
if (startRequests!=null){
startRequests.clear();
}
for (Request request : UrlUtils.convertToRequests(urls)) {
addRequest(request);
}
CollectorPipeline collectorPipeline = getCollectorPipeline();
pipelines.add(collectorPipeline);
run();
spawnUrl = true;
destroyWhenExit = true;
return collectorPipeline.getCollected();
}
protected CollectorPipeline getCollectorPipeline() {
return new ResultItemsCollectorPipeline();
}
public <T> T get(String url) {
List<String> urls = WMCollections.newArrayList(url);
List<T> resultItemses = getAll(urls);
if (resultItemses != null && resultItemses.size() > 0) {
return resultItemses.get(0);
} else {
return null;
}
}
/**
* Add urls with information to crawl.<br>
*
* @param requests requests
* @return this
*/
//批量加入
public Spider addRequest(Request... requests) {
for (Request request : requests) {
addRequest(request);
}
signalNewUrl();
return this;
}
//等待新的url
private void waitNewUrl() {
newUrlLock.lock();
try {
//double check
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
return;
}
newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("waitNewUrl - interrupted, error {}", e);
} finally {
newUrlLock.unlock();
}
}
private void signalNewUrl() {
try {
newUrlLock.lock();
///唤醒 await()等待队列中所有的线程
newUrlCondition.signalAll();
} finally {
newUrlLock.unlock();
}
}
//=runAsync() 异步启动
public void start() {
runAsync();
}
//停止爬虫
public void stop() {
//状态加1
if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) {
logger.info("Spider " + getUUID() + " stop success!");
} else {
logger.info("Spider " + getUUID() + " stop fail!");
}
}
/**
* start with more than one threads
*
* @param threadNum threadNum
* @return this
*/
//线程数
public Spider thread(int threadNum) {
checkIfRunning();
this.threadNum = threadNum;
if (threadNum <= 0) {
throw new IllegalArgumentException("threadNum should be more than one!");
}
return this;
}
/**
* start with more than one threads
*
* @param executorService executorService to run the spider
* @param threadNum threadNum
* @return this
*/
//重载方法 加入executorService 自定义线程池
public Spider thread(ExecutorService executorService, int threadNum) {
checkIfRunning();
this.threadNum = threadNum;
if (threadNum <= 0) {
throw new IllegalArgumentException("threadNum should be more than one!");
}
this.executorService = executorService;
return this;
}
//是否完全
public boolean isExitWhenComplete() {
return exitWhenComplete;
}
/**
* Exit when complete. <br>
* True: exit when all url of the site is downloaded. <br>
* False: not exit until call stop() manually.<br>
*
* @param exitWhenComplete exitWhenComplete
* @return this
*/
public Spider setExitWhenComplete(boolean exitWhenComplete) {
this.exitWhenComplete = exitWhenComplete;
return this;
}
public boolean isSpawnUrl() {
return spawnUrl;
}
/**
* Get page count downloaded by spider.
*
* @return total downloaded page count
* @since 0.4.1
*/
public long getPageCount() {
return pageCount.get();
}
/**
* Get running status by spider.
*
* @return running status
* @see Status
* @since 0.4.1
*/
//爬虫状态获取
public Status getStatus() {
return Status.fromValue(stat.get());
}
//状态枚举内部类
public enum Status {
Init(0), Running(1), Stopped(2);
private Status(int value) {
this.value = value;
}
private int value;
int getValue() {
return value;
}
public static Status fromValue(int value) {
for (Status status : Status.values()) {
if (status.getValue() == value) {
return status;
}
}
//default value
return Init;
}
}
/**
* Get thread count which is running
*
* @return thread count which is running
* @since 0.4.1
*/
//获取存活线程数
public int getThreadAlive() {
if (threadPool == null) {
return 0;
}
return threadPool.getThreadAlive();
}
/**
* Whether add urls extracted to download.<br>
* Add urls to download when it is true, and just download seed urls when it is false. <br>
* DO NOT set it unless you know what it means!
*
* @param spawnUrl spawnUrl
* @return this
* @since 0.4.0
*/
public Spider setSpawnUrl(boolean spawnUrl) {
this.spawnUrl = spawnUrl;
return this;
}
@Override
public String getUUID() {
if (uuid != null) {
return uuid;
}
if (site != null) {
return site.getDomain();
}
uuid = UUID.randomUUID().toString();
return uuid;
}
public Spider setExecutorService(ExecutorService executorService) {
checkIfRunning();
this.executorService = executorService;
return this;
}
@Override
public Site getSite() {
return site;
}
public List<SpiderListener> getSpiderListeners() {
return spiderListeners;
}
public Spider setSpiderListeners(List<SpiderListener> spiderListeners) {
this.spiderListeners = spiderListeners;
return this;
}
public Date getStartTime() {
return startTime;
}
public Scheduler getScheduler() {
return scheduler;
}
/**
* Set wait time when no url is polled.<br><br>
*
* @param emptySleepTime In MILLISECONDS.
*/
public void setEmptySleepTime(int emptySleepTime) {
this.emptySleepTime = emptySleepTime;
}
}