VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Java教程 >
  • ReentrantLock源码

  • JUC 指java.util.concurrent包下,一系列关于并发的类,JUC就是包名的首字母

  • CAS 比较并交换,可以看另一篇文章

  • AQS 指主要利用CAS来实现的轻量级多线程同步机制,并且不会在CPU上出现上下文切换和调度的情况

自定义锁

如何在自己实现一个锁?

可以定义一个属性来判断当前是否有其线程在运行,如果正在运行那么其他线程需要等待

如何实现? 例如有两个线程T1和T2,都执行同一段代码

自定义两个方法


public void lock(); public void unlock();
 
 

public void addI(){ i++; } 将上面的addI方法更改为下面的 public void addI(){ lock(); i++; unlock(); }
 
 

这里忽略程序出错导致死锁的情况,正常解锁需要放在finally代码块中

当T1进入代码,将锁的改为被持有的状态


/** * 0为未持有 * 1为被持有 */ private volatile int i=0; public void lock(){ //CAS修改成功返回true while(CAS(i,1)){ return } } public void unlock(){ i=0; }
 
 

上面的伪代码当T1进入lock方法后,因为是第一个进入的,锁的状态还是0,通过cas可以改为1,修改成功返回true,进入循环return到addI方法,执行i++操作,然后进入unLock方法,将状态改为0,方法结束

假设当T1进入方法将状态改为1,那么T2进入会一直循环CAS修改,线程一直在自旋不会走下面的代码,直到锁的状态改为0,才会继续业务代码

那么我们就实现了一个简单的锁,但是这个锁有什么缺点呢? 没有获取到锁的线程会一直自旋,消耗系统资源,这个是我们不想看到的

在java中还有一个类LockSupport,其中有一个park方法


public static void park() { UNSAFE.park(false, 0L); } public native void park(boolean var1, long var2);
 
 

里面继续调用UNSAFE类,这个类里的方法是使用C/C++实现,park方法的作用是将当前线程立即休眠,让出CPU,直到被唤醒,还有一个唤醒的方法


public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } public native void unpark(Object var1);
 
 

这个同样也是其他语言实现,传入需要被唤醒的线程,那么我们上面的代码可以改造为


/** * 0为未持有 * 1为被持有 */ private volatile int i=0; //存放等待获取锁的线程 private Thread t; public void lock(){ //CAS修改成功返回true if(CAS(i,1)){ return } //将没有获取到锁的线程存放 t=Thread.currentThread() //如果没有获取到锁则进行休眠 LockSupport.park(); } public void unlock(){ i=0; if(t!=null){ LockSupport.unpark(t); } }
 
 

我们修改完后即使没有获取到锁的线程也不会占用CPU的资源,但是如果出现2个以上的线程同时进行操作,那么会出现丢失线程的情况,可以再进行优化,将等待的线程存放到队列中,就不再演示了,而ReentrantLock就是主要使用CAS,park,自旋来实现的,接下来看ReentrantLock的源码

ReentrantLock

当初始化一个ReentrantLock使用默认构造时创建的是一个非公平锁


public ReentrantLock() { sync = new NonfairSync(); }
 
 

如果想创建一个公平锁则使用有参构造


public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
 
 

这篇文章先来看公平锁的实现


public void service() { //创建一个公平锁 ReentrantLock reentrantLock = new ReentrantLock(true); reentrantLock.lock(); try { System.out.println("==这里有一堆的业务==="); } catch (Exception e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } }
 
 

加锁

没有竞争情况


public void lock() { sync.lock(); } 调用的sync是一个ReentrantLock的内部抽象类 abstract static class Sync extends AbstractQueuedSynchronizer{ ...... }
 
 

它的公平锁的实现方法,是FairSync类中的,也是一个内部类,在ReentrantLock中,继承了Sync类,实现lock方法


static final class FairSync extends Sync { final void lock() { acquire(1); } }
 
 

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
 
 

点进tryAcquire方法


protected final boolean tryAcquire(int acquires) { //获取当前执行的线程 final Thread current = Thread.currentThread(); //得到锁的状态 int c = getState(); //如果锁状态为0说明当前锁没有被其他线程持有 if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
 
 

继续点进hasQueuedPredecessors方法,该方法定义在AbstractQueuedSynchronizer抽象类中的


public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 其中tail和head这两个变量是在AbstractQueuedSynchronizer抽象类中定义的,用来存放等待线程头和尾部
 
 

因为当前线程执行前锁的状态是未被持有的,所以还没有初始化过队列,那么等待队列的头和尾部都为null,return的第一个判断h!=t为false,后面的&&运算符,所以直接返回

那么回到tryAcquire方法,hasQueuedPredecessors返回false,而前面有一个取反!符号,则继续执行compareAndSetState(0, acquires)方法,通过cas改变当前锁的状态为1,然后执行setExclusiveOwnerThread方法,该方法就是简单的赋值


protected final void setExclusiveOwnerThread(Thread thread) { //当前持有锁的线程 exclusiveOwnerThread = thread; }
 
 

继续返回到acquire方法,为true,取反false,使用了&&阻断符,则不会执行后面的acquireQueued方法,直接结束lock()方法,执行自定义的业务代码

tryAcquire方法什么时候走到 else if (current == getExclusiveOwnerThread()) 判断呢

ReentrantLock的特性之一就是体现在这里-重入锁

啥叫重入锁?简单讲就是在加锁后又加锁


public void addI(){ ReentrantLock rLock =new ReentrantLock(true); rLock.lock(); //执行业务== rLock.lock(); //执行业务== //解锁最后加锁的 rLock.unlock(); //解锁最先加锁的 rLock.unlock(); }
 
 

当线程和该锁已经持有的线程相同时则会进入这个判断,将锁的状态加1,赋值给state,下面的判断state小于0可能是判断溢出的问题,即数值超出int类型最大容量则为负数,一般这种情况很少见吧

存在竞争情况

那么上面是没有其他线程竞争的情况,如果在T1加锁后,T2,T3..来尝试获取锁改怎么办呢?->进等待队列

这个还是tryAcquire方法的代码,拿下来方便查看


protected final boolean tryAcquire(int acquires) { //获取当前执行的线程 final Thread current = Thread.currentThread(); //得到锁的状态 int c = getState(); //如果锁状态为0说明当前锁没有被其他线程持有 if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
 
 

如果在T1进行完加锁后T2来尝试获取锁,因为state状态不为0,而当前线程和锁持有的线程又不同,则直接返回false

那么返回acquire方法中


public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } Node.EXCLUSIVE 返回一个Node节点
 
 

取反为true,则执行acquireQueued方法,而acquireQueued方法中有执行了addWaiter方法,先来看addWaiter方法


private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
 
 

使用链表的形式来存储阻塞排队的线程,来看node的内部结构

主要的三个属性


//存放上一个节点 volatile Node prev; //存放下一个节点 volatile Node next; //存放当前等待的线程 volatile Thread thread;
 
 

Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
 
 

当进入这个方法后,首先将AbstractQueuedSynchronizer类中的尾部节点赋值给一个临时变量,判断尾部是否为空,假设现在线程为T2,队列还没有被初始化,尾部为空,则进入enq方法,继续点进


private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { //CAS设置头节点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //CAS设置尾巴节点 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
 
 

还是将AbstractQueuedSynchronizer类中尾部节点赋值给临时变量t 然后判断t是否为空,因为队列还没有初始化,所以尾巴节点为空,则使用cas来设置 AbstractQueuedSynchronizer类中的头节点,之后将设置的头节点赋值给尾部

当执行完节点的关系如下

这时候有个疑问,怎么没有设置传入的Node节点呢?而是设置新new出来的Node,和参数传入的Node节点没有一点关系?

注意看上面的代码for(;;) 死循环,当下次循环的时候t已经不为空了,因为上次循环给加了一个空节点,然后将传入的Node节点的上一个赋值为t,然后通过CAS获取AbstractQueuedSynchronizer类中的尾部节点,如果尾部节点还是为t,则更改为传入的node对象,如果CAS失败,即在CAS设置前被其他线程对AbstractQueuedSynchronizer类中的尾部节点进行了修改,则进行下一次for循环,直至设置成功,当操作完成后,节点结构如下图

之后代码返回到acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法


final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
 
 

还是一个for死循环,首先获取上一个节点和AbstractQueuedSynchronizer类中的头节点进行判断,如果相同则调用tryAcquire()方法尝试获取锁,因为在初始化队列过程中可能获取锁执行的线程已经执行完了,并且释放了锁,所以这里尝试一下获取锁,假设没有获取到锁,则不会进入if (p == head && tryAcquire(arg)) {}代码块,继续下面的判断,进入shouldParkAfterFailedAcquire()方法,从名称可以看到[在获取锁失败后应该睡眠]


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
 
 

判断上一个node节点的状态,将上一个节点的Node.SIGNAL状态的值为-1,而上面的代码中并没有对waitStatus的值进行更改,默认初始化为0,则进入最后的else代码块,通过CAS将waitStatus的值改为-1,方法返回false结束,回到acquireQueued方法中,继续进行for循环,假设还是没有获取到锁,则再次进入shouldParkAfterFailedAcquire方法中,因为上次for循环将waitStatus的值改为了-1,则这次进入了if (ws == Node.SIGNAL)的代码块,返回true,返回到 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())判断中,因为shouldParkAfterFailedAcquire方法返回了true,则继续执行parkAndCheckInterrupt方法


private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
 
 

当执行完parkAndCheckInterrupt方法后,T2线程就在这里进行休眠

为什么不开始就把waitStatus设置为-1呢?还要多自旋一次,有一个原因是尽量不使用park,能尝试获取到锁最好

那么假设现在又来一个线程T3


public final void acquire(int arg) { //尝试获取锁肯定不会成功,则进入acquireQueued,addWaiter方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
 
 

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //这时tail已经是t2节点了 Node pred = tail; //不为空进入 if (pred != null) { //将当前节点上一个节点设置为t2 node.prev = pred; //通过CAS来设置AQS类中的尾节点 if (compareAndSetTail(pred, node)) { //然后设置T2的下一个节点 pred.next = node; return node; } } enq(node); return node; }
 
 

完成操作后节点关系如下

之后继续执行acquireQueued方法


final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取上一个节点:T2 final Node p = node.predecessor(); //T2不是头节点,则不进入下面的代码块 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //之后调用shouldParkAfterFailedAcquire方法 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
 
 

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //同样的代码,第一次获取T3的前一个节点T2,判断T2的ws值为0, //CAS修改后返回,外层循环再次进入这时T2的ws值为-1,返回true,方法结束 int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
 
 

解锁

假设现在T1执行unlock方法,T2,T3在队列中


public void unlock() { sync.release(1); }
 
 

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
 
 

进入tryRelease方法


protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; //把当前持有锁的线程清空 setExclusiveOwnerThread(null); } //设置锁的状态 setState(c); return free; }
 
 

首先将状态数值-1,判断如果当前线程和持有锁的线程不是同一个则抛出异常,即解锁的线程和加锁的不是同一个线程

判断如果c==0,也就是没有重入锁的情况,将free改为true,然后进入setExclusiveOwnerThread方法


protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final void setState(int newState) { state = newState; }
 
 

方法返回,没有重入锁的情况,则free为true,获取AQS类中的头节点,假设不为空,ws=-1,则进入unparkSuccessor(h)方法


private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
 
 

首先获取头结点的状态,小于0进入代码块,将头结点的锁状态改为0,获取下一个节点,那么s就是t2,而t2的ws也是-1,所以直接进入最下面的代码块,if(s!=null),unpark(t2)线程

那么回到t2线程休眠的地方


private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //在这里醒来 return Thread.interrupted(); }
 
 

下面的是判断线程是否被中断过,native方法,无法看到实现了,那么假设没有被中断过则返回false,那么返回上一个方法


final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //返回代码后继续执行这里 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
 
 

因为parkAndCheckInterrupt方法返回false,所以进不去代码块,那么继续执行for,当执行if (p == head && tryAcquire(arg))时p==head成立,而调用tryAcquire方法尝试获取锁成功,因为t1已经释放了,那么进入下面的代码块


if (p == head && tryAcquire(arg)) { setHead(node); //设置t2上一个节点,也就是空节点的下一个节点设置为null p.next = null; // help GC p节点没有任何引用指向了,帮助垃圾回收 failed = false; return interrupted; } private void setHead(Node node) { //将t2节点设置为头部 head = node; //然后将t2节点的thread设置为null node.thread = null; //节点的上一个节点设置为null node.prev = null; }
 
 

经过上面的操作后节点关系如下

如果这个节点在头说明它正在执行代码,而不是排队,即使初始化时T1没有进队列,但是给它添加了一个空node,来代替它正在执行

例如有T2,T3在排队,T1线程unpark后T2线程执行,上面的代码也能说明T2会先把当前节点的线程,上下节点都设置为null,而T2线程去执行代码去了,已经在运行过程中了

看别的博客有一段解释:比如你去买车票,你如果是第一个这个时候售票员已经在给你服务了,你不算排队,你后面的才算排队

注意一点:队列头始终为空Node

如何保证公平

情况1

T1执行完unpark后,释放完锁,还没来的及唤醒队列中的T2,这时T3线程来尝试获取到锁


public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
 
 

这种情况队列中肯定有节点排队,如果没有节点直接获取到锁也是公平的,那么有节点排队h就不等于t,true,&&运算符继续判断,h的next节点也不为null,返回false

s.thread != Thread.currentThread() 如果当前来尝试获取锁的对象不是在排队的第一个(也就是头结点的下一个节点,头结点正在运行,不算在排队的队列中)也就是其他线程插队的情况,则返回true,结果就是(true&&(false||true)) 整体返回true,外层代码取反为false,不会尝试CAS获取锁,则T3去排队

情况2

T2尝试获取锁时发现T1持有锁,于是去初始化队列,在初始化过程中T1执行完释放锁,T2执行初始化队列代码时间片用完,这时T3来尝试获取锁


private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node()))<------假设T2初始化队列执行到这里CPU时间片用完 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
 
 

此时节点关系如下

那么回到hasQueuedPredecessors方法,看最后的return


return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
 
 

h头节点为一个空node,而t为节点为null,不等于true继续判断,h头结点下一个为null,整体返回true,外层代码取反为false,则去排队


if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; }
 
 

遗留问题

1 初始化队列以及后面的入队为什么要设置空的头节点

2 在parkAndCheckInterrupt()方法中最后调用的Thread.interrupted();一系列方法最后不改变任何东西,不明白它这个的作用,也有说是为了复用lockInterruptibly()方法,但是感觉有点牵强

 

__EOF__

 
本文作者Jame 本文链接:https://www.cnblogs.com/sunankang/p/15075729.html


相关教程