并发编程 - 读写锁 ReentrantReadWriteLock
创始人
2024-05-14 15:37:59
0

前言

ReentrantReadWriteLock 在 ReentrantLock 的基础上增加了读写锁的概念。适用于读多写少的场景。

一个线程成功获取了读锁,并且在没有释放的情况下,其它线程(包括它自己)无法再获取写锁。

一个线程成功获取了写锁,并且在没有释放的情况下,其它线程(包括它自己)无法再获取读锁、写锁。

对同步状态的高16位表示写锁的数量、低16位表示读锁的数量。

源码分析

构造器

首先看下 ReentrantReadWriteLock 的构造器方法。

public ReentrantReadWriteLock() {this(false);
}public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);
}

可以看出 ReentrantReadWriteLock 也有公平模式、非公平模式,可以通过方法参数来指定。默认是 非公平模式。

读锁

接下来就是 ReentrantReadWriteLock 的重头戏,读写锁部分,我们先来看读锁。

public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

可以通过 readLock 方法获取读锁。

lock 方法

获取读锁。

作为读锁的 ReadLock 本身实现了 Lock 接口,首先看它的 lock 方法的实现。

public void lock() {sync.acquireShared(1);
}

acquireShared 方法是 AbstractQueuedSynchronizer 提供的。

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);
}

首先我们来看 tryAcquireShared 方法。

protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();// 如果有线程持有写锁,并且该线程不是当前线程if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)// 返回-1,表示获取读锁失败return -1;// 读锁的数量int r = sharedCount(c);// 判断线程是否需要阻塞// 如果不需要,再判断读锁的数量是否小于阈值(65535)// 如果小于阈值,再判断CAS方式获取读锁是否成功if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {// 如果当前没有线程持有读锁if (r == 0) {// 记录第一个持有读锁的线程为当前线程firstReader = current;// 记录第一个持有读锁的线程的重入次数为1firstReaderHoldCount = 1;// 如果第一个持有读锁线程是当前线程} else if (firstReader == current) {// 对第一个持有读锁的线程的可重入次数加1firstReaderHoldCount++;// 如果第一个持有读锁的线程不是当前线程,即已经有其它线程先一步持有读锁  } else {// 获取计数器缓存(记录最近一次成功获取读锁的线程的相关信息)HoldCounter rh = cachedHoldCounter;// 如果计数器缓存为空或者最近一次成功获取读锁的线程不是当前线程if (rh == null || rh.tid != getThreadId(current))// 初始化HoldCounter实例,作为计数器缓存cachedHoldCounter = rh = readHolds.get();// 如果计数器缓存记录的最近一次成功获取读锁的线程是当前线程,并且计数为0else if (rh.count == 0)// 将HoldCounter实例放到readHolds中(每个线程都会有自己的readHolds)readHolds.set(rh);// 计数加1rh.count++;}// 返回1,表示获取读锁成功return 1;}// 如果无法同时满足上述三个条件,就会执行该方法// 尝试获取读锁return fullTryAcquireShared(current);
}

sharedCount 方法

获取读锁的数量。

static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

SHARED_SHIFT 定义如下:

static final int SHARED_SHIFT   = 16;

结合上下文就是,取同步状态的低16位,即为读锁的数量。

readerShouldBlock 方法

判断线程是否需要阻塞。

先来看非公平模式的实现

final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();
}

apparentlyFirstQueuedIsExclusive 方法是 AbstractQueuedSynchronizer 提供的。

final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;// (h = head) != null:同步队列的头节点存在// (s = h.next) != null:头节点的后继节点存在// !s.isShared():不是共享模式的// s.thread != null:它的线程存在return (h = head) != null &&(s = h.next)  != null &&!s.isShared()         &&s.thread != null;
}

简言之,同步队列有独占模式的节点在前面排队。

再来看公平模式的实现

final boolean readerShouldBlock() {return hasQueuedPredecessors();
}

hasQueuedPredecessors 方法是 AbstractQueuedSynchronizer 提供的。

public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;// h != t:保证同步队列中有节点,不是空队列// (s = h.next) == null:头节点的后继节点已经在队列中,但是与头节点的连接还没有完全建立// s.thread != Thread.currentThread():(头节点的后继节点与头节点的连接已经完全建立)通过比较线程得知,当前节点// 不是那个头节点的后继节点return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}

简言之,同步队列中已经有节点在前面排队。

fullTryAcquireShared 方法

尝试获取读锁。

final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) {int c = getState();// 如果有线程持有写锁if (exclusiveCount(c) != 0) {// 如果持有写锁的线程不是当前线程if (getExclusiveOwnerThread() != current)// 返回-1,表示尝试获取读锁失败return -1;// 如果当前线程需要阻塞} else if (readerShouldBlock()) {// 如果第一次持有读锁的线程是当前线程if (firstReader == current) {} else {// 如果第一次持有读锁的线程不是当前线程if (rh == null) {// 从计数器缓存中获取计数器rh = cachedHoldCounter;// 如果该计数器为空或者计算器记录的不是当前线程if (rh == null || rh.tid != getThreadId(current)) {// 从readHolds实例获取计数器rh = readHolds.get();// 如果该计数器计数为0if (rh.count == 0)// 删除该readHolds实例readHolds.remove();}}// 如果计数器计数为0if (rh.count == 0)// 返回-1,表示尝试获取读锁失败return -1;}}// 如果读锁的数量达到了阈值(65535),则报错if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");// 如果获取读锁成功if (compareAndSetState(c, c + SHARED_UNIT)) {// 如果读锁数量为0if (sharedCount(c) == 0) {// 记录第一次成功获取读锁的线程为当前线程firstReader = current;// 记录第一次成功获取读锁的重入次数为1firstReaderHoldCount = 1;// 如果第一次成功获取读锁的线程是当前线程  } else if (firstReader == current) {// 对第一次成功获取读锁的重入次数加一firstReaderHoldCount++;// 如果第一次成功获取读锁的线程不是当前线程    } else {// 如果计数器为空if (rh == null)// 从计数器缓存中(记录最近一次成功获取读锁的线程的相关信息)获取计数器rh = cachedHoldCounter;// 如果计数器依然为空或者计数器记录的不是当前线程if (rh == null || rh.tid != getThreadId(current))// 从readHolds实例中获取计数器rh = readHolds.get();// 如果计数器计数为0else if (rh.count == 0)// 将计数器放到readHolds实例中readHolds.set(rh);// 计数加一rh.count++;// 将计数器放到计数器缓存中cachedHoldCounter = rh; }// 返回1,表示获取读锁成功return 1;}}
}

doAcquireShared 方法

表达的是获取锁失败之后对同步队列的处理。具体的分析看下 AbstractQueuedSynchronizer 的文章。

unlock 方法

释放读锁。

public void unlock() {sync.releaseShared(1);
}

releaseShared 方法是 AbstractQueuedSynchronizer 提供的。

public final boolean releaseShared(int arg) {// 如果释放读锁成功if (tryReleaseShared(arg)) {// 修改头节点的等待状态为"PROPAGATE",然后唤醒同步队列中头节点的后继节点(或者从同步队列队尾节点开始向前// 寻找可用的节点)对应的线程。doReleaseShared();// 返回true,表示释放读锁成功return true;}// 返回false,表示释放读锁失败return false;
}

首先看下 tryReleaseShared 方法。

tryReleaseShared 方法

表达的是释放读锁。

protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();// 如果记录的第一个持有读锁的线程是当前线程if (firstReader == current) {// 如果第一个持有读锁的线程的重入次数为1if (firstReaderHoldCount == 1)// 重置firstReader为nullfirstReader = null;else// 对重入次数减一firstReaderHoldCount--;// 如果记录的第一个持有读锁的线程不是当前线程  } else {// 从计数器缓存中(记录最近一次成功获取读锁的线程的相关信息)获取计数器HoldCounter rh = cachedHoldCounter;// 如果计数器为空或者计数器记录的线程不是当前线程if (rh == null || rh.tid != getThreadId(current))// 从readHolds实例获取计数器rh = readHolds.get();int count = rh.count;// 如果计数器的计数<=1if (count <= 1) {// 删除readHolds实例readHolds.remove();// 如果计数器的计数<=0if (count <= 0)// 抛出异常throw unmatchedUnlockException();}// 对计数器的计数减一--rh.count;}for (;;) {// 获取同步状态int c = getState();// 对同步状态减去一个单位int nextc = c - SHARED_UNIT;// 使用CAS方式更新同步状态if (compareAndSetState(c, nextc))// 返回同步状态是否等于0,即表示是否成功释放读锁return nextc == 0;}
}

doReleaseShared 方法

表达的是成功释放锁之后对同步队列的处理。

private void doReleaseShared() {for (;;) {Node h = head;// 同步队列中有节点(除头节点、尾节点以外)if (h != null && h != tail) {int ws = h.waitStatus;// 如果头节点的等待状态为 SIGNALif (ws == Node.SIGNAL) {// 使用 CAS 方式更新头节点的等待状态为 0if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;   // 主要用于唤醒同步队列中头节点的后继节点(或者从同步队列队尾节点开始向前寻找可用的节点)对应的线程unparkSuccessor(h);}// 如果头节点的等待状态为 0,使用 CAS 方式更新头节点的等待状态为 PROPAGATEelse if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                }// 判断头节点是否发生改变,如果没有改变则跳出循环if (h == head)                   break;}
}

主要修改头节点的等待状态为"PROPAGATE",然后唤醒同步队列中头节点的后继节点(或者从同步队列队尾节点开始向前寻找可用的节点)对应的线程。

写锁

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }

可以通过 writeLock 方法获取写锁。

lock 方法

获取写锁。

public void lock() {sync.acquire(1);
}

acquire 方法是 AbstractQueuedSynchronizer 提供的。

public final void acquire(int arg) {// 尝试获取写锁,如果获取写锁失败,则加入到同步队列中进行等待if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 线程中断selfInterrupt();
}

首先看下 tryAcquire 方法。

protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();// 获取写锁的数量int w = exclusiveCount(c);// 如果同步状态不为0,即有线程持有读锁或者写锁if (c != 0) {// 如果有其它线程持有读锁或者持有写锁的线程不是当前线程if (w == 0 || current != getExclusiveOwnerThread())// 返回false,表示获取写锁失败return false;// 如果写锁的数量大于阈值(65535),则抛出异常if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 更新同步状态setState(c + acquires);// 返回true,表示获取写锁成功return true;}// 走到这儿说明同步状态为0,即没有线程持有读锁或者写锁// 如果线程需要阻塞或者使用 CAS 方式修改同步状态失败if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))// 返回false,表示获取写锁失败return false;// 使用 exclusiveOwnerThread 记录当前线程(表示持有写锁的线程)setExclusiveOwnerThread(current);// 返回true,表示获取写锁成功return true;
}

简单看下 exclusiveCount 方法如何表达写锁的数量。

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

EXCLUSIVE_MASK 定义如下:

static final int SHARED_SHIFT   = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

结合上下文就是,取同步状态的高16位,再减1,即为写锁的数量。

接下来看下 writerShouldBlock 方法。

writerShouldBlock 方法

判断线程是否需要阻塞。

非公平模式下该方法始终返回 false,即不需要阻塞。

接下来看下公平模式的处理:

final boolean writerShouldBlock() {return hasQueuedPredecessors();
}

hasQueuedPredecessors 方法是 AbstractQueuedSynchronizer 提供的。

public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;// h != t:保证同步队列中有节点,不是空队列// (s = h.next) == null:头节点的后继节点已经在队列中,但是与头节点的连接还没有完全建立// s.thread != Thread.currentThread():(头节点的后继节点与头节点的连接已经完全建立)通过比较线程得知,当前节点// 不是那个头节点的后继节点return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}

简言之,同步队列中前面已经有节点在排队。

addWaiter 方法

初始化同步队列以及将当前线程封装成节点,加入到同步队列中。具体的代码分析看上一篇的 ReentrantLock 文章。

acquireQueued 方法

表达的是在同步队列中的节点,如何尝试获取同步状态。具体的代码分析看上一篇的 ReentrantLock 文章。

unlock 方法

释放写锁。

public void unlock() {sync.release(1);
}

release 方法是 AbstractQueuedSynchronizer 提供的。

public final boolean release(int arg) {// 如果释放写锁成功if (tryRelease(arg)) {Node h = head;// 如果头节点存在,并且头节点的等待状态不为0if (h != null && h.waitStatus != 0)// 主要用于唤醒同步队列中头节点的后继节点(或者从同步队列队尾节点开始向前寻找可用的节点)对应的线程unparkSuccessor(h);// 返回 true,表示释放写锁成功return true;}// 返回 false,表示释放写锁失败return false;
}

首先看下 tryRelease 方法

protected final boolean tryRelease(int releases) {// 如果 exclusiveOwnerThread 记录的不是当前线程if (!isHeldExclusively())// 抛出异常throw new IllegalMonitorStateException();// 同步状态减去给定值int nextc = getState() - releases;// 判断写锁的数量是否等于0boolean free = exclusiveCount(nextc) == 0;// 如果写锁的数量等于0if (free)// 重置 exclusiveOwnerThread 为 nullsetExclusiveOwnerThread(null);// 更新同步状态setState(nextc);// 返回是否成功释放写锁return free;
}

只有持有写锁的线程才能释放写锁。

接下来看下 unparkSuccessor 方法

private void unparkSuccessor(Node node) {int ws = node.waitStatus;// 如果头节点的等待状态小于0if (ws < 0)// 使用CAS方式更新等待状态为0compareAndSetWaitStatus(node, ws, 0);Node s = node.next;// 如果头节点的后继节点为空或者等待状态大于0if (s == null || s.waitStatus > 0) {s = null;// 从同步队列队尾节点开始向前寻找可用的(等待状态小于等于0的)节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)// 唤醒该节点对应的线程LockSupport.unpark(s.thread);
}

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...