编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

从crawler4j源码中看wait与notify,Java程序员的进阶学习之路

wxchong 2024-08-08 00:54:53 开源技术 11 ℃ 0 评论

引言

crawler4j 是一个开源的 Java 爬虫框架,且拥有4k多个 star ,相信其源码值得我去研究,所以才写下这篇文章。如有错误欢迎联系我指正!

其实本文的重点不在于研究 crawler4j 源码中的各种逻辑、细节等,主要还是以 crawler4j 这个例子来看 Java 中 waitnotify 的使用,看看热门开源项目里是如何使用如何编码的。

想快速了解的话,你可以直接看核心逻辑部分,也可以直接看究极简单版wait/notify使用

正文

crawler4j 中最重要的两个类莫过于 CrawlControllerWebCrawler 了,一个是用于设置与开启爬虫,而另一个则是爬虫的核心实现类。这里讨论的代码基本都在 CrawlController 类中。

熟悉的同学都知道,在开启 controller 时一般有两个用法,如下:

// 用法1:阻塞式,当爬虫线程都结束后才会执行这行以后的代码
controller.start(factory, numberOfCrawlers);
复制代码
// 用法2:非阻塞式,在 start 以后, waitUntilFinish 以前的代码都会立刻执行,在 waitUntilFinish 处阻塞
controller.startNonBlocking(factory, numberOfCrawlers);
// 这中间的代码都会异步执行
controller.waitUntilFinish();

这个的源码部分就是本文重点要讨论的 waitnotify 的使用。

两个重要变量

首先,在 CrawlController 中定义了两个这个功能需要的重要变量:

/**
* Is the crawling of this session finished?
*/
protected boolean finished;

protected final Object waitingLock = new Object();
  • finished 用于判断此次爬取是否已结束
  • waitingLock 则是用于加锁

阻塞式 start 方法

为了只关注重要内容,其他部分代码我以注释的形式带过。

我们调用 start 方法的入口在这里

/**
* Start the crawling session and wait for it to finish.
*
* @param crawlerFactory
*            factory to create crawlers on demand for each thread
* @param numberOfCrawlers
*            the number of concurrent threads that will be contributing in
*            this crawling session.
* @param <T> Your class extending WebCrawler
*/
public <T extends WebCrawler> void start(WebCrawlerFactory<T> crawlerFactory,
                                        int numberOfCrawlers) {
    this.start(crawlerFactory, numberOfCrawlers, true);
}

它会去调用另一个有更多参数的 start 方法,多的参数就是 isBlocking ,这个参数表示是否需要阻塞,具体作用在下面这个 start 方法的注释中给出

protected <T extends WebCrawler> void start(final WebCrawlerFactory<T> crawlerFactory, final int numberOfCrawlers, boolean isBlocking) {

    // 根据提供的工厂类 crawlerFactory 构造指定数量的线程并使它们开始运行

    // 创建一个监控线程 monitorThread 如下
    Thread monitorThread = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                synchronized (waitingLock) {

                    while (true) {
                        // 设置的监控循环周期
                        sleep(config.getThreadMonitoringDelaySeconds());
                        boolean someoneIsWorking = false;
                        
                        //  第一部分: 
                        //      观察每个 爬虫线程 是否正常运行,若没有正常运行则采取相应措施
                        //      第一部分的代码省略,有兴趣可以去 github 看
                    
                        //  第二部分: 
                        //      查看是否还有正在工作的线程,若没有则准备退出并关闭资源
                        //      这个部分也是我们经常看到的 "It looks like no thread is working, waiting for ..." 等 打印日志的所在源码部分
                        //      在关闭时会调用 notifyAll
                        if (!someoneIsWorking && shutOnEmpty) {
                            // 再次确保无线程工作且队列中无 URL 等待爬取

                            // 释放资源

                            waitingLock.notifyAll();

                            // 释放资源
                        }
                    }
                }
            } catch (Throwable e) {
                if (config.isHaltOnError()) {
                    // 发生了某个错误
                    setError(e);
                    synchronized (waitingLock) {
                        // 释放资源

                        waitingLock.notifyAll();

                        // 释放资源
                    }
                } else {
                    logger.error("Unexpected Error", e);
                }
            }
        }

    });

    monitorThread.start();

    // 如果需要阻塞,那么就调用 waitUntilFinish 这个方法,代码执行到这就会阻塞住
    if (isBlocking) {
        waitUntilFinish();
    }
}

从代码中可以看到,阻塞的地方在最后几行,也就是监控线程开启后的 waitUntilFinish 方法。

监控线程在监控到线程都运行完后,调用 waitingLock.notifyAll() 从而使这里的阻塞结束,那么这是如何做到的呢,我们再来看 waitUntilFinish 方法。

waitUntilFinish 方法如何使 start 阻塞

这个方法的源码很短,我直接放出来。

/**
* Wait until this crawling session finishes.
*/
public void waitUntilFinish() {
    while (!finished) {
        synchronized (waitingLock) {
            if (config.isHaltOnError()) {
                Throwable t = getError();
                if (t != null && config.isHaltOnError()) {
                    if (t instanceof RuntimeException) {
                        throw (RuntimeException)t;
                    } else if (t instanceof Error) {
                        throw (Error)t;
                    } else {
                        throw new RuntimeException("error on monitor thread", t);
                    }
                }
            }
            if (finished) {
                return;
            }
            try {
                // 主动让出并等待锁资源
                waitingLock.wait();
            } catch (InterruptedException e) {
                logger.error("Error occurred", e);
            }
        }
    }
}

首先,在 start 方法和 waitUntilFinish 方法中都有 synchronized 来修饰关键的代码块,并且争夺的都是同一个锁 waitingLock。这意味着一方执行,就会有另一方被阻塞。我们希望的是 waitUntilFinish 一直被阻塞,直到爬虫线程都执行完(也就是 start 方法中对应的 synchronized 方法块里的内容)后,再让 waitUntilFinish 方法结束。这也就是源码中对这部分的处理,同时也是 wait 和 notify 使用的思想。

核心逻辑

再来理一遍源码这块的逻辑:

  1. monitorThread 的 run 方法中使用 synchronized 获取锁 waitingLock,并循环检查是否所有爬虫线程、爬虫任务都执行完毕。
  2. waitUntilFinish 使用 synchronized 获取锁 waitingLock,并根据变量 isFinished 检查爬取过程是否结束,若结束则直接返回;若未结束,则调用 wait 方法让出资源给 monitorThread 的 run 方法。
  3. 即使 waitUntilFinish 在调用 wait 方法后又获取到了锁 waitingLock ,它也会根据爬取是否结束 isFinished 来判断是否要再次进入循环调用 wait 方法。
  4. monitorThread 在检查到所有爬虫线程、爬虫任务都执行完毕后,调用 notifyAll 方法(和 notify 一样,只是对所有竞争锁资源的线程都发送通知)来让 waitUntilFinish 继续从 wait 处执行下去
  5. waitUntilFinish 获得锁资源,并从调用 wait 方法后的代码继续向下执行,在循环判断 isFinished 时发现爬取过程结束了,则直接返回,整个过程结束

其中也还有很多细节没有提及,如延时的设置、循环监控周期、资源的释放等等,由于不是本文关注的重点内容,可以自己参照源码理解一下。

透过现象看本质

不难看出其实本质就是某个线程调用 wait 方法主动让出锁给另一个线程,然后另一个线程在执行完任务后调用 notify/notifyAll 来通知它执行完了以让它开始抢占锁

其中还有一些细节:

  • 调用 notify 后,调用 wait 的线程并不会马上获得锁资源,而是等调用 notify 的那个线程释放锁资源后它才能获得,也就是说即使线程调用了 notify 方法,可能也要等到他退出 synchronized 代码块后,其他线程才能获得锁资源
  • 调用 wait 释放锁,又重新获得锁后,代码会从 wait 方法下面的那一行继续向下执行,而不会去回到 synchronized 代码块开始的地方执行,这也是为什么源码中要使用 while 循环去重复获取锁资源。因为如果没有这层循环而该线程在释放锁后重新获取锁时其实爬取过程还没结束(也就是 isFinished 是 False),那 waitUntilFinish 就会直接结束
  • wait 其实可以设置超时时长 wait(long timeout),在 timeout 时间后唤醒自己,这就相当于 timeout 时间后有人来通知他可以去抢锁资源了

究极简易版实现

为了加深理解,自己动手实现一下 crawler4j 这个机制的究极简易版如下(注意只是实现 wait/notify 机制):

package thread_practice;

public class WaitNotify {

    private final Object waitingLock = new Object();
    private boolean isFinished = false;

    public void start() {
        synchronized (waitingLock) {
            isFinished = false;
            System.out.println("doing sth...");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("done.");
            isFinished = true;
            waitingLock.notifyAll();
        }
    }

    public void waitUntilFinish() {
        synchronized (waitingLock) {
            if (isFinished) return;

            try {
                waitingLock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        WaitNotify wn = new WaitNotify();
        new Thread(() -> wn.start()).start();
        wn.waitUntilFinish();
        System.out.println("continue another thing...");
    }
}

执行程序5秒之内:

执行程序5秒之后:

可以看到主线程确实是阻塞在了 wn.waitUntilFinish() 这个地方,在5秒之后才继续执行下去。 其逻辑和前面几节我的解释一样,是只提取了核心部分的简化版。

总结

本文结合 crawler4j 中实际使用的例子对实际场景中如何使用 wait 与 notify 进行了介绍与讨论,也根据 crawler4j 中的场景实现了一个简易版功能。线程之间的通信离不开 wait 与 notify ,当然也不止 wait 与 notify ,我会在以后对这方面做更深入的研究。

本文有什么错误欢迎联系我指正。

扩展

这里只是单个线程通知单个线程任务执行完毕,如果是多个线程通知单个线程的场景怎么处理呢?

  • 如果是多个线程都执行完了,才通知某个线程,那可以参照 crawler4j 的方式,使用一个监控线程去循环检查所有线程是否执行完
  • 如果是多个线程中的某个执行完了就要通知,如何实现?

欢迎大家在评论区留言讨论,喜欢的朋友记得关注我哦!

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表