Java多线程编程-(4)-线程间通信机制的介绍与使用
上一篇:
Java多線程編程-(1)-線程安全和鎖Synchronized概念
Java多線程編程-(2)-可重入鎖以及Synchronized的其他基本特性
Java多線程編程-(3)-線程本地ThreadLocal的介紹與使用
線程間通信簡介
我們知道線程是操作系統中獨立的個體,但是這個單獨的個體之間沒有一種特殊的處理方式使之成為一個整體,線程之間沒有任何交流和溝通的話,他就是一個個單獨的個體,不足以形成一個強大的交互性較強的整體。
為了提高CPU的利用率和各線程之間相互協作,Java的一種實現線程間通信的機制是:wait/notify線程間通信,下邊就一起學習一下這種線程間的通信機制。
不使用等待/通知機制實現線程間通信
假如,我們不使用下邊需要介紹的機制,那我們如何實現兩個線程之間的通信哪,下邊看一段代碼,實現的是兩個線程向一個List里填充數據:
MyList代碼:
public class MyList {
? ? private List list = new ArrayList();
? ? public void add() {
? ? ? ? list.add("我是元素");
? ? }
? ? public int size() {
? ? ? ? return list.size();
? ? }
}
線程A:
public class ThreadA extends Thread {
? ? private MyList list;
? ? public ThreadA(MyList list) {
? ? ? ? super();
? ? ? ? this.list = list;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? try {
? ? ? ? ? ? for (int i = 0; i < 10; i++) {
? ? ? ? ? ? ? ? list.add();
? ? ? ? ? ? ? ? System.out.println("添加了" + (i + 1) + "個元素");
? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? }
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
線程B:
public class ThreadB extends Thread {
? ? private MyList list;
? ? public ThreadB(MyList list) {
? ? ? ? super();
? ? ? ? this.list = list;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? try {
? ? ? ? ? ? while (true) {
? ? ? ? ? ? ? ? if (list.size() == 5) {
? ? ? ? ? ? ? ? ? ? System.out.println("==5了,線程b要退出了!");
? ? ? ? ? ? ? ? ? ? throw new InterruptedException();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
}
測試類Test:
public class Test {
? ? public static void main(String[] args) {
? ? ? ? MyList myList = new MyList();
? ? ? ? ThreadA a = new ThreadA(myList);
? ? ? ? a.setName("A");
? ? ? ? a.start();
? ? ? ? ThreadB b = new ThreadB(myList);
? ? ? ? b.setName("B");
? ? ? ? b.start();
? ? }
}
執行結果:
添加了1個元素
添加了2個元素
添加了3個元素
添加了4個元素
添加了5個元素
==5了,線程b要退出了!
java.lang.InterruptedException
? ? at text.ThreadB.run(ThreadB.java:20)
添加了6個元素
添加了7個元素
添加了8個元素
添加了9個元素
添加了10個元素
可以看出,當List集合中的數據為5個的時候線程B退出,雖然兩個線程之間實現了通信,但是代碼中我們的線程B是一直執行著while(true) 循環的,直到長度為5才終止執行,顯然這種方式是很消耗資源的。所以,就需要一種機制能避免上述的操作又能實現多個線程之間的通信,這就是接下來需要學習的“wait/notify線程間通信”。
什么是等待/通知機制
道理很簡單,就像我們去銀行辦業務,進門之后取票號,等到達的時候會廣播通知我們辦業務一樣,這就是很實際的一個場景,我們取了票號就需要等待,等業務員輪到票號的時候就會廣播通知。
Java中等待/通知機制的實現
Java中對應等待/通知的方法是wait()/notify(),這兩個方法都是超類Object中的方法,如下圖所示:
之所以會是超類Object中的方法,我們可以簡單的理解:上幾篇文章中我們知道任何對象都可以作為鎖,而wait()/notify()是由鎖調用的,想到這里自然可以體會到這里設計的巧妙之處。
一、wait方法
(1)方法wait()的作用是使當前執行代碼的線程進行等待,該方法會將該線程放入”預執行隊列“中,并且在wait()所在的代碼處停止執行,直到接到通知或被中斷為止。
(2)在調用wait()之前,線程必須獲得該對象級別鎖,這是一個很重要的地方,很多時候我們可能會忘記這一點,即只能在同步方法或同步塊中調用wait()方法。
(3)還需要注意的是wait()是釋放鎖的,即在執行到wait()方法之后,當前線程會釋放鎖,當從wait()方法返回前,線程與其他線程競爭重新獲得鎖。
二、notify方法
(1)和wait()方法一樣,notify()方法也要在同步塊或同步方法中調用,即在調用前,線程也必須獲得該對象的對象級別鎖。
(2)該方法是用來通知那些可能等待該對象的對象鎖的其他線程,如果有多個線程等待,則由線程規劃器隨機挑選出其中一個呈wait狀態的線程,對其發出通知notify,并使它等待獲取該對象的對象鎖。
(3)這里需要注意的是,執行notify方法之后,當前線程不會立即釋放其擁有的該對象鎖,而是執行完之后才會釋放該對象鎖,被通知的線程也不會立即獲得對象鎖,而是等待notify方法執行完之后,釋放了該對象鎖,才可以獲得該對象鎖。
(3)notifyAll()通知所有等待同一共享資源的全部線程從等待狀態退出,進入可運行狀態,重新競爭獲得對象鎖。
三、wait()/notify()方法總結
(1)wait()/notify()要集合synchronized關鍵字一起使用,因為他們都需要首先獲取該對象的對象鎖;
(2)wait方法是釋放鎖,notify方法是不釋放鎖的;
(3)線程的四種狀態如下圖:
wait/notify線程間通信示例代碼
根據上述不使用wait/notify的代碼改造如下:
MyList代碼:
public class MyList {
? ? private static List list = new ArrayList();
? ? public static void add() {
? ? ? ? list.add("我是元素");
? ? }
? ? public static int size() {
? ? ? ? return list.size();
? ? }
}
線程A:
public class ThreadA extends Thread {
? ? private Object lock;
? ? public ThreadA(Object lock) {
? ? ? ? super();
? ? ? ? this.lock = lock;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? try {
? ? ? ? ? ? synchronized (lock) {
? ? ? ? ? ? ? ? if (MyList.size() != 5) {
? ? ? ? ? ? ? ? ? ? System.out.println("wait begin " + System.currentTimeMillis());
? ? ? ? ? ? ? ? ? ? lock.wait();
? ? ? ? ? ? ? ? ? ? System.out.println("wait end ?" + System.currentTimeMillis());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
}
線程B:
public class ThreadB extends Thread {
? ? private Object lock;
? ? public ThreadB(Object lock) {
? ? ? ? super();
? ? ? ? this.lock = lock;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? try {
? ? ? ? ? ? synchronized (lock) {
? ? ? ? ? ? ? ? for (int i = 0; i < 10; i++) {
? ? ? ? ? ? ? ? ? ? MyList.add();
? ? ? ? ? ? ? ? ? ? if (MyList.size() == 5) {
? ? ? ? ? ? ? ? ? ? ? ? lock.notify();
? ? ? ? ? ? ? ? ? ? ? ? System.out.println("已發出通知!");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? System.out.println("添加了" + (i + 1) + "個元素!");
? ? ? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
}
測試代碼:
public class Run {
? ? public static void main(String[] args) {
? ? ? ? try {
? ? ? ? ? ? Object lock = new Object();
? ? ? ? ? ? ThreadA a = new ThreadA(lock);
? ? ? ? ? ? a.start();
? ? ? ? ? ? Thread.sleep(50);
? ? ? ? ? ? ThreadB b = new ThreadB(lock);
? ? ? ? ? ? b.start();
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
}
運行結果:
wait begin 1507634541467
添加了1個元素!
添加了2個元素!
添加了3個元素!
添加了4個元素!
已發出通知!
添加了5個元素!
添加了6個元素!
添加了7個元素!
添加了8個元素!
添加了9個元素!
添加了10個元素!
wait end ?1507634551563
上述實例已經實現了簡單的等待通知機制,并且我們也可以看到,雖然線程B在第五個元素的時候發出通知,而線程A實現線程B執行完之后才獲得對象鎖,這也可以說明,wait方法是釋放鎖的而notify方法是不釋放鎖的。
另一個案例:使用wait/notify模擬BlockingQueue阻塞隊列
BlockingQueue是阻塞隊列,我們需要實現的是阻塞的放入和得到數據,設計思路如下:
(1)初始化隊列最大長度為5;?
(2)需要新加入的時候,判斷是否長度為5,如果是5則等待插入;?
(3)需要消費元素的時候,判斷是否為0,如果是0則等待消費;
實現代碼如下:
public class MyQueue {
? ? //1、需要一個承裝元素的集合
? ? private final LinkedList<Object> list = new LinkedList<>();
? ? //2、需要一個計數器
? ? private final AtomicInteger count = new AtomicInteger(0);
? ? //3、需要指定上限和下限
? ? private final int maxSize = 5;
? ? private final int minSize = 0;
? ? //5、初始化鎖對象
? ? private final Object lock = new Object();
? ? /**
? ? ?* put方法
? ? ?*/
? ? public void put(Object obj) {
? ? ? ? synchronized (lock) {
? ? ? ? ? ? //達到最大無法添加,進入等到
? ? ? ? ? ? while (count.get() == maxSize) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? lock.wait();
? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? list.add(obj); //加入元素
? ? ? ? ? ? count.getAndIncrement(); //計數器增加
? ? ? ? ? ? System.out.println(" 元素 " + obj + " 被添加 ");
? ? ? ? ? ? lock.notify(); //通知另外一個阻塞的線程方法
? ? ? ? }
? ? }
? ? /**
? ? ?* get方法
? ? ?*/
? ? public Object get() {
? ? ? ? Object temp;
? ? ? ? synchronized (lock) {
? ? ? ? ? ? //達到最小,沒有元素無法消費,進入等到
? ? ? ? ? ? while (count.get() == minSize) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? lock.wait();
? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? count.getAndDecrement();
? ? ? ? ? ? temp = list.removeFirst();
? ? ? ? ? ? System.out.println(" 元素 " + temp + " 被消費 ");
? ? ? ? ? ? lock.notify();
? ? ? ? }
? ? ? ? return temp;
? ? }
? ? private int size() {
? ? ? ? return count.get();
? ? }
? ? public static void main(String[] args) throws Exception {
? ? ? ? final MyQueue myQueue = new MyQueue();
? ? ? ? initMyQueue(myQueue);
? ? ? ? Thread t1 = new Thread(() -> {
? ? ? ? ? ? myQueue.put("h");
? ? ? ? ? ? myQueue.put("i");
? ? ? ? }, "t1");
? ? ? ? Thread t2 = new Thread(() -> {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Thread.sleep(2000);
? ? ? ? ? ? ? ? myQueue.get();
? ? ? ? ? ? ? ? Thread.sleep(2000);
? ? ? ? ? ? ? ? myQueue.get();
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }, "t2");
? ? ? ? t1.start();
? ? ? ? Thread.sleep(1000);
? ? ? ? t2.start();
? ? }
? ? private static void initMyQueue(MyQueue myQueue) {
? ? ? ? myQueue.put("a");
? ? ? ? myQueue.put("b");
? ? ? ? myQueue.put("c");
? ? ? ? myQueue.put("d");
? ? ? ? myQueue.put("e");
? ? ? ? System.out.println("當前元素個數:" + myQueue.size());
? ? }
}
執行結果:
?元素 a 被添加?
?元素 b 被添加?
?元素 c 被添加?
?元素 d 被添加?
?元素 e 被添加?
當前元素個數:5
?元素 a 被消費?
?元素 h 被添加?
?元素 b 被消費?
?元素 i 被添加?
其他注意事項
(1)wait()和notify()方法要在同步塊或同步方法中調用,即在調用前,線程也必須獲得該對象的對象級別鎖。
(2)wait方法是釋放鎖,notify方法是不釋放鎖的;
(3)notify每次喚醒wait等待狀態的線程都是隨機的,且每次只喚醒一個;
(4)notifAll每次喚醒wait等待狀態的線程使之重新競爭獲取對象鎖,優先級最高的那個線程會最先執行;
(5)當線程處于wait()狀態時,調用線程對象的interrupt()方法會出現InterruptedException異常;
其他知識點
(1)進程間的通信方式:
管道(pipe)、有名管道(named pipe)、信號量(semophore)、消息隊列(message queue)、信號(signal)、共享內存(shared memory)、套接字(socket);
(2)線程程間的通信方式:
1、鎖機制?
1.1 互斥鎖:提供了以排它方式阻止數據結構被并發修改的方法。?
1.2 讀寫鎖:允許多個線程同時讀共享數據,而對寫操作互斥。?
1.3 條件變量:可以以原子的方式阻塞進程,直到某個特定條件為真為止。
對條件測試是在互斥鎖的保護下進行的。條件變量始終與互斥鎖一起使用。
2、信號量機制:包括無名線程信號量與有名線程信號量?
3、信號機制:類似于進程間的信號處理。
線程間通信的主要目的是用于線程同步,所以線程沒有象進程通信中用于數據交換的通信機制。
---------------------?
作者:徐劉根?
來源:CSDN?
原文:https://blog.csdn.net/xlgen157387/article/details/78195817?
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
總結
以上是生活随笔為你收集整理的Java多线程编程-(4)-线程间通信机制的介绍与使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 眼皮有时单有时双怎么办
- 下一篇: Java多线程编程-(5)-使用Lock