java多线程总结之线程安全队列queueWord格式文档下载.docx
《java多线程总结之线程安全队列queueWord格式文档下载.docx》由会员分享,可在线阅读,更多相关《java多线程总结之线程安全队列queueWord格式文档下载.docx(18页珍藏版)》请在冰点文库上搜索。
![java多线程总结之线程安全队列queueWord格式文档下载.docx](https://file1.bingdoc.com/fileroot1/2023-5/1/3aa1b4f5-ca29-4461-9a07-15121fc37745/3aa1b4f5-ca29-4461-9a07-15121fc377451.gif)
当队列被元素填满后,再调用add(e),则会抛出异常。
∙offer(e)poll()peek()方法即不会阻塞线程,也不会抛出异常。
当队列被元素填满后,再调用offer(e),则不会插入元素,函数返回false。
∙要想要实现阻塞功能,需要调用put(e)take()方法。
当不满足约束条件时,会阻塞线程。
好,上点源码你就更明白了。
以ArrayBlockingQueue类为例:
对于第一类方法,很明显如果操作不成功就抛异常。
而且可以看到其实调用的是第二类的方法,为什么?
因为第二类方法返回boolean啊。
Java代码
1.public
boolean
add(Ee){
2.if
(offer(e))
3.return
true;
4.else
5.throw
new
IllegalStateException("
Queuefull"
);
//队列已满,抛异常
6.}
7.
8.public
Eremove(){
9.Ex=poll();
10.if
(x!
=
null)
11.return
x;
12.else
13.throw
NoSuchElementException();
//队列为空,抛异常
14.}
对于第二类方法,很标准的ReentrantLock使用方式(不熟悉的朋友看一下我上一篇帖子吧
先不看阻塞与否,这ReentrantLock的使用方式就能说明这个类是线程安全类。
offer(Ee){
(e==
null)throw
NullPointerException();
3.final
ReentrantLocklock=
this.lock;
4.lock.lock();
5.try
{
6.if
(count==items.length)//队列已满,返回false
7.return
false;
8.else
9.insert(e);
//insert方法中发出了notEmpty.signal();
10.return
11.}
12.}
finally
13.lock.unlock();
15.}
16.
17.public
Epoll(){
18.final
19.lock.lock();
20.try
21.if
(count==
0)//队列为空,返回false
22.return
null;
23.Ex=extract();
//extract方法中发出了notFull.signal();
24.return
25.}
26.lock.unlock();
27.}
28.}
对于第三类方法,这里面涉及到Condition类,简要提一下,
await方法指:
造成当前线程在接到信号或被中断之前一直处于等待状态。
signal方法指:
唤醒一个等待线程。
void
put(Ee)throws
InterruptedException{
E[]items=
this.items;
4.final
5.lock.lockInterruptibly();
6.try
7.try
8.while
(count==items.length)//如果队列已满,等待notFull这个条件,这时当前线程被阻塞
9.notFull.await();
10.}
catch
(InterruptedExceptionie){
11.notFull.signal();
//唤醒受notFull阻塞的当前线程
12.throw
ie;
13.}
14.insert(e);
15.}
16.lock.unlock();
17.}
18.}
19.
20.public
Etake()
throws
21.final
22.lock.lockInterruptibly();
23.try
24.try
25.while
0)//如果队列为空,等待notEmpty这个条件,这时当前线程被阻塞
26.notEmpty.await();
27.}
28.notEmpty.signal();
//唤醒受notEmpty阻塞的当前线程
29.throw
30.}
31.Ex=extract();
32.return
33.}
34.lock.unlock();
35.}
36.}
37.
第四类方法就是指在有必要时等待指定时间,就不详细说了。
再来看看BlockingQueue接口的具体实现类吧:
∙ArrayBlockingQueue,其构造函数必须带一个int参数来指明其大小
∙LinkedBlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定
∙PriorityBlockingQueue,其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序
上面是用ArrayBlockingQueue举得例子,下面看看LinkedBlockingQueue:
首先,既然是链表,就应该有Node节点,它是一个内部静态类:
1.static
class
Node<
E>
{
2./**Theitem,volatiletoensurebarrierseparatingwriteandread*/
3.volatile
Eitem;
4.Node<
next;
5.Node(Ex){item=x;
}
然后,对于链表来说,肯定需要两个变量来标示头和尾:
1./**头指针*/
2.private
transient
head;
//head.next是队列的头元素
3./**尾指针*/
4.private
last;
//last.next是null
那么,对于入队和出队就很自然能理解了:
1.private
enqueue(Ex){
2.last=last.next=
(x);
//入队是为last再找个下家
3.}
4.
5.private
Edequeue(){
6.Node<
first=head.next;
//出队是把head.next取出来,然后将head向后移一位
7.head=first;
8.Ex=first.item;
9.first.item=
另外,LinkedBlockingQueue相对于ArrayBlockingQueue还有不同是,有两个ReentrantLock,且队列现有元素的大小由一个AtomicInteger对象标示。
AtomicInteger类是以原子的方式操作整型变量。
final
AtomicIntegercount=new
AtomicInteger(0);
2./**用于读取的独占锁*/
3.private
ReentrantLocktakeLock=new
ReentrantLock();
4./**队列是否为空的条件*/
ConditionnotEmpty=takeLock.newCondition();
6./**用于写入的独占锁*/
7.private
ReentrantLockputLock=new
8./**队列是否已满的条件*/
9.private
ConditionnotFull=putLock.newCondition();
有两个Condition很好理解,在ArrayBlockingQueue也是这样做的。
但是为什么需要两个ReentrantLock呢?
下面会慢慢道来。
让我们来看看offer和poll方法的代码:
AtomicIntegercount=
this.count;
4.if
(count.get()==capacity)
5.return
6.int
c=-1;
7.final
ReentrantLockputLock=this.putLock;
//入队当然用putLock
8.putLock.lock();
9.try
(count.get()<
capacity){
11.enqueue(e);
//入队
12.c=count.getAndIncrement();
//队长度+1
13.if
(c+
1
<
capacity)
14.notFull.signal();
//队列没满,当然可以解锁了
16.}
17.putLock.unlock();
19.if
(c==
0)
20.signalNotEmpty();
//这个方法里发出了notEmpty.signal();
21.return
c>
0;
22.}
23.
24.public
25.final
26.if
(count.get()==
27.return
28.Ex=
29.int
30.final
ReentrantLocktakeLock=this.takeLock;
出队当然用takeLock
31.takeLock.lock();
32.try
33.if
(count.get()>
0){
34.x=dequeue();
//出队
35.c=count.getAndDecrement();
//队长度-1
36.if
(c>
1)
37.notEmpty.signal();
//队列没空,解锁
38.}
39.}
40.takeLock.unlock();
41.}
42.if
(c==capacity)
43.signalNotFull();
//这个方法里发出了notFull.signal();
44.return
45.}
看看源代码发现和上面ArrayBlockingQueue的很类似,关键的问题在于:
为什么要用两个ReentrantLockputLock和takeLock?
我们仔细想一下,入队操作其实操作的只有队尾引用last,并且没有牵涉到head。
而出队操作其实只针对head,和last没有关系。
那么就是说入队和出队的操作完全不需要公用一把锁,所以就设计了两个锁,这样就实现了多个不同任务的线程入队的同时可以进行出队的操作,另一方面由于两个操作所共同使用的count是AtomicInteger类型的,所以完全不用考虑计数器递增递减的问题。
另外,还有一点需要说明一下:
await()和singal()这两个方法执行时都会检查当前线程是否是独占锁的当前线程,如果不是则抛出java.lang.IllegalMonitorStateException异常。
所以可以看到在源码中这两个方法都出现在Lock的保护块中。
-------------------------------我是分割线--------------------------------------
下面再来说说ConcurrentLinkedQueue,它是一个无锁的并发线程安全的队列。
以下部分的内容参照了这个帖子
对比锁机制的实现,使用无锁机制的难点在于要充分考虑线程间的协调。
简单的说就是多个线程对内部数据结构进行访问时,如果其中一个线程执行的中途因为一些原因出现故障,其他的线程能够检测并帮助完成剩下的操作。
这就需要把对数据结构的操作过程精细的划分成多个状态或阶段,考虑每个阶段或状态多线程访问会出现的情况。
ConcurrentLinkedQueue有两个volatile的线程共享变量:
head,tail。
要保证这个队列的线程安全就是保证对这两个Node的引用的访问(更新,查看)的原子性和可见性,由于volatile本身能够保证可见性,所以就是对其修改的原子性要被保证。
下面通过offer方法的实现来看看在无锁情况下如何保证原子性:
3.Node<
n=
(e,
null);
4.for
(;
;
){
5.Node<
t=tail;
s=t.getNext();
7.if
(t==tail){
//------------------------------a
8.if
(s==
null){//---------------------------b
9.if
(t.casNext(s,n)){
//-------------------c
10.casTail(t,n);
//------------------------d
12.}
13.}
else
14.casTail(t,s);
//----------------------------e
16.}
此方法的循环内首先获得尾指针和其next指向的对象,由于tail和Node的next均是volatile的,所以保证了获得的分别都是最新的值。
代码a:
t==tail是最上层的协调,如果其他线程改变了tail的引用,则说明现在获得不是最新的尾指针需要重新循环获得最新的值。
代码b:
s==null的判断。
静止状态下tail的next一定是指向null的,但是多线程下的另一个状态就是中间态:
tail的指向没有改变,但是其next已经指向新的结点,即完成tail引用改变前的状态,这时候s!
=null。
这里就是协调的典型应用,直接进入代码e去协调参与中间态的线程去完成最后的更新,然后重新循环获得新的tail开始自己的新一次的入队尝试。
另外值得注意的是a,b之间,其他的线程可能会改变tail的指向,使得协调的操作失败。
从这个步骤可以看到无锁实现的复杂性。
代码c:
t.casNext(s,n)是入队的第一步,因为入队需要两步:
更新Node的next,改变tail的指向。
代码c之前可能发生tail引用指向的改变或者进入更新的中间态,这两种情况均会使得t指向的元素的next属性被原子的改变,不再指向null。
这时代码c操作失败,重新进入循环。
代码d:
这是完成更新的最后一步了,就是更新tail的指向,最有意思的协调在这儿又有了体现。
从代码看casTail(t,n)不管是否成功都会接着返回true标志着更新的成功。
首先如果成功则表明本线程完成了两步的更新,返回true是理所当然的;
如果casTail(t,n)不成功呢?
要清楚的是完成代码c则代表着更新进入了中间态,代码d不成功则是tail的指向被其他线程改变。
意味着对于其他的线程而言:
它们得到的是中间态的更新,s!
=null,进入代码e帮助本线程执行最后一步并且先于本线程成功。
这样本线程虽然代码d失败了,但是是由于别的线程的协助先完成了,所以返回true也就理所当然了。
通过分析这个入队的操作,可以清晰的看到无锁实现的每个步骤和状态下多线程之间的协调和工作。
上面这大段文字看起来很累,先能看懂多少看懂多少,现在看不懂先不急,下面还会提到这个算法,并且用示意图说明,就易懂很多了。
在使用ConcurrentLinkedQueue时要注意,如果直接使用它提供的函数,比如add或者poll方法,这样我们自己不需要做任何同步。
但如果是非原子操作,比如:
1.if(!
queue.isEmpty()){
2.queue.poll(obj);
我们很难保证,在调用了isEmpty()之后,poll()之前,这个queue没有被其他线程修改。
所以对于这种情况,我们还是需要自己同步:
1.synchronized(queue){
2.if(!
3.queue.poll(obj);
4.}
5.}
这种需要进行自己同步的情况要视情况而定,不是任何情况下都需要这样做。
另外还说一下,ConcurrentLinkedQueue的size()是要遍历一遍集合的,所以尽量要避免用size而改用isEmpty(),以免性能过慢。
好,最后想说点什么呢,阻塞算法其实很好理解,简单点理解就是加锁,比如在BlockingQueue中看到的那样,再往前推点,那就是synchronized。
相比而言,非阻塞算法的设计和实现都很困难,要通过低级的原子性来支持并发。
下面就简要的介绍一下非阻塞算法,以下部分的内容参照了一篇很经典的文章
我觉得可以这样理解,阻塞对应同步,非阻塞对应并发。
也可以说:
同步是阻塞模式,异步是非阻塞模式
举个例子来说明什么是非阻塞算法:
非阻塞的计数器
首先,使用同步的线程安全的计数器代码如下
finalclass
Counter{
long
value=0;
3.public
synchronizedlong
getValue(){
4.return
value;
6.public
increment(){
++value;
8.}
9.}
下面的代码显示了一种最简单的非阻塞算法:
使用AtomicInteger的compareAndSet()(CAS方法)的计数器。
compareAndSet()方法规定“将这个变量更新为新值,但是如果从我上次看到这个变量之后其他线程修改了它的值,那么更新就失败”
NonblockingCounter{
AtomicIntegervalue;
//前面提到过,AtomicInteger类是以原子的方式操作整型变量。
int
value.get();
7.int
v;
8.do
9.v=value.get();
10.while
(!
pareAndSet(v,v+1));
v+
1;
12.
13.XX文库-让每个人平等地提升自我}
非阻塞版本相对于基于锁的版本有几个性能优势。
首先,它用硬件的原生形态代替JVM的锁定代码路径,从而在更细的粒度层次上(独立的内存位置)进行同步,失败的线程也可以立即重试,而不会被挂起后重新调度。
更细的粒度降低了争用的机会,不用重新调度就能重试的能力也降低了争用的成本。
即使有少量失败的CAS操作,这种方法仍然会比由于锁争用造成的重新调度快得多。
NonblockingCounter这个示例可能简单了些,但是它演示了所有非阻塞算法的一个基本特征——有些算法步骤的执行是要冒险的,因为知道如果CAS不成功可能不得不重做。
非阻塞算法通常叫作乐观算法,因为它们继续操作的假设是不会有干扰。
如果发现干扰,就会回退并重试。
在计