首页>>后端>>java->JAVA多线程之AQS的实现重入锁ReentrantLock

JAVA多线程之AQS的实现重入锁ReentrantLock

时间:2023-11-29 本站 点击:36

在JAVA体系当中锁是实现多线程中同步的一个重要机制, JAVA的锁有synchronized这个是JVM层面实现的锁, JAVA实现的锁同步有: Lock、ReadWriteLock、ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier,包括上一篇中讲到的ThreadPoolExecutor中Worker执行用到锁,也是来自于本文所讲的AQS实现,所以要理解JAVA的锁,除了JVM里实现的外,只要搞懂AQS的就可以了,话不多说,赶紧开始吧。

AQS的数据结构

AbstractQueuedSynchronizer的类以及重要成员变量

下面代码中可以看出AQS的类继承关系比较简单,就是继承AbstractOwnableSynchronizer类并实现了Serializable接口,

publicabstractclassAbstractQueuedSynchronizerextendsAbstractOwnableSynchronizerimplementsjava.io.Serializable{

AbstractOwnableSynchronizer类功能比较简单,主要是设置锁的排他线程,设置排它线程表示是排它锁,没有设置表示共享锁。

publicabstractclassAbstractOwnableSynchronizerimplementsjava.io.Serializable{protectedAbstractOwnableSynchronizer(){}privatetransientThreadexclusiveOwnerThread;protectedfinalvoidsetExclusiveOwnerThread(Threadthread){exclusiveOwnerThread=thread;}protectedfinalThreadgetExclusiveOwnerThread(){returnexclusiveOwnerThread;}}

下面是AQS中比较重要的是三个成员变量, head是等待队列中头节点, tail是等待队列中尾结点,state是同步的状态值。

/***Headofthewaitqueue,lazilyinitialized.Exceptfor*initialization,itismodifiedonlyviamethodsetHead.Note:*Ifheadexists,itswaitStatusisguaranteednottobe*CANCELLED.*/privatetransientvolatileNodehead;/***Tailofthewaitqueue,lazilyinitialized.Modifiedonlyvia*methodenqtoaddnewwaitnode.*/privatetransientvolatileNodetail;/***Thesynchronizationstate.*/privatevolatileintstate;

加下来看下NOde类的数据结构的定义:等待队列中waitStatus(等待状态)的的枚举:1.CANCELLED(1) 这个节点是由于超时或者中断。2.SIGNAL(-1) 这个节点的后继是被阻塞的,所以当前Node取消或者释放锁时,需要unpark去唤醒后继节点。3.CONDITION(-2) 这个节点是在条件等待队列。4.PROPAGATE(-3) 就是对当前的节点操作传播到其他节点, 比如doReleaseShared共享模式的释放锁就是传播后续节点。5.0 不是上面的任何一种状态

staticfinalclassNode{/**标志它是共享模式*/staticfinalNodeSHARED=newNode();/**标志他是排它模式*/staticfinalNodeEXCLUSIVE=null;/**waitStatusvaluetoindicatethreadhascancelled*/staticfinalintCANCELLED=1;/**waitStatusvaluetoindicatesuccessor'sthreadneedsunparking*/staticfinalintSIGNAL=-1;/**waitStatusvaluetoindicatethreadiswaitingoncondition*/staticfinalintCONDITION=-2;/***waitStatusvaluetoindicatethenextacquireSharedshould*unconditionallypropagate*/staticfinalintPROPAGATE=-3;```volatileintwaitStatus;/***连接当前节点的前一个Node,*/volatileNodeprev;/***连接当前节点的下一个一个Node,*/volatileNodenext;volatileThreadthread;/***指向条件等待队列的下一个Node或者是SHARED的Node,因为条件队列*只有排它模式下才能访问,这里只需要一个简单的单向链表去保存等待条件*变量的node,因为条件变量只能是排它锁,所以用这个nextWaiter也可以*SHARED变量去表示当前是共享锁*/NodenextWaiter;/***Returnstrueifnodeiswaitinginsharedmode.*判断是否共享模式*/finalbooleanisShared(){returnnextWaiter==SHARED;}/***返回等待队列的前一个node*/finalNodepredecessor()throwsNullPointerException{Nodep=prev;if(p==null)thrownewNullPointerException();elsereturnp;}Node(){//UsedtoestablishinitialheadorSHAREDmarker}Node(Threadthread,Nodemode){//UsedbyaddWaiterthis.nextWaiter=mode;this.thread=thread;}Node(Threadthread,intwaitStatus){//UsedbyConditionthis.waitStatus=waitStatus;this.thread=thread;}}

加锁流程

首先看下ReentrantLock的获取锁的过程,它的构造函数初始化一个NonfairSync,也就是说这个重入锁是非公平锁,

publicReentrantLock(){sync=newNonfairSync();}

从这里看到ReentrantLoc默认创建的非公平锁,调用lock会直接进行CAS尝试抢锁(这里state=0代表无所,1代表有一个线程已经抢到了锁),如果没有抢到锁,则正常调用AQS的acquire方法默认参数1去获取锁,

/***Syncobjectfornon-fairlocks*/staticfinalclassNonfairSyncextendsSync{privatestaticfinallongserialVersionUID=7316153563782823691L;/***Performslock.Tryimmediatebarge,backinguptonormal*acquireonfailure.*/finalvoidlock(){if(compareAndSetState(0,1))//设置排他线程为当前线程setExclusiveOwnerThread(Thread.currentThread());else//抢锁acquire(1);}protectedfinalbooleantryAcquire(intacquires){returnnonfairTryAcquire(acquires);}}

那么接下来重点看下AQS的acquire方法获取锁的逻辑1 tryAcquire尝试获取锁,2. 如果获取失败,则加入一个排他的Node到等待队列

publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&//获取队列acquireQueued(//加入等待队列addWaiter(Node.EXCLUSIVE),arg))//中断当前线程selfInterrupt();}

非公平锁,一开始就会尝试抢锁,而不管之前等待队列中是否有等待的抢锁的线程,

此时state值为0,则直接通过CAS将其值修改成acquires(ReentrentLock是1),并设置排他线程为它自己,这里说明ReentrantLock的加的锁是排它锁。

当排它线程就是当前线程,则表示当前线程已经获取锁,这个时候代表是偏向锁,增加acquires值,并赋值给state变量 ,表示同一线程同时同一把锁的次数,

/***Performsnon-fairtryLock.tryAcquireisimplementedin*subclasses,butbothneednonfairtryfortrylockmethod.*/finalbooleannonfairTryAcquire(intacquires){finalThreadcurrent=Thread.currentThread();intc=getState();if(c==0){//直接尝试获取锁if(compareAndSetState(0,acquires)){//设置排它锁为当前线程setExclusiveOwnerThread(current);returntrue;}}//当前线程已经获取锁,偏向锁的处理elseif(current==getExclusiveOwnerThread()){//之前的state值,加上申请的值intnextc=c+acquires;if(nextc<0)//overflowthrownewError("Maximumlockcountexceeded");//赋值当前statesetState(nextc);returntrue;}returnfalse;}

上面的tryAcquire这一步尝试抢锁失败后,然后执行addWaiter(Node.EXCLUSIVE),新建一个排它模式的Node并加入等待队列。

privateNodeaddWaiter(Nodemode){//新建NodeNodenode=newNode(Thread.currentThread(),mode);//Trythefastpathofenq;backuptofullenqonfailure//保存队列尾部节点Nodepred=tail;//如果尾节点不为空if(pred!=null){//将新的节点prev指针指向之前的尾指针node.prev=pred;//CAS的原子操作将新建的Node加入队列尾部if(compareAndSetTail(pred,node)){//将之前的尾部的next指针新建的nodepred.next=node;returnnode;}}//如果上面的通过CAS失败了,则表示有其他线程已经加入了队列,则通过enq加入//新建的Node到等待队列enq(node);returnnode;}

下图是上面node加入到队列尾部的过程,这里注意双向队列首先是新增的node的prev指针指向原先的tail,至于为什么在下面会有分析

下面enq就是通过for的无限循环中再以CAS方式加入了新建的Node,这样保证新建的Node一定加入到等待队列中。

privateNodeenq(finalNodenode){for(;;){Nodet=tail;//等待队列为空if(t==null){//Mustinitialize//设置虚头节点,就是空的Nodeif(compareAndSetHead(newNode()))//将尾部节点指向都节点tail=head;}else{//首先让新建的node的前驱节点指向当前队列的尾部节点node.prev=t;//CAS设置新建的node为尾部接节点if(compareAndSetTail(t,node)){、//将新建的Node设置为新的尾部节点t.next=node;returnt;}}}}

将新建的尾部节点加入等待队列后,然后执行acquireQueued方法,参数是上面加入到等待队列中的Node以及args(tryAcquire的acquire变量),这个方法的逻辑是处理新的Node加入等待队列,然后尝试从队列中唤醒之前的等待队列中的线程去运行,

publicabstractclassAbstractOwnableSynchronizerimplementsjava.io.Serializable{protectedAbstractOwnableSynchronizer(){}privatetransientThreadexclusiveOwnerThread;protectedfinalvoidsetExclusiveOwnerThread(Threadthread){exclusiveOwnerThread=thread;}protectedfinalThreadgetExclusiveOwnerThread(){returnexclusiveOwnerThread;}}0

shouldParkAfterFailedAcquire是获取锁后失败后,需要park阻塞等待锁,

publicabstractclassAbstractOwnableSynchronizerimplementsjava.io.Serializable{protectedAbstractOwnableSynchronizer(){}privatetransientThreadexclusiveOwnerThread;protectedfinalvoidsetExclusiveOwnerThread(Threadthread){exclusiveOwnerThread=thread;}protectedfinalThreadgetExclusiveOwnerThread(){returnexclusiveOwnerThread;}}1

如果获取锁失败,acquireQueued方法中failed为true时,调用cancelAcquire取消获取锁,

publicabstractclassAbstractOwnableSynchronizerimplementsjava.io.Serializable{protectedAbstractOwnableSynchronizer(){}privatetransientThreadexclusiveOwnerThread;protectedfinalvoidsetExclusiveOwnerThread(Threadthread){exclusiveOwnerThread=thread;}protectedfinalThreadgetExclusiveOwnerThread(){returnexclusiveOwnerThread;}}2

唤醒前驱节点的逻辑是从tail节点从尾部往前,通过找到链表上一个waiteStatus小于0的节点(即不是取消状态的线程),然后执行LockSupport的unpark唤醒等待锁的线程,这个就是上面提到双向链表是两个指针,分两步首先是加的prev指针,所以得从从后往前遍历就能保证遍历到全部节点

publicabstractclassAbstractOwnableSynchronizerimplementsjava.io.Serializable{protectedAbstractOwnableSynchronizer(){}privatetransientThreadexclusiveOwnerThread;protectedfinalvoidsetExclusiveOwnerThread(Threadthread){exclusiveOwnerThread=thread;}protectedfinalThreadgetExclusiveOwnerThread(){returnexclusiveOwnerThread;}}3

释放锁

接来下我们看下如何释放锁,他一样是来自于AQS父类的release方法,

publicabstractclassAbstractOwnableSynchronizerimplementsjava.io.Serializable{protectedAbstractOwnableSynchronizer(){}privatetransientThreadexclusiveOwnerThread;protectedfinalvoidsetExclusiveOwnerThread(Threadthread){exclusiveOwnerThread=thread;}protectedfinalThreadgetExclusiveOwnerThread(){returnexclusiveOwnerThread;}}4

将state变量减去release后,如果c = 0,说明已经没有无锁了,那么设置排它线程为NULL,并设置state变量为0

publicabstractclassAbstractOwnableSynchronizerimplementsjava.io.Serializable{protectedAbstractOwnableSynchronizer(){}privatetransientThreadexclusiveOwnerThread;protectedfinalvoidsetExclusiveOwnerThread(Threadthread){exclusiveOwnerThread=thread;}protectedfinalThreadgetExclusiveOwnerThread(){returnexclusiveOwnerThread;}}5

总结

本文主要对于ReentrantLock的加排它锁和释放排它锁的过程做了一次详细的分析,主要是围绕state变量还有CLH的双端队列展开,以及线程阻塞(LockSupport.park)和唤醒(LockSupport.park),最后ConditionObject没有分析到,后面再写。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/34.html