VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Java教程 >
  • Java并发之AQS原理解读(二)

前言#

本文从源码角度分析AQS独占锁工作原理,并介绍ReentranLock如何应用。

独占锁工作原理#

独占锁即每次只有一个线程可以获得同一个锁资源。

获取锁#

  1. 尝试获取资源(修改state),成功则返回
  2. 资源不足的情况下,线程会被封装成Node写入阻塞队列,然后以CAS自旋地方式循环重试获取锁(当插入的结点是head的直接后继时尝试获取锁,否则进入阻塞,只有当其他线程释放锁或者调用当前节点线程的中断方法时,才会重试获取锁)
  3. 自旋获取锁成功后,会将当前节点设为队列头结点
  4. 如果自旋阶段发生了线程中断,在获取锁成功之后,会补偿主动调用一次 interrupt 方法。因为自旋时调用的是interrupted方法返回中断标识,调用完后会清除状态

源码分析:

/* 获取独占锁
 * 1. tryAcquire 先尝试获取锁,如果成功直接返回;
 * 2. 否则 addWaiter 初始化辅助头结点,并将新节点添加到阻塞队列;
 * acquireQueued 如果新节点是 head 的直接后继则尝试获取锁,否则 LockSupport 阻塞当前线程,
 * 直到被释放锁的线程唤醒或者发生线程中断,才会重新尝试获取锁(CAS自旋阶段);
 * 3. 获取锁成功后,如果之前循环重试阶段发生线程中断,则会通过 selfInterrupt 将线程中断标志设为 true
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 将新节点插入到队尾
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 如果tail节点不为null时,直接尝试插入
    if (pred != null) {
        node.prev = pred;
        // 修改tail变量的值为插入节点的地址,即让tail指向新插入的节点
        // pred的值不变,还是原tail的地址
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // tail节点为null时,先初始化辅助头节点,再插入新节点
    enq(node);
    return node;
}

// 初始化辅助头节点,循环地将新节点插入到队尾,直至成功
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            // 初始化辅助头节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 插入新节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

// CAS自旋阶段
// 循环重试获取锁,不成功就阻塞,直到被其他释放锁线程唤醒或发生线程中断,方法返回自旋阶段是否发生线程中断
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            /*
             * 如果新插入结点是 head 的直接后继,则尝试获取锁
             * 获取成功,则将当前节点设为head,并改成 dummy node(假结点)
             */
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;  // help GC,丢弃原先的 dummy node
                failed = false;
                return interrupted;
            }
            /*
             * 获取锁失败后阻塞当前节点,直到其他线程释放锁或调用当前线程的线程中断
             * 发生线程中断的情况时,会将 interrupted 设为 true,表示自旋阶段发生了线程中断
             * shouldParkAfterFailedAcquire方法在前驱节点状态不为SIGNAL的情况下都会循环重试获取锁
             */
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 根据前驱节点等待状态判断是否要阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 前驱节点等待状态为SIGNAL时,在释放锁的时候会唤醒后继节点,
         * 所以当前节点的线程可以阻塞自己
         */
        return true;
    if (ws > 0) {
        /*
         * 前驱节点等待状态为CANCELLED时,向前遍历
         * 断开对 CANCELLED 状态结点引用,help gc
         * 之后会回到循环重试获取锁
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         /**
          * 等待状态为0或者PROPAGATE(-3)时,设置前驱节点等待状态为SIGNAL,
          * 之后会回到循环重试获取锁
          */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// 阻塞当前线程并返回线程的中断标识
private final boolean parkAndCheckInterrupt() {
    // 阻塞当前线程,可以通过 LockSupport.unpark 或 currentThread.interrupt 唤醒
    LockSupport.park(this);
    return Thread.interrupted();
}

释放锁#

  1. 尝试释放资源(修改state),如果失败直接返回
  2. 成功的话,再唤醒阻塞队列中的下一个结点的线程。当前节点后继不符合时,会从队尾往前找

源码分析:

// 释放锁
public final Boolean release(int arg) {
    // 释放资源
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 获取并唤醒头结点后的一个待唤醒结点(waitStatus<=0)
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 结点直接后继不符合,则从队尾向前找
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒阻塞结点的线程
        LockSupport.unpark(s.thread);
}

ReentranLock实现#

非公平锁含义在于拥有锁的线程释放锁的时候,当前尝试获取锁资源的线程可以和队列中的第一个等待线程竞争;而已经进入队列的线程只能按照先进先出的顺序获取锁,也就是公平锁的逻辑。

第一次获取锁时,只要锁还未被占用,非公平锁会先直接通过CAS抢占,而公平锁则会判断是否有其他结点先进入队列,没有的话才会尝试获取锁。

ReentranLock是可重入锁,可重入即获得锁的线程可以重复进入临界区。通过实现AbstractOwnableSynchronizer接口记录获取锁的线程,当线程重复进入临界区时state+1,退出临界区时state-1

非公平锁#

// 先尝试 CAS 抢占锁,失败后再通过 acquire 尝试
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 当锁未被占用时,尝试获取锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 已获得锁的线程支持重入
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平锁#

// 不抢占,通过 acquire 尝试
final void lock() {
    acquire(1);
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        /**
          * 当锁未被占用时,且阻塞队列中没有结点比当前节点更早开始等待时,才尝试获取锁
          */
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 已获得锁的线程支持重入
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
出处:https://www.cnblogs.com/flowers-bloom/p/java-concurrent-aqs-2.html

相关教程