java队列处理高并发
Java 并发重要知识点
java 线程池
ThreadPoolExecutor 类分析
ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
下面这些对创建非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。
ThreadPoolExecutor 3 个最重要的参数:
corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor其他常见参数 :
keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
unit : keepAliveTime 参数的时间单位。
threadFactory :executor 创建新线程的时候会用到。
handler :饱和策略。关于饱和策略下面单独介绍一下。
下面这张图可以加深你对线程池中各个参数的相互关系的理解(图片来源:《Java 性能调优实战》):
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nzbqGRz9-1654600571133)(https://javaguide.cn/assets/%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%90%84%E4%B8%AA%E5%8F%82%E6%95%B0%E4%B9%8B%E9%97%B4%E7%9A%84%E5%85%B3%E7%B3%BB.d65f3309.png)]
ThreadPoolExecutor 饱和策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:
ThreadPoolExecutor.AbortPolicy :抛出 RejectedExecutionException来拒绝新任务的处理。
ThreadPoolExecutor.CallerRunsPolicy :调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
ThreadPoolExecutor.DiscardPolicy :不处理新任务,直接丢弃掉。
ThreadPoolExecutor.DiscardOldestPolicy : 此策略将丢弃最早的未处理的任务请求。
举个例子:
Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了。)
推荐使用 ThreadPoolExecutor 构造函数创建线程池
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
为什么呢?
使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
另外,《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回线程池对象的弊端如下(后文会详细介绍到):
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
CachedThreadPool 和 ScheledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
方式一:通过ThreadPoolExecutor构造函数实现(推荐)通过构造方法实现
方式二:通过 Executor 框架的工具类 Executors 来实现 我们可以创建三种类型的 ThreadPoolExecutor:
FixedThreadPool
SingleThreadExecutor
CachedThreadPool
对应 Executors 工具类中的方法如图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YGd4ygZu-1654600571136)(https://javaguide.cn/assets/Executors%E5%B7%A5%E5%85%B7%E7%B1%BB.4b0cbd16.png)]
正确配置线程池参数
说到如何给线程池配置参数,美团的骚操作至今让我难忘(后面会提到)!
我们先来看一下各种书籍和博客上一般推荐的配置线程池参数的方式,可以作为参考!
常规操作
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
上下文切换:
多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。
但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
美团的骚操作
美团技术团队在《Java线程池实现原理及其在美团业务中的实践》open in new window这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。
美团技术团队的思路是主要对线程池的核心参数实现自定义可配置。这三个核心参数是:
corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
为什么是这三个参数?
我在这篇《新手也能看懂的线程池学习总结》open in new window 中就说过这三个参数是 ThreadPoolExecutor 最重要的参数,它们基本决定了线程池对于任务的处理策略。
如何支持参数动态配置? 且看 ThreadPoolExecutor 提供的下面这些方法。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Sm89qdJZ-1654600571137)(https://javaguide.cn/assets/b6fd95a7-4c9d-4fc6-ad26-890adb3f6c4c.5ff332dc.png)]
格外需要注意的是corePoolSize, 程序运行期间的时候,我们调用 setCorePoolSize() 这个方法的话,线程池会首先判断当前工作线程数是否大于corePoolSize,如果大于的话就会回收工作线程。
另外,你也看到了上面并没有动态指定队列长度的方法,美团的方式是自定义了一个叫做 的队列(主要就是把LinkedBlockingQueue的capacity 字段的final关键字修饰给去掉了,让它变为可变的)。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cmNN5yAL-1654600571138)(https://javaguide.cn/assets/19a0255a-6ef3-4835-98d1-a839d1983332.b334d1e9.png)]
还没看够?推荐 why神的[《如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。》open in new window](如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。 (qq.com))这篇文章,深度剖析,很不错哦!
Java 常见并发容器
JDK 提供的这些容器大部分在 java.util.concurrent 包中。
ConcurrentHashMap : 线程安全的 HashMap
CopyOnWriteArrayList : 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector。
ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列。
BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。
ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找。
ConcurrentHashMap
我们知道 HashMap 不是线程安全的,在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap() 方法来包装我们的 HashMap。但这是通过使用一个全局的锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。
所以就有了 HashMap 的线程安全版本—— ConcurrentHashMap 的诞生。
在 ConcurrentHashMap 中,无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。
CopyOnWriteArrayList
CopyOnWriteArrayList 简介
public class CopyOnWriteArrayList<E>
extends Object
implements List<E>, RandomAccess, Cloneable, Serializable
在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读取操作是安全的。
这和我们之前在多线程章节讲过 ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList 类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。那它是怎么做的呢?
CopyOnWriteArrayList 是如何做到的?
CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。
从 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的。所谓 CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。
CopyOnWriteArrayList 读取和写入源码简单分析
CopyOnWriteArrayList 读取操作的实现
读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
public E get(int index) {
return get(getArray(), index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
final Object[] getArray() {
return array;
}
CopyOnWriteArrayList 写入操作的实现
CopyOnWriteArrayList 写入操作 add()方法在添加集合的时候加了锁,保证了同步,避免了多线程写的时候会 出多个副本出来。
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.Of(elements, len + 1);//拷贝新数组
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();//释放锁
}
}
ConcurrentLinkedQueue
Java 提供的线程安全的 Queue 可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。
从名字可以看出,ConcurrentLinkedQueue这个队列使用链表作为其数据结构.ConcurrentLinkedQueue 应该算是在高并发环境中性能最好的队列了。它之所有能有很好的性能,是因为其内部复杂的实现。
ConcurrentLinkedQueue 内部代码我们就不分析了,大家知道 ConcurrentLinkedQueue 主要使用 CAS 非阻塞算法来实现线程安全就好了。
ConcurrentLinkedQueue 适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue 来替代。
BlockingQueue
BlockingQueue 简介
上面我们己经提到了 ConcurrentLinkedQueue 作为高性能的非阻塞队列。下面我们要讲到的是阻塞队列——BlockingQueue。阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。
BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。下面是 BlockingQueue 的相关实现类:
BlockingQueue 的实现类
下面主要介绍一下 3 个常见的 BlockingQueue 的实现类:ArrayBlockingQueue、LinkedBlockingQueue 、PriorityBlockingQueue 。
ArrayBlockingQueue
ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。
public class ArrayBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable{}
ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock ,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue。而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue 可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的 ArrayBlockingQueue,可采用如下代码:
private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
1
1
LinkedBlockingQueue
LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足 FIFO 的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE 。
相关构造方法:
/**
*某种意义上的无界队列
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
*有界队列
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。
PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。
简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
推荐文章: 《解读 Java 并发队列 BlockingQueue》open in new window
ConcurrentSkipListMap
下面这部分内容参考了极客时间专栏《数据结构与算法之美》open in new window以及《实战 Java 高并发程序设计》。
为了引出 ConcurrentSkipListMap,先带着大家简单理解一下跳表。
对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 所以在并发数据结构中,JDK 使用跳表来实现一个 Map。
跳表的本质是同时维护了多个链表,并且链表是分层的,
2级索引跳表
最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集。
跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。如上图所示,在跳表中查找元素 18。
在跳表中查找元素18
查找 18 的时候原来需要遍历 18 次,现在只需要 7 次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显。
从上面很容易看出,跳表是一种利用空间换时间的算法。
使用跳表实现 Map 和使用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是 ConcurrentSkipListMap。
② JAVA高并发问题,大数据,频繁I/O操作。
建议采用缓存处理,按照你说的这种数据量,基于redis的缓存完全可以满足,存取速度可以10W+的,另外,拟采用的hashMap 是ConcurrentHashMap还是其他,页面展示是增量查询还是直接所有的再查询一次,socket数据接收你是用的netty还是mina,这都需要经过仔细的斟酌考虑设计的。有这么大的并发的需求,完全可以考虑做分布式集群的,估计这只是领导想要的目标吧
③ java的monitor机制中,为什么阻塞队列用list等待队列用set
java阻塞队列应用于生产者消费者模式、消息传递、并行任务执行和相关并发设计的大多数常见使用上下文。
BlockingQueue在Queue接口基础上提供了额外的两种类型的操作,分别是获取元素时等待队列变为非空和添加元素时等待空间变为可用。
BlockingQueue新增操作的四种形式:
3.2.1.3 HashMap类
对Map类的另外一个实现是HashMap。HashMap使用Hash表数据结构。HashMap假定哈希函数能够将元素适当的分布在各桶之间,提供一种接近O(1)的查询和更新操作。但是如果需要对集合进行迭代,则与HashMap的容量和桶的大小有关,因此HashMap的迭代效率不会很高(尤其是你为HashMap设置了较大的容量时)。
与HashMap性能有影响的两个参数是,初始容量和加载因子。容量是哈希表中桶的数量,初始容量是哈希表在创建时的容量。加载因子是哈希表在容器容量被自动扩充之前,HashMap能够达到多满的一种程度。当hash表中的条目数超出了加载因子与当前容量的乘积时,Hash表需要进行rehash操作,此时Hash表将会扩充为以前两倍的桶数,这个扩充过程需要进行完全的拷贝工作,效率并不高,因此应当尽量避免。合理的设置Hash表的初始容量和加载因子会提高Hash表的性能。HashMap自身不是线程安全的,可以通过Collections的synchronizedMap方法对HashMap进行包装。
3.2.1.4 ConcurrentHashMap类
ConcurrentHashMap类实现了ConcurrentMap接口,并提供了与HashMap相同的规范和功能。实际上Hash表具有很好的局部可操作性,因为对Hash表的更新操作仅会影响到具体的某个桶(假设更新操作没有引发rehash),对全局并没有显着影响。因此ConcurrentHashMap可以提供很好的并发处理能力。可以通过concurrencyLevel的设置,来控制并发工作线程的数目(默认为16),合理的设置这个值,有时很重要,如果这个值设置的过高,那么很有可能浪费空间和时间,使用的值过低,又会导致线程的争用,对数量估计的过高或过低往往会带来明显的性能影响。最好在创建ConcurrentHashMap时提供一个合理的初始容量,毕竟rehash操作具有较高的代价。
3.2.2 ConcurrentSkipListSet类
实际上Set和Map从结构来说是很像的,从底层的算法原理分析,Set和Map应当属于同源的结构。所以Java也提供了TreeSet和ConcurrentSkipListSet两种SortedSet,分别适合于非多线程(或低并发多线程)和多线程程序使用。具体的算法请参考前述的Map相关介绍,这里不在累述。
3.2.3 CopyOnWriteArrayList类
CopyOnWriteArrayList是ArrayList的一个线程安全的变体,其中对于所有的可变操作都是通过对底层数组进行一次新的复制来实现的。
由于可变操作需要对底层的数据进行一次完全拷贝,因此开销一般较大,但是当遍历操作远远多于可变操作时,此方法将会更有效,这是一种被称为“快照”的模式,数组在迭代器生存期内不会发生更改,因此不会产生冲突。创建迭代器后,迭代器不会反映列表的添加、移除或者更改。不支持在迭代器上进行remove、set和add操作。CopyOnWriteArraySet与CopyOnWriteArrayList相似,只不过是Set类的一个变体。
3.2.3 Collections提供的线程安全的封装
Collections中提供了synchronizedCollection、synchronizedList、synchronizedMap、synchronizedSet、synchronizedSortedMap、synchronizedSortedMap等方法可以完成多种集合的线程安全的包装,如果在并发度不高的情况下,可以考虑使用这些包装方法,不过由于Concurrent相关的类的出现,已经不这么提倡使用这些封装了,这些方法有些人称他们为过时的线程安全机制。
3.2.4简单总结
提供线程安全的集合简单概括分为三类,首先,对于并发性要求很高的需求可以选择以Concurrent开头的相应的集合类,这些类主要包括:ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentSkipListMap、ConcurrentSkipSet。其次对于可变操作次数远远小于遍历的情况,可以使用CopyOnWriteArrayList和CopyOnWriteArraySet类。最后,对于并发规模比较小的并行需求可以选择Collections类中的相应方法对已有集合进行封装。
此外,本章还对一些集合类的底层实现进行简单探讨,对底层实现的了解有利于对何时使用何种方式作出正确判断。希望大家能够将涉及到原理(主要有循环队列、堆、HashMap、红黑树、SkipList)进行仔细研究,这样才能更深入了解Java为什么这样设计类库,在什么情况使用,应当如何使用。
④ java高并发,如何解决,什么方式解决,高并发
首先,为防止高并发带来的系统压力,或者高并发带来的系统处理异常,数据紊乱,可以以下几方面考虑:1、加锁,这里的加锁不是指加java的多线程的锁,是指加应用所和数据库锁,应用锁这边通常是使用redis的setnx来做,其次加数据库锁,因为代码中加了应用所,所以数据库不建议加悲观锁(排他锁),一般加乐观锁(通过设置一个seq_no来解决),这两个锁一般能解决了,最后做合理的流控,丢弃一部分请求也是必不可少的
⑤ 4种线程池和7种并发队列
Java并发包中的阻塞队列一共7个,当然他们都是线程安全的。
ArrayBlockingQueue:一个由数组结构组成的 有界 阻塞队列。
LinkedBlockingQueue:一个由链表结构组成的无界阻塞队列。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
DealyQueue:一个使用优先级(启动时间)队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
常用的只有三个,重点是前两个
LinkedBlockingQueue和ArrayBlockingQueue区别:
1、LinkedBlockingQueue内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。而ArrayBlockingQueue的只使用一个ReentrantLock管理进出队列。
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
2、队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
3、数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
SynchronousQueue
没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素
使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界,避免线程拒绝执行操作。
1、newfixed,线程默认大小确定、最大大小确定(实际没什么用),默认使用linkedblockqueue,无尽队列
危害在于这个等待队列,队列如果消费不及时不断膨胀可以使机器资源耗尽
ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。
2、cached,线程数不限大小
危害 本身就是没有限制,有多少请求创建多少线程,直到资源耗尽
CachedThreadPool使用没有容量的SynchronousQueue作为主线程池的工作队列,它是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作。这意味着,如果主线程提交任务的速度高于线程池中处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU资源。
3、single
可顺序执行任务,同时只有一个线程处理(单线程)
执行过程如下:
1.如果当前工作中的线程数量少于corePool的数量,就创建一个新的线程来执行任务。
2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。
3.线程执行完1中的任务后会从队列中去任务。
注意:由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行。
4、ScheledThreadPool
public ScheledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
使用了延迟队列,无界,和cached类似
如果运行的线程达到了corePoolSize,就把任务添加到任务队列DelayedWorkQueue中;DelayedWorkQueue会将任务排序,先执行的任务放在队列的前面。
任务执行完后,ScheledFutureTask中的变量time改为下次要执行的时间,并放回到DelayedWorkQueue中
DelayQueue是一个没有边界BlockingQueue实现,加入其中的元素必需实现Delayed接口。 当生产者线程调用put之类的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
消费者线程查看队列头部的元素,注意是查看不是取出。然后调用元素的getDelay方法,如果此方法返回的值小0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果getDelay方法返回的值大于0,则消费者线程wait返回的时间值后,再从队列头部取出元素,此时元素应该已经到期。
DelayQueue是Leader-Followr模式的变种,消费者线程处于等待状态时,总是等待最先到期的元素,而不是长时间的等待。消费者线程尽量把时间花在处理任务上,最小化空等的时间,以提高线程的利用效率。
无论创建那种线程池 必须要调用ThreadPoolExecutor,以上四种都是调用这个实现的
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间 , unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略 --饱和策略
-------------------------------------------------------------------
通用流程:
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
---------------------------------------------------------------------------------------------
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent. ArrayBlockingQueue
handler有四个选择:
1、【抛异常】ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException异常
2、【重试】ThreadPoolExecutor.CallerRunsPolicy() 重试添加当前的任务,他会自动重复调用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy()
3、【找个旧的停了】抛弃旧的任务 ThreadPoolExecutor.DiscardPolicy()
4、【不抛异常】抛弃当前的任务
5、当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义 饱和策略 ,如记录日志或持久化存储不能处理的任务。
https://www.cnblogs.com/zhanshi/p/5469948.html
⑥ java 为什么要使用队列
你指的是JMS么???
选用他的一个好处就是:它可以把不影响用户执行结果又比较耗时的任务(比如发邮件通知管理员)异步的扔给JMS 服务端去做,而尽快的把屏幕返还给用户。 服务端能够多线程排队响应高并发的请求,并保证请求不丢失。
还有一种是被动型的:比如两个平台之间交互的时候对方决定使用JMS队列,那你也只能跟着他用这个列队来处理信息流。
它允许多线程将信息放入队列,也能多线程消费。以统一的格式发送请求:例如JSON
⑦ 谈论java中怎样处理高并发的问题
对于我们开发的网站,如果网站的访问量非常大的话,那么我们就需要考虑相关的并发访问问题了。而并发问题是绝大部分的程序员头疼的问题,
但话又说回来了,既然逃避不掉,那我们就坦然面对吧~今天就让我们一起来研究一下常见的并发和同步吧。
为了更好的理解并发和同步,我们需要先明白两个重要的概念:同步和异步
1、同步和异步的区别和联系
所谓同步,可以理解为在执行完一个函数或方法之后,一直等待系统返回值或消息,这时程序是出于阻塞的,只有接收到
返回的值或消息后才往下执行其它的命令。
异步,执行完函数或方法后,不必阻塞性地等待返回值或消息,只需要向系统委托一个异步过程,那么当系统接收到返回
值或消息时,系统会自动触发委托的异步过程,从而完成一个完整的流程。
同步在一定程度上可以看做
⑧ BlockingQueue详解
在Java中,BlockingQueue是一个接口,它的实现类有 ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、 LinkedBlockingQueue、PriorityBlockingQueue、 SynchronousQueue等,它们的区别主要体现在存储结构上或对元素 操作上的不同,但是对于take与put操作的原理,却是类似的。
阻塞与非阻塞
LinkedBlockingQueue内部是使用链表实现一个队列的,但是有别于一般的队列,在于该队列至少是有一个节点的,头节点不含有元素。如果队列为空时,头节点的next参数为null。尾节点的next参数也为null。
1、LinkedBlockingQueue不允许插入的元素为null;
2、同一时刻只有一个线程可以进行入队操作,putLock在将元素插入队列尾部时加锁了;
3、如果队列满了,则会调用notFull.await(),将该线程加入到Condition等待队列中。await方法会释放线程占有的锁,这将导致之前由于被阻塞的入队线程将会获取到锁,执行到while循环处,不过可能因为队列仍旧是满的,也被进入到条件队列中;
4、一旦有出队线程取走元素,就会通知到入队等待队列释放线程。那么第一个加入到Condition队列中的将会被释放,那么该线程将会重新获得put锁,继而执行enqueue()方法,将节点插入到队列的尾部;
5、然后得到插入队列前元素的个数,如果插入后队列中还可以继续插入元素,那么就通知notFull条件的等待队列中的线程;
6、如果插入队列前个数为0,那现在插入后,就为1了,那就可以通知因为队列为空而导致阻塞的出队线程去取元素了。
1、同一时刻只有一个线程可以进行出队操作,takeLock在出队之前加锁了;
2、如果队列中元素为空,那就进入notEmpty队列中进行等待。直到队列不为空时,得到队列中的第一个元素。当发现取完发现还有元素可取时,再通知一下notEmpty队列中等待的其他线程。最后判断自己取元素前的是不是满的,如果是满的,那自己取完,就不满了,就可以通知在notFull队列中等待插入的线程进行put了。
LinkedBlockingQueue允许两个线程同时在两端进行入队和出队操作,但一端同时只能有一个线程进行操作,是通过两个锁进行区分的。
为了维护底部数据的统一,引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(进行+1操作),另一个是出队的方法(进行-1操作),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。
ArrayBlockingQueue内部是使用数组实现一个队列的,并且在构造方法中就需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此在队列满的时候执行入队会阻塞,在队列为空时出队也会阻塞。
1、ArrayBlockingQueue不允许添加null元素;
2、ArrayBlockingQueue在队列已满的时候,会调用notFull.await(),释放锁并处于阻塞状态;
3、一旦ArrayBlockingQueue在队列不满的时候,就立即入队。
1、取元素时,一旦获得锁,队列为空, 则会阻塞,直至不为空,调用dequeue()出队。
ArrayBlockingQueue是一个底层结构是数组的阻塞队列,是通过 ReentrantLock 和 Condition 来实现的。不可插入为null的元素,入队和出队使用的是同一个锁。意味着同一时刻只能有一个线程能进行入队或者出队操作。入队时,队列已满则会调用notFull.await(),进入阻塞状态。直到队列不满时,再进行入队操作。当出队时,队列为空,则调用notEmpty.await(),进入阻塞状态,直到队列不为空时,则出队。
LinkedBlockingQueue底层实现是链表,ArrayBlockingQueue底层实现是数组
LinkedBlockingQueue默认的队列长度是Integer.Max,但是可以指定容量。在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多;
ArrayBlockingQueue必须在构造方法中指定队列长度,不可变。在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高。
LinkedBlockingQueue有两把锁,可以有两个线程同时进行入队和出队操作,但同时只能有一个线程进行入队或出队操作。
ArrayBlockingQueue只有一把锁,同时只能有一个线程进行入队和出队操作。
⑨ js怎么处理高并发
JS不同于Java, C#等语言.
使用Java编写的应用, 可以编程开启多线程处理高并发业务场景.
而JS处理高并发场景使用的是 : 队列机制, 事件机制
因为JS在网页中运行时单线程模式, 在服务端nodejs中运行是单进程模式, 都无法像JAVA那样开启多个线程或者协程来处理高并发任务.
但是这不意味着JS无法处理高并发任务, 单进程的程序在使用队列机制(就是待处理任务一个个排队)处理高并发场景也仍然是非常高效的, 而且避免了开启多个线程的内存消耗.但是其缺点也是很明显的 : 不适合处理单个任务计算非常复杂消耗时间的场景.
举个栗子 :
想象一下生活中排队的场景, 如果前面有一个人磨磨唧唧, 半天赖在窗口各种问问题, 后面的人都要排队等着, 很着急.
而如果开启多个窗口(多线程/进程), 那些难缠的人分到一个窗口, 速度快的人分到一个窗口, 效率就大大提升了.
⑩ 如何控制高并发,比如现在商品表10个商品,有10000个人同时下单购买,这个时候我们如何处理这种情况
这种时候可以通过队列,比如每次有人下单,就丢到队列里面,然后队列里面校验库存,这样有并发的时候也只是多插入了队列,但是队列里面是依次执行的。队列的话可以参考下redis如何做队列处理。有php相关扩展的,java的话我就不清楚了。