java并发(七)——AbstractQueuedSynchronizer

小编 2026-06-05 阅读:1042 评论:0
Java并发包(JUC)中提供了很多并发工具,这其中,很多我们耳熟能详的并发工具,譬如ReentrangLock、Semaphore,它们的实现都用到了一个共同的基类–AbstractQue...

Java并发包(JUC)中提供了很多并发工具,这其中,很多我们耳熟能详的并发工具,譬如ReentrangLock、Semaphore,它们的实现都用到了一个共同的基类–AbstractQueuedSynchronizer,简称AQS。AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。

基本实现原理

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。

//共享变量,使用volatile修饰保证线程可见性
private volatile int state;

状态信息通过procted类型的getState,setState,compareAndSetState进行操作。

AQS支持两种同步方式:

  1. 独占式
  2. 共享式

这样方便使用者实现不同类型的同步组件,独占式如ReentrantLock,共享式如Semaphore,CountDownLatch,组合式的如ReentrantReadWriteLock。总之,AQS为使用提供了底层支撑,如何组装实现,使用者可以自由发挥。

如何使用

同步器的设计是基于模板方法模式的,一般的使用方式是这样:

  1. 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)。
  2. 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这其实是模板方法模式的一个很经典的应用。

我们来看看AQS定义的这些可重写的方法:

	   protected boolean tryAcquire(int arg) : 独占式获取同步状态,试着获取,成功返回true,反之为false

    protected boolean tryRelease(int arg) :独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态;

    protected int tryAcquireShared(int arg) :共享式获取同步状态,返回值大于等于0,代表获取成功;反之获取失败;

    protected boolean tryReleaseShared(int arg) :共享式释放同步状态,成功为true,失败为false

    protected boolean isHeldExclusively() : 是否在独占模式下被线程占用。

关于AQS的使用,我们来简单总结一下:

首先,我们需要去继承AbstractQueuedSynchronizer这个类,然后我们根据我们的需求去重写相应的方法,比如要实现一个独占锁,那就去重写tryAcquire,tryRelease方法,要实现共享锁,就去重写tryAcquireShared,tryReleaseShared;最后,在我们的组件中调用AQS中的模板方法就可以了,而这些模板方法是会调用到我们之前重写的那些方法的。也就是说,我们只需要很小的工作量就可以实现自己的同步组件,重写的那些方法,仅仅是一些简单的对于共享资源state的获取和释放操作,至于像是获取资源失败,线程需要阻塞之类的操作,自然是AQS帮我们完成了。

设计思想

对于使用者来讲,我们无需关心获取资源失败,线程排队,线程阻塞/唤醒等一系列复杂的实现,这些都在AQS中为我们处理好了。我们只需要负责好自己的那个环节就好,也就是获取/释放共享资源state的姿势T_T。很经典的模板方法设计模式的应用,AQS为我们定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现即可。

源码分析

我们先来简单描述下AQS的基本实现,前面我们提到过,AQS维护一个共享资源state,通过内置的FIFO来完成获取资源线程的排队工作。(这个内置的同步队列称为\"CLH\"队列)。该队列由一个一个的Node结点组成,每个Node结点维护一个prev引用和next引用,分别指向自己的前驱和后继结点。AQS维护两个指针,分别指向队列头部head和尾部tail。
\"在这里插入图片描述\"
其实就是个双端双向链表

当线程获取资源失败(比如tryAcquire时试图设置state状态失败),会被构造成一个结点加入CLH队列中,同时当前线程会被阻塞在队列中(通过LockSupport.park实现,其实是等待态)。当持有同步状态的线程释放同步状态时,会唤醒后继结点,然后此结点线程继续加入到对同步状态的争夺中。

Node结点

Node结点是AbstractQueuedSynchronizer中的一个静态内部类,我们捡Node的几个重要属性来说一下。

static final class Node {
        /** 代表共享模式 */
        static final Node SHARED = new Node();
        /** 代表独占模式 */
        static final Node EXCLUSIVE = null;

        /** waitStatus值,表示线程已被取消(等待超时或者被中断) */
        static final int CANCELLED =  1;
        /** waitStatus值,表示后继线程需要被唤醒(unpaking) */
        static final int SIGNAL    = -1;
        /** waitStatus值,表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中 */
        static final int CONDITION = -2;
        /** waitStatus值,表示下一次共享式同步状态会被无条件地传播下去 */
        static final int PROPAGATE = -3;
        /** 等待状态,初始为0 */
        volatile int waitStatus;
        /** 当前结点的前驱结点 */
        volatile Node prev;
        /** 当前结点的后继结点 */
        volatile Node next;
        /** 与当前结点关联的排队中的线程 */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * 是否是共享模式
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /** 
        * 返回当前结点的前驱结点 
        */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
		// Used to establish initial head or SHARED marker
        Node() { }
		 // Used by addWaiter
        Node(Thread thread, Node mode) {    
            this.nextWaiter = mode;
            this.thread = thread;
        }
		// Used by Condition
        Node(Thread thread, int waitStatus) { 
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
成员变量

除了node内部类之外,还需要三个变量保存同步信息。

	//双端队列的头
    private transient volatile Node head;
	//双端队列的尾
    private transient volatile Node tail;
    //同步状态
    private volatile int state;
独占模式——获取同步状态

acquire方法,这是独占模式下获取同步状态的方法。

	public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我们来简单理一下代码逻辑:

  1. 首先,调用使用者重写的tryAcquire方法,若返回true,意味着获取同步状态成功,后面的逻辑不再执行;若返回false,也就是获取同步状态失败,进入步骤2;
  2. 此时,获取同步状态失败,构造独占式同步结点,通过addWatiter将此结点添加到同步队列的尾部(此时可能会有多个线程结点试图加入同步队列尾部,需要以线程安全的方 式添加);
  3. 该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。

addWaiter方法,为获取同步状态失败的线程,构造成一个Node结点,添加到同步队列尾部。

    private Node addWaiter(Node mode) {
    	//构造结点
        Node node = new Node(Thread.currentThread(), mode);
        // //指向尾结点tail
        Node pred = tail;
        //如果尾结点不为空,CAS快速尝试在尾部添加,若CAS设置成功,返回;否则,enq方法。
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

先cas快速设置,若失败,进入enq方法。将结点添加到同步队列尾部这个操作,同时可能会有多个线程尝试添加到尾部,是非线程安全的操作。以上代码可以看出,使用了compareAndSetTail这个cas操作保证安全添加尾结点。

enq方法,当cas快速设置失败时,将一直自旋。

	private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
             //如果队列为空,创建结点,同时被head和tail引用
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //cas设置尾结点,不成功就一直重试
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq内部是个死循环,通过CAS设置尾结点,不成功就一直重试。很经典的CAS自旋的用法。这是一种乐观的并发策略

acquireQueued方法,这个方法是一个自旋的过程,每个节点都会自省地观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//找到当前结点的前驱结点
                final Node p = node.predecessor();
                //如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。
                if (p == head && tryAcquire(arg)) {
                	//获取同步状态成功,将当前结点设置为头结点。
                    setHead(node);
                    // 方便GC
                    p.next = null; 
                    failed = false;
                    return interrupted;
                }
                // 如果没有获取到同步状态,通过shouldParkAfterFailedAcquire判断是否应该阻塞,parkAndCheckInterrupt用来阻塞线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //如果线程被中断则返回true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued内部也是一个死循环,只有前驱结点是头结点的结点,也就是老二结点,才有机会去tryAcquire;若tryAcquire成功,表示获取同步状态成功,将此结点设置为头结点;若是非老二结点,或者tryAcquire失败,则进入shouldParkAfterFailedAcquire去判断判断当前线程是否应该阻塞,若可以,调用parkAndCheckInterrupt阻塞当前线程,直到被中断或者被前驱结点唤醒。若还不能休息,继续循环。

shouldParkAfterFailedAcquire方法,用来判断当前结点线程是否能休息,既是否需要阻塞等待。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	//获取前驱结点的状态 
        int ws = pred.waitStatus;
        //若前驱结点的状态是SIGNAL(已经告诉前驱拿完号后通知自己一下),意味着当前结点可以被安全地park
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
        	// ws>0,只有CANCEL状态ws才大于0。若前驱结点处于CANCEL状态,也就是此结点线程已经无效。会从前驱节点的前驱节点开始向前查找,找到一个非CANCEL状态的结点,将自己设置为它的后继结点。
        	//注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
           // 如果前驱正常,将其设置为SIGNAL状态(告诉它拿完号后通知自己一下。)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

若shouldParkAfterFailedAcquire返回true,也就是当前结点的前驱结点为SIGNAL状态,则意味着当前结点可以放心休息,进入parking状态了。

parkAncCheckInterrupt方法,阻塞线程并处理中断。

	private final boolean parkAndCheckInterrupt() {
		//使用LockSupport使线程进入阻塞状态
        LockSupport.park(this);
        // 线程是否被中断过
        return Thread.interrupted();
    }

selfInterrupt方法,中断当前线程。

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

至此,关于acquire的方法源码已经分析完毕,我们来简单总结下:

  1. 首先tryAcquire获取同步状态,成功则直接返回;否则,进入下一环节;
  2. 线程获取同步状态失败,就构造一个结点,加入同步队列中,这个过程要保证线程安全;
  3. 加入队列中的结点线程进入自旋状态,若是老二结点(即前驱结点为头结点),才有机会尝试去获取同步状态;否则,当其前驱结点的状态为SIGNAL,线程便可安心休息,进入阻塞状态,直到被中断或者被前驱结点唤醒。
独占模式——释放同步状态

当前线程执行完自己的逻辑之后,需要释放同步状态,由于已经获取到了同步状态,所以不需要进行CAS的同步操作,来看看release方法的逻辑。

release方法,释放同步状态。

    public final boolean release(int arg) {
    	//调用使用者重写的tryRelease方法,若成功,唤醒其后继结点,失败则返回false
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            	//唤醒后继结点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unparkSuccessor方法,唤醒后继结点。

    private void unparkSuccessor(Node node) {
        //获取wait状态
        int ws = node.waitStatus;
        if (ws < 0)
        	// 将等待状态waitStatus设置为初始值0
            compareAndSetWaitStatus(node, ws, 0);
		//后继结点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
        	//若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到一个处于正常阻塞状态的结点进行唤醒
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
        	//使用LockSupprot唤醒结点对应的线程
            LockSupport.unpark(s.thread);
    }

唤醒后继节点时,可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,但是为何是从tail尾节点开始,而不是从node.next开始呢?原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。

独占模式——获取同步状态响应中断

AQS提供了acquire(int arg)方法以供独占式获取同步状态,但是该方法对中断不响应,对线程进行中断操作后,该线程会依然位于CLH同步队列中等待着获取同步状态。为了响应中断,AQS提供了acquireInterruptibly(int arg)方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException。

acquireInterruptibly方法,获取同步状态响应中断。

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
        	//如果线程中断抛出异常
            throw new InterruptedException();
        if (!tryAcquire(arg))
        	//获取同步状态方法
            doAcquireInterruptibly(arg);
    }

doAcquireInterruptibly方法,它是获取同步状态主要逻辑代码。

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //这里不在返回boolean,直接抛出异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireInterruptibly(int arg)方法与acquire(int arg)方法仅有两个差别:

  1. 方法声明抛出InterruptedException异常;
  2. 在中断方法处不再是使用interrupted标志,而是直接抛出InterruptedException异常。
独占模式—— 超时获取同步状态

AQS除了提供上面两个方法外,还提供了一个增强版的方法:tryAcquireNanos(int arg,long nanos)。该方法为acquireInterruptibly方法的进一步增强,它除了响应中断外,还有超时控制。即如果当前线程没有在指定时间内获取同步状态,则会返回false,否则返回true。如下:

tryAcquireNanos方法,超时获取同步状态。

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //线程中断,抛出异常    
        if (Thread.interrupted())
            throw new InterruptedException();
        //获取同步状态,如果获取失败进入doAcquireNanos方法    
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

doAcquireNanos方法,

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //nanosTimeout <= 0    
        if (nanosTimeout <= 0L)
            return false;
        //超时时间    
        final long deadline = System.nanoTime() + nanosTimeout;
        //新增Node节点
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
        	//自旋
            for (;;) {
                final Node p = node.predecessor();
                //获取同步状态成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                //获取失败,做超时、中断判断
                //重新计算超时时间
                nanosTimeout = deadline - System.nanoTime();
                //已经超时,返回false
                if (nanosTimeout <= 0L)
                    return false;
                //判断前置接点是否为SIGNAL状态,代表当前线程是否可以进行阻塞处理  
                //这里还要满足剩余的超时时间大约1000,否则就还是会进行自旋  
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //如果中断,抛出异常    
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
共享模式——获取同步状态

共享式地获取同步状态。对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态,其他线程都得去排队等待,其待重写的尝试获取同步状态的方法tryAcquire返回值为boolean,这很容易理解;对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态,这也是“共享”的意义所在。其待重写的尝试获取同步状态的方法tryAcquireShared返回值为int。

我们需要重写的方法是:tryAcquireShared

 protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
  1. 当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取;
  2. 当返回值等于0时,表示获取同步状态成功,但没有可用同步状态了;
  3. 当返回值小于0时,表示获取同步状态失败。

acquireShared方法,共享模式获取同步状态。

    public final void acquireShared(int arg) {
    	//返回值小于0,获取同步状态失败,排队去;获取同步状态成功,直接返回去干自己的事儿。
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

doAcquireShared方法,

    private void doAcquireShared(int arg) {
    	//构造一个共享结点,添加到同步队列尾部。若队列初始为空,先添加一个无意义的傀儡结点,再将新节点添加到队列尾部。
        final Node node = addWaiter(Node.SHARED);
        //是否获取成功
        boolean failed = true;
        try {
        	//线程parking过程中是否被中断过
            boolean interrupted = false;
            //死循环
            for (;;) {
            	//找到前驱结点
                final Node p = node.predecessor();
                //头结点持有同步状态,只有前驱是头结点,才有机会尝试获取同步状态
                if (p == head) {
                	//尝试获取同步装填
                    int r = tryAcquireShared(arg);
                    @font-face {
				font-family: "autolinktags";
				src: url("https://www.seowoai.com/zb_users/plugin/AutoLinkTags/style/fonts/iconfont.woff2") format("woff2"),
				url("https://www.seowoai.com/zb_users/plugin/AutoLinkTags/style/fonts/iconfont.woff") format("woff"),
				url("https://www.seowoai.com/zb_users/plugin/AutoLinkTags/style/fonts/iconfont.ttf") format("truetype");
				font-weight:normal;
				font-style:normal;
			}.tagslink::after { content:"\e613"; margin:2px 0 0 0px; font-size:12px; font-family:"autolinktags"; display:inline-block; vertical-align:top; }															
版权声明

本文仅代表作者观点,不代表百度立场。
本文系作者授权百度百家发表,未经许可,不得转载。

热门文章
  • 机房智能化温湿度解决方式之POE供电以太网温湿度传感器

    机房智能化温湿度解决方式之POE供电以太网温湿度传感器
    机房智能化温湿度解决方式之POE供电以太网温湿度传感器 北京盈创力和电子科技有限公司 智能型TCP网口温湿度记录仪 北京IP网络温湿度记录仪厂家,北京盈创力和 北京智能型TCP网口温湿度记录仪IP网络温湿度记录仪是一种新型的基于TCP/IP协议双绞线以太网标准温湿度采集模块,利用它可以实现现场温度值、相对湿度值的采集,同时利用其自身的RJ45通信接口可以方便地和机房监控主机或交换机集线器进行联网。 工作于-40℃~85℃工业级带...
  • Sequential Monte Carlo Methods (SMC) 序列蒙特卡洛/粒子滤波/Bootstrap Filtering

    Sequential Monte Carlo Methods (SMC) 序列蒙特卡洛/粒子滤波/Bootstrap Filtering
    Problem Statement 我们考虑一个具有马尔可夫性质、非线性、非高斯的状态空间模型(State Space Model):对于一个时间序列上的观测结果{yt,t∈N}\\{ y_t , t \\in N \\}{yt​,t∈N},我们认为每个观测结果yty_tyt​的生成依赖于一个无法直接观察的隐变量xt∈{xt,t∈N}x_t \\in \\{x_t , t \\in N \\}xt​∈{xt​,t∈N},即:p(...
  • HTTP状态保持的原理

    HTTP状态保持的原理
    a)在用户登录之后,浏览器返回响应的时候会在响应中添加上cookieb)浏览器接收到cookie之后会自动保存c)当用户再次请求同一服务器中的其他网页的时候,浏览器会自动带上之前保存的cookied)服务接收到请求之后可以请 request 对象中取到cookie 判断当前用户是否登录  Http是无状态的,就是连接时数据互通,关闭后...
  • Hive 系统函数及示例

    Hive 系统函数及示例
    查看所有系统函数 show functions; 函数分类 内置函数【系统函数】 数学函数: floor、round、ceil、cos、log2等 字符串函数: length、reverse、trim、lower、get_json_object、repeat等 收集函数: size 转换函数: cast 日期函数: year、month、datediff、date、date_add等 条件函数: coalesce、case…w...
  • CSRF的原理和防范措施

    CSRF的原理和防范措施
    a)攻击原理:i.用户C访问正常网站A时进行登录,浏览器保存A的cookieii.用户C再访问攻击网站B,网站B上有某个隐藏的链接或者图片标签会自动请求网站A的URL地址,例如表单提交,传指定的参数iii.而攻击网站B在访问网站A的时候,浏览器会自动带上网站A的cookieiv.所以网站A在接收到请求之后可判断当前用户是登录状态,所以...
标签列表