0%

webmagic源码解析

webmagic简介

webmagic是一个开源的Java爬虫框架,中文文档:http://webmagic.io/docs/zh。

​ 一个好的框架必然凝聚了领域知识。WebMagic的设计参考了业界最优秀的爬虫Scrapy,而实现则应用了HttpClient、Jsoup等Java世界最成熟的工具,目标就是做一个Java语言Web爬虫的教科书般的实现。
​ 如果你是爬虫开发老手,那么WebMagic会非常容易上手,它几乎使用Java原生的开发方式,只不过提供了一些模块化的约束,封装一些繁琐的操作,并且提供了一些便捷的功能。
​ 如果你是爬虫开发新手,那么使用并了解WebMagic会让你了解爬虫开发的常用模式、工具链、以及一些问题的处理方式。熟练使用之后,相信自己从头开发一个爬虫也不是什么难事。
​ 因为这个目标,WebMagic的核心非常简单——在这里,功能性是要给简单性让步的。

虽然简单,但是完成所有功能,还是要花很多时间的。这里之分析webmagic-core。

webmagic架构图

webmagic有四个组件:Downloader、PageProcessor、Scheduler、Pipline。

三个数据对象:Request、Page、ResultItems。

spider的实例

1
2
3
4
5
6
7
8
9
Spider.create(new XXXProcessor())
// 保存当前爬去的链接队列,和程序指针
// 会建立.urls.txt和.cursor.txt两个文件
.setScheduler(new FileCacheQueueScheduler("D:\\webcrawler_test"))
.addUrl("https://github.com/code4craft").addPipeline(new LinuxidcPipline())
// 开启5个线程抓取
.thread(5)
// 启动爬虫
.run();

webmagic线程模型

webmagic是典型的生产者消费者模式,spider实现了Runnable接口,作为生产线程,从Scheduler中取出request,提交给线程池CountableThreadPool运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void run() {
//通过一个AtomicInteger标识符,判断如果已经运行,抛出异常,如果没有运行,设置标识符为运行状态。
checkRunningStat();
//初始化dowloader pipelines threadPool addRequest startTime
initComponent();
logger.info("Spider {} started!",getUUID());
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
final Request request = scheduler.poll(this);//调用BlockingQueue.poll(),若队列为空,返回null。
//System.out.println("spider请求"+((MonitorableScheduler)scheduler).getLeftRequestsCount(this));
if (request == null) {
//第一种情况,队列中没有新的url了,并且线程池中没有正在下载的队列,也就是说不会产生新的url了。如果设置了完成后退出,就会退出
//第二种情况,队列中没有新的url了,但线程池还有下载队列,可能产生新的url,就在这里wait。等待线程池的signal。
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {//exitWhenComplete默认为true,也就是不支持动态加入新的url。
break;
}
// wait until new url added
// 等待被唤醒也对应上面两种情况。
//第一种情况,通过方法添加新的url,addUrl(String... urls)中会调用signal。
//第二种情况,线程池线程完成,会调用signal。
System.out.println("spider wait");
waitNewUrl();//否则等待新的url加入,当然是spider运行中动态加入的了。默认30秒自动苏醒
} else {
//是否会产生这个生产者线程不断提交Runnable,造成内存溢出的情况呢?
//不会的,在threadPool.execute()内部,提交这个Runnable前做了判断,如果当前threadPool运行线程大于等于配置的threadNum
//会进行wait,也就是这个生产者提交的Runnable当大于threadNum就会进行wait,顶多有一个Runnable进行wait,不会内存溢出
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
//下载页面,分析页面,添加从页面提取的继续爬取的url到scheduler的BlockingQueue,对应方法queue.add(request),
//只有当超出queue的默认长度,也就是Integer.MAX_VALUE 2147483647才会抛出异常
processRequest(request);//无论页面下载成功与否,都会顺利执行下面的onSuccess。
onSuccess(request);
} catch (Exception e) {
onError(request);
logger.error("process request " + request + " error", e);
} finally {
pageCount.incrementAndGet();
signalNewUrl();
}
}
});
}
}
stat.set(STAT_STOPPED);
// release some resources
if (destroyWhenExit) {
close();
}
logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}
  1. spider的停止逻辑是:第一种情况,队列中没有新的url了,并且线程池中没有正在下载的队列,也就是说不会产生新的url了。如果设置了完成后退出,就会退出。第二种情况,队列中没有新的url了,但线程池还有下载队列,可能产生新的url,就在这里wait。等待线程池中线程下载完后signal。exitWhenComplete控制spider是否爬完就退出,如果不退出,spider会在队列空后等待,然后每隔一段时间唤醒,检查队列是否为空。添加新url也会唤醒。

  2. spider在while循环中不断从队列中获得request,封装成Runnable交给线程池,如果队列很大,会不会封装很多的Runnable,从而造成内存不足呢?答案是不会的,作者封装了一个CountableThreadPool,spider初始化它的实例的时候,设置了一个threadNum,如果线程池满了,spider的生产线程要wait。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//只有spider类中的run()一个线程不断的调用它
public void execute(final Runnable runnable) {

if (threadAlive.get() >= threadNum) {//没有足够的线程数
System.out.println("进来了");
try {
reentrantLock.lock();
while (threadAlive.get() >= threadNum) {
try {
//调用这个方法的线程-spider类中的run()-会在这里等待。
condition.await();//等待,释放锁。相当于object.wait()
} catch (InterruptedException e) {
}
}
} finally {
reentrantLock.unlock();
}
}
threadAlive.incrementAndGet();//上面判断完成,立马增加计数。
executorService.execute(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} finally {
try {
reentrantLock.lock();
threadAlive.decrementAndGet();
//它只会唤醒一个等待线程,也就是spider的run()对应的生产线程
condition.signal();//通知await(),但不会释放锁。相当于object.notify()
} finally {
reentrantLock.unlock();
}
}
}
});
}
  1. 多个线程下载,spider中只有一个downloader实例。查看spider run()中的initComponent()
1
2
3
4
protected void initComponent() {
if (downloader == null) {
this.downloader = new HttpClientDownloader();
}

原因是downloader使用PoolingHttpClientConnectionManager,默认就支持多线程,传入的poolSize就是spider中的threadNum,线程池的大小也是threadNum,所以没有问题。

1
2
3
4
5
6
7
public HttpClientGenerator setPoolSize(int poolSize) {
//setMaxTotal()方法用来设置连接池的最大连接数,即整个池子的大小;
//setDefaultMaxPerRoute()方法来设置每一个路由的最大连接数,这里的路由是指IP+PORT,
//例如连接池大小(MaxTotal)设置为300,路由连接数设置为200(DefaultMaxPerRoute),对于www.a.com与www.b.com两个路由来说,发起服务的主机连接到每个路由的最大连接数(并发数)不能超过200,两个路由的总连接数不能超过300。
connectionManager.setMaxTotal(poolSize);
return this;
}

webmagic组件

组件没什么可说的,就是各种设计模式。downloader组件封装了Apache HttpClient,考虑了ssl,cookie,redirect。可以借鉴一下。