谈谈java的并发容器、Queue
目錄
同步類容器
并發類容器
ConcurrentMap:支持高并發下線程安全。
Copy-On-Write容器:最好在讀多寫少的情況下使用。
并發Queue
同步類容器
同步類容器是線程安全的。
//HashMap不是線程安全的,但是這樣做之后就是線程安全的。 Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());Vector類 是在 java 中可以實現自動增長的對象數組,線程安全的。 Vector<String> tickets = new Vector<String>(); HashTable、Vector,其底層的機制就是用傳統的synchronized關鍵字對每個公共方法進行同步,所以性能較差。并發類容器
ConcurrentHashMap代替了HashTable。
CopyOnWriteArrayList代替了Voctor。
以及并發的CopyonWriteArraySet。
以及并發的Queue,ConcurrentLinkedQueue(高性能隊列)和LinkedBlockingQueue(阻塞形式的隊列);具體的Queue還有很多,例如ArrayBlockingQueue、PriorityBlockingQueue、SynchronousQueue。
ConcurrentMap:支持高并發下線程安全。
ConcurrentHashMap代替HashTable:
HashTable底層機制是用傳統的synchronized關鍵字對每個公共方法進行同步,性能較差。
ConcurrentHashMap底層有一個segment(段)的概念最多分16個段,把一個HashTable分成許多段,相當于形成許多小的HashTable,通過減小鎖的粒度,減小鎖競爭的問題,底層大量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容,性能非常好。
ConcurrentSkipListMap(支持并發排序功能,彌補ConcurrentHashMap)
ConcurrentHashMap<String, Object> chm = new ConcurrentHashMap<String, Object>(); chm.put("k1", "v1"); chm.put("k2", "v2"); chm.put("k3", "v3"); chm.putIfAbsent("k4", "vvvv");//重復不加,不重復就加。 //System.out.println(chm.get("k2")); //System.out.println(chm.size()); for(Map.Entry<String, Object> me : chm.entrySet()){System.out.println("key:" + me.getKey() + ",value:" + me.getValue()); }Copy-On-Write容器:最好在讀多寫少的情況下使用。
對一個容器進行寫操作的時候,不直接往當前容器添加,而是先將當前容器進行Copy,復制出一個新的容器,然后新的容器進行寫操作;操作完成之后,再將原容器的引用指向新的容器。這樣做的好處是可以對CopyOnWrite容器進行并發的讀,而不需要加鎖。所以CopyOnWrite容器是一種讀寫分離的思想,讀和寫不同的容器。
CopyOnWriteArrayList<String> cwal = new CopyOnWriteArrayList<String>(); CopyOnWriteArraySet<String> cwas = new CopyOnWriteArraySet<String>(); 在add、remove等方法加了鎖,重入鎖。java.util.concurrent.locks.ReentrantLock并發Queue
ConcurrentLinkedQueue為代表的高性能隊列;BlockingQueue接口為代表的阻塞隊列。
ConcurrentLinkedQueue:適用于高并發,無鎖、先進先出、不允許null元素。 ConcurrentLinkedQueue重要方法: add()和offer()都是加入元素(在ConcurrentLinkedQueue中這倆方法沒有任何區別); poll()和peek()都是取頭元素節點,前者會刪除元素,后者不會。//高性能無阻塞無界隊列:ConcurrentLinkedQueue ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>(); q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.add("e"); System.out.println(q.poll()); //a 從頭部取出元素,并從隊列里刪除 System.out.println(q.size()); //4 System.out.println(q.peek()); //b System.out.println(q.size()); //4 ArrayBlockingQueue:基于數組實現的阻塞隊列。內部維護了一個定長數組,以便緩存隊列中的數據對象,其內部沒實現讀寫分離,也就意味著生產和消費不能完全并行,長度是需要定義的,可以指定先進先出或者先進后出,也叫有界隊列。 LinkedBlockingQueue:基于鏈表的阻塞隊列,同ArrayBlockingQueue類似,其內部維持著一個數據緩沖隊列(該隊列由一個鏈表構成),LinkedBlockingQueue之所以能夠高效的處理并發數據,是因為其內部實現采用分離鎖(讀寫分離兩個鎖),從而實現生產者和消費者操作的完全并行運行。它是一個無界隊列。 SynchronousQueue:一種沒有緩沖的隊列,生產者產生的數據直接會被消費者獲取并消費。ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5); array.put("a"); array.put("b"); array.add("c"); array.add("d"); array.add("e"); array.add("f");//最多5個,超長報錯 //System.out.println(array.offer("a", 3, TimeUnit.SECONDS));//返回true或false//阻塞隊列 LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>(5);//不聲明長度的話,就是無界隊列 q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.offer("e"); q.add("f");//聲明了5,但是用offer還添加的話添加不上,用add添加會報錯。 //System.out.println(q.size()); for (Iterator iterator = q.iterator(); iterator.hasNext();) {String string = (String) iterator.next();System.out.println(string); } List<String> list = new ArrayList<String>(); System.out.println(q.drainTo(list, 3));//從隊列中取3個元素放到集合里。3 System.out.println(list.size());//3 for (String string : list) {System.out.println(string); }final SynchronousQueue<String> q = new SynchronousQueue<String>(); //q.add("asdasd");//直接這樣加會報錯 Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {try {System.out.println(q.take());//會阻塞} catch (InterruptedException e) {e.printStackTrace();}} }); t1.start(); Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {q.add("asdasd");} }); t2.start(); PriorityBlockingQueue:基于優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定,也就是說傳入隊列的對象必須實現Comparable接口),在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖,他也是一個無界的隊列。public class Task implements Comparable<Task>{private int id ;private String name;public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic int compareTo(Task task) {return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0); }public String toString(){return this.id + "," + this.name;} }PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>(); Task t1 = new Task(); t1.setId(3); t1.setName("id為3"); Task t2 = new Task(); t2.setId(4); t2.setName("id為4"); Task t3 = new Task(); t3.setId(1); t3.setName("id為1"); //return this.id > task.id ? 1 : 0; q.add(t1); //3 q.add(t2); //4 q.add(t3); //1//并不是添加元素之后就排序 System.out.println("容器:" + q);//容器:[1,id為1, 4,id為4, 3,id為3] System.out.println(q.take().getId());//調用take方法之后把優先級最高的元素拿出來 System.out.println("容器:" + q); DelayQueue:帶有延遲時間的Queue,其中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue中的元素必須實現Delayed接口,DelayQueue是一個沒有大小限制的隊列,應用場景很多,比如對緩存超時的數據進行移除、任務超時處理、空閑連接的關閉等等。 public class WangBa implements Runnable { private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>(); public boolean yinye =true; public void shangji(String name,String id,int money){ Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis()); System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"交錢"+money+"塊,開始上機..."); this.queue.add(man); } public void xiaji(Wangmin man){ System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"時間到下機..."); } @Override public void run() { while(yinye){ try { Wangmin man = queue.take(); xiaji(man); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String args[]){ try{ System.out.println("網吧開始營業"); WangBa siyu = new WangBa(); Thread shangwang = new Thread(siyu); shangwang.start(); siyu.shangji("路人甲", "123", 1); siyu.shangji("路人乙", "234", 10); siyu.shangji("路人丙", "345", 5); } catch(Exception e){ e.printStackTrace();} } } public class Wangmin implements Delayed { private String name; //身份證 private String id; //截止時間 private long endTime; //定義時間工具類private TimeUnit timeUnit = TimeUnit.SECONDS;public Wangmin(String name,String id,long endTime){ this.name=name; this.id=id; this.endTime = endTime; } public String getName(){ return this.name; } public String getId(){ return this.id; } /*** 用來判斷是否到了截止時間*/ @Override public long getDelay(TimeUnit unit) {//return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);return endTime - System.currentTimeMillis();} /*** 相互批較排序用*/ @Override public int compareTo(Delayed delayed) { Wangmin w = (Wangmin)delayed; return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0; } }?
總結
以上是生活随笔為你收集整理的谈谈java的并发容器、Queue的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java的CountDownLatch使
- 下一篇: 谈谈java的线程池(创建、机制)