首页>>后端>>java->可重入锁ReentrantLock源码分析

可重入锁ReentrantLock源码分析

时间:2023-12-06 本站 点击:0

写在前面

本文主要是针对 ReentrantLock 实现 AQS 的基础上的分析以及对 Condition 的相关分析。

因此建议先了解AQS的实现原理,对 ReentrantLock 原理的理解将更加容易,AQS 详情点击。

一、起源:

1. 什么是可重入锁?

可重入锁是指当某个线程已经持有了这把锁,但是某个时刻,这个线程还要尝试再次拿到这把锁,支持这种可重入的实现就是可重入锁;

以 ReentrantLock 可重入锁来看,其 state 表示重入次数,当想要再次拿到这把锁的时候,state + 1

当想要释放这把锁的时候state - 1,因此可以根据 state 是否等于 0 来判断这把锁是否被某个线程锁持有。

2. ReentrantLock的基本用法

先创建 ReentrantLock 对象,然后 lock.lock(),最后 lock.unlock(),即:

     class X {       private final ReentrantLock lock = new ReentrantLock();       // ...        public void m() {         lock.lock();  // block until condition holds         try {           // ... method body         } finally {           lock.unlock()         }       }     }}

使用上比较简单,推荐的做法是在 finally 块中释放锁,因为这样在出现异常的时候可以及时释放锁资源

3. ReentrantLock如何实现等待/通知模式?

关键字 synchronized 与 wait() 和 notify() / notifyAll() 方法相结合可以实现等待/通知模式,ReentrantLock 也同样可以借助于 Condition 实现等待/通知模式

Condition 是 JDK1.5 中出现的技术,使用它有更好的灵活性,比如可以实现多路通知功能,也就是在一个 lock 对象里可以创建多个 Condition (即对象监视器)实例,线程对象可以注册在指定的 Condition 中,从而可以有选择性的进行线程通知,在调度线程上更加灵活

在使用 notify()/notifyAll() 方法进行通知时,被通知的线程却是由 JVM 随机选择的;但使用 ReentrantLock 结合 Condition 是可以实现"选择性通知"

在了解 ReentrantLock 实现 AQS 之前,先来看看 AQS 的实现类一般如何去操作...

4. AQS用法

一般要通过 AQS 实现自己的同步器类有以下三步:

新建自己的同步器类,在内部写一个 Sync类,该类继承AbstractQueuedSynchronizer,即 AQS

设计同步器类的逻辑,在Sync类里,根据是否独占来重新对应的方法。如果是独占,则重写 tryAcquire 和 tryRelease 等方法;如果是非独占,则重写 tryAcquireShared 和 tryReleaseShared 等方法

在自己的同步器类中实现获取/释放相关方法,并在里面调用 AQS 对应的方法,如果是独占则调用 acquire 或 release 等方法,非独占则调用 acquireShared 或 releaseShared 或 acquireSharedInterruptibly 等方法。

二、实现原理

需要注意的的是,state 在 ReentrantLock 的含义表示的是重入次数

state = 0,表示此锁未被任何线程持有

state > 0, 表示被当前线程重入持有多次,以后每次释放锁都会在 state 上减 1,直到 state = 0

ReentrantLock 是一种独占模式,相应的会实现 tryAcquire 和 tryRelease 方法。

在 Condition 中有两个名词需要做区分

条件等待队列,这个队列是由每个 condition 实例自己维护的,也就是说,如果有两个 condition 实例,也就有两个条件等待队列

同步队列:指的是 AQS 中的 FIFO

等待与唤醒:

condition.await:等待操作是将节点放入条件等待队列

condition.signal:唤醒操作是将节点从条件等待队列中移到同步队列中,等待获取资源

另外,ReentrantLock 大部分实现都是由 AQS 完成,在上篇博文中已经对 AQS 做了详细分析,因此这里不在过多重复分析...

1. 内部对象Sync实现AQS

    abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860L;        abstract void lock();        // 非公平锁,tryAcquire方法由子类实现        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            // 拿到当前锁对象的重入次数            int c = getState();            // 如果等于0说明该锁对象没有被任何对象持有            // 这个时候等待队列可能是有等待节点的,只是恰好锁资源在此刻被释放了            if (c == 0) {                // 这里尝试去抢这把锁                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            // 如果此锁被当前对象持有,也就是重入操作,累加state            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }        // 尝试释放锁资源        protected final boolean tryRelease(int releases) {            int c = getState() - releases;            // 检查是不是当前线程获得了锁            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            // 如果重入次数等于0了,说明完全释放了这把锁,其他线程可以获取这把锁了            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }        protected final boolean isHeldExclusively() {            // 判是否独占模式            return getExclusiveOwnerThread() == Thread.currentThread();        }        // 创建Condition对象        final ConditionObject newCondition() {            return new ConditionObject();        }        final Thread getOwner() {            return getState() == 0 ? null : getExclusiveOwnerThread();        }        // 重入次数        final int getHoldCount() {            return isHeldExclusively() ? getState() : 0;        }        // state=0表示此锁未被占用        final boolean isLocked() {            return getState() != 0;        }    }     //

具体实现逻辑已经在源码中注释,简单来说,子类(这里的 Sync)需要实现 AQS 提供的抽象方法,来达到自己个性化的需求。

2. NonfairSync 非公平锁

    static final class NonfairSync extends Sync {        private static final long serialVersionUID = 7316153563782823691L;        final void lock() {            // 尝试获取锁            // 这个时候等待队列可能是还有等待节点的,这里取尝试抢一下;            // 如果锁资源这个时候刚好被释放了,这里是有可能抢成功的            if (compareAndSetState(0, 1))                setExclusiveOwnerThread(Thread.currentThread());            else                // 既然没有抢成功,那就老老实实取获取锁资源                acquire(1);        }        protected final boolean tryAcquire(int acquires) {            return nonfairTryAcquire(acquires);        }    }    //

公平锁非公平锁的最大区别在于,非公平锁会尽可能的去抢占资源(尽管等待队列存在很多等待节点)。

而公平锁,如果等待队列里存在等待节点,那它是不会去抢占资源的,放进队列,然后按先进先出的顺序去获取资源。

3. FairSync 公平锁实现

    static final class FairSync extends Sync {        private static final long serialVersionUID = -3000897897090466540L;        // 获取锁,如果拿不到会进入阻塞队列中等待        final void lock() {            acquire(1);        }        // 尝试拿取锁,成功则返回true,失败返回false        protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            // 拿到重入次数            int c = getState();            // 说明这把锁未被任何线程持有,可以尝试获取锁            if (c == 0) {                // 和非公平锁的唯一区别是,这里多了hasQueuedPredecessors判断条件                // 意思是:首先判断在等待队列里面没有任何等待节点,它才会尝试取获取资源,                // 否则的话,就不去争抢锁资源了,毕竟是先来先服务嘛(保证公平性)                if (!hasQueuedPredecessors() &&                    compareAndSetState(0, acquires)) { // CAS设置状态值                    // 说明获取锁资源成功了,在锁对象中设置exclusiveOwnerThread=当前线程,表明此锁被当前线程锁住了                    setExclusiveOwnerThread(current);                     return true;                }            }            // 判断是否当前线程持有,是的话,就是重入持有,改变state的值            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }    }        //

4. Condition

在特定条件上等待锁资源

来看个例子:

    /**     * 使用多个Condition实现通知部分线程     */    public class ReentrantLockExample {        private Lock lock = new ReentrantLock();        private Condition conditionA = lock.newCondition();        private Condition conditionB = lock.newCondition();        public void awaitA() {            lock.lock();            try {                System.out.println("start awaitA at " + System.currentTimeMillis()                        + " ThreadName:" + Thread.currentThread().getName());                conditionA.await();                System.out.println("end awaitA at " + System.currentTimeMillis()                        + " ThreadName:" + Thread.currentThread().getName());            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                lock.unlock();            }        }        public void awaitB() {            lock.lock();            try {                System.out.println("start awaitB at " + System.currentTimeMillis()                        + " ThreadName:" + Thread.currentThread().getName());                conditionB.await();                System.out.println("end awaitB at " + System.currentTimeMillis()                        + " ThreadName:" + Thread.currentThread().getName());            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                lock.unlock();            }        }        public void signalAll_A() {            lock.lock();            try {                System.out.println("signalAll_A at " + System.currentTimeMillis()                        + " ThreadName:" + Thread.currentThread().getName());                conditionA.signalAll();            } finally {                lock.unlock();            }        }        public void signalAll_B() {            lock.lock();            try {                System.out.println("signalAll_B at " + System.currentTimeMillis()                        + " ThreadName:" + Thread.currentThread().getName());                conditionB.signalAll();            } finally {                lock.unlock();            }        }        public static void main(String[] args) {            ReentrantLockExample example = new ReentrantLockExample();            Thread a = new Thread(() -> {                example.awaitA();            });            a.setName("A");            a.start();            Thread b = new Thread(() -> {                example.awaitB();            });            b.setName("B");            b.start();            example.signalAll_A();            /**             * print: 从输出结果可以看出只有线程A被唤醒了             *             * start awaitA at 1596380331589 ThreadName:A             * start awaitB at 1596380331590 ThreadName:B             * signalAll_A at 1596380331590 ThreadName:main             * end awaitA at 1596380331590 ThreadName:A             */        }    }    //

在以上例子中,想要实现的效果是使用多个 Condition 实现通知部分线程,也就是将唤醒粒度变小。

比如说,我现在有 10 个线程,定义了 5 个 condition 对象,每个 condition 上都注册两个线程, 假设某种情况下,10 线程都通过 await 阻塞了,这个时候假如 conditionA 的两个线程可以被唤醒处理其业务了(有可用资源),这个时候我可以做到只唤醒 conditionA 上两个线程,其他线程仍然在阻塞状态;这样就做到了精准唤醒了。

来看看 Condition 的实现原理:

4.1 newCondition

创建 Condition 对象,这里的 ConditionObject 是 AQS 的内部类

    final ConditionObject newCondition() {        return new ConditionObject();    }

4.2 await

AbstractQueuedSynchronizer#ConditionObject.await()

也就是我们例子中通过 conditionA.await() 进入阻塞状态,等待其他线程调用 signalAll 或者 signal唤醒

        public final void await() throws InterruptedException {            // 如果当前线程已经被中断了,就响应中断(也就是抛出异常)            if (Thread.interrupted())                throw new InterruptedException();            // 将当前线程包装成Node放入条件等待队列的队尾               Node node = addConditionWaiter();            // 同时还有释放当前线程已获取的资源            int savedState = fullyRelease(node);            int interruptMode = 0;            // 如果当前节点不在同步队列里,那就将线程挂起            while (!isOnSyncQueue(node)) {                LockSupport.park(this);                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;            }            // 说明当前节点已经从条件队列移到了同步队列中(也就是从await状态被signal唤醒之后,可以尝试获取锁资源进行后续操作了)            // 从上面被挂起的地方被唤醒之后,尝试去获取锁资源,如果获取失败,那就会进入等待队列中            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                interruptMode = REINTERRUPT;            if (node.nextWaiter != null) // clean up if cancelled                unlinkCancelledWaiters();            // 记录中断信息                if (interruptMode != 0)                reportInterruptAfterWait(interruptMode);        }        //

主要操作

将当前线程包装成 node 节点之后放入条件队列

释放当前线程占用的资源

挂起当前线程

当线程被 signal 或者 signalAll 唤醒之后,从条件队列移到同步队列,并尝试获取锁资源

4.2.1 addConditionWaiter

AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter

将当前线程包装成 Node 放入条件等待队列队尾

        /**         * Adds a new waiter to wait queue.         * @return its new wait node         */        private Node addConditionWaiter() {            Node t = lastWaiter;            // If lastWaiter is cancelled, clean out.            if (t != null && t.waitStatus != Node.CONDITION) {               // 从条件等待队列中移除取消的节点                unlinkCancelledWaiters();                t = lastWaiter;            }            Node node = new Node(Thread.currentThread(), Node.CONDITION);            if (t == null)                firstWaiter = node;            else                t.nextWaiter = node;            lastWaiter = node;            return node;        }  //

4.2.2 unlinkCancelledWaiters

AbstractQueuedSynchronizer.ConditionObject#unlinkCancelledWaiters

从条件等待队列中移除被取消的节点

        private void unlinkCancelledWaiters() {            Node t = firstWaiter;            Node trail = null;            while (t != null) {                Node next = t.nextWaiter;                if (t.waitStatus != Node.CONDITION) {                    t.nextWaiter = null;                    if (trail == null)                        firstWaiter = next;                    else                        trail.nextWaiter = next;                    if (next == null)                        lastWaiter = trail;                }                else                    trail = t;                t = next;            }        }         //

4.2.3 fullyRelease 释放资源

AbstractQueuedSynchronizer#fullyRelease

    final int fullyRelease(Node node) {        boolean failed = true;        try {            // 拿到当前线程锁占用的资源,然后释放            int savedState = getState();            if (release(savedState)) {                failed = false;                return savedState;            } else {                throw new IllegalMonitorStateException();            }        } finally {            if (failed)                node.waitStatus = Node.CANCELLED;        }    }//

4.2.4 isOnSyncQueue

AbstractQueuedSynchronizer#isOnSyncQueue

判断当前节点是否在同步器队列中,如果是的话说明当前节点正在等待获取资源

    abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860L;        abstract void lock();        // 非公平锁,tryAcquire方法由子类实现        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            // 拿到当前锁对象的重入次数            int c = getState();            // 如果等于0说明该锁对象没有被任何对象持有            // 这个时候等待队列可能是有等待节点的,只是恰好锁资源在此刻被释放了            if (c == 0) {                // 这里尝试去抢这把锁                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            // 如果此锁被当前对象持有,也就是重入操作,累加state            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }        // 尝试释放锁资源        protected final boolean tryRelease(int releases) {            int c = getState() - releases;            // 检查是不是当前线程获得了锁            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            // 如果重入次数等于0了,说明完全释放了这把锁,其他线程可以获取这把锁了            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }        protected final boolean isHeldExclusively() {            // 判是否独占模式            return getExclusiveOwnerThread() == Thread.currentThread();        }        // 创建Condition对象        final ConditionObject newCondition() {            return new ConditionObject();        }        final Thread getOwner() {            return getState() == 0 ? null : getExclusiveOwnerThread();        }        // 重入次数        final int getHoldCount() {            return isHeldExclusively() ? getState() : 0;        }        // state=0表示此锁未被占用        final boolean isLocked() {            return getState() != 0;        }    }     //0

4.3 signal 唤醒在条件等待队列中的节点

AbstractQueuedSynchronizer.ConditionObject#signal

    abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860L;        abstract void lock();        // 非公平锁,tryAcquire方法由子类实现        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            // 拿到当前锁对象的重入次数            int c = getState();            // 如果等于0说明该锁对象没有被任何对象持有            // 这个时候等待队列可能是有等待节点的,只是恰好锁资源在此刻被释放了            if (c == 0) {                // 这里尝试去抢这把锁                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            // 如果此锁被当前对象持有,也就是重入操作,累加state            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }        // 尝试释放锁资源        protected final boolean tryRelease(int releases) {            int c = getState() - releases;            // 检查是不是当前线程获得了锁            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            // 如果重入次数等于0了,说明完全释放了这把锁,其他线程可以获取这把锁了            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }        protected final boolean isHeldExclusively() {            // 判是否独占模式            return getExclusiveOwnerThread() == Thread.currentThread();        }        // 创建Condition对象        final ConditionObject newCondition() {            return new ConditionObject();        }        final Thread getOwner() {            return getState() == 0 ? null : getExclusiveOwnerThread();        }        // 重入次数        final int getHoldCount() {            return isHeldExclusively() ? getState() : 0;        }        // state=0表示此锁未被占用        final boolean isLocked() {            return getState() != 0;        }    }     //1

4.3.1 doSignal

AbstractQueuedSynchronizer.ConditionObject#doSignal

    abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860L;        abstract void lock();        // 非公平锁,tryAcquire方法由子类实现        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            // 拿到当前锁对象的重入次数            int c = getState();            // 如果等于0说明该锁对象没有被任何对象持有            // 这个时候等待队列可能是有等待节点的,只是恰好锁资源在此刻被释放了            if (c == 0) {                // 这里尝试去抢这把锁                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            // 如果此锁被当前对象持有,也就是重入操作,累加state            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }        // 尝试释放锁资源        protected final boolean tryRelease(int releases) {            int c = getState() - releases;            // 检查是不是当前线程获得了锁            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            // 如果重入次数等于0了,说明完全释放了这把锁,其他线程可以获取这把锁了            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }        protected final boolean isHeldExclusively() {            // 判是否独占模式            return getExclusiveOwnerThread() == Thread.currentThread();        }        // 创建Condition对象        final ConditionObject newCondition() {            return new ConditionObject();        }        final Thread getOwner() {            return getState() == 0 ? null : getExclusiveOwnerThread();        }        // 重入次数        final int getHoldCount() {            return isHeldExclusively() ? getState() : 0;        }        // state=0表示此锁未被占用        final boolean isLocked() {            return getState() != 0;        }    }     //2

signalAll 和 signal 原理类似,只不过一个是唤醒所有在当前 condition 上等待的节点,另一个是只唤醒一个,这里不在赘述。

相关文档:

JAVA并发基石之AQS --- 心法篇

原文:https://juejin.cn/post/7100485385595125773


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/15884.html