并发包基石AQS和线程池实战
in JavaDevelop with 0 comment

并发包基石AQS和线程池实战

in JavaDevelop with 0 comment

并发包的基石 AQS 介绍

我们在上一章节讲了一些 JUC 包中的一些常见的并发工具类,我们以 CountDownLatch 和 ReentrantReadWriteLock 来看看它们的源码:

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer { ...} ...
}
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    ...
    final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer { ...} ...
}

我们可以很明显地看到这些并发工具类都使用到了一个相同的类:AbstractQueuedSynchronizer,简称 AQS,实际上,JUC 包中的大部分并发类,都直接或间接的依赖了 AQS,而 AQS 也称为 Java 并发包的基石,学习 AQS 也是每一个 Java 开发人员的必经之路。

AQS 原理解析

接下来我们给大家阐述一下 AQS 的原理是怎样的,在 AQS 中,维护着一个表示共享资源加锁情况的变量 volatile int state ,以及一个 FIFO(先入先出)的线程阻塞队列(称为 CLH 队列)。当多个线程并发访问共享资源时,如果共享资源已经被某个线程加了锁,那么其他线程在访问此共享资源时就会被加入到 CLH 队列中,原理如下图所示:

image-1658453192010

state 表示共享资源被线程加锁的次数。例如,当 state 的值为 1 时,就表示共享资源被某个线程加了一次锁;当 state 的值为 0 时,就表示共享资源没有被加锁,随时可以访问。AQS 类中提供了三种访问 state 的方法,如下表所示。

方法 简介
int getState() 获取 state 值
void setState(int newState) 直接设置 state 值
compareAndSetState(int expect, int update) 使用 CAS 算法,设置 state 值

除了加锁次数以外,并发的线程在访问共享资源时都会使用以下一种或两种加锁方式。

(1)其中 Exclusive 方式的加锁与解锁,在 AQS 源码中对应的实现方法,如下表所示。

方法 简介
boolean tryAcquire(int arg) 尝试获取资源,如果成功,就给该资源加 arg 个数量的锁,并独占该资源。
boolean tryRelease(int arg) 尝试释放资源。如果成功,就释放该资源的 arg 个数量的锁。
boolean isHeldExclusively() 判断当前线程,是否正在独占共享资源。

在这里我们以 ReentrantLock 为例来说明:ReentrantLock 的 state 初始时为 0(即共享资源没有被加锁)。当某个线程 A 调用 lock()方法时,lock()会在底层触发 tryAcquire(1),把该资源的 state 修改为 1,表示给该资源加了一把锁,之后就可以独占使用该资源。之后,其他线程如果再调用 lock()方法,就会失败并进入阻塞状态(因为 ReentrantLock 是独占方式,同一时间只能被一个线程加锁)。只有在线程 A 调用 unlock()(unlock()会触发 tryRelease(1),表示将 state 的值减 1,即把 state 的值设置为 0),也就是在把资源的锁释放了以后,其他线程才能访问该资源。简言之,当 state=0 时,表示资源未被加锁,任何线程都可以访问;当 state>0 时(ReentrantLock 是可重入锁,即同一个线程可以对某一资源多次加锁,因此 state 可以是一个任意的正数),表示资源已被加了锁,其他线程就不能访问。

(2)共享方式的加锁与解锁,在 AQS 源码中对应的方法如下表所示:

方法 简介
int tryAcquireShared(int arg) 尝试给该资源加 arg 个数量的共享锁,并访问该资源。 注意返回值是一个 int 类型: 返回负数:加共享锁失败,当前线程会进入 CLH 等待。 返回 0:当前线程加共享锁成功,但后续其他线程无法再加共享锁。 返回正数:当前线程加共享锁成功,并且后续其他线程也可以再加共享锁。
boolean tryReleaseShared(int arg) 尝试释放该资源的 arg 个数量的锁。

在这里我们以 CountDownLatch 为例,CountDownLatch 的构造方法 CountDownLatch(int count)可以将 state 的初始值设置为 count,并交给 count 个子线程去并发执行,与此同时主线程会进入阻塞状态。当每个子线程执行完毕后,都会调用一次 countDown(),countDown()会在底层调用 tryReleaseShared(1),即把 state 减 1。因此,当所有子线程全部执行完毕后,state 的值就会变为 0。而当 state=0 时,就会唤醒主线程,从而实现了闭锁的功能。

综合上面介绍的内容我们总结,在 Java 并发包提供的同步类(或者我们自定义的同步类)中,如果想要提供独占的访问方式,就只需要实现 AQS 的 tryAcquire()和 tryRelease()方法;如果要提供共享的访问方式,就只需要实现 AQS 的 tryAcquireShared()和 tryReleaseShared()方法;如果既要提供独占方式,又要提供共享方式,也只需要将以上四个方法全部实现即可,如 JUC 中的 ReentrantReadWriteLock 类就同时实现了独占和共享两种方式,其源码如下所示。

/* ReentrantReadWriteLock的源码 */
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    ...
    final Sync sync;
    ...
abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
        //独占方式解锁
        protected final boolean tryRelease(int releases) {  ... }
        //独占方式加锁
        protected final boolean tryAcquire(int acquires)  { ...}
        //共享方式解锁
        protected final boolean tryReleaseShared(int unused) {  ...  }
        //共享方式加锁
        protected final int tryAcquireShared(int unused) { ... }
            ...
     }
...
}

AQS 源码解读

之前介绍的 tryAcquire()、tryAcquireShared()等同步方法,在 AQS 中的源码如下所示:

protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}
protected boolean isHeldExclusively() { throw new UnsupportedOperationException();}
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException();}

这几个方法的具体含义,已在前面做过介绍。在源码中,如果操作成功,则返回 true;如果操作失败,就抛出一个 UnsupportedOperationException 异常。这是因为 AQS 本身是作为同步类的基类,充当的是一个设计者的角色(相当于接口的概念),因此 AQS 中方法都比较抽象。如果某个同步类实现了 AQS,就需要重写这些方法。我们以 ReentrantLock 为例说明,以下是 ReentrantLock 对其中一部分方法的重写。

...
public class ReentrantLock implements Lock, java.io.Serializable {
    ...
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc <0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
         }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
   }

    ...
    static final class NonfairSync extends Sync {... }
    ...
    static final class FairSync extends Sync {
    ...
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&  compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                return true;
             }
         }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc <0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
     }
    }
    ...
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
   ...
}

独占模式源码解读

现在开始,我们来学习 AQS 中对于“独占方式”的加锁以及释放锁的源码。前面讲过,如果线程 A 在访问某个资源时,发现该资源已被其他线程加了锁,那么线程 A 将会被加入到 CLH 等待队列。“加入 CLH 等待队列”这个动作,就对应于 AQS 源码中的 addWaiter(Node)方法,如下所示。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //先尝试以一种“快速方式”将访问失败的线程加入到CLH队尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
//如果“快速方式”入队失败,再通过enq()将线程加入到CLH队尾。
    enq(node);
    return node;
}
...
private Node enq(final Node node) {
//如果出现冲突,就根据CAS算法"自旋",直到Node成功被加入队尾
    for (;;) {
        Node t = tail;
//如果CLH为空,则new一个新的Node,并将head和tail都指向该Node
    if (t == null) {
        if (compareAndSetHead(new Node()))
            tail = head;
        } else {
//如果CLH不为空,直接将node加入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

上述代码中,Node 是对访问线程进行的封装,即 Node 包含了线程本身、线程在队列中的前驱节点/后继节点、线程在 CLH 中的等待状态等,我们可以看下 AbstractQueuedSynchronizer 的源码。

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
    static final class Node {
        //共享模式的Node
    static final Node SHARED = new Node();
        //独占模式的Node
    static final Node EXCLUSIVE = null;
    /*
Node的等待状态,共有CANCELLED、SIGNAL、CONDITION、PROPAGATE四种状态。初始时,
    waitStatus为0
*/
        volatile int waitStatus;
        /*
失效状态:如果在CLH中的线程等待超时或被中断,就需要从CLH中取消
该Node结点,并将该Node的waitStatus设置为CANCELLED。
注意,CANCELLED的值为1,也是所有状态中,唯一一个大于0的值。
*/
        static final int CANCELLED = 1;
        /*
可理解为“第二执行状态”:如果某一个Node的前驱结点正在加锁并占
用资源,当这个前驱结点释放锁后,就会将waitStatus=SIGNAL的Node中的
线程唤醒执行。也就是说,waitStatus=SIGNAL的Node,就是除了正在占用
资源的线程以外,第二个能够占用资源的Node。
*/
        static final int SIGNAL  = -1;
        /*
waitStatus=CONDITION的Node(记为Node-C),表示Node-C中的线程正在
等待某一个Condition;当其他线程调用了该Condition的signal()方法后,
就会将Node-C从等待队列转移到同步队列,等待获取同步锁。
*/
        static final int CONDITION = -2;
        //在共享模式中,waitStatus=PROPAGATE的Node中的线程处于可运行状态。
        static final int PROPAGATE = -3;
        //CLH中,当前等待Node的前驱节点
        volatile Node prev;
        //CLH中,当前等待Node的后继节点
        volatile Node next;
        //在CLH中,当前线程的Node
        volatile Thread thread;
        ...
}
    //指向CLH中的第一个结点,也就是正在占用资源的Node
    private transient volatile Node head;
    //指向CLH中的最后一个结点,也就是最近一个因为访问资源失败,而加入到CLH的Node
    private transient volatile Node tail;
    //资源状态,即资源被加锁的次数
    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }
    ..
}

我们从上面的源码中可以看到,AQS 的源码提供了 acquireQueued()和 acquire()方法。如果线程 A 访问资源失败,就会以 Node 的形式(记为 Node-A)被加入到 CLH 队尾,进入等待状态。那么,Node-A 在 CLH 中是如何等待又如何前移到 CLH 队头的呢?这些就是由 acquireQueued()方法所定义的。总体的讲,acquireQueued()会先判断当前 Node 是不是 CLH 中的第二个节点(即之前讲的“第二执行状态”)。如果是,就通过“自旋”不断去尝试占用资源;如果不是,则安心的处于等待状态,直到自己前移到第二个位置,因为每次步骤判断中都会有判断当前节点是否是第二节点的过程。这就好比我们在超市购物后,在收银台前等待结账的动作:如果我们是排队付款队列中的第二个人,那么我们就会时刻准备在前面第一个人结束付款后立即占用收银台(占用资源),这种“时刻准备”就是上面的“自旋”状态;反之,如果我们不是排队付款的前两个人,而是队列中处于较后的位置,那么我们就可以放心的休息一会,直到自己移动到了第二个位置。

我们下面来看看具体的 acquireQueued()相关的源码:

final boolean acquireQueued(final Node node, int arg) {
//是否成功获取到资源。需要注意此变量表示的是“failed”:如果成功获取,则返回false;
如果失败,则返回true。
    boolean failed = true;
    try {
//在等待的过程中,是否被中断过
        boolean interrupted = false;
//自旋
        for (;;) {
//获取当前结点的前驱结点。
            final Node p = node.predecessor();
/*
如果前驱结点是head(即第一个结点,也就是正在占用资源的结点),
那么自己就是第二个结点。因此,此时就通过自旋不断的尝试获取
资源(tryAcquire(arg))
*/
            if (p == head && tryAcquire(arg)) {
/*
如果成功获取到资源,则表示此时自己已经是第一个结点了。此时,
就将自己设置为头结点。
                */
setHead(node);
/*
因为当前结点的前驱p已经将资源使用完毕了,因此可以断开它的
引用,便于GC回收。
*/
                p.next = null;
failed = false;
                return interrupted;
            }
//如果当前结点不在前两个位置,则放心的等待,直到被唤醒(unpark())
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
//如果当前线程在等待的过程中被中断过,就将interrupted标记为true
                interrupted = true;
        }
    } finally {
    if (failed)
            cancelAcquire(node);
    }
}
...
/*
shouldParkAfterFailedAcquire()总体是说:如果当前Node不在前两个位置,那么应该可以安心的等待了。但是要考虑一些特殊情况:例如,如果前面的某些Node是无效状态,那么当轮到这些无效Node占用资源时,这些Node将会放弃占用,因此CLH会迅速切换到下一个Node。还是以在超市排队结账为例:如果我们不是前两个排队的人,那么一般来讲就可以放心休息一会儿。但是,如果在前面排队的人中,有一些并不是真正要结账的人呢?例如,一家四口都在排队,但当这四个人前移到队头时,可能仅仅有一个人付款买单,其他三个人会迅速出队。因此,我们的“放心休息”还需要两个条件:①我们需要在前面排队的人中,找到一个会真正付款买单的人,然后移动到这个人的后面(相当于插队,插在了那些仅仅排队、但不买单的人前面);②告诉前面这个真正买单的人“如果轮到你买单了,告诉一下我(唤醒当前线程),我就可以开始准备了(自旋、不断尝试占用资源)”。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱结点的等待状态
    int ws = pred.waitStatus;
/*
如果前驱结点的状态是SIGNAL,就表示这个前驱结点会在自己前移到第一位
(即占用资源状态)时,告知自己一下。这样当前结点就可以放心休息了。
*/
    if (ws == Node.SIGNAL)
        return true;
/*
如果前驱结点的等待状态>0,即waitStatus为CANCELLED失效状态时,当前结点就
一直前移,一直移动到一个真正等待的结点后面。
*/
    if (ws >0) {
    do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus >0);
        pred.next = node;
    } else {
    /*
如果前驱结点是正常等待状态,就把前驱的状态设置成SIGNAL(即告诉前驱:
当你移动到第一位占用资源时,告诉我一声,以便我开始“自旋”尝试获取资源)
*/
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
...
//当前Node放心等待的方法
private final boolean parkAndCheckInterrupt() {
//调用park()方法,让当前线程进入waiting状态
    LockSupport.park(this);
/*
如果被唤醒,还要检查是否是被中断的。(线程会在以下两种情况下被唤醒:
1.正常调用unpark()被唤醒;2.被interrupt()方法中断)
*/
    return Thread.interrupted();
}

acquire()

接下来,我们再来学习一下 AQS 中独占模式下线程获取共享资源的顶层方法 acquire(),源码如下所示。

public final void acquire(int arg) {
/*
尝试独占资源的流程如下所示:
 ①如果执行tryAcquire(arg)后结果为true,则表示当前线程将资源独占成功,并直接结束此方法;
 ②addWaiter():如果独占失败,就将该线程的Node标记为独占模式,并加入到CLH的队尾;
 ③因为是独占模式,当前Node独占失败后就会在CLH中等待(即处于阻塞状态),直到等待结束后成功独占资源;
 ④如果当前Node独占失败,并且在CLH中等待的过程中出现了中断,就还需要执行selfInterrupt()。
*/
    if (!tryAcquire(arg) &&  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire()方法的执行逻辑如下图所示:

image-1658453230912

大家可以尝试用此方法的逻辑,去理解 Lock 接口中的 lock()方法。acquire()是获取共享资源的顶层方法(即加锁操作),与之相反的 release()就是释放锁的顶层方法。例如每执行一次 release(1),就会将共享资源上加锁的次数减 1。如果减到 state=0,就说明共享资源被彻底释放了,此时就会唤醒 CLH 中下一个等待的线程。

release()在 AQS 中的源码如下所示:

public final boolean release(int arg) {
//通过tryRelease(arg),释放加在共享资源之上的arg个锁。如果释放成功,则返回true
    if (tryRelease(arg)) {
//获取CLH中的头结点
        Node h = head;
//唤醒CLH中的下一个结点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

其中,用于唤醒 CLH 中下一个结点的 unparkSuccessor()方法的源码如下所示。

//唤醒CLH中下一个“正常等待状态的”结点
private void unparkSuccessor(Node node) {
    //获取正在占用资源的当前结点
    int ws = node.waitStatus;
//将结点的状态,通过CAS恢复成初始值0
    if (ws <0)
    compareAndSetWaitStatus(node, ws, 0);

    //获取下一个结点,也就是下一个即将被唤醒的结点
    Node s = node.next;
/*
如果下一个结点是null,或是失效状态(关闭状态即waitStatus=1,且是唯一一个>0的状态值)
就将下一个结点设置为null;并从尾部往前遍历,直到找到一个处于正常等待状态的结点,
进行唤醒。
*/
    if (s == null || s.waitStatus >0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
//waitStatus <= 0的节点,都是正常等待状态的节点
            if (t.waitStatus <= 0)
            s = t;
    }
    if (s != null)
//通过unpark()方法,唤醒下一个结点
        LockSupport.unpark(s.thread);
}

以上的这些内容就是我们对于独占模式加锁和解锁的源码解读。而对于共享模式,加锁的顶层方法是 acquireShared(int),解锁的顶层方法是 releaseShared(int),思路与独占模式大体相同,大家可以尝试自行阅读这些源码。

实战线程池

我们现在重点来学习线程池的概念,与数据库连接池的原理类似,线程池就是将多个线程对象放入一个池子中,之后就可以从该池子中获取、使用以及回收线程。在大家学习线程池之前,还需要先明确以下两点:

五种线程池的创建方式

在实际开发时,我们可以根据具体的业务需求通过 JUC 提供的 Executors 类,来创建各种类型的线程池。各个类型的线程池都有自己的独特之处,其中最常用的五种类型的线程池的创建方式如下所示。

以上方式所创建的线程池(即以上方法的返回值)可以分为两类:前三种方式会创建 ExecutorService 类型的线程池;后两种方式会创建 ScheduledExecutorService 类型的线程池(继承自 ExecutorService)。接下来我们具体讲述一下 ExecutorService 和 ScheduledExecutorService 的具体使用方法。

ExecutorService 线程池

我们可以通过 submit()方法向 ExecutorService 中提交 Callable 或 Runnable 类型的线程任务,任务提交后就会被线程池中的线程领取并执行。线程执行完毕后,会将结果返回到 Future 对象中,具体如下。

ExecutorService 在 JDK 中的源码如下所示。

public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T>task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
void shutdown();
List<Runnable> shutdownNow();
...
}

我们从源码中可以发现,在 ExecutorService 的源码中有 2 个用于关闭线程池的方法 shutDown()和 shutdownNow(),二者的区别如下。

ScheduledExecutorService 线程池

我们可以通过 schedule()方法向 ScheduledExecutorService 中提交 Callable 或 Runnable 类型的线程任务。特殊的是,用户可以设置此线程池中任务的执行时间。

综合上面的内容我们可以总结线程池的使用步骤就是:

常用线程池的应用实例与解析

下面我们通过使用一些经典的示例来演示线程池的具体使用,大家可以从中总结各种类型的线程池的各自应用场景。

首先我们可以创建一个固定数量的线程池实现线程池中的线程数量固定为 6,并且向此线程池中提交 6 个任务,每个任务都是计算 1-10 的和,最后将这 6 个任务的计算结果分别打印出来。

我们在进入线上环境之后先在终端中输入以下的命令创建一个项目 demo:

mkdir demo demo/bin demo/lib demo/src

然后在路径 src/ 下面创建 TestThreadPoolWithCallable.java 文件。

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestThreadPoolWithCallable {
    public static void  main(String[] args) throws InterruptedException, ExecutionException
    {
        ExecutorService pool = Executors.newFixedThreadPool(6);
        ArrayList<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
        for (int i = 0; i <6; i++) {
            //对submit(Callable<T> task)使用了jdk8提供的lambda表达式
            Future<Integer> result = pool.submit(() ->{
                //向线程池中提交6个任务(每个任务:求1-10之和)
                int sum = 0;
                for (int j = 0; j <= 10; j++) {
                    sum += j;
                }
                return sum;
            });
            /*
            在submit()中的线程返回结果前,futureList.add()会一直处于阻塞状态
            */
            futureList.add(result);
        }
        for (Future<Integer> future : futureList) {
            //获取并打印各个任务的结果
            System.out.println(future.get());
        }
        pool.shutdown();
    }
}

然后在终端中输入相应的指令编译并运行这个程序,运行的效果和指令过程如下图所示:

image-1658453267530

线程池的流程控制

这里我们再创建一个线程池,线程池中的线程数量固定为本机 CPU 核数,要求通过线程池实现以下需求。

我们还是在路径 src/ 下面新建 TestPool.java 文件。

import java.util.ArrayList;
import java.util.concurrent.*;

public class TestPool {
    public static void main(String[] args) throws Exception {
        Future<String> result = null;
        ScheduledExecutorService schedulPool
                = Executors.newScheduledThreadPool(
                Runtime.getRuntime().availableProcessors());
        ArrayList<Future<String>> results = new ArrayList<Future<String>>();
        for (int i = 0; i <2; i++) {
        /*
           schedule(a,b,c)三个参数的含义:
           a:向线程池中提交的任务;
           b:该任务等待多长时间之后,才会被执行
           c:b的时间单位
        */
            result = schedulPool.schedule(new ThreadTask("thread"+i),
                    (int)(Math.random()*10), TimeUnit.SECONDS);
            //存储各个线程的执行结果
            results.add(result);
        }
        //打印结果
        for(Future<String> res: results){
            System.out.println(res.isDone() ? "已完成":"未完成");
            System.out.println("等待线程执行完毕后,返回的结果: "+ res.get());
        }
        schedulPool.shutdown();
    }
}

/* 线程任务类 */
class ThreadTask implements Callable<String> {
    private String tname;

    public ThreadTask(String tname) {
        this.tname = tname;
    }

    @Override
    public String call() throws Exception {
        //获取当前线程的名字
        String name = Thread.currentThread().getName();
        long currentTimeMillis = System.currentTimeMillis();
        System.out.println(name + " - 【"+ tname + "】 启动时间:"+ currentTimeMillis);
        //模拟线程执行
        Thread.sleep((long) Math.random() * 2000);
        System.out.println(name + " - 【"+ tname + "】 正在执行...");
        return name + " - 【"+ tname + "】";
    }
}

然后我们编译并运行这个文件:

image-1658453289386

得到运行的结果如下所示:

image-1658453300717

本程序有 3 个线程:main 线程、以及 for 循环中生成的 2 个 TaskCallable 类型的线程。产生此结果的一种可能原因:

我们不仅要会使用这些提供给我们的线程池,还要学会自定义的线程池的构建。

自定义线程池的构建原理与案例详解

如果已有的线程池都不能满足业务的需求,那么我们就可以通过 ThreadPoolExecutor 来自定义一种类型的线程池。具体来说,可以通过 ThreadPoolExecutor 的构造方法来创建一个自定义的线程池对象,并且通过 execute 向池中提交无返回值的任务(类似于 run()方法),或者使用 submit()向池中提交有返回值的任务(同样是用 Future 接收返回值)。ThreadPoolExecutor 在 JDK 中的部分源码如下。

public class ThreadPoolExecutor extends AbstractExecutorService {
//根据不同的参数个数,一共有四种用于创建ThreadPoolExecutor对象的构造方法
...
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable>workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        ...
}
//处理无返回值的任务
public void execute(Runnable command) {...}
//处理有返回值任务的submit()方法,继承自父类AbstractExecutorService
    ...
}

其中,构造方法中各个参数的含义如下所示。

package java.util.concurrent;
public interface RejectedExecutionHandler
{
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

该接口的 4 个实现类,就是 4 种拒绝策略,分别如下。

另外我们还可以自定义一个实现了 RejectedExecutionHandler 接口的类,即自定义拒绝策略。接下来我们通过示例演示一个自定义线程池的具体使用。

我们还是在路径 src/ 下面新建 TestMyThreadPool.java 文件。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestMyThreadPool {
    public static void main(String[] args) {
        /*
         ThreadPoolExecutor()的参数含义如下所示:
         ①核心线程:1,如果只有1个任务,会直接交给线程池中的这一个线程来处理
         ②最大线程数:2,如果任务的数量>(核心线程数1+workQueue.size()),且任务的
数量<=最大线程数2+workQueue.size()之和时,就将新提交的任务交给非核心线程处理。
        后文会结合程序详细解释。
         ③最大空闲时间:10
         ④最大空闲时间的单位:秒
         ⑤任务队列:有界队列ArrayBlockingQueue,该队列中可以存放3个任务
         ⑥拒绝策略:AbortPolicy(),当提交任务数>(最大线程数2+workQueue.size())时,
        任务会交给AbortPolicy()来处理
       */
        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 10,
                TimeUnit.SECONDS,
                //new LinkedBlockingQueue<Runnable>()
                new ArrayBlockingQueue<Runnable>(3),
                //, new MyRejected()
                new ThreadPoolExecutor.AbortPolicy()
        );
        MyThread t1 = new MyThread("t1");
        MyThread t2 = new MyThread("t2");
        MyThread t3 = new MyThread("t3");
        MyThread t4 = new MyThread("t4");
        MyThread t5 = new MyThread("t5");
        MyThread t6 = new MyThread("t6");
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);
        pool.execute(t6);
        pool.shutdown();
    }
}
class MyThread implements Runnable {
    private String threadName;
    public MyThread(String threadName){
        this.threadName = threadName;
    }
    @Override
    public void run() {
        try {
            System.out.println("threadName :"+ this.threadName);
            System.out.println(threadName+"执行了1秒中...");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public String getThreadName() {
        return threadName;
    }
    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }
}

然后我们对这个程序类进行编译并运行,可以得到下面的运行结果,下图包含了操作的步骤即指令。

image-1658453373001

(1)如果执行 pool.execute(t1) ,此时仅仅提交了一个 t1 任务,线程池中正好有 1 个核心线程能够处理,因此会直接运行。

(2)如果从 pool.execute(t1) 执行到 pool.execute(t2) ,此时提交了一个 2 个任务,但线程池中仅有 1 个核心线程能够处理一个任务,因此另一个任务会被放入 ArrayBlockingQueue 中等待执行。t1 在执行完毕一秒中之后,t2 才会得到运行。

(3)如果从 pool.execute(t1) 执行到 pool.execute(t4) ,由于 ArrayBlockingQueue 中能同时容纳 3 个任务,因此此次的执行的逻辑和(2)相同,大家可以自行分析并尝试。

(4)如果从 pool.execute(t1) 执行到 pool.execute(t5) ,线程池在运行时:第一个入池线程任务 t1 会被交给唯一的核心线程立刻执行,t2 到 t4 会放入 ArrayBlockingQueue 队列中等待执行,而最后一个 t5 由于核心线程和 ArrayBlockingQueue 都已饱和、但最大线程数中还有一个非核心线程空闲,因此 t5 就交给了这个空闲线程立刻执行。也就是说,t1 和 t5 都会被线程立刻执行,而 t2、t3、t4 被放到了 ArrayBlockingQueue 中等待执行。

(5)如果从 pool.execute(t1) 执行到 pool.execute(t6) ,线程池在运行时,任务数(6 个)已经大于最大线程数(即 2)+ArrayBlockingQueue 长度(即 3)之和,因此会被拒绝策略 AbortPolicy 拒绝提交。

本次是将等待的任务放入了 ArrayBlockingQueue 中,ArrayBlockingQueue 是一个有界队列(队列的长度是有限制的,例如本次的长度是 3,表示最多能放 3 个任务)。此外,我们还可以使用长度为 Integer.MAX_VALUE 的 LinkedBlockingQueue(或无界队列),这样一来就可以存放足够多的等待着的任务,线程池也不会因为任务超额而触发拒绝策略。

无界队列改写 TestMyThreadPool.java

我们考虑来使用无界队列来改写 TestThreadPool 测试类,如果使用了无界队列,那么 ThreadPoolExecutor 构造方法中的参数“最大线程数”应该设置为多大?答案是只要比核心线程数大就可以,大家可以思考一下原因。

最后,我们再将本程序的 AbortPolicy()策略,替换为自定义拒绝策略,修改 TestMyThreadPool.java ,此时的 TestMyThreadPool.java 测试类如下所示:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestMyThreadPool {
    public static void main(String[] args) {
        /*
         ThreadPoolExecutor()的参数含义如下所示:
         ①核心线程:1,如果只有1个任务,会直接交给线程池中的这一个线程来处理
         ②最大线程数:2,如果任务的数量>(核心线程数1+workQueue.size()),且任务的
数量<=最大线程数2+workQueue.size()之和时,就将新提交的任务交给非核心线程处理。
        后文会结合程序详细解释。
         ③最大空闲时间:10
         ④最大空闲时间的单位:秒
         ⑤任务队列:有界队列ArrayBlockingQueue,该队列中可以存放3个任务
         ⑥拒绝策略:AbortPolicy(),当提交任务数>(最大线程数2+workQueue.size())时,
        任务会交给AbortPolicy()来处理
       */
        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                , new MyRejectPolicy()//使用自定义拒绝策略
        );

        MyThread t1 = new MyThread("t1");
        MyThread t2 = new MyThread("t2");
        MyThread t3 = new MyThread("t3");
        MyThread t4 = new MyThread("t4");
        MyThread t5 = new MyThread("t5");
        MyThread t6 = new MyThread("t6");
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);
        pool.execute(t6);
        pool.shutdown();
    }
}
class MyThread implements Runnable {
    private String threadName;
    public MyThread(String threadName){
        this.threadName = threadName;
    }
    @Override
    public void run() {
        try {
            System.out.println("threadName :"+ this.threadName);
            System.out.println(threadName+"执行了1秒中...");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public String getThreadName() {
        return threadName;
    }
    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }
}
class MyRejectPolicy implements RejectedExecutionHandler {
    public MyRejectPolicy() {
    }
    //自定义拒绝方法
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("被拒绝的线程名:"+ ((MyThread) r).getThreadName());
    }
}

然后我们编译并运行上面的改进过后的测试类,步骤即运行结果如下所示:

image-1658453391054

可见现在程序的运行结果执行的是我们自定义的拒绝方法,接下来我们介绍一下异步模型和时间驱动模型的知识点。

异步模型和事件驱动模型

在 Ajax、Netty 等异步框架中,经常会涉及异步模型和事件驱动模型,实际上这两个模型也是理解“异步”的根基,下面我们会针对这两个模型分别进行介绍。

异步模型

异步是和同步相对的,先来了解一下同步的概念。在 MVC 模式中,我们经常使用的 Controller 大多是同步的,例如 Servlet、Struts2、SpringMVC 等。这些 Controller 会在接收到请求后,立刻去处理这些请求并返回响应。在这里我们以 SpringMVC 为例,以同步方式处理请求的示例代码如下所示。

@RequestMapping("/queryStudent/{stuNo}")
public ModelAndView queryStudentByNo(@PathVariable("stuNo")Integer stuNo){
  Student student = studentService.queryStudentByNo(stuNo);
  ModelAndView mv = new ModelAndView("success");
  mv.addObject("student",student);
  return mv;
}

结果页 success.jsp 的源码如下。

${requestScope.student.stuNo }

以上,请求的一方向 Controller 发出请求(如 localhost:8080/项目名/queryStudent/9527),之后 Controller 的 queryStudentByNo()方法接收到该请求后,就会调用 studentService.queryStudentByNo(9527)方法处理该请求,直到该方法全部执行完毕后,再将结果通过 success 页面响应给了请求方。即请求方在发出请求后,会一直等待该请求的结果,只有等到本次请求的响应结果后,才能再发出其他请求。大家不难发现,这种同步方式的处理流程是自上而下顺序执行的。

image-1658453403611

而异步是指当请求方发出请求后,请求方不会去等待此次请求的响应结果,而是直接可以再发出其他请求。如果某次发出的请求被处理完毕,处理方法会借助于监听器等方式以“回调”的形式去通知请求方,如下图所示。

image-1658453416274

因此,异步方式不会去等待请求的响应结果,也就不会发生阻塞,更加适合高并发情形。

事件驱动模型

很多异步请求采用的都是事件驱动模型。简单的说,事件驱动就是预先设置一个个方法(通常为回调方法),然后将这些方法和一个个动作(即事件)一一对应起来。之后,如果发生了某个动作,就会自动触发相应的方法。例如 JavaScript 中的单击事件:当用户单击了鼠标之后,就会自动触发 onclick 所指定的方法。再比如在 jQuery AJAX 中,也预先定义了异步请求中各个不同阶段的不同事件方法,如 ajaxStart()、ajaxSend()、ajaxSuccess()、ajaxComplete()和 ajaxStop()等,当请求处于事件方法所定义的阶段时,就会自动触发相应的方法,如下图所示。

image-1658453427496

类似以上这些方法“当达到某个阶段时,就会执行预先设置的相应动作”就是事件驱动机制。本章节的知识点内容和实验讲解就是以上的这些内容,希望大家多加以理解和实践掌握。