并发编程总结之线程间的通信

× 文章目录
  1. 1. 线程间通信概念
  2. 2. wait/notify实现线程通信
    1. 2.1. 最原始线程间通信代码
    2. 2.2. wait和notify实现线程间通信代码
    3. 2.3. 使用java.util.concurrent下的CountDownLatch实现实时通信
    4. 2.4. 使用wait/notify模拟Queue

摘要:使用wait/notify方法实现线程间的通信。wait和notify必须配合synchronized关键字使用,wait方法释放锁notify方法不释放锁

线程间通信概念

线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体,线程间的通信就成为整体的必用方式之一。当线程存在通信指挥,系统间的交互会更加强大,在提高CPU利用率的同时还会使开发人员对线程任务在处理的过程中进行有效的把控与监督。

wait/notify实现线程通信

使用wait/notify方法实现线程间的通信。注意这两个方法都是Object的类方法,换句话说Java为所有的对象提供了这个两个方法。
1.wait和notify必须配合synchronized关键字使用
2.wait方法释放锁notify方法不释放锁

最原始线程间通信代码

如下面代码所示,t1,t2两个线程,t1线程一直循环add,t2线程一直循环,当t1线程把list的size变为5的时候,t2线程抛出异常停止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.util.ArrayList;
import java.util.List;
/**
*
* @author xujin
*
*/
public class ListAdd1 {
private volatile static List<String> list = new ArrayList();
public void add() {
list.add("test....");
}
public int size() {
return list.size();
}
public static void main(String[] args) {
final ListAdd1 list1 = new ListAdd1();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
list1.add();
System.out
.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
if (list1.size() == 5) {
System.out.println("当前线程收到通知:" + Thread.currentThread().getName()
+ " list size = 5 线程停止..");
throw new RuntimeException();
}
}
}
}, "t2");
t1.start();
t2.start();
}
}

wait和notify实现线程间通信代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import java.util.ArrayList;
import java.util.List;
/**
* @author xujin
*
*/
public class ListAdd2 {
private volatile static List list = new ArrayList();
public void add() {
list.add("test.......");
}
public int size() {
return list.size();
}
public static void main(String[] args) {
final ListAdd2 list2 = new ListAdd2();
final Object lock = new Object();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
synchronized (lock) {
System.out.println("t1启动..");
for (int i = 0; i < 10; i++) {
list2.add();
System.out.println(
"当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
Thread.sleep(500);
if (list2.size() == 5) {
System.out.println("已经发出通知..");
lock.notify();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("t2启动..");
if (list2.size() != 5) {
try {
//t2线程,拿到了锁,但是size不等于5,所以lock.wait(),释放了锁,然后t1得到t2释放的锁。
System.out.println("size() != 5,t2 wait释放锁!..");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
throw new RuntimeException();
}
}
}, "t2");
t1.start();
t2.start();
}
}

运行结果分析如下:

通过上面的运行结果分析,可以看出线程间的通信,因为持有锁的问题,使用wait、notify线程间通信,没法做到实时通信。

使用java.util.concurrent下的CountDownLatch实现实时通信

示例代码,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author xujin
*
*/
public class ListAdd3 {
private volatile static List list = new ArrayList();
public void add() {
list.add("test.......");
}
public int size() {
return list.size();
}
public static void main(String[] args) {
final ListAdd3 list2 = new ListAdd3();
// final Object lock = new Object();
final CountDownLatch countDownLatch = new CountDownLatch(1);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
// synchronized (lock) {
System.out.println("t1启动..");
for (int i = 0; i < 10; i++) {
list2.add();
System.out.println(
"当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
Thread.sleep(500);
if (list2.size() == 5) {
System.out.println("已经发出通知..");
// lock.notify();
countDownLatch.countDown();
}
}
// }
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
// synchronized (lock) {
System.out.println("t2启动..");
if (list2.size() != 5) {
try {
// lock.wait();
countDownLatch.await();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(
"当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
throw new RuntimeException();
}
// }
}, "t2");
t2.start();
t1.start();
}
}

countDownLatch.countDown();相当于lock.notify();countDownLatch.await()相当于lock.wait();

如果final CountDownLatch countDownLatch = new CountDownLatch(2);需要countDownLatch.countDown()两次如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final ListAdd3 list2 = new ListAdd3();
// final Object lock = new Object();
final CountDownLatch countDownLatch = new CountDownLatch(2);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
// synchronized (lock) {
System.out.println("t1启动..");
for (int i = 0; i < 10; i++) {
list2.add();
System.out.println(
"当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
Thread.sleep(500);
if (list2.size() == 5) {
System.out.println("已经发出通知..");
// lock.notify();
countDownLatch.countDown();
countDownLatch.countDown();
}
}
// }
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1");

使用wait/notify模拟Queue

BlockingQueue:顾名思义阻塞队列,首先它是一个队列,并且支持阻塞的机制,阻塞的放入和得到数据。我们要实现LinkedBlockingQueue下面两个简单的方法put和get和take。
put(aObject):把一个对象aobject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。
take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态,直到BlockingQueue有新的数据被加入。
示例代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 使用wait/notify模拟Queue
* @author xujin
*
*/
public class ImitateQueue {
// 承载元素的集合
private final LinkedList<Object> list = new LinkedList<Object>();
// 计数器进行计数
private final AtomicInteger count = new AtomicInteger(0);
// 制定元素的上限和下限
private final int maxSize;
private final int minSize = 0;
// 初始化一个对象用于加锁
private final Object lock = new Object();
public ImitateQueue(int maxSize) {
this.maxSize = maxSize;
}
// 把一个对象aobject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续增加
public void put(Object obj) {
synchronized (lock) {
while (count.get() == maxSize) {
try {
// 当队列中数据塞满,线程等待
lock.wait();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(obj);
// 计数器自增
count.getAndIncrement();
System.out.println(" 元素 " + obj + " 被添加 ");
// 唤醒之前等待阻塞的take方法线程取数据
lock.notify();
}
}
// 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态,直到BlockingQueue有新的数据被加入
public Object take() {
Object temp = null;
synchronized (lock) {
while (count.get() == minSize) {
try {
// 当队列中的元素,取完,该线程等待
lock.wait();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
// 计数器递减
count.getAndDecrement();
// 取出元素
temp = list.removeFirst();
System.out.println(" 元素 " + temp + " 被消费 ");
// 唤醒之前阻塞的put方法把元素放进去
lock.notify();
}
return temp;
}
public int size() {
return count.get();
}
public static void main(String[] args) throws Exception {
final ImitateQueue m = new ImitateQueue(5);
m.put("a");
m.put("b");
m.put("c");
m.put("d");
m.put("e");
System.out.println("当前元素个数:" + m.size());
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
m.put("h");
m.put("i");
}
}, "t1");
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Object t1 = m.take();
System.out.println("被取走的元素为:" + t1);
Thread.sleep(1000);
Object t2 = m.take();
System.out.println("被取走的元素为:" + t2);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t2");
// 休眠2秒钟
TimeUnit.SECONDS.sleep(2);
t2.start();
}
}

程序运行结果如下

1
2
3
4
5
6
7
8
9
10
11
12
元素 a 被添加
元素 b 被添加
元素 c 被添加
元素 d 被添加
元素 e 被添加
当前元素个数:5
元素 a 被消费
被取走的元素为:a
元素 h 被添加
元素 b 被消费
被取走的元素为:b
元素 i 被添加

如果您觉得文章不错,可以打赏我喝一杯咖啡!