java.util.concurrent

Posted by 余腾 on 2019-05-02
Estimated Reading Time 46 Minutes
Words 10.5k In Total
Viewed Times

一、进程和线程

根本区别:进程是操作系统资源分配的基本单位,而线程是 CPU 任务调度和执行的基本单位

  • 一个程序(进程)同时执行多个任务,每一个任务称为一个线程。
  • 进程是线程的容器,不存在没有线程的进程的。

在开销方面:

  • 每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;
  • 线程可以看做轻量级的进程,同一类线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小。

所处环境:

  • 在操作系统中能同时运行多个进程(程序);
  • 而在同一个进程(程序)中有多个线程同时执行(通过CPU调度,在每个时间片中只有一个线程执行)

内存分配方面:

  • 进程: 系统在运行的时候会为每个进程分配不同的内存空间;
  • 线程: 除了CPU外,系统不会为线程分配内存(线程所使用的资源来自其所属进程的资源),线程组之间只能共享资源。

1.1、多进程与多线程区别

  • 本质区别在于每个进程拥有自己的一套变量,而线程则共享数据。
  • 共享变量使线程之间的通信比进程之间的通信更有效,更容易。
  • 线程更加“轻量级”,创建、撤销一个线程比启动新进程的开销小得多。

1.2、守护线程与非守护线程

  • 程序运行完毕,jvm会等待非守护线程完成后关闭,但是jvm不会等待守护线程。

  • 守护线程最典型的例子就是GC线程。

守护线程

  • 垃圾回收线程:GC线程
  • 当主线程运行的时候,垃圾回收线程一起运行。当主线程销毁,会和主线程一起销毁。

非守护线程(用户线程)

  • 非守护线程即我们手动创建的线程
  • 如果主线程销毁,用户线程继续运行且互不影响。

创建守护线程 👇

  • 必须在 t1.start()之前设置守护线程 t1.setDaemon(true);
  • 不能把正在运行的常规线程设置为守护线程,否则报IllegalThreadStateException异常。
  • 主线程结束之后并没有在继续运行守护线程,守护线程也跟着结束,一起销毁停止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class DaemonThread {
public static void main(String[] args) throws InterruptedException {

Thread t1 = new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);

} catch (Exception e) {
e.printStackTrace();
}
System.out.println("我是子线程(用户线程)");
}
});

// TODO 标识当前方法为守护线程,一定要在启动线程前设置为守护线程
t1.setDaemon(true);
t1.start();

//相当与主线程
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(300);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("main:i:" + i);
}
System.out.println("主线程执行完毕...");
}
}

// TODO
main:i:0
main:i:1
main:i:2
我是子线程(用户线程)
main:i:3
main:i:4
main:i:5
我是子线程(用户线程)
main:i:6
main:i:7
main:i:8
我是子线程(用户线程)
main:i:9
主线程执行完毕...

创建非守护线程:👇

  • 如果主线程销毁,用户线程继续运行且互不影响。

  • 当主线程销毁停止,非守护线程(用户线程)并没有结束,而是一直在执行,与主线程互不影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class NotDaemonThread {
public static void main(String[] args) {

Thread t1 = new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);

} catch (Exception e) {
e.printStackTrace();
}
System.out.println("我是子线程(用户线程)");
}
});
// TODO 启动线程 没有开启守护线程
t1.start();

//相当与主线程
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(300);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("main:i:" + i);
}
System.out.println("主线程执行完毕...");
}
}

// TODO
main:i:0
main:i:1
main:i:2
我是子线程(用户线程)
main:i:3
main:i:4
main:i:5
我是子线程(用户线程)
main:i:6
main:i:7
main:i:8
我是子线程(用户线程)
main:i:9
主线程执行完毕...
我是子线程(用户线程)
我是子线程(用户线程)
我是子线程(用户线程)
......

二、volatile

volatile 关键字:

  • 是Java虚拟机提供的轻量级的同步机制。当多个线程进行操作共享数据时,可以保证内存中的数据可见。提供一个免锁机制,相较于 synchronized 是一种较为轻量级的同步策略。

注意:

  • volatile 保证内存可见性

  • volatile 不能保证变量的“原子性”; 不可分割,完整性,执行过程中不可以被加塞。

  • volatile 不具备“互斥性”;(互斥性,就是一个线程持有锁,另外线程则必须等待)

  • 禁止指令重排。

2.1、测试 volatile 内存可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* @author Yu
*/
public class TestVolatile {

public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while (true) {
if (td.isFlag()) {
System.out.println("End");
break;
}
}
}
}

class ThreadDemo implements Runnable {
// voatile 比锁效率要高
private volatile boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(200);
}
catch (InterruptedException e) {
}

flag = true;
System.out.println("flag=" + isFlag());
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}

// TODO
flag=true
End

2.2、测试 volatile 原子性

i++ 的原子性问题:

  • i++ 的操作实际上分为三个步骤“读-改-写“ 字节码如下:
    • aload_0
    • dup
    • getfield
    • iconst_1
    • iadd
    • putfield

在中间某一步如果CPU切换调度,则无法保证数据的准确性。

原子变量:在 java.util.concurrent.atomic 包下提供了一些原子变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* @author Yu
*/
class MyData {

volatile int num = 0;

public void numTo60() {
this.num = 60;
}

public void addPulsPuls() {
num++;
}

AtomicInteger atomicInteger = new AtomicInteger();

public void MyAtomicInteger() {
atomicInteger.incrementAndGet();
}
}

public class VolatileDemo {

public static void main(String[] args) {
VolatileKeyWord();
System.out.println("-----------");
AtomicIntegerDemo();
}

private static void AtomicIntegerDemo() {
MyData myData = new MyData();

for (int i = 1; i <= 20; i++) {
new Thread(() -> {
for (int j = 1; j <= 1000; j++) {
myData.addPulsPuls();
myData.MyAtomicInteger();
}
}, String.valueOf(i)).start();
}

//当上面 20个线程运行完毕后,用 Main线程看最后结果是多少
// main & GC
while (Thread.activeCount() > 2) {
Thread.yield();
}

System.out.println(Thread.currentThread().getName() + " Finally num :" + myData.num);
System.out.println(Thread.currentThread().getName() + " Finally num :" + myData.atomicInteger);
}

/**
* volatile int num = 0;
* 1、证明 Volatile 内存可见性
*/
private static void VolatileKeyWord() {

MyData myData = new MyData();

new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " Enter");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
myData.numTo60();
System.out.println(Thread.currentThread().getName() + " Updata num" + myData.num);
}, "Thread-Son").start();

while (myData.num == 0) {

}
System.out.println(Thread.currentThread().getName() + "End:" + myData.num);
}
}

2.3、CAS

  • CAS(Compare-And-Swap) 算法保证数据变量的原子性
  • CAS 算法是硬件对于并发操作的支持 效率比同步锁高
  • CAS 包含了三个操作数:
    • ①内存值 V
    • ②预估值 A
    • ③更新值 B
    • 当且仅当 V == A 时,—> V = B; B 赋值给 V 否则,不会执行任何操作。
  • 它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新值,这个过程是原子的。

CAS 并发原语体现在 JAVA 语言中就是 sun.misc.Unsafe 类中的各个方法。调用 UnSafe 类中的CAS方法,JVM 会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。由于CAS是一 种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题


Unsafe 类

1
2
3
4
5
6
7
8
9
static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
atomicReference.compareAndSet(100, 101);

public final boolean compareAndSet(V expect, V update) {
//TODO Unsafe
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

Unsafe 是 CAS 的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native) 方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe 类存在于 rt.jar sun.misc 包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中 CAS 操作的执行依赖于 Unsafe 类的方法。

  • 注:Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务

CAS 的缺点

  • 循环时间长,开销大。如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销。
  • 只能保证一个共享变量的原子操作;对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就需要锁来保证原子性。
1
2
3
4
5
6
7
8
9
/**
* Atomically increments by one the current value.
* @return the previous value
* this 当前对象
* valueOffset 内存偏移量,内存地址
*/
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// var1 AtomicInteger对象本身
// var2 该对象值的引用地址
// var4 需要变动的值 -> 1
// var5 通过var1、var2找出主内存中真实的值
// 用该对象当前的值与var5比较:
// 相同,更新var5+var4并返回true
// 不同,继续取值然后再比较,直到更新完成
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

ABA 问题 👇

CAS算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化。

  • 比如说一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且线程two进行了一些操作将值变成了B,:然后线程two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后线程one操作成功。尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class CASDemo {

static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);

public static void main(String[] args) {
new Thread(() -> {
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}, "t1").start();

new Thread(() -> {
try {
// 暂停1秒钟t2线程,保证上面的t1线程完成了一次ABA操作
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "->" + atomicReference.compareAndSet(100, 200) + "\t" + atomicReference.get());
}, "t2").start();
}
}

// TODO
输出 true 200

解决 ABA 原子引用

  • AtomicStampedReference
  • AtomicReference
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;

public class CASTest {

static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);

public static void main(String[] args) {
System.out.println("------ABA问题的解决--------");

new Thread(() -> {
// 初始版本号
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第 1 次版本号" + stamp);
try {
// 暂停1秒钟t3线程
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 101,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第 2 次版本号" + atomicStampedReference.getStamp());

atomicStampedReference.compareAndSet(101, 100,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第 3 次版本号" + atomicStampedReference.getStamp());
}, "t3").start();

new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第 1 次版本号" + stamp);
try {
// 暂停3秒钟t4线程,保证上面的t3线程完成了一次ABA操作
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
//这里必须重新获取版本号 才可以完成修改
// int newStamp = atomicStampedReference.getStamp();
boolean result = atomicStampedReference.compareAndSet(100, 200, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t修改成功否:" + result + "\t当前最新实际版本号:" + atomicStampedReference.getStamp());
System.out.println(Thread.currentThread().getName() + "\t当前实际最新值:" + atomicStampedReference.getReference());
}, "t4").start();
}
}
// TODO
------ABA问题的解决--------
t3 第一次版本号1
t4 第一次版本号1
t3 第二次版本号2
t3 第三次版本号3
t4 修改成功否:false 当前最新实际版本号:3
t4 当前实际最新值:100

模拟 CAS 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class TestCompareAndSwap {

public static void main(String[] args) {
final CompareAndSwap cas = new CompareAndSwap();

for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {

@Override
public void run() {
int expectedValue = cas.get();
boolean b = cas.compareAndSet(expectedValue, (int)(Math.random() * 101));
System.out.println(b);
}
}).start();
}
}
}

class CompareAndSwap{
private int value;
//获取内存值
public synchronized int get(){
return value;
}
//比较
public synchronized int compareAndSwap(int expectedValue, int newValue){
int oldValue = value;

if(oldValue == expectedValue){
this.value = newValue;
}
return oldValue;
}
//设置
public synchronized boolean compareAndSet(int expectedValue, int newValue){
return expectedValue == compareAndSwap(expectedValue, newValue);
}
}

为什么用 CAS 不用 Synchronized

在Java并发中,我们最初接触的应该就是synchronized关键字了,但是synchronized属于重量级锁,很多时候会引起性能问题,volatile也是个不错的选择,但是volatile不能保证原子性,只能在某些场合下使用。像synchronized这种独占锁属于悲观锁,它是在假设一定会发生冲突的,那么加锁恰好有用。除此之外,还有乐观锁,乐观锁的含义就是假设没有发生冲突,那么我正好可以进行某项操作,如果要是发生冲突呢,那我就重试直到成功,乐观锁最常见的就是CAS


2.4、禁止指令重排

指令重排序 : 处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。

JVM底层优化,指令重排序(乱序执行,增加效率),使用volatile修饰后就不能重排序了。

  • 有volatile修饰的变量,赋值后多执行了一个“load addl $0x0, (%esp)”操作,这个操作相当于一个内存屏障(指令重排序时不能把后面的指令重排序到内存屏障之前的位置),只有一个CPU访问内存时,并不需要内存屏障;(什么是指令重排序:是指CPU采用了允许将多条指令不按程序规定的顺序分开发送给各相应电路单元处理)。

2.5、volatile 使用场景

单例设计模式 DCL双检查锁机制

  • 因为有指令重排序的存在, DCL不一定安全。但是加了 volatile 可以禁止指令重排序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Singleton {

// 将自身实例化对象设置为一个属性,并用static修饰
//TODO volatile 关键字
private volatile static Singleton instance;

// 构造方法私有化
private Singleton() {}

// 静态方法返回该实例
public static Singleton getInstance() {
// 第一次检查instance是否被实例化出来,如果没有进入if块
if(instance == null) {
synchronized (Singleton.class) {
// 某个线程取得了类锁,实例化对象前第二次检查instance是否已经被实例化出来
// 如果没有,才最终实例出对象
if (instance == null) {
instance = new Singleton();//这一行代码可以分为 三步。有可能指令重排序
}
}
}
return instance;
}
}

三、ConcurrentHashMap

if (key == null || value == null) throw new NullPointerException();

  • key-value都不可为空!
1
2
3
4
5
6
7
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();

ConcurrentHashMap 可以支持并发的读写

HashMap:非线程安全,即任一时刻可以有多个线程同时写HashMap,可能会导致数据的不一致。

  • HashMap 做put 操作的时候,假如A线程和B线程同时对同一个数组位置调用addEntry,两个线程会同时得到现在的头结点,然后A写入新的头结点之后,B也写入新的头结点,那B的写入操作就会覆盖A的写入操作造成A的写入操作丢失。同理,当多线程对同一数组位置进行remove操作时也会产生覆盖。因此如果不进行额外的外同步操作,HashMap 是非线程安全的。样必然导致效率低下,而且竞争越激烈,效率越低下。严重情况,两个线程使用HashMap进行put操作会引起死循环(比如put的两个对象 引起扩容,可能出现同时在同一数组下用链表表示,结果在get时会出现死循环),导致CPU利用率接近100%

HashTable:遗留类,线程安全。 很多映射的常用功能与HashMap类似,不同的是它承自Dictionary类,并且是线程安全的,任一时间只有一个线程能进入Hashtable。

  • HashTable 只有一把锁,当一个线程访问 HashTable 的同步方法时,会将整张 table 锁住,当其他线程也想访问 HashTable 同步方法时,就会进入阻塞或轮询状态。也就是确保同一时间只有一个线程对同步方法的占用,避免多个线程同时对数据的修改,确保线程的安全性。但是 HashTable 对 get,put,remove 方法都使用了同步操作,这就造成如果两个线程都只想使用get 方法去读取数据时,因为一个线程先到进行了锁操作,另一个线程就不得不等待,这样必然导致效率低下,而且竞争越激烈,效率越低下。

3.1、JDK 1.7 ConcurrentHashMap

ReentrantLock + Segment + HashEntry

JDK1.7 之前的 ConcurrentHashMap 数组+链表 使用分段锁机制实现。(JDK 1.5—JDK1.7)

  • ConcurrentHashMap 在对象中保存了一个 Segment 数组,即将整个Hash表划分为多个分段;而每个Segment 元素,即每个分段则类似于一个Hashtable;这样,在执行put操作时首先根据hash算法定位到元素属于哪个 Segment,然后对该Segment加锁即可。

3.2、JDK 1.8 ConcurrentHashMap

Synchronized + CAS + HashEntry + Red-Black Tree

JDK 1.8 已经抛弃了Segment 的概念,虽然源码里面还保留了,也只是为了兼容性的考虑。
JDK1.8 则使用数组+链表+红黑树数据结构、并发控制使用Synchronized和CAS原子操作实现ConcurrentHashMap

ConcurrentHashMap 的put方法实现

实现思路:

  • 1.如果没有初始化就先调用 initTable() 方法来进行初始化过程。
  • 2.如果没有hash冲突就直接CAS插入。
  • 3.如果还在进行扩容操作就先进行扩容。
  • 4.如果存在hash冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入
  • 5.最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环
  • 6.如果添加成功就调用 addCount() 方法统计size,并且检查是否需要扩容

ConcurrentHashMap 的get方法实现

实现思路:

  • 1.计算hash值,定位到该table索引位置,如果是首节点符合就返回
  • 2.如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
  • 3.以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null

3.3、JDK1.8 Synchronized代替ReentrantLock

JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock:

  1. 因为粒度降低了,在相对而言的低粒度加锁方式,synchronized 并不比 ReentrantLock 差,在粗粒度加锁中ReentrantLock 可能通过 Condition 来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition 的优势就没有了;
  2. JVM的开发团队从来都没有放弃 synchronized,而且基于 JVM 的 synchronized 优化空间更大,使用内嵌的关键字比使用API更加自然;
  3. 在大量的数据操作下,对于JVM的内存压力,基于 API 的 ReentrantLock 会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据。

四、UnSafeCollection

4.1、UnSafeList

ArrayList 故障现象👇

  • java.util.ConcurrentModificationException
1
2
3
4
5
6
7
8
9
10
11
public class UnSafeList {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list.toString());
}, String.valueOf(i)).start();
}
}
}

ArrayList 故障导致原因👇

  • 并发争抢修改导致。

ArrayList 故障解决方案👇

  • List<String> list = new Vector();//已经废弃
  • List<String> list = Collections.synchronizedList(new ArrayList<>());
    • List<String\> list = new CopyOnWriteArrayList<>();add 源码如下:👇

CopyOnWriteArrayList 即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器 Object[ ] 添加,而是先将当前容器 Object[ ] 进行 Copy,复制出一个新的容器 Object[ ] newElements, 然后新的容器 Object[ ] newElements 里添加元素,添加完元素之后,再将原容器的引用指向新的容器 setArray(newElements);

这样做的好处是可以对 CopyOnWriteArrayList 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 CopyOnWriteArrayList 容器也是一种读写分离的思想,读和写不同的容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Appends the specified element to the end of this list.
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

CopyOnWriteArrayList

“写入并复制”每次写入复制新表

  • CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于Vector,CopyOnWriteArrayList 支持读多写少的并发情况。适合迭代

注意:添加操作多时,效率低,因为每次添加时都会进行复制,开销非常的大。

  • 测试迭代区别
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class TestCopyOnWriteArrayList {
public static void main(String[] args) {
HelloThread ht = new HelloThread();
for (int i = 0; i < 10; i++) {
new Thread(ht).start();
}
}
}

/**
* 使用迭代器
* 因为 迭代器 和 list 操作的同一个数据源 所以添加后会报错
* Collections.synchronizedList(new ArrayList<>());
*/
class HelloThread implements Runnable {

//java.util.ConcurrentModificationException
//private static List<String> list = Collections.synchronizedList(new ArrayList<>());

private static List<String> list = new CopyOnWriteArrayList<>();

static {
list.add("AA");
list.add("BB");
list.add("CC");
}

@Override
public void run() {
Iterator<String> it = list.iterator();
while (it.hasNext()) {
System.out.println(it.next());
//CopyOnWriteArrayList 添加时都会在底层复制一份新的列表
list.add("AA");
list.add("A");
}
}
}

4.2、UnSafeSet

CopyOnWriteArraySet:底层是CopyOnWriteArrayList

HashSet 故障现象👇

  • java.util.ConcurrentModificationException

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class UnSafeHashSet {
    public static void main(String[] args) {
    Set<String> set = new HashSet<>();
    for (int i = 1; i <= 10; i++) {
    new Thread(() -> {
    set.add(UUID.randomUUID().toString().substring(0, 8));
    System.out.println(set.toString());
    }, String.valueOf(i)).start();
    }
    }
    }

    HashSet 故障导致原因👇

  • 并发争抢修改导致。

HashSet 故障解决方案👇

  • Set set = Collections.synchronizedSet(new HashSet<>());
  • Set<String> set = new CopyOnWriteArraySet<>();源码如下

五、锁 Look

锁的分类

  • 公平锁/非公平锁: 公平锁是指尽量以线程的等待时间先后顺序获取锁,等待时间最久的线程优先获取锁。synchronized 隐性锁是非公平锁,它无法保证等待的线程获取锁的顺序,ReentrantLook可以自己控制是否公平锁。

  • 可重入锁: Synchronized和ReentrantLook都是可重入锁,锁的可重入性标明了锁是针对线程分配方式而不是针对方法。例如调用Synchronized方法A中可以调用Synchronized方法B,而不需要重新申请锁。

  • 读写锁: 按照数据库事务隔离特性的类比读写锁,在访问统一个资源(一个文件)的时候,使用读锁来保证多线程可以同步读取资源。ReadWriteLock是一个读写锁,通过readLock()获取读锁,通过writeLock()获取写锁。

  • 可中断锁: 可中断是指锁是可以被中断的,Synchronized内置锁是不可中断锁,ReentrantLock可以通过lockInterruptibly() 方法中断显性锁。例如线程B在等待线程A释放锁,但是线程B由于等待时间太久,可以主动中断等待锁。

5.1、公平锁/非公平锁

公平锁: 是指多个线程按照申请锁的顺序来获取锁,先来后到 FIFO。

  • new ReentrantLock(true)

非公平锁: 是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。优点在于高并发情况下吞吐量比公平锁要大,有可能会造成优先级反转或者饥饿现象。当线程尝试占有锁失败,就再才用类似公平锁那种方式。

  • synchronized、ReentrantLock(默认非公平)

ReentrantLock 默认采用非公平锁,除非在构造方法中传入参数 true ,为公平锁。源码如下:👇

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

5.2、可重入锁(递归锁)

指的是同一线程外层函数获得锁之后,内层递归函数仍然能获取该锁的代码,同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。也就是说,线程可以进入任何一个它已经拥有的锁所同步着的代码块。

优点:避免死锁

synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Sync {
public synchronized void inOne() {
System.out.println(Thread.currentThread().getName() + " Come in One!");
inTwo();// 线程在进入内层方法会自动获取锁。
}

public synchronized void inTwo() {
System.out.println(Thread.currentThread().getName() + " Come in Two!");
}
}

public class SyncTest {
public static void main(String[] args) {
Sync sync = new Sync();

new Thread(() -> {
sync.inOne();
},"Thread-1").start();

new Thread(() -> {
sync.inOne();
},"Thread-2").start();
}
}

// TODO
Thread-1 Come in One!
Thread-1 Come in Two!
Thread-2 Come in One!
Thread-2 Come in Two!

ReentrantLock

  • 加锁 lock解锁 unlock 需要匹配 配对,可以编译,但运行发生死锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Reetrant implements Runnable {

Lock lock = new ReentrantLock();

@Override
public void run() {
inOne();
}

public void inOne() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " Come in One!");
inTwo();// 线程在进入内层方法会自动获取锁。
} finally {
lock.unlock();
}
}

public void inTwo() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " Come in Two!");
} finally {
lock.unlock();
}
}
}
public class ReetrantLockDemo {

public static void main(String[] args) {

Reetrant reetrant = new Reetrant();

new Thread(reetrant, "Thread-3").start();
new Thread(reetrant, "Thread-4").start();
}
}

// TODO
Thread-3 Come in One!
Thread-3 Come in Two!
Thread-4 Come in One!
Thread-4 Come in Two!

synchronized 和 Lock 区别

层次 Synchronized Java关键字,jvm 层面上 Lock 是一个接口,Java API 层
锁状态 无法判断 可以判断
锁类型 隐式锁 可重入 不可中断 非公平 显示锁 可重入 可中断 可公平(两者皆可)
性能 悲观锁 适用少量同步 乐观锁(CAS) 适用大量同步
唤醒 随机唤醒 1 个或 唤醒全部线程。 可精确唤醒

1、原始构成

  • synchronized是关键字属于JVM层面,底层是通过 monitor 对象来完成,其实wait/notify等方法也依赖monitor 对象只有在同步代码块和同步方法中才能调用wait/notify等方法)

  • Lock是具体类 ( java.util.concurrent.locks.Lock )是 API 层面的锁。

2、锁的释放:

  • Synchronized 在代码执行完后或线程发生异常时系统会自动释放锁,不会发生异常死锁。
  • Lock 异常时不会自动释放锁,需要Lock() 和unlock() 方法配合 try/finally 语句块来完成。

3、等待是否可中断

  • synchronized 不可中断,除非抛出异常或者正常运行完成。

  • ReentrantLock 可中断。

    • 1.设置超时方法tryLock(long timeout, TimeUnit unit)
    • 2.lockInterruptibly()放代码块中,调用interrupt() 方法可中断

4、加锁是否公平

  • synchronized 非公平锁。
  • Reentrantlock 两者都可以,默认非公平锁,构造方法可以传入boolean值, true 为公平锁,false为非公平锁。

5、锁绑定多个条件 condition

  • synchronized 没有。
  • ReentrantLock用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。

对synchronized和ReentrantLock进行反汇编

1
2
3
4
5
6
7
public static void main(String[] args) {
synchronized (new Object()){

}

new ReentrantLock();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(java.lang.String[]);
Code:
0: new #2 // class java/lang/Object
3: dup
4: invokespecial #1 // Method java/lang/Object."<init>":()V
7: dup
8: astore_1
9: monitorenter
10: aload_1
11: monitorexit
12: goto 20
15: astore_2
16: aload_1
17: monitorexit
18: aload_2
19: athrow
20: new #3 // class java/util/concurrent/locks/ReentrantLock
23: dup
24: invokespecial #4 // Method java/util/concurrent/locks/ReentrantLock."<init>":()V
27: pop
28: return

可以看到synchronized映射成字节码指令就是两个指令monitorentermonitorexit。当一条线程进行执行的遇到monitorenter指令的时候,它会去尝试获得锁,如果获得锁那么锁计数+1(为什么会加一呢,因为它是一个可重入锁,所以需要用这个锁计数判断锁的情况),如果没有获得锁,那么阻塞。当它遇到monitorexit的时候,锁计数器-1,当计数器为0,那么就释放锁。但是,我们发现有2个monitorexit,synchronized锁释放有两种机制,一种就是执行完释放;另外一种就是发送异常,虚拟机释放。第二个monitorexit就是发生异常时执行的流程,在第13行,有一个goto指令,也就是说如果正常运行结束会跳转到20行执行。


5.3、SpinLock 自旋锁 (CAS 思想)

是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环消耗CPU。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class SpinLockTest {

AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + " Lock!");

while (!atomicReference.compareAndSet(null, thread)) {
System.out.println(thread.getName()+" CAS In");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public void unMyLock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(thread.getName() + " unLock!");
}

public static void main(String[] args) {

SpinLockTest spinLockTest = new SpinLockTest();

new Thread(() -> {
spinLockTest.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockTest.unMyLock();

}, "Thread-1").start();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

new Thread(() -> {
spinLockTest.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockTest.unMyLock();
}, "Thread-2").start();
}
}

5.4、ReadWriteLockDemo 读写锁

  • 读锁共享、写锁独占:读读共存,读写互斥、写写互斥。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class TestReadWriteLock {

public static void main(String[] args) {
ReadWriteLockDemo rw = new ReadWriteLockDemo();

new Thread(() -> {
for (int i = 0; i < 2; i++) {
final int num = rw.set((int) (Math.random() * 101));
System.out.println(Thread.currentThread().getName() + "->" + num);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Write").start();

for (int i = 0; i < 1000; i++) {
new Thread(() -> {
rw.get();
}).start();
}
}

}

class ReadWriteLockDemo {
private int number = 0;
private ReadWriteLock lock = new ReentrantReadWriteLock();
// 读
public void get() {
lock.readLock().lock(); // 上锁

try {
System.out.println(Thread.currentThread().getName() + " : " + number);
} finally {
lock.readLock().unlock(); // 释放锁
}
}
// 写
public int set(int number) {
lock.writeLock().lock();
try {
this.number = number;
return number;
} finally {
lock.writeLock().unlock();
}
}
}

5.5、CountDownLatch 闭锁

CountDownLatch :闭锁,让一些线程阻塞,直到另一些所有线程的运算全部完成,计数器值变为0时,调用await()被阻塞的线程才会被唤醒继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.concurrent.CountDownLatch;

public class TestCountDownLatch {

public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(10);
LatchDemo ld = new LatchDemo(latch);
long start = System.currentTimeMillis();
//10个分线程
for (int i = 0; i < 10; i++) {
new Thread(ld, "T" + i).start();
}

//不加 latch.await(); 10个分线程和 1个主线程同时执行,没办法计算时间
try {
latch.await();
} catch (InterruptedException e) {
}
long end = System.currentTimeMillis();//主线程
System.out.println("耗费时间为:" + (end - start));
}
}
class LatchDemo implements Runnable {
private CountDownLatch latch;
public LatchDemo(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
//每个线程都各自打印50以内的偶数
synchronized (this) {
try {
for (int i = 0; i < 50; i++) {
if (i % 2 == 0) {
System.out.println(Thread.currentThread().getName() + "---->" + i);
}
}
} finally {
latch.countDown();
}
}
}
}

5.5.1、闭锁枚举的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(6);

//TODO 枚举的用法
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName());
countDownLatch.countDown();//TODO CountDownLatch做减法,锁为 0,释放 await
}, enumDemo.foreach_enum(i).getStr()).start();
}

try {
countDownLatch.await();//TODO main线程等待 直到 释放 await
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + "->秦!");
}
}

enum enumDemo {

ONE(1, "齐"), TWO(2, "楚"), THREE(3, "燕"), FOUR(4, "韩"), FIVE(5, "赵"), SIX(6, "魏");

private Integer code;
private String str;

enumDemo(Integer code, String str) {
this.code = code;
this.str = str;
}

public Integer getCode() {
return code;
}

public String getStr() {
return str;
}

public static enumDemo foreach_enum(int index) {
enumDemo[] values = enumDemo.values();

for (enumDemo value : values) {
if (index == value.getCode()) {
return value;
}
}
return null;
}
}

5.6、CyclicBarrier 循环屏障

  • 与 CountDownLatch 相反,做加法。让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会激活。所有被屏障拦截的线程才会继续干活。线程进入屏障通过 await()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

public static void main(String[] args) {

// public CyclicBarrier(int parties, Runnable barrierAction)
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙!");
});

for (int i = 1; i <= 7; i++) {

final int num = i;

new Thread(() -> {
System.out.println("--> 收集到" + num + "龙珠!");
try {
cyclicBarrier.await();//TODO
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}

5.7、Semaphore 信号

  • 用于共享资源的互斥使用。
  • 并发线程数的控制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {

public static void main(String[] args) {

Semaphore semaphore = new Semaphore(3);//模拟 3个位置

for (int i = 1; i <= 6; i++) {//模拟 6部车

new Thread(() -> {
try {
semaphore.acquire();//TODO
System.out.println(Thread.currentThread().getName() + " 抢到车位!");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 停车3秒后离开车位!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();//TODO
}
}, String.valueOf(i)).start();
}
}
}

六、阻塞队列 BlockQueue

为什么需要 阻塞队列 BlockQueue ?这样就不需要关心,什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockQueue 都自解决了。

  • 当阻塞队列是空的时候,从队列中获取元素操作将会阻塞(挂起)。
  • 当阻塞队列是满的时候,往队列中添加元素操作将会阻塞(挂起)。

6.1、阻塞队列分类

在多线程领域:所谓阻塞,在某些情况下,会挂起线程(即阻塞)一旦条件满足,被挂起的线程又会自动被唤醒。

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列;FIFO

  • LinkedBlockingQueue:由链表结构组成的有界(默认大小 Integer.MAX _VALUE)阻塞队列;

  • SynchronousQueue:不存储元素的阻塞队列,也即単个元素的队列;

  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列;

  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列;

  • linkedTransferQueue:由链表结构组成的无界阻塞队列;

  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

6.2、阻塞队列提供了四种处理方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用
  • 抛出异常:
    • 当阻塞队列满的时候,再往队列里插入元素,会抛出 IllegalStateException(“Queue full”) 异常。
    • 当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。
  • 返回特殊值:
    • 插入方法,会返回是否成功,成功则返回 true。
    • 移除方法,则是从队列里拿出一个元素,如果没有则返回 null。
  • 一直阻塞:
    • 当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。
    • 当队列空时,消费者线程试图从队列里 take 元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:
    • 当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

6.3、阻塞队列 API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @author Yu
*/
public class BlockQueueTest {

public static void main(String[] args) throws InterruptedException {

BlockingQueue blockingQueue = new ArrayBlockingQueue(3);

//TODO 抛出异常 add —— remove
System.out.println("抛出异常 add —— remove");
blockingQueue.add(1);
blockingQueue.add(2);
blockingQueue.add(3);
System.out.println("element()->获取头部元素:" + blockingQueue.element());//TODO 获取头部元素
System.out.println(blockingQueue.toString());
//blockingQueue.add(4);//TODO java.lang.IllegalStateException: Queue full

System.out.println(blockingQueue.remove());//TODO 1
System.out.println(blockingQueue.remove());//TODO 2
System.out.println(blockingQueue.remove());//TODO 3
//System.out.println(blockingQueue.remove());//TODO java.util.NoSuchElementException

System.out.println("-----------------------------");
System.out.println(blockingQueue.toString());

//TODO 返回特殊值 offer —— poll
System.out.println("返回特殊值 offer —— poll");
System.out.println(blockingQueue.offer(1));//TODO true
System.out.println(blockingQueue.offer(2));//TODO true
System.out.println(blockingQueue.offer(3));//TODO true
System.out.println(blockingQueue.offer(4));//TODO false

System.out.println("peek()->获取头部元素:" + blockingQueue.peek());
System.out.println(blockingQueue.toString());

System.out.println(blockingQueue.poll());//TODO 1
System.out.println(blockingQueue.poll());//TODO 2
System.out.println(blockingQueue.poll());//TODO 3
System.out.println(blockingQueue.poll());//TODO null

System.out.println("-----------------------------");
System.out.println(blockingQueue.toString());

//TODO 阻塞 put —— take
System.out.println("根据容器大小阻塞 put —— take");
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);

System.out.println("take-> " + blockingQueue.take());
blockingQueue.put(4);//TODO 根据队列大小 阻塞
System.out.println(blockingQueue.toString());

System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//System.out.println(blockingQueue.take());//TODO 阻塞

System.out.println("-----------------------------");
System.out.println(blockingQueue.toString());

//TODO 超时退出
//TODO offer(E e, long timeout, TimeUnit unit)
//TODO poll(long timeout, TimeUnit unit)
System.out.println("boolean offer(E e, long timeout, TimeUnit unit)");
System.out.println("E poll(long timeout, TimeUnit unit)");
System.out.println(blockingQueue.offer(1, 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer(2, 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer(3, 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer(4, 2L, TimeUnit.SECONDS)+" 阻塞2s");

System.out.println(blockingQueue.toString());
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)+" 阻塞2s");
}

6.4、SynchronousQueue

不存储元素的阻塞队列,也即単个元素的队列。

  • 每个 put 操作必须要等待一个 take 操作,否则不能继续添加元素,反之亦然。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
* @author Yu
*/
public class SynchronousQueueDemo {

public static void main(String[] args) {

BlockingQueue<String> synchronousQueue = new SynchronousQueue();

new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();

new Thread(() -> {
try {

TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "B").start();
}
}

传统版生产者消费者案例

  • while 避免虚假唤醒
  • 虚假唤醒就是一些obj.wait()会在除了obj.notify()和obj.notifyAll()的其他情况被唤醒,而此时是不应该唤醒的。解决的办法是基于while来反复判断进入正常操作的临界条件是否满足。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author Yu
*/
class MyData {

private int num = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void increment() throws Exception {
lock.lock();
try {
//1、判断
while (num != 0) {
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName() + "->" + num);
condition.signalAll();

} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void decrement() throws Exception {
lock.lock();
try {
//1、判断
while (num == 0) {
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName() + "->" + num);
condition.signalAll();

} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

public class Traditional_Productor_Consumer {

public static void main(String[] args) {

MyData myData = new MyData();

new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
myData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "A").start();

new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
myData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "B").start();
}
}

线程通信之生产者消费者阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ProdConsumer_BlockQueue {
public static void main(String[] args) throws InterruptedException {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "\t生产线程启动");
try {
myResource.myProd();

} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Prod").start();

new Thread(()->{
System.out.println(Thread.currentThread().getName() + "\t消费线程启动");
try {
myResource.myConsumer();

} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer").start();

TimeUnit.SECONDS.sleep(5);

System.out.println(Thread.currentThread().getName() + "\t 主线程叫停FLAG=false,生产动作结束");
myResource.close();
}
}
class MyResource{
private volatile boolean FLAG = true;//默认开启,进行生产+消费
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;

public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}

public void myProd() throws InterruptedException {
String data;
boolean retValue;
while(FLAG){
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if(retValue){
System.out.println(Thread.currentThread().getName() + "\t 插入队列"+data+"成功");
}else{
System.out.println(Thread.currentThread().getName() + "\t 插入队列"+data+"失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "/ FLAG = false,生产结束");
}

public void myConsumer() throws InterruptedException{
String result;
while(FLAG){
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(null == result || result.equalsIgnoreCase("")){
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 超过2秒没有消费成功,消费退出");
return;
}
System.out.println(Thread.currentThread().getName() + "\t消费队列" + result + "成功");
}
}

public void close(){
FLAG =false;
}
}
// 输出
Prod 生产线程启动
Consumer 消费线程启动
Prod 插入队列1成功
Consumer 消费队列1成功
Consumer 消费队列2成功
Prod 插入队列2成功
Consumer 消费队列3成功
Prod 插入队列3成功
Prod 插入队列4成功
Consumer 消费队列4成功
Prod 插入队列5成功
Consumer 消费队列5成功
main 主线程叫停FLAG=false,生产动作结束
Prod FLAG = false,生产结束
Consumer 超过2秒没有消费成功,消费退出

感谢阅读


If you like this blog or find it useful for you, you are welcome to comment on it. You are also welcome to share this blog, so that more people can participate in it. If the images used in the blog infringe your copyright, please contact the author to delete them. Thank you !