當前位置:首頁 » 編程語言 » java隊列處理高並發

java隊列處理高並發

發布時間: 2023-01-07 06:02:26

javase線程怎麼存儲到容器

Java 並發重要知識點
java 線程池
ThreadPoolExecutor 類分析
ThreadPoolExecutor 類中提供的四個構造方法。我們來看最長的那個,其餘三個都是在這個構造方法的基礎上產生(其他幾個構造方法說白點都是給定某些默認參數的構造方法比如默認制定拒絕策略是什麼)。

/**
* 用給定的初始參數創建一個新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//線程池的核心線程數量
int maximumPoolSize,//線程池的最大線程數
long keepAliveTime,//當線程數大於核心線程數時,多餘的空閑線程存活的最長時間
TimeUnit unit,//時間單位
BlockingQueue<Runnable> workQueue,//任務隊列,用來儲存等待執行任務的隊列
ThreadFactory threadFactory,//線程工廠,用來創建線程,一般默認即可
RejectedExecutionHandler handler//拒絕策略,當提交的任務過多而不能及時處理時,我們可以定製策略來處理任務
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
下面這些對創建非常重要,在後面使用線程池的過程中你一定會用到!所以,務必拿著小本本記清楚。

ThreadPoolExecutor 3 個最重要的參數:

corePoolSize : 核心線程數線程數定義了最小可以同時運行的線程數量。
maximumPoolSize : 當隊列中存放的任務達到隊列容量的時候,當前可以同時運行的線程數量變為最大線程數。
workQueue: 當新任務來的時候會先判斷當前運行的線程數量是否達到核心線程數,如果達到的話,新任務就會被存放在隊列中。
ThreadPoolExecutor其他常見參數 :

keepAliveTime:當線程池中的線程數量大於 corePoolSize 的時候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了 keepAliveTime才會被回收銷毀;
unit : keepAliveTime 參數的時間單位。
threadFactory :executor 創建新線程的時候會用到。
handler :飽和策略。關於飽和策略下面單獨介紹一下。
下面這張圖可以加深你對線程池中各個參數的相互關系的理解(圖片來源:《Java 性能調優實戰》):

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-nzbqGRz9-1654600571133)(https://javaguide.cn/assets/%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%90%84%E4%B8%AA%E5%8F%82%E6%95%B0%E4%B9%8B%E9%97%B4%E7%9A%84%E5%85%B3%E7%B3%BB.d65f3309.png)]

ThreadPoolExecutor 飽和策略定義:

如果當前同時運行的線程數量達到最大線程數量並且隊列也已經被放滿了任務時,ThreadPoolTaskExecutor 定義一些策略:

ThreadPoolExecutor.AbortPolicy :拋出 RejectedExecutionException來拒絕新任務的處理。
ThreadPoolExecutor.CallerRunsPolicy :調用執行自己的線程運行任務,也就是直接在調用execute方法的線程中運行(run)被拒絕的任務,如果執行程序已關閉,則會丟棄該任務。因此這種策略會降低對於新任務提交速度,影響程序的整體性能。如果您的應用程序可以承受此延遲並且你要求任何一個任務請求都要被執行的話,你可以選擇這個策略。
ThreadPoolExecutor.DiscardPolicy :不處理新任務,直接丟棄掉。
ThreadPoolExecutor.DiscardOldestPolicy : 此策略將丟棄最早的未處理的任務請求。
舉個例子:

Spring 通過 ThreadPoolTaskExecutor 或者我們直接通過 ThreadPoolExecutor 的構造函數創建線程池的時候,當我們不指定 RejectedExecutionHandler 飽和策略的話來配置線程池的時候默認使用的是 ThreadPoolExecutor.AbortPolicy。在默認情況下,ThreadPoolExecutor 將拋出 RejectedExecutionException 來拒絕新來的任務 ,這代表你將丟失對這個任務的處理。 對於可伸縮的應用程序,建議使用 ThreadPoolExecutor.CallerRunsPolicy。當最大池被填滿時,此策略為我們提供可伸縮隊列。(這個直接查看 ThreadPoolExecutor 的構造函數源碼就可以看出,比較簡單的原因,這里就不貼代碼了。)

推薦使用 ThreadPoolExecutor 構造函數創建線程池
在《阿里巴巴 Java 開發手冊》「並發處理」這一章節,明確指出線程資源必須通過線程池提供,不允許在應用中自行顯式創建線程。

為什麼呢?

使用線程池的好處是減少在創建和銷毀線程上所消耗的時間以及系統資源開銷,解決資源不足的問題。如果不使用線程池,有可能會造成系統創建大量同類線程而導致消耗完內存或者「過度切換」的問題。

另外,《阿里巴巴 Java 開發手冊》中強制線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 構造函數的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險

Executors 返回線程池對象的弊端如下(後文會詳細介紹到):

FixedThreadPool 和 SingleThreadExecutor : 允許請求的隊列長度為 Integer.MAX_VALUE,可能堆積大量的請求,從而導致 OOM。
CachedThreadPool 和 ScheledThreadPool : 允許創建的線程數量為 Integer.MAX_VALUE ,可能會創建大量線程,從而導致 OOM。
方式一:通過ThreadPoolExecutor構造函數實現(推薦)通過構造方法實現

方式二:通過 Executor 框架的工具類 Executors 來實現 我們可以創建三種類型的 ThreadPoolExecutor:

FixedThreadPool
SingleThreadExecutor
CachedThreadPool
對應 Executors 工具類中的方法如圖所示:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-YGd4ygZu-1654600571136)(https://javaguide.cn/assets/Executors%E5%B7%A5%E5%85%B7%E7%B1%BB.4b0cbd16.png)]

正確配置線程池參數
說到如何給線程池配置參數,美團的騷操作至今讓我難忘(後面會提到)!

我們先來看一下各種書籍和博客上一般推薦的配置線程池參數的方式,可以作為參考!

常規操作
很多人甚至可能都會覺得把線程池配置過大一點比較好!我覺得這明顯是有問題的。就拿我們生活中非常常見的一例子來說:並不是人多就能把事情做好,增加了溝通交流成本。你本來一件事情只需要 3 個人做,你硬是拉來了 6 個人,會提升做事效率嘛?我想並不會。 線程數量過多的影響也是和我們分配多少人做事情一樣,對於多線程這個場景來說主要是增加了上下文切換成本。不清楚什麼是上下文切換的話,可以看我下面的介紹。

上下文切換:

多線程編程中一般線程的個數都大於 CPU 核心的個數,而一個 CPU 核心在任意時刻只能被一個線程使用,為了讓這些線程都能得到有效執行,CPU 採取的策略是為每個線程分配時間片並輪轉的形式。當一個線程的時間片用完的時候就會重新處於就緒狀態讓給其他線程使用,這個過程就屬於一次上下文切換。概括來說就是:當前任務在執行完 CPU 時間片切換到另一個任務之前會先保存自己的狀態,以便下次再切換回這個任務時,可以再載入這個任務的狀態。任務從保存到再載入的過程就是一次上下文切換。

上下文切換通常是計算密集型的。也就是說,它需要相當可觀的處理器時間,在每秒幾十上百次的切換中,每次切換都需要納秒量級的時間。所以,上下文切換對系統來說意味著消耗大量的 CPU 時間,事實上,可能是操作系統中時間消耗最大的操作。

Linux 相比與其他操作系統(包括其他類 Unix 系統)有很多的優點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。

類比於實現世界中的人類通過合作做某件事情,我們可以肯定的一點是線程池大小設置過大或者過小都會有問題,合適的才是最好。

如果我們設置的線程池數量太小的話,如果同一時間有大量任務/請求需要處理,可能會導致大量的請求/任務在任務隊列中排隊等待執行,甚至會出現任務隊列滿了之後任務/請求無法處理的情況,或者大量任務堆積在任務隊列導致 OOM。這樣很明顯是有問題的! CPU 根本沒有得到充分利用。

但是,如果我們設置線程數量太大,大量線程可能會同時在爭取 CPU 資源,這樣會導致大量的上下文切換,從而增加線程的執行時間,影響了整體執行效率。

有一個簡單並且適用面比較廣的公式:

CPU 密集型任務(N+1): 這種任務消耗的主要是 CPU 資源,可以將線程數設置為 N(CPU 核心數)+1,比 CPU 核心數多出來的一個線程是為了防止線程偶發的缺頁中斷,或者其它原因導致的任務暫停而帶來的影響。一旦任務暫停,CPU 就會處於空閑狀態,而在這種情況下多出來的一個線程就可以充分利用 CPU 的空閑時間。
I/O 密集型任務(2N): 這種任務應用起來,系統會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內不會佔用 CPU 來處理,這時就可以將 CPU 交出給其它線程使用。因此在 I/O 密集型任務的應用中,我們可以多配置一些線程,具體的計算方法是 2N。
如何判斷是 CPU 密集任務還是 IO 密集任務?

CPU 密集型簡單理解就是利用 CPU 計算能力的任務比如你在內存中對大量數據進行排序。但凡涉及到網路讀取,文件讀取這類都是 IO 密集型,這類任務的特點是 CPU 計算耗費時間相比於等待 IO 操作完成的時間來說很少,大部分時間都花在了等待 IO 操作完成上。

美團的騷操作
美團技術團隊在《Java線程池實現原理及其在美團業務中的實踐》open in new window這篇文章中介紹到對線程池參數實現可自定義配置的思路和方法。

美團技術團隊的思路是主要對線程池的核心參數實現自定義可配置。這三個核心參數是:

corePoolSize : 核心線程數線程數定義了最小可以同時運行的線程數量。
maximumPoolSize : 當隊列中存放的任務達到隊列容量的時候,當前可以同時運行的線程數量變為最大線程數。
workQueue: 當新任務來的時候會先判斷當前運行的線程數量是否達到核心線程數,如果達到的話,新任務就會被存放在隊列中。
為什麼是這三個參數?

我在這篇《新手也能看懂的線程池學習總結》open in new window 中就說過這三個參數是 ThreadPoolExecutor 最重要的參數,它們基本決定了線程池對於任務的處理策略。

如何支持參數動態配置? 且看 ThreadPoolExecutor 提供的下面這些方法。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Sm89qdJZ-1654600571137)(https://javaguide.cn/assets/b6fd95a7-4c9d-4fc6-ad26-890adb3f6c4c.5ff332dc.png)]

格外需要注意的是corePoolSize, 程序運行期間的時候,我們調用 setCorePoolSize() 這個方法的話,線程池會首先判斷當前工作線程數是否大於corePoolSize,如果大於的話就會回收工作線程。

另外,你也看到了上面並沒有動態指定隊列長度的方法,美團的方式是自定義了一個叫做 的隊列(主要就是把LinkedBlockingQueue的capacity 欄位的final關鍵字修飾給去掉了,讓它變為可變的)。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-cmNN5yAL-1654600571138)(https://javaguide.cn/assets/19a0255a-6ef3-4835-98d1-a839d1983332.b334d1e9.png)]

還沒看夠?推薦 why神的[《如何設置線程池參數?美團給出了一個讓面試官虎軀一震的回答。》open in new window](如何設置線程池參數?美團給出了一個讓面試官虎軀一震的回答。 (qq.com))這篇文章,深度剖析,很不錯哦!

Java 常見並發容器
JDK 提供的這些容器大部分在 java.util.concurrent 包中。

ConcurrentHashMap : 線程安全的 HashMap
CopyOnWriteArrayList : 線程安全的 List,在讀多寫少的場合性能非常好,遠遠好於 Vector。
ConcurrentLinkedQueue : 高效的並發隊列,使用鏈表實現。可以看做一個線程安全的 LinkedList,這是一個非阻塞隊列。
BlockingQueue : 這是一個介面,JDK 內部通過鏈表、數組等方式實現了這個介面。表示阻塞隊列,非常適合用於作為數據共享的通道。
ConcurrentSkipListMap : 跳錶的實現。這是一個 Map,使用跳錶的數據結構進行快速查找。
ConcurrentHashMap
我們知道 HashMap 不是線程安全的,在並發場景下如果要保證一種可行的方式是使用 Collections.synchronizedMap() 方法來包裝我們的 HashMap。但這是通過使用一個全局的鎖來同步不同線程間的並發訪問,因此會帶來不可忽視的性能問題。

所以就有了 HashMap 的線程安全版本—— ConcurrentHashMap 的誕生。

在 ConcurrentHashMap 中,無論是讀操作還是寫操作都能保證很高的性能:在進行讀操作時(幾乎)不需要加鎖,而在寫操作時通過鎖分段技術只對所操作的段加鎖而不影響客戶端對其它段的訪問。

CopyOnWriteArrayList
CopyOnWriteArrayList 簡介
public class CopyOnWriteArrayList<E>
extends Object
implements List<E>, RandomAccess, Cloneable, Serializable
在很多應用場景中,讀操作可能會遠遠大於寫操作。由於讀操作根本不會修改原有的數據,因此對於每次讀取都進行加鎖其實是一種資源浪費。我們應該允許多個線程同時訪問 List 的內部數據,畢竟讀取操作是安全的。

這和我們之前在多線程章節講過 ReentrantReadWriteLock 讀寫鎖的思想非常類似,也就是讀讀共享、寫寫互斥、讀寫互斥、寫讀互斥。JDK 中提供了 CopyOnWriteArrayList 類比相比於在讀寫鎖的思想又更進一步。為了將讀取的性能發揮到極致,CopyOnWriteArrayList 讀取是完全不用加鎖的,並且更厲害的是:寫入也不會阻塞讀取操作。只有寫入和寫入之間需要進行同步等待。這樣一來,讀操作的性能就會大幅度提升。那它是怎麼做的呢?

CopyOnWriteArrayList 是如何做到的?
CopyOnWriteArrayList 類的所有可變操作(add,set 等等)都是通過創建底層數組的新副本來實現的。當 List 需要被修改的時候,我並不修改原有內容,而是對原有數據進行一次復制,將修改的內容寫入副本。寫完之後,再將修改完的副本替換原來的數據,這樣就可以保證寫操作不會影響讀操作了。

從 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是滿足 CopyOnWrite 的。所謂 CopyOnWrite 也就是說:在計算機,如果你想要對一塊內存進行修改時,我們不在原有內存塊中進行寫操作,而是將內存拷貝一份,在新的內存中進行寫操作,寫完之後呢,就將指向原來內存指針指向新的內存,原來的內存就可以被回收掉了。

CopyOnWriteArrayList 讀取和寫入源碼簡單分析
CopyOnWriteArrayList 讀取操作的實現
讀取操作沒有任何同步控制和鎖操作,理由就是內部數組 array 不會發生修改,只會被另外一個 array 替換,因此可以保證數據安全。

/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
public E get(int index) {
return get(getArray(), index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
final Object[] getArray() {
return array;
}

CopyOnWriteArrayList 寫入操作的實現
CopyOnWriteArrayList 寫入操作 add()方法在添加集合的時候加了鎖,保證了同步,避免了多線程寫的時候會 出多個副本出來。

/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//加鎖
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.Of(elements, len + 1);//拷貝新數組
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();//釋放鎖
}
}

ConcurrentLinkedQueue
Java 提供的線程安全的 Queue 可以分為阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是 BlockingQueue,非阻塞隊列的典型例子是 ConcurrentLinkedQueue,在實際應用中要根據實際需要選用阻塞隊列或者非阻塞隊列。 阻塞隊列可以通過加鎖來實現,非阻塞隊列可以通過 CAS 操作實現。

從名字可以看出,ConcurrentLinkedQueue這個隊列使用鏈表作為其數據結構.ConcurrentLinkedQueue 應該算是在高並發環境中性能最好的隊列了。它之所有能有很好的性能,是因為其內部復雜的實現。

ConcurrentLinkedQueue 內部代碼我們就不分析了,大家知道 ConcurrentLinkedQueue 主要使用 CAS 非阻塞演算法來實現線程安全就好了。

ConcurrentLinkedQueue 適合在對性能要求相對較高,同時對隊列的讀寫存在多個線程同時進行的場景,即如果對隊列加鎖的成本較高則適合使用無鎖的 ConcurrentLinkedQueue 來替代。

BlockingQueue
BlockingQueue 簡介
上面我們己經提到了 ConcurrentLinkedQueue 作為高性能的非阻塞隊列。下面我們要講到的是阻塞隊列——BlockingQueue。阻塞隊列(BlockingQueue)被廣泛使用在「生產者-消費者」問題中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。當隊列容器已滿,生產者線程會被阻塞,直到隊列未滿;當隊列容器為空時,消費者線程會被阻塞,直至隊列非空時為止。

BlockingQueue 是一個介面,繼承自 Queue,所以其實現類也可以作為 Queue 的實現來使用,而 Queue 又繼承自 Collection 介面。下面是 BlockingQueue 的相關實現類:

BlockingQueue 的實現類

下面主要介紹一下 3 個常見的 BlockingQueue 的實現類:ArrayBlockingQueue、LinkedBlockingQueue 、PriorityBlockingQueue 。

ArrayBlockingQueue
ArrayBlockingQueue 是 BlockingQueue 介面的有界隊列實現類,底層採用數組來實現。

public class ArrayBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable{}
ArrayBlockingQueue 一旦創建,容量不能改變。其並發控制採用可重入鎖 ReentrantLock ,不管是插入操作還是讀取操作,都需要獲取到鎖才能進行操作。當隊列容量滿時,嘗試將元素放入隊列將導致操作阻塞;嘗試從一個空隊列中取一個元素也會同樣阻塞。

ArrayBlockingQueue 默認情況下不能保證線程訪問隊列的公平性,所謂公平性是指嚴格按照線程等待的絕對時間順序,即最先等待的線程能夠最先訪問到 ArrayBlockingQueue。而非公平性則是指訪問 ArrayBlockingQueue 的順序不是遵守嚴格的時間順序,有可能存在,當 ArrayBlockingQueue 可以被訪問時,長時間阻塞的線程依然無法訪問到 ArrayBlockingQueue。如果保證公平性,通常會降低吞吐量。如果需要獲得公平性的 ArrayBlockingQueue,可採用如下代碼:

private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
1
1
LinkedBlockingQueue
LinkedBlockingQueue 底層基於單向鏈表實現的阻塞隊列,可以當做無界隊列也可以當做有界隊列來使用,同樣滿足 FIFO 的特性,與 ArrayBlockingQueue 相比起來具有更高的吞吐量,為了防止 LinkedBlockingQueue 容量迅速增,損耗大量內存。通常在創建 LinkedBlockingQueue 對象時,會指定其大小,如果未指定,容量等於 Integer.MAX_VALUE 。

相關構造方法:

/**
*某種意義上的無界隊列
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

/**
*有界隊列
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

PriorityBlockingQueue
PriorityBlockingQueue 是一個支持優先順序的無界阻塞隊列。默認情況下元素採用自然順序進行排序,也可以通過自定義類實現 compareTo() 方法來指定元素排序規則,或者初始化時通過構造器參數 Comparator 來指定排序規則。

PriorityBlockingQueue 並發控制採用的是可重入鎖 ReentrantLock,隊列為無界隊列(ArrayBlockingQueue 是有界隊列,LinkedBlockingQueue 也可以通過在構造函數中傳入 capacity 指定隊列最大的容量,但是 PriorityBlockingQueue 只能指定初始的隊列大小,後面插入元素的時候,如果空間不夠的話會自動擴容)。

簡單地說,它就是 PriorityQueue 的線程安全版本。不可以插入 null 值,同時,插入隊列的對象必須是可比較大小的(comparable),否則報 ClassCastException 異常。它的插入操作 put 方法不會 block,因為它是無界隊列(take 方法在隊列為空的時候會阻塞)。

推薦文章: 《解讀 Java 並發隊列 BlockingQueue》open in new window

ConcurrentSkipListMap
下面這部分內容參考了極客時間專欄《數據結構與演算法之美》open in new window以及《實戰 Java 高並發程序設計》。

為了引出 ConcurrentSkipListMap,先帶著大家簡單理解一下跳錶。

對於一個單鏈表,即使鏈表是有序的,如果我們想要在其中查找某個數據,也只能從頭到尾遍歷鏈表,這樣效率自然就會很低,跳錶就不一樣了。跳錶是一種可以用來快速查找的數據結構,有點類似於平衡樹。它們都可以對元素進行快速的查找。但一個重要的區別是:對平衡樹的插入和刪除往往很可能導致平衡樹進行一次全局的調整。而對跳錶的插入和刪除只需要對整個數據結構的局部進行操作即可。這樣帶來的好處是:在高並發的情況下,你會需要一個全局鎖來保證整個平衡樹的線程安全。而對於跳錶,你只需要部分鎖即可。這樣,在高並發環境下,你就可以擁有更好的性能。而就查詢的性能而言,跳錶的時間復雜度也是 O(logn) 所以在並發數據結構中,JDK 使用跳錶來實現一個 Map。

跳錶的本質是同時維護了多個鏈表,並且鏈表是分層的,

2級索引跳錶

最低層的鏈表維護了跳錶內所有的元素,每上面一層鏈表都是下面一層的子集。

跳錶內的所有鏈表的元素都是排序的。查找時,可以從頂級鏈表開始找。一旦發現被查找的元素大於當前鏈表中的取值,就會轉入下一層鏈表繼續找。這也就是說在查找過程中,搜索是跳躍式的。如上圖所示,在跳錶中查找元素 18。

在跳錶中查找元素18

查找 18 的時候原來需要遍歷 18 次,現在只需要 7 次即可。針對鏈表長度比較大的時候,構建索引查找效率的提升就會非常明顯。

從上面很容易看出,跳錶是一種利用空間換時間的演算法。

使用跳錶實現 Map 和使用哈希演算法實現 Map 的另外一個不同之處是:哈希並不會保存元素的順序,而跳錶內所有的元素都是排序的。因此在對跳錶進行遍歷時,你會得到一個有序的結果。所以,如果你的應用需要有序性,那麼跳錶就是你不二的選擇。JDK 中實現這一數據結構的類是 ConcurrentSkipListMap。

② JAVA高並發問題,大數據,頻繁I/O操作。

建議採用緩存處理,按照你說的這種數據量,基於redis的緩存完全可以滿足,存取速度可以10W+的,另外,擬採用的hashMap 是ConcurrentHashMap還是其他,頁面展示是增量查詢還是直接所有的再查詢一次,socket數據接收你是用的netty還是mina,這都需要經過仔細的斟酌考慮設計的。有這么大的並發的需求,完全可以考慮做分布式集群的,估計這只是領導想要的目標吧

③ java的monitor機制中,為什麼阻塞隊列用list等待隊列用set

java阻塞隊列應用於生產者消費者模式、消息傳遞、並行任務執行和相關並發設計的大多數常見使用上下文。

BlockingQueue在Queue介面基礎上提供了額外的兩種類型的操作,分別是獲取元素時等待隊列變為非空和添加元素時等待空間變為可用。

BlockingQueue新增操作的四種形式:

3.2.1.3 HashMap類

對Map類的另外一個實現是HashMap。HashMap使用Hash表數據結構。HashMap假定哈希函數能夠將元素適當的分布在各桶之間,提供一種接近O(1)的查詢和更新操作。但是如果需要對集合進行迭代,則與HashMap的容量和桶的大小有關,因此HashMap的迭代效率不會很高(尤其是你為HashMap設置了較大的容量時)。

與HashMap性能有影響的兩個參數是,初始容量和載入因子。容量是哈希表中桶的數量,初始容量是哈希表在創建時的容量。載入因子是哈希表在容器容量被自動擴充之前,HashMap能夠達到多滿的一種程度。當hash表中的條目數超出了載入因子與當前容量的乘積時,Hash表需要進行rehash操作,此時Hash表將會擴充為以前兩倍的桶數,這個擴充過程需要進行完全的拷貝工作,效率並不高,因此應當盡量避免。合理的設置Hash表的初始容量和載入因子會提高Hash表的性能。HashMap自身不是線程安全的,可以通過Collections的synchronizedMap方法對HashMap進行包裝。

3.2.1.4 ConcurrentHashMap類

ConcurrentHashMap類實現了ConcurrentMap介面,並提供了與HashMap相同的規范和功能。實際上Hash表具有很好的局部可操作性,因為對Hash表的更新操作僅會影響到具體的某個桶(假設更新操作沒有引發rehash),對全局並沒有顯著影響。因此ConcurrentHashMap可以提供很好的並發處理能力。可以通過concurrencyLevel的設置,來控制並發工作線程的數目(默認為16),合理的設置這個值,有時很重要,如果這個值設置的過高,那麼很有可能浪費空間和時間,使用的值過低,又會導致線程的爭用,對數量估計的過高或過低往往會帶來明顯的性能影響。最好在創建ConcurrentHashMap時提供一個合理的初始容量,畢竟rehash操作具有較高的代價。

3.2.2 ConcurrentSkipListSet類

實際上Set和Map從結構來說是很像的,從底層的演算法原理分析,Set和Map應當屬於同源的結構。所以Java也提供了TreeSet和ConcurrentSkipListSet兩種SortedSet,分別適合於非多線程(或低並發多線程)和多線程程序使用。具體的演算法請參考前述的Map相關介紹,這里不在累述。

3.2.3 CopyOnWriteArrayList類

CopyOnWriteArrayList是ArrayList的一個線程安全的變體,其中對於所有的可變操作都是通過對底層數組進行一次新的復制來實現的。

由於可變操作需要對底層的數據進行一次完全拷貝,因此開銷一般較大,但是當遍歷操作遠遠多於可變操作時,此方法將會更有效,這是一種被稱為「快照」的模式,數組在迭代器生存期內不會發生更改,因此不會產生沖突。創建迭代器後,迭代器不會反映列表的添加、移除或者更改。不支持在迭代器上進行remove、set和add操作。CopyOnWriteArraySet與CopyOnWriteArrayList相似,只不過是Set類的一個變體。

3.2.3 Collections提供的線程安全的封裝

Collections中提供了synchronizedCollection、synchronizedList、synchronizedMap、synchronizedSet、synchronizedSortedMap、synchronizedSortedMap等方法可以完成多種集合的線程安全的包裝,如果在並發度不高的情況下,可以考慮使用這些包裝方法,不過由於Concurrent相關的類的出現,已經不這么提倡使用這些封裝了,這些方法有些人稱他們為過時的線程安全機制。

3.2.4簡單總結

提供線程安全的集合簡單概括分為三類,首先,對於並發性要求很高的需求可以選擇以Concurrent開頭的相應的集合類,這些類主要包括:ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentSkipListMap、ConcurrentSkipSet。其次對於可變操作次數遠遠小於遍歷的情況,可以使用CopyOnWriteArrayList和CopyOnWriteArraySet類。最後,對於並發規模比較小的並行需求可以選擇Collections類中的相應方法對已有集合進行封裝。

此外,本章還對一些集合類的底層實現進行簡單探討,對底層實現的了解有利於對何時使用何種方式作出正確判斷。希望大家能夠將涉及到原理(主要有循環隊列、堆、HashMap、紅黑樹、SkipList)進行仔細研究,這樣才能更深入了解Java為什麼這樣設計類庫,在什麼情況使用,應當如何使用。

④ java高並發,如何解決,什麼方式解決,高並發

首先,為防止高並發帶來的系統壓力,或者高並發帶來的系統處理異常,數據紊亂,可以以下幾方面考慮:1、加鎖,這里的加鎖不是指加java的多線程的鎖,是指加應用所和資料庫鎖,應用鎖這邊通常是使用redis的setnx來做,其次加資料庫鎖,因為代碼中加了應用所,所以資料庫不建議加悲觀鎖(排他鎖),一般加樂觀鎖(通過設置一個seq_no來解決),這兩個鎖一般能解決了,最後做合理的流控,丟棄一部分請求也是必不可少的

⑤ 4種線程池和7種並發隊列

Java並發包中的阻塞隊列一共7個,當然他們都是線程安全的。 

ArrayBlockingQueue:一個由數組結構組成的 有界 阻塞隊列。 

LinkedBlockingQueue:一個由鏈表結構組成的無界阻塞隊列。 

PriorityBlockingQueue:一個支持優先順序排序的無界阻塞隊列。 

DealyQueue:一個使用優先順序(啟動時間)隊列實現的無界阻塞隊列。 

SynchronousQueue:一個不存儲元素的阻塞隊列。 

LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。 

LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

常用的只有三個,重點是前兩個

LinkedBlockingQueue和ArrayBlockingQueue區別:

1、LinkedBlockingQueue內部由兩個ReentrantLock來實現出入隊列的線程安全,由各自的Condition對象的await和signal來實現等待和喚醒功能。而ArrayBlockingQueue的只使用一個ReentrantLock管理進出隊列。

而LinkedBlockingQueue實現的隊列中的鎖是分離的,其添加採用的是putLock,移除採用的則是takeLock,這樣能大大提高隊列的吞吐量,也意味著在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。

2、隊列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對於後者而言,當添加速度大於移除速度時,在無界的情況下,可能會造成內存溢出等問題。

3、數據存儲容器不同,ArrayBlockingQueue採用的是數組作為數據存儲容器,而LinkedBlockingQueue採用的則是以Node節點作為連接對象的鏈表。

由於ArrayBlockingQueue採用的是數組的存儲容器,因此在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而LinkedBlockingQueue則會生成一個額外的Node對象。這可能在長時間內需要高效並發地處理大批量數據的時,對於GC可能存在較大影響。

SynchronousQueue

沒有容量,是無緩沖等待隊列,是一個不存儲元素的阻塞隊列,會直接將任務交給消費者,必須等隊列中的添加元素被消費後才能繼續添加新的元素

使用SynchronousQueue阻塞隊列一般要求maximumPoolSizes為無界,避免線程拒絕執行操作。

1、newfixed,線程默認大小確定、最大大小確定(實際沒什麼用),默認使用linkedblockqueue,無盡隊列

危害在於這個等待隊列,隊列如果消費不及時不斷膨脹可以使機器資源耗盡

ArrayBlockingQueue是一個有界緩存等待隊列,可以指定緩存隊列的大小,當正在執行的線程數等於corePoolSize時,多餘的元素緩存在ArrayBlockingQueue隊列中等待有空閑的線程時繼續執行,當ArrayBlockingQueue已滿時,加入ArrayBlockingQueue失敗,會開啟新的線程去執行,當線程數已經達到最大的maximumPoolSizes時,再有新的元素嘗試加入ArrayBlockingQueue時會報錯。

2、cached,線程數不限大小

危害 本身就是沒有限制,有多少請求創建多少線程,直到資源耗盡

CachedThreadPool使用沒有容量的SynchronousQueue作為主線程池的工作隊列,它是一個沒有容量的阻塞隊列。每個插入操作必須等待另一個線程的對應移除操作。這意味著,如果主線程提交任務的速度高於線程池中處理任務的速度時,CachedThreadPool會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU資源。

3、single

可順序執行任務,同時只有一個線程處理(單線程)

執行過程如下:

1.如果當前工作中的線程數量少於corePool的數量,就創建一個新的線程來執行任務。

2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.線程執行完1中的任務後會從隊列中去任務。

注意:由於在線程池中只有一個工作線程,所以任務可以按照添加順序執行。

4、ScheledThreadPool

public ScheledThreadPoolExecutor(int corePoolSize) {

        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

              new DelayedWorkQueue());

    }

使用了延遲隊列,無界,和cached類似

如果運行的線程達到了corePoolSize,就把任務添加到任務隊列DelayedWorkQueue中;DelayedWorkQueue會將任務排序,先執行的任務放在隊列的前面。

任務執行完後,ScheledFutureTask中的變數time改為下次要執行的時間,並放回到DelayedWorkQueue中

DelayQueue是一個沒有邊界BlockingQueue實現,加入其中的元素必需實現Delayed介面。 當生產者線程調用put之類的方法加入元素時,會觸發Delayed介面中的compareTo方法進行排序,也就是說隊列中元素的順序是按到期時間排序的,而非它們進入隊列的順序。排在隊列頭部的元素是最早到期的,越往後到期時間赿晚。

消費者線程查看隊列頭部的元素,注意是查看不是取出。然後調用元素的getDelay方法,如果此方法返回的值小0或者等於0,則消費者線程會從隊列中取出此元素,並進行處理。如果getDelay方法返回的值大於0,則消費者線程wait返回的時間值後,再從隊列頭部取出元素,此時元素應該已經到期。

DelayQueue是Leader-Followr模式的變種,消費者線程處於等待狀態時,總是等待最先到期的元素,而不是長時間的等待。消費者線程盡量把時間花在處理任務上,最小化空等的時間,以提高線程的利用效率。

無論創建那種線程池 必須要調用ThreadPoolExecutor,以上四種都是調用這個實現的

線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue,

RejectedExecutionHandler handler)

corePoolSize: 線程池維護線程的最少數量 

maximumPoolSize:線程池維護線程的最大數量 

keepAliveTime: 線程池維護線程所允許的空閑時間 , unit: 線程池維護線程所允許的空閑時間的單位 

workQueue: 線程池所使用的緩沖隊列 

handler: 線程池對拒絕任務的處理策略 --飽和策略

-------------------------------------------------------------------

通用流程:

一個任務通過 execute(Runnable)方法被添加到線程池,任務就是一個 Runnable類型的對象,任務的執行方法就是 Runnable類型對象的run()方法。

當一個任務通過execute(Runnable)方法欲添加到線程池時:

如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。

如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那麼任務被放入緩沖隊列。

如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。

如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。

也就是:處理任務的優先順序為:

核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。 

當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。

---------------------------------------------------------------------------------------------

unit可選的參數為java.util.concurrent.TimeUnit中的幾個靜態屬性:

NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

workQueue我常用的是:java.util.concurrent. ArrayBlockingQueue

handler有四個選擇:

1、【拋異常】ThreadPoolExecutor.AbortPolicy() 拋出java.util.concurrent.RejectedExecutionException異常

2、【重試】ThreadPoolExecutor.CallerRunsPolicy() 重試添加當前的任務,他會自動重復調用execute()方法

ThreadPoolExecutor.DiscardOldestPolicy()

3、【找個舊的停了】拋棄舊的任務 ThreadPoolExecutor.DiscardPolicy()

4、【不拋異常】拋棄當前的任務

5、當然也可以根據應用場景實現RejectedExecutionHandler介面,自定義 飽和策略 ,如記錄日誌或持久化存儲不能處理的任務。

https://www.cnblogs.com/zhanshi/p/5469948.html

⑥ java 為什麼要使用隊列

你指的是JMS么???
選用他的一個好處就是:它可以把不影響用戶執行結果又比較耗時的任務(比如發郵件通知管理員)非同步的扔給JMS 服務端去做,而盡快的把屏幕返還給用戶。 服務端能夠多線程排隊響應高並發的請求,並保證請求不丟失。
還有一種是被動型的:比如兩個平台之間交互的時候對方決定使用JMS隊列,那你也只能跟著他用這個列隊來處理信息流。

它允許多線程將信息放入隊列,也能多線程消費。以統一的格式發送請求:例如JSON

⑦ 談論java中怎樣處理高並發的問題

對於我們開發的網站,如果網站的訪問量非常大的話,那麼我們就需要考慮相關的並發訪問問題了。而並發問題是絕大部分的程序員頭疼的問題,

但話又說回來了,既然逃避不掉,那我們就坦然面對吧~今天就讓我們一起來研究一下常見的並發和同步吧。

為了更好的理解並發和同步,我們需要先明白兩個重要的概念:同步和非同步

1、同步和非同步的區別和聯系

所謂同步,可以理解為在執行完一個函數或方法之後,一直等待系統返回值或消息,這時程序是出於阻塞的,只有接收到

返回的值或消息後才往下執行其它的命令。

非同步,執行完函數或方法後,不必阻塞性地等待返回值或消息,只需要向系統委託一個非同步過程,那麼當系統接收到返回

值或消息時,系統會自動觸發委託的非同步過程,從而完成一個完整的流程。

同步在一定程度上可以看做

⑧ BlockingQueue詳解

在Java中,BlockingQueue是一個介面,它的實現類有 ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、 LinkedBlockingQueue、PriorityBlockingQueue、 SynchronousQueue等,它們的區別主要體現在存儲結構上或對元素 操作上的不同,但是對於take與put操作的原理,卻是類似的。

阻塞與非阻塞

LinkedBlockingQueue內部是使用鏈表實現一個隊列的,但是有別於一般的隊列,在於該隊列至少是有一個節點的,頭節點不含有元素。如果隊列為空時,頭節點的next參數為null。尾節點的next參數也為null。

1、LinkedBlockingQueue不允許插入的元素為null;

2、同一時刻只有一個線程可以進行入隊操作,putLock在將元素插入隊列尾部時加鎖了;

3、如果隊列滿了,則會調用notFull.await(),將該線程加入到Condition等待隊列中。await方法會釋放線程佔有的鎖,這將導致之前由於被阻塞的入隊線程將會獲取到鎖,執行到while循環處,不過可能因為隊列仍舊是滿的,也被進入到條件隊列中;

4、一旦有出隊線程取走元素,就會通知到入隊等待隊列釋放線程。那麼第一個加入到Condition隊列中的將會被釋放,那麼該線程將會重新獲得put鎖,繼而執行enqueue()方法,將節點插入到隊列的尾部;

5、然後得到插入隊列前元素的個數,如果插入後隊列中還可以繼續插入元素,那麼就通知notFull條件的等待隊列中的線程;

6、如果插入隊列前個數為0,那現在插入後,就為1了,那就可以通知因為隊列為空而導致阻塞的出隊線程去取元素了。

1、同一時刻只有一個線程可以進行出隊操作,takeLock在出隊之前加鎖了;

2、如果隊列中元素為空,那就進入notEmpty隊列中進行等待。直到隊列不為空時,得到隊列中的第一個元素。當發現取完發現還有元素可取時,再通知一下notEmpty隊列中等待的其他線程。最後判斷自己取元素前的是不是滿的,如果是滿的,那自己取完,就不滿了,就可以通知在notFull隊列中等待插入的線程進行put了。

LinkedBlockingQueue允許兩個線程同時在兩端進行入隊和出隊操作,但一端同時只能有一個線程進行操作,是通過兩個鎖進行區分的。

為了維護底部數據的統一,引入了AtomicInteger的一個count變數,表示隊列中元素的個數。count只能在兩個地方變化,一個是入隊的方法(進行+1操作),另一個是出隊的方法(進行-1操作),而AtomicInteger是原子安全的,所以也就確保了底層隊列的數據同步。

ArrayBlockingQueue內部是使用數組實現一個隊列的,並且在構造方法中就需要指定容量,也就意味著底層數組一旦創建了,容量就不能改變了,因此ArrayBlockingQueue是一個容量限制的阻塞隊列。因此在隊列滿的時候執行入隊會阻塞,在隊列為空時出隊也會阻塞。

1、ArrayBlockingQueue不允許添加null元素;

2、ArrayBlockingQueue在隊列已滿的時候,會調用notFull.await(),釋放鎖並處於阻塞狀態;

3、一旦ArrayBlockingQueue在隊列不滿的時候,就立即入隊。

1、取元素時,一旦獲得鎖,隊列為空, 則會阻塞,直至不為空,調用dequeue()出隊。

ArrayBlockingQueue是一個底層結構是數組的阻塞隊列,是通過 ReentrantLock Condition 來實現的。不可插入為null的元素,入隊和出隊使用的是同一個鎖。意味著同一時刻只能有一個線程能進行入隊或者出隊操作。入隊時,隊列已滿則會調用notFull.await(),進入阻塞狀態。直到隊列不滿時,再進行入隊操作。當出隊時,隊列為空,則調用notEmpty.await(),進入阻塞狀態,直到隊列不為空時,則出隊。

LinkedBlockingQueue底層實現是鏈表,ArrayBlockingQueue底層實現是數組

LinkedBlockingQueue默認的隊列長度是Integer.Max,但是可以指定容量。在入隊與出隊都高並發的情況下,性能比ArrayBlockingQueue高很多;

ArrayBlockingQueue必須在構造方法中指定隊列長度,不可變。在只有入隊高並發或出隊高並發的情況下,因為操作數組,且不需要擴容,性能很高。

LinkedBlockingQueue有兩把鎖,可以有兩個線程同時進行入隊和出隊操作,但同時只能有一個線程進行入隊或出隊操作。

ArrayBlockingQueue只有一把鎖,同時只能有一個線程進行入隊和出隊操作。

⑨ js怎麼處理高並發

JS不同於Java, C#等語言.
使用Java編寫的應用, 可以編程開啟多線程處理高並發業務場景.
而JS處理高並發場景使用的是 : 隊列機制, 事件機制
因為JS在網頁中運行時單線程模式, 在服務端nodejs中運行是單進程模式, 都無法像JAVA那樣開啟多個線程或者協程來處理高並發任務.
但是這不意味著JS無法處理高並發任務, 單進程的程序在使用隊列機制(就是待處理任務一個個排隊)處理高並發場景也仍然是非常高效的, 而且避免了開啟多個線程的內存消耗.但是其缺點也是很明顯的 : 不適合處理單個任務計算非常復雜消耗時間的場景.
舉個栗子 :
想像一下生活中排隊的場景, 如果前面有一個人磨磨唧唧, 半天賴在窗口各種問問題, 後面的人都要排隊等著, 很著急.
而如果開啟多個窗口(多線程/進程), 那些難纏的人分到一個窗口, 速度快的人分到一個窗口, 效率就大大提升了.

⑩ 如何控制高並發,比如現在商品表10個商品,有10000個人同時下單購買,這個時候我們如何處理這種情況

這種時候可以通過隊列,比如每次有人下單,就丟到隊列裡面,然後隊列裡面校驗庫存,這樣有並發的時候也只是多插入了隊列,但是隊列裡面是依次執行的。隊列的話可以參考下redis如何做隊列處理。有php相關擴展的,java的話我就不清楚了。

熱點內容
隨機啟動腳本 發布:2025-07-05 16:10:30 瀏覽:535
微博資料庫設計 發布:2025-07-05 15:30:55 瀏覽:32
linux485 發布:2025-07-05 14:38:28 瀏覽:310
php用的軟體 發布:2025-07-05 14:06:22 瀏覽:760
沒有許可權訪問計算機 發布:2025-07-05 13:29:11 瀏覽:437
javaweb開發教程視頻教程 發布:2025-07-05 13:24:41 瀏覽:734
康師傅控流腳本破解 發布:2025-07-05 13:17:27 瀏覽:249
java的開發流程 發布:2025-07-05 12:45:11 瀏覽:696
怎麼看內存卡配置 發布:2025-07-05 12:29:19 瀏覽:288
訪問學者英文個人簡歷 發布:2025-07-05 12:29:17 瀏覽:838