JVM 核心概念以及 JVM 对高并发的支持
大家在学习 Java 的技术途径中应该都听过 JVM 虚拟机的概念,JVM 是安装在操作系统之上的 Java 虚拟机,Java 代码直接操作的对象就是 JVM(而不是操作系统)。不论是哪种操作系统,只要安装了 JVM 就能屏蔽各种来自操作系统的差异性,而以一种相同规范的“虚拟机”形式来和 Java 字节码交互。因此 Java 就可以实现“一次编写,到处运行”的平台无关性。在这里给出一张 JVM 与其他系统的关系图,接下来我们看看 JVM 的内存区域和内存模型。
JVM 内存区域
JVM 在运行时,会将其管理的内存区域划分为方法区、堆、虚拟机栈、本地方法栈和程序计数器等五个区域。其中方法区和堆是所有线程可以共享的区域,而虚拟机栈、本地方法栈、程序计数器是各个线程私有的,我们把 JVM 的内存划分用下面的图进行展示。
JVM 内存模型
Java Memory Model 简称 JMM 即 JVM 的内存模型,用于定义程序中变量的访问规则:在 JVM 中如何将变量存储到内存,以及如何从内存中获取变量(此处的变量是指能被所有线程共享的变量,不含线程私有的局部变量和方法参数)。与 JVM 内存区域不同,JMM 是从另一个角度对内存进行了划分,分为主内存和工作内存。
JMM 规定所有的变量都存储在主内存中,每个线程还拥有自己独立的工作内存。主内存中的变量会通过复制留给线程的工作内存一个副本,供各个线程独立使用。也就是说我们需要注意的是,线程对变量的所有读写操作都是在工作内存中进行的,工作内存中的副本变量会通过 JMM 与主内存中的原变量保持同步。
除此以外,各个线程在运行期间必须遵循以下规定:
- 只能访问自己工作内存中的变量;
- 无法直接访问其他线程工作内存中的变量。
若想要访问其他线程中的变量可以通过主内存,间接访问其他线程工作内存中的变量。例如,假设线程 B 要访问线程 A 中的变量 a,经历的大致步骤是:
- 等待线程 A 将工作内存中变量 a 的副本更新到主内存中。
- 从主内存中,将更新后的变量 a 拷贝到线程 B 的工作内存中。
不同的线程之间在进行数据交互地时候完整的要经历以下的路线:
- Lock:把主内存中的变量,标识为一条线程独占的状态。
- Read:把主内存中的变量读取到工作内存中。
- Load:将该变量放入变量副本中。
- Use:把变量副本传递给线程使用。
- Assign:把线程正在使用的变量,传递到工作内存中的变量副本中。
- Store:把工作内存中的变量副本传递到主内存中。
- Write:将该变量副本作为一个变量放入主内存中。
- Unlock:解除线程的独占状态。
JVM 要求了以上的八个步骤的操作都是原子性的,但是对于 64 位的数据类型却有着非原子性协议:JVM 允许 64 位的 long 和 double 在执行 load、store、read、和 write 操作时,分为两次 32 位的原子性操作。这就意味着,如果多个线程共享一个 long 或 double 类型时,某一个线程理论上可能读到半个 long 或 double 值。如果真的遇到这种错误情况,读者就可以使用 volatile 关键字来避免 JVM 的这种误操作。接下来我们就来看看 volatile 是怎样解决可见性和重排序的问题的。
使用 volatile 解决可见性和重排序的问题
volatile 是 JVM 提供的一个轻量级的同步机制,除了能够“避免 JVM 对 long 或者 double 的误操作”以外,还有以下两个作用。
volatile 修饰的变量可以对所有线程立即可见。前面讲过,不同线程如果要访问同一个变量,就必须借助主内存进行传递;但是如果给变量加了 volatile 关键字,则该变量的值就可以被所有线程及时感知(即某一个线程对 volatile 变量进行的任何操作,都会第一时间同步到其他线程中)。
volatile 可以禁止指令进行“重排序”优化。
理解原子性
我们需要知道的是,在 Java 中并不是所有语句都是原子性的,如果已经存在变量 num,那么对 num 的赋值语句 num=10
就是一个原子性操作,但是如果不存在 age,声明并赋值 age 的语句 int age = 23
就不是一个原子性的操作,该语句会在最终执行的时候分成先后执行 int age;
和 age = 23;
这两句的代码。
重排序是指 JVM 为了提高执行效率,会对我们编写的代码进行一些额外的优化。例如,会对我们已经写完的代码指令重新进行排序。重排序所实现的优化不会影响单线程程序执行结果。
1. int height = 10 ;
2. int width ;
3. width = 20 ;
4. int area = height * width ;
上面的这段代码实际执行的顺序可以是 1、2、3、4,也可以是 2、3、1、4,还可以是 2、1、3、4 等,因为这几种可能的最终执行结果都是相同的。
单例模式大家应该都了解过,下面给大家一个基于双重检查方式的懒汉式单例模式。
1.public class Singleton {
2. private static Singleton instance = null;//多个线程共享instance
3. private Singleton() {}
4. public static Singleton getInstance() {
5. if (instance == null){
6. synchronized(Singleton.class){
7. if (instance == null)
8. instance = new Singleton();
9. }
10. }
11. return instance;
12. }
13. }
上述代码的第 8 行也不是一个原子性操作,JVM 会在执行时将这条语句大致拆分为以下 3 步。
- 分配内存地址、空间地址
- 使用构造方法实例化对象
- 将 instance 赋值为第一步分配好的内存地址
由于重排序,第 8 行的内部执行顺序可能是按照上面的顺序来执行的,也可能是分配内存地址、空间地址,将 instance 赋值为第一步分配好的内存地址,使用构造方法实例化对象的顺序。如果是后者,我们思考一下,当某一个线程 X 正在执行第 8 行,具体是刚执行完 c 但还没执行 b 时(即 instance 虽然已被赋了值、不再为 null,但还没有实例化),另一个线程 Y 正好此时抢占了 CPU 并且执行到第 5 行、判断 instance 不为 null,因此线程 Y 会直接返回 instance 对象,但此 instance 却是线程 X 还没有实例化的对象,所以后续在使用 instance 时就会出错。
为了避免这种因为 JVM 重排序而造成的问题,我们就可以给 instance 加上 volatile 关键字,即:
private volatile static Singleton instance = null;
这样一来,就算是真正意义地实现了单例模式。大家现在总结一下单例模式这个概念了综合上面所讲的这些内容。
内存屏障
volatile 是通过“内存屏障”来防止指令重排序的,具体的实现步骤如下所示:
- 在 volatile 写操作前,插入一个 StoreStore 屏障;
- 在 volatile 写操作后,插入一个 StoreLoad 屏障;
- 在 volatile 读操作前,插入一个 LoadLoad 屏障;
- 在 volatile 读操作后,插入一个 LoadStore 屏障。
所以 volatile 就是通过这样子的内存屏障的方式来实现防止指令重排的。另外我们还需要注意的一点就是虽然 volatile 修饰的变量具有可见性,但是并不具备原子性,因此单独使用 volatile 不是线程安全的,大家要理解这点就得将“原子性”和“重排序”的概念区分开来,后面我们会通过一个具体的实验测试来说明一下 volatile 不具备线程安全性的原因并且如何改造成线程安全的,然后说明一下在 Java 的并发包中提供的哪些原子性变量对象。
接下来我们就利用线上环境进行本章节的实验吧
volatile 的非原子性
本次实验课程我们使用 VS Code 环境来进行相关的演示操作,若要了解更多关于 VS Code 的内容,请参阅 VS Code 环境使用指南。
首先我们还是进入到 WebIDE 的云上环境中去,此时工作的目录会切到 peoject 路径下面,此时我们需要新建一个模板项目,我们可以在这个终端里面输入以下的命令:
mkdir demo demo/src demo/bin demo/lib
我们在 src 文件夹下右键新建 TestVolatile_1.java
文件,可以查看此时的项目结构:
编写 TestVolatile_1.java
public class TestVolatile_1 {
public static volatile int num = 0;
public static void main(String[] args) throws Exception {
for (int i = 0; i <100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <20000; i++) {
num++;//num++不是一个原子性操作
}
}
}).start();
}
Thread.sleep(3000);//休眠3秒,确保创建的100个线程都已执行完毕
System.out.println(num);
}
}
然后我们编译这个文件并运行:
javac -d bin/ src/TestVolatile_1.java
cd bin
java TestVolatile_1
可以看到第一次运行的结果:
然后我们再运行一次:
java TestVolatile_1
我们发现第一次的 num 的结果是 1180586,而第二次的运行结果是 1087036,很明显两次是不一样的。
初始 num=0
时,我们创建了 100 个线程,并且每个线程都会执行 20000 次 num++s
。因此如果 volatile 是线程安全的,那么最终应该打印 2000000,但实际结果并非如此,从运行结果可以发现,volatile 并不能将所修饰的 num 设置为原子性操作(如 num++
就不是原子性操作),因此会造成 num++
被多个线程同时执行,最终导致了漏加的线程不安全情况(即最终的结果值远小于 2000000)。
如果要将本程序改进为线程安全,就可以使用 java.util.concurrent.atomic
包中提供的原子类型,我们在 src 目录下面继续创建 TestVolatile_2.java
文件。
TestVolatile_2.java
我们从 TestVolatile_2.java
来看 atomic 原子性。
import java.util.concurrent.atomic.AtomicInteger;
public class TestVolatile_2 {
public static AtomicInteger num = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
for (int i = 0; i <100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <20000; i++) {
num.incrementAndGet() ;// num自增,功能上相当于int类型的num++操作
}
}
}).start();
}
Thread.sleep(3000);//休眠3秒,确保创建的100个线程都已执行完毕
System.out.println(num);
}
}
因为刚执行完上个文件,我们的工作目录在 bin 下面,我们现在编译并执行 TestVolatile_2.java
。
cd ..
javac -d bin/ src/TestVolatile_2.java
cd bin
java TestVolatile_2
可见此时的运行结果是 2000000 和预期一样,除了本例使用的 AtomicInteger 以外,在 java.util.concurrent.atomic
包中还提供了形如“AtomicXxx”的其他原子性变量对象。观察 AtomicInteger 的源代码,可以看到一个 compareAndSet()方法,其源码如下所示。
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
此方法就是实现原子性操作的关键,它实现了 CAS 算法,而 CAS 算法就能够保证变量的原子性操作。
同步机制-并发售票问题
解决并发环境下线程安全问题的最基本策略就是使用“锁”。在这里我们将向大家介绍如何用各种方式的“锁”来实现同步机制,从而保障共享资源在高并发环境中的线程安全性。
synchronized 解决并发售票问题与死锁演示
当多个线程同时访问同一个资源(对象、方法或代码块)时,经常会出现一些“不安全”的情况。例如,假设有 100 张火车票,同时在被 t1 和 t2 两个站点售卖,就可能会出现火车票数据的“不安全”情况,代码如下:
public class ThreadDemo01 implements Runnable {
//100张火车票
private int tickets = 100;
@Override
public void run() {
while (true) {
sellTickets();//调用售票方法
}
}
//售票方法
public void sellTickets() {
if (tickets >0) {
/*打印线程名和剩余票数(假设剩余票数就是该车票的编号,例如剩余票数为100,
就表示此时正在售卖的票的编号为100)*/
System.out.println(Thread.currentThread().getName() + tickets);
tickets--;
}
}
public static void main(String[] args) {
ThreadDemo01 t = new ThreadDemo01();
//创建两个线程并执行
Thread t1 = new Thread(t);
Thread t2 = new Thread(t);
t1.setName("t1售票站点");
t2.setName("t2售票站点");
t1.start();
t2.start();
}
}
我们在 src 文件夹下新建 ThreadDemo01.java
文件,并把上面的代码复制进去。按照上一个实例的运行办法我们这里依次输入以下的命令:
# 确保当前的工作路径在/demo/下面
cd ..
javac -d bin/ src/ThreadDemo01.java
cd bin
java ThreadDemo01.java
课件运行效果如下:
可以发现,t1 和 t2 两个线程同时销售了编号为 100 的车票,显然是不对的。造成这种错误的原因是 t1 和 t2 在争夺资源(即变量 tickets)时,同时执行了 sellTickets()方法,如下:
public void sellTickets(){
if(tickets > 0){
System.out.println(Thread.currentThread().getName + tickets);
tickets--;
}
}
初始时 tickets=100
,我们假设 t2 刚刚执行完打印的过程但还没有执行 tickets--
时,t1 也去执行了打印的过程,就会出现重复打印 ticket=100
的情况。“非线程安全”就是指这种由于线程的异步特性而造成的并发问题。为了解决这种问题,我们就可以使用 synchronized 关键字给共享的资源加锁。
synchronized 关键字
具体的讲,可以使用 synchronized 来给方法或代码块加锁,语法如下所示。
(1)给方法加锁
访问修饰符 synchronized 返回值 方法名() {
...
}
(2)给代码块加锁
synchronized(任意对象){
...
}
对于上锁的理解,我们可以给出一个比较有趣的场景作对比来理解给代码块上锁时传入的任意对象:
例如有多个人(多线程)去使用卫生间,如果某一个人(线程)已经占用了卫生间,那么他就可以给卫生间门口挂个牌子“有人”、或者他也可以把卫生间门口的提示灯打开表示有人。因此,无论“牌子”、“提示灯”还是其他物体都没关系,只要能告知其他人(其他线程)该卫生间已被占用就可以了。同样的,在多线程看来,无论用什么对象,都可以实现加锁的目的。
共享资源加锁或者解锁的时机
(1)加锁
当某一个线程开始访问时某个资源时,该线程就会对这个资源加锁,之后就会独占使用该资源。
(2)解锁
当满足以下任一条件时,独占该资源的线程就会对该资源进行解锁:线程将资源访问完毕时、线程访问资源出现了异常时。
我们只需要给 sellTickets()方法加上 synchronized,就可以保证“线程安全”,如下所示。
public synchronized void sellTickets() {
if (tickets >0) {
System.out.println(Thread.currentThread().getName() + tickets);
tickets--;
}
}
所以我们在这里总结一下什么时候加 synchronized 关键字。
- 当某一个资源被共享时,就可以考虑给该资源加上 synchronized,确保线程安全;但如果某资源不是共享资源(不会被多个线程共用),就不需要加。
- 当被加了锁的资源在执行过程中出现异常时,锁也会被释放掉。因此,在并发程序中一定要将异常及时处理,否则会影响并发的逻辑。
- 如果给某个资源加了锁,在多线程共享时要注意避免死锁的逻辑。例如,有两个共享资源 resource1 和 resource2,如果某一时刻线程 1 给 resource1 加了锁并同时等待使用 resource2,而与此同时,线程 2 也给 resource2 加了锁并在等待使用 resource1,这样便形成了死锁,两个线程会一直处于等待状态(都在等待对方释放资源)。
产生死锁的原因
产生死锁的根本原因有两个:
- 系统资源有限,例如,如果有多个 resource,那么线程 1 和线程 2 各自就能够获取一个 resource,自然就不会出现死锁;
- 多个线程(或进程)之间的执行顺序不合理。我们可以通过“打破死锁的四个必要条件”、“银行家算法”等方式来避免死锁的产生。
使用线程通信、队列及线程池模拟生产消费者场景
线程通信模拟生产者消费者
多个线程在争夺同一个资源时,为了让这些线程协同工作、提高 CPU 利用率,就可以让线程之间进行通信,具体可以通过 wait()和 notify()(或 notifyAll())实现,这些方法的含义如下所示。
- wait():使当前线程处于等待状态(阻塞),直到其他线程调用此对象的 notify()或 notifyAll()方法。
- notify(): 唤醒在此对象监视器上等待的单个线程;如果有多个线程同时在监视器上等待,则随机唤醒一个。
- notifyAll(): 唤醒在对象监视器上等待的所有线程。
简言之,wait()会使线程阻塞,而 notify()或 notifyAll()可以唤醒线程、使之成为就绪状态。
此外,在实际使用这些方法时,还要注意以下几点。
- 这三个方法都是在 Object 类中定义的 native 方法,而不是 Thread 类提供的。这是因为 Java 提供的锁是对象级的,而不是线程级的。
- 这三个方法都必须在 synchronized 修饰的方法(或代码块)中使用,否则会抛异常
java.lang.IllegalMonitorStateException
。 - 在使用 wait()时,为了避免并发带来的问题,通常建议将 wait()方法写在循环的内部。JDK 在定义此方法时,也对此增加了注释说明,如下是 Object 类的部分源码。
接下来我们在 src 文件夹中创建 ProducerAndConsumer.java
文件。
//car的库存
class CarStock {
//最多能存放20辆车
int cars;
//通知生产者去生产车
public synchronized void productCar() {
try {
if (cars <20) {
System.out.println("生产车...."+ cars);
Thread.currentThread().sleep(100);
cars++;
//通知正在监听CarStock并且处于阻塞状态的线程(即处于wait()状态的消费者)
notifyAll();
} else {//超过了最大库存20
/*使自己(当前的生产者线程)处于阻塞状态,等待消费者
消执行car--(即等待消费者调用notifyAll()方法)*/
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void consumerCar() {//通知消费者去消费车
try {
if (cars >0) {
System.out.println("销售车...."+ cars);
Thread.currentThread().sleep(100);
cars--;
notifyAll();
//通知正在监听CarStock并且处于阻塞状态的线程(即处于wait()状态的生产者)
} else {
/*使自己(当前的消费者线程)处于阻塞状态,等待消费
者消执行car++(即等待生产者调用notifyAll()方法)*/
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//生产者
class CarProducter implements Runnable {
CarStock carStock;
public CarProducter(CarStock clerk) {
this.carStock = clerk;
}
@Override
public void run() {
while (true) {
carStock.productCar(); //生产车
}
}
}
//消费者
class CarConsumer implements Runnable {
CarStock carStock;
public CarConsumer(CarStock carStock) {
this.carStock = carStock;
}
@Override
public void run() {
while (true) {
carStock.consumerCar();//消费车
}
}
}
//测试方法
public class ProducerAndConsumer {
public static void main(String[] args) {
CarStock carStock = new CarStock();
//注意:生产者线程和消费者线程,使用的是同一个carStock对象
CarProducter product = new CarProducter(carStock);
CarConsumer consumer = new CarConsumer(carStock);
//2个生产者,2个消费者
Thread tProduct1 = new Thread(product);
Thread tProduct2 = new Thread(product);
Thread tConsumer1 = new Thread(consumer);
Thread tConsumer2 = new Thread(consumer);
tProduct1.start();
tProduct2.start();
tConsumer1.start();
tConsumer2.start();
}
}
然后我们在终端中按照上面介绍的方法运行这个类,大家注意空间的切换,首先保证此时的工作空间目录在 demo 下面。
javac -d bin/ src/ProducerAndConsumer.java
cd bin
java ProducerAndConsumer
我们在这里来说明一下上面的程序的执行逻辑:
(1)生产者(CarProducter)不断地向共享缓冲区中增加数据(本例用 cars++模拟)。
(2)同时,消费者也不断地从共享缓冲区中消费数据(cars–)。
(3)共享缓冲区有固定大小的容量(本例为 20)。
(4)当容量达到最大值(20)时,生产者无法再继续生产,生产者的线程就会通过 wait()使自己处于阻塞状态;直到有消费者减少了容量后(<20),再通过 notify()或 notifyAll()唤醒生产者去继续生产。
(5)当容量为 0 时,消费者无法再继续消费,消费者线程就通过 wait()使自己处于阻塞状态;直到有生产者增加了容量后(>0),再通过 notify()或 notifyAll()唤醒消费者去继续消费。这样一来,生产者就会和消费者在共享缓冲区 0-20 的范围内,达成一种动态平衡,下面是运行的效果图。
以上,是一个非常简单的生产者消费者程序,生产者和消费者之间仅仅共享了一个 int 变量,接下来,我们使用队列、线程池等技术对本程序进行改进,并且此次共享的数据是一个 BlockingQueue 队列,该队列中最多可以保存 100 个 CarData 对象,所以每次生产者和消费者就依次在这个共享队列中存数据和取数据。我们是把汽车的实体类放到共享缓冲区的,而这个队列共享缓冲区在汽车库存类里面。
队列及线程池模拟生产者消费者
在有了上一个利用线程之间的通信来模拟生产者和消费者的实例之后,我们现在可以利用队列和线程池来模拟生产者和消费者。
我们还是在上线的 demo 的文件夹里面创建一个新的 TestProducerAndConsumer.java
文件。
import java.util.concurrent.*;
/*CarData汽车实体类*/
class CarData {
private int id ;
public int getId(){
return id;
}
public void setId(int id){
this.id = id;
}
}
/*汽车库存*/
class CarStock {
//统计一共生产了多少辆车
private static int count = 0;
//存放CarData对象的共享缓冲区
private BlockingQueue<CarData> queue;
public CarStock(BlockingQueue<CarData> queue) {
this.queue = queue;
}
//生产车
public synchronized void productCar() {
try {
CarData carData = new CarData();
//向CarData队列增加一个CarData对象
boolean success = this.queue.offer(carData, 2, TimeUnit.SECONDS);
if (success) {
int id = ++count;
carData.setId(id);
System.out.println("生产CarData,编号:"+ id + ",库存:"+ queue.size());
Thread.sleep((int) (1000 * Math.random()));
notifyAll();
} else {
System.out.println("生产CarData失败....");
}
if (queue.size() <100) {
} else {
System.out.println("库存已满,等待消费...");
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消费车
public synchronized void ConsumeCar() {
try {
// 从CarData队列中,拿走一个CarData对象
CarData carData = this.queue.poll(2, TimeUnit.SECONDS);
if (carData != null) {
Thread.sleep((int) (1000 * Math.random()));
notifyAll();
System.out.println("消费CarData,编号:"+ carData.getId() + ",库存: "+ queue.size());
} else {
System.out.println("消费CarData失败....");
}
if (queue.size() >0) {
} else {
System.out.println("库存为空,等待生产...");
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*生产者实体类*/
class CarProducter implements Runnable {
//共享缓存区
private CarStock carPool;
//多线程的执行状态,用于控制线程的启停
private volatile boolean isRunning = true;
public CarProducter(CarStock carPool) {
this.carPool = carPool;
}
@Override
public void run() {
while (isRunning) {
carPool.productCar();
}
}
//停止当前线程
public void stop() {
this.isRunning = false;
}
}
/*消费者实体类*/
class CarConsumer implements Runnable {
//共享缓存区:CarData队列
private CarStock carPool;
public CarConsumer(CarStock carPool) {
this.carPool = carPool;
}
@Override
public void run() {
while (true) {
carPool.ConsumeCar();
}
}
}
public class TestProducerAndConsumer {
public static void main(String[] args) throws Exception {
//共享缓存区:CarData队列
BlockingQueue<CarData> queue = new LinkedBlockingQueue<CarData>(100);
//CarData库存,包含了queue队列
CarStock carStock = new CarStock(queue);
//生产者
CarProducter carProducter1 = new CarProducter(carStock);
CarProducter carProducter2 = new CarProducter(carStock);
//消费者
CarConsumer carConsumer1 = new CarConsumer(carStock);
CarConsumer carConsumer2 = new CarConsumer(carStock);
//将生产者和消费者加入线程池运行
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(carProducter1);
cachePool.execute(carProducter2);
cachePool.execute(carConsumer1);
cachePool.execute(carConsumer2);
// carProducter1.stop();停止p1生产
// cachePool.shutdown();//关闭线程池
}
}
创建之后我们需要注意的是要避免环境监测程序代码中存在的类冲突,我们可以将上一个或者导致冲突的类文件先删除掉。
我们新建的这个类里面包含的有汽车库存类、汽车实体类、生产者实体类、消费者实体类。
我们的共享缓存队列在 CarStock 这个类里面,即包含的消费车子,生产车子的方法都在汽车库存类里面。而消费实体类和生产实体类都是实现了 Runable 接口,并把汽车库存类 CarStock 加载进去的。每次生产车我们都会新 new 一个汽车实体类的对象出来,并且把它放进队列中,将统计车子数量的 count++
,并将此时的车子的数量值作为当前新生产出来的这辆车子的 id 编号。消费车就是生产车里面定义的逻辑的逆向过程,最后在 main 方法里面我们设置了两个消费者和两个生产者,并向它们传入进了相同额共享缓存区和包含了这个 queue 的 CarStock 即汽车库存。
我们确保目录在当前的项目 demo 下面,然后我们编译当前的这个 Java 类文件,将编译过后的文件放到 bin 目录下面。
javac -d bin/ src/TestProducerAndConsumer.java
cd bin
java TestProducerAndConsumer
可见运行的效果如下所示:
这里我们做一个补充,我们还可以使用 Lock 来重构生产消费者及线程通信。
在前面的两个生产者消费者程序中,我们都是使用 synchronized 给生产或消费的方法加锁,然后通过 wait()和 notifyAll()进行线程通信。除此以外,我们还可以使用 Lock 给方法加锁,然后使用 Condition 接口提供的 await()和 signalAll()进行线程通信,这里就不具体展开了。synchronized 和 lock 的一个很明显的释放锁的方式不同在于 synchronized 修饰的方法或者代码执行完毕即可释放,而 lock 必须使用 unlock()方法来手动释放,所以 synchronized 的锁的状态是不可以判断的,而 lock 的状态是可以判断的。加锁的方式我们会担心产生死锁的问题,下面我们再给大家介绍一下 CAS 无锁算法。
CAS 无锁算法
给大家抛出一个问题,为了保证共享的资源在并发访问时的数据安全,是否必须对共享的资源加锁(synchronized,Lock),其实并不是这样的,我们还可以使用 CAS(Compare and Swap)无锁算法。
实际上,加锁是处理并发最简单的方式,但对系统性能的损耗也是巨大的。例如,加锁、释放锁会导致系统多次进行上下文切换(内核态与用户态之间的切换),以及造成调度延迟等情况。因此,为了减少加锁对性能的损耗,我们还可以通过一种算法来保证数据的安全即 CAS 算法。
可以将加锁的方式理解为一种悲观的策略:总是假设对数据的访问是不安全的(一个线程访问数据的同时,其他线程也会修改此数据),因此总是会对要访问的数据加锁,然后独占式访问。
与之相反,CAS 算法是一种乐观的策略:总是假设对数据的访问是安全的(一个线程访问数据的同时,其他线程不会操作此数据),因此每次会直接访问数据,访问时可能出现两种情况:
- 如果要访问的数据已被其他线程修改了(即数据不安全),就会放弃此次访问,再重新获取最新的数据(即被其他线程修改后的数据);如果重新获取最新数据时,又被另外的线程修改了刚刚“最新”的数据,就再次放弃此次访问,再重新获取最新的数据。
- 如果要访问的数据没有冲突(从上次访问以后,没有其他线程对该数据进行修改),就直接访问该数据。不难发现,因为 CAS 算法没有加锁操作,因此不会出现死锁。
Java 新特性
大家要知道在 JDK8 出现以前,我们无法将方法作为参数传递给另一个方法,也无法将方法作为另一个方法的返回值。而在 JavaScript、Scala 等函数式语言中,将方法作为另一个方法的参数或返回值是非常常见,并且非常有价值的做法。而 JDK8 新增加的 Lambda 就提供了这一点,实现了 JAVA 向函数式编程风格的迈进。
JDK8 提供的 Lambda 表达式可以让代码的编写变得更简洁、紧凑,并且 Lambda 可以作为一段可传递的代码(可用于传递一种行为,而不仅仅是值)。我们可以将 Lambda 理解成一种匿名函数,即没有访问修饰符、返回值类型、方法名的函数。但严格来讲,在 Python、Scala 等语言中,Lambda 表达式的类型的确是函数,但是在 Java 中 Lambda 语法上属于一种对象类型。
一个 Lambda 表达式由以下三部分组成。
- 用逗号分隔的参数列表
- 箭头符号–>
- 方法体(表达式或代码块)
Lambda 表达式的一个很重要的特性就是简化,我们下面通过具体的一个实例来看看它具体是怎样简化的吧。
public class HelloWorld {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Hello World");
}
}) .start();
}
}
在这个类里面,main 方法中我们新建了一个线程类,并重写了它的 run 方法来实现打印"Hello World"的逻辑。接下来用 Lambda 表达式来简化如下:
public class HelloWorld {
public static void main(String[] args) {
new Thread( () ->System.out.println("Hello World") ) .start();
}
}
我们将没有用的话删除之后就只剩下了下面两部分:
- ():方法的参数列表
- 方法体
也就是说,Lambda 表达式只留下了方法最重要的参数列表和方法体,而将其余的代码全部删除。因此 Lambda 表达式可以看作这样的格式:方法的参数列表 -> 方法体
再来分析,上面提到的 new Thread(x)中,x 应该是一个 Runnable 类型的对象,而简化后的代码直接将 () ->System.out.println("Hello World")
作为一个对象传入到了 x 中。这就是之前所说的:Lambda 可以作为一段可传递的代码,即 Lambda 可以作为一个对象传递到方法的参数中。
我们简单地讨论一下函数式接口的风格,JDK8 将这种“只包含一个抽象方法的接口”,称为函数式接口,并且可以用 @FunctionalInterface
进行标注。如果某个接口只包含一个抽象方法,但没有标注 @FunctionalInterface
的话 JDK 仍然还是会将此接口看作是函数式接口,又或者一个接口中虽然包含的有三个抽象方法但是只有其中一个是后面新定义的,其他的都是和 Obejct 中的完全相同的,也会被 JDK 人作为是一个函数式接口。我们以后在使用 Lambda 表达式作为对象的时候,就必须先准备好一个函数式接口,这个接口可以使我们自定义的接口,只要该接口中有且仅有一个抽象方法即可。
Lambda 表达式包含有两种风格:
- 函数式接口名 引用名 = Lambda 表达式;
- 将 Lambda 表达式所代表的的函数式接口对象作为一个方法的入参。
Stream 流式处理
我们在这里讲解的 Stream 是指 JDK8 中提供的 java.util.stream
包中的流,而不是 IO 操作中的流。
Lambda 和 Stream 的出现,使得 Java 向函数式编程迈出了重要的一步。Stream 主要对集合、数组等批量数据提供了非常便利的操作。需要注意,Stream 指的是一种“操作”,因此它不会存储数据。
使用 Stream 需要经历以下 3 个步骤:
- 生成 Stream
- 转换 Stream
- 终端操作
下面是 Stream 的执行流程:
生成 Stream
Stream 操作的是“大批量”数据,因此生成 Stream 实际就是如何将集合等“大批量”类型转为 Stream 类型。常见的有以下几种方式。
(1)通过集合、数组提供的方法:
- 通过 Collection 提供的 stream()和 parallelStream()方法。而且既然 Collection 中有创建 Stream 的方法,那么它的子接口 List、Set 等也拥有这些方法。
- 通过 Arrays 提供的各种重载的 stream(…)方法。但要这里我们需要注意,stream(…)的参数只能是 double[]、int[]、long[]和对象[]等四种类型,并不是任意类型的数组都可以通过 stream(…)方法转为 Stream。例如,stream(…)就不能将 char[]、boolean[]等类型的数组转为 Stream。
(2)通过 Stream 接口提供的 of()、iterate()、generate()方法,以及 Stream 的内部接口中的 build()方法
转换 Stream
“转换 Stream”就是对“生成的 Stream”进行的各种操作,如:filter()过滤、limit()限制等。可以对同一个 Stream 进行多次转换操作。
转换方法 | 功能 |
---|---|
filter(Predicate) | 筛选出符合条件的元素。 |
map(Function<T, U>) | 将流中的各个元素,统一的进行某种转换操作。map()是一对一的操作。例如,假设流中有 10 个元素,那么可以通过 map()将这 10 个元素全部变为大写。 |
flatMap(Function<T, Stream> | 与 map()类似,但不同的是 flatMap()是一对多的操作。 |
distinct() | 删除流中重复的元素。 |
sorted() | 将流中的元素,按 Comparable 中的 compareTo()排序(称为内部排序)。 一般可用于比较的类,都已经实现了 Comparable 接口,例如 String 的定义:public final class String implements Comparable,… |
Sorted(Comparator) | 将流中的元素,按自定义比较器 Comparator 中的 compare()排序(称为外部排序)。自定义比较器,需要根据业务需求自己编写。 |
limit(long) | 将流中的元素,截取成指定个数的元素。 |
skip(long) | 跳过流中前 n 个元素,即从第 n+1 个元素开始使用 |
需要注意的是,转换操作是惰性的:转换操作只会改变流管道中的元素,不会立刻执行任何操作。
终端操作
终端操作就是对转换后的 Stream 进行的操作,每进行一次终端操作,就会结束一个 Stream 对象。因此,以后如果还想对 Stream 进行转换操作,就必须再重新生成。常见的终端操作如下表所示:
终端操作 | 功能 |
---|---|
forEach(Consumer action) | 遍历操作流中的每个元素。 |
toArray() | 将流的元素转为一个数组。 |
reduce(BinaryOperator )reduce(T identity, BinaryOperator )reduce(U,BiFunction<U, T, U>,BinaryOperator) | 将流中的元素进行规约(聚合),即将多个元素值按照某种约定,汇聚成一个值。 |
collect(Collector<T, A, R> collector) | reduce 是将流中的元素聚合成一个值;而 collect()是将流的元素聚合成一个集合(或一个值)。集合(或值)的类型,由 collect()的参数指定。 |
min(Comparator) | 通过自定义比较器,返回流中最小的元素。 |
max(Comparator) | 通过自定义比较器,返回流中最大的元素。 |
count() | 返回流中元素的个数。 |
boolean allMatch(Predicate)boolean anyMatch(Predicate)boolean noneMatch(Predicate) | 判断流中的元素是否全部、存在一个、没有元素与 Predicate 条件一致。 |
findFirst() | 返回流中的第一个元素。 |
findAny() | 根据特定算法,返回流中的某一个元素。 |
JDK 推出的这些新特性都是为了了编码和性能上的规范与提升,本章节想要介绍的内容就是这些,希望大家多多梳理和总结一下本章节的所有知识点和 demo 实验。
本文由 liyunfei 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Aug 15,2022