并发编程之深入理解Condition

并发编程之深入理解Condition

在并发编程中的开发中,我们难免会使用到等待通知模式,比如我们生产者消费者模式中,当生产者生产的东西填满了容器,则需要停止生产,当消费者把容器内的东西消费完了,也需要停止消费,同样的当容器内有新的东西生产出来,会通知消费者继续生产。可能我们平时使用synchronized比较多,一般我们使用使用object.wait()和object.notify()、notifyAll()。然而今天我们一起学习的是当我们使用jdk提供的并发编程的Lock实现等待通知模式,此时我们就需要使用Condition来实现—条件等待通知。

一、condition的简单使用

如果我们之前没用过condition,那么先来了解一下如何使用,其实和synchronized结合object.wait()和object.notify()思想是一致的。这里举了一个实例,面包生产者生产面包扔到容器中和面包消费者从容器中取出面包进行消费,当容器满了则生产者阻塞,停止生产,直到容器不满是被唤醒;当容器空了,停止消费,直到容器不空被唤醒。

简单实例代码如下:

BreadContainer.java

package com.taolong.concurrent.condition;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/** * @Author taolong.hong * @Date 2020/5/10 16:12 * @Version 1.0 * @Description 装面包的容器 */
public class BreadContainer {
   

    //锁
    private final ReentrantLock lock;

    private final Condition condition;

    private final List<Bread> breadList;

    private final int containerSize;

    private static final int MAX_SIZE = 20;


    public BreadContainer(int containerSize) {
   
        this.lock = new ReentrantLock();
        condition = lock.newCondition();
        if (containerSize <=0 || containerSize > MAX_SIZE){
   
            this.containerSize = MAX_SIZE;
        }else{
   
            this.containerSize = containerSize;
        }
        breadList = new ArrayList<>(containerSize);
    }

    /*** * 往容器里添加生产的面包 */
    public void produceBread(Bread bread){
   
       lock.lock();
       try {
   
           //已经装满了,需要等待,并且唤醒阻塞的线程
           while (breadList.size() == containerSize){
   
               System.out.println("容器已经满了,生产者停止生产...");
               condition.await();
           }
           System.out.println("正在往容器里添加 id="+ bread.getId()+"的面包");
           breadList.add(bread);
           condition.signal();
       } catch (InterruptedException e) {
   
           e.printStackTrace();
       } finally {
   
           lock.unlock();
       }
    }

    /*** * 从容器里拿出面包 */
    public void consumeBread(){
   
        lock.lock();
        try {
   
            //当容器里没有面包则停止消费...
            while(breadList.size() == 0){
   
                System.out.println("当前容器没有面包,停止消费...");
                condition.await();
            }
            Bread bread = breadList.get(0);
            breadList.remove(0);
            System.out.println("正在消费id="+bread.getId()+"的面包");
            condition.signal();
        } catch (InterruptedException e) {
   
            e.printStackTrace();
        } finally {
   
            lock.unlock();
        }
    }


}

Bread.java

package com.taolong.concurrent.condition;

/** * @Author taolong.hong * @Date 2020/5/8 16:15 * @Version 1.0 * @Description 面包 */
public class Bread {
   

    private String name;

    private final int id;

    public Bread(int id) {
   
        this.id = id;
    }

    public String getName() {
   
        return name;
    }

    public void setName(String name) {
   
        this.name = name;
    }

    public int getId() {
   
        return id;
    }


}

ConditionTest.java

package com.taolong.concurrent.condition;

/** * @Author taolong.hong * @Date 2020/5/8 16:39 * @Version 1.0 * @Description */
public class ConditionTest {
   

    public static void main(String[] args) {
   
        BreadContainer breadContainer = new BreadContainer(20);
        Thread producer = new Thread(new BreadProducer(breadContainer));
        Thread consumer = new Thread(new BreadConsumer(breadContainer));
        producer.start();
        consumer.start();
    }


    /** * 面包生产者 */
    static class BreadProducer implements Runnable{
   
        final BreadContainer breadContainer;

        BreadProducer(BreadContainer breadContainer){
   
            this.breadContainer = breadContainer;
        }

        @Override
        public void run() {
   
            //生产100000个面包
            for (int i = 0; i < 100000; i++) {
   
                Bread bread = new Bread(i);
                breadContainer.produceBread(bread);
            }
        }
    }


    /*** * 面包消费者 */
    static class BreadConsumer implements Runnable{
   

        final BreadContainer breadContainer;

        BreadConsumer(BreadContainer breadContainer){
   
            this.breadContainer = breadContainer;
        }

        @Override
        public void run() {
   
            int i = 0;
            //消费100000个面包
            while(i<100000){
   
                breadContainer.consumeBread();
                i++;
            }
        }
    }
}

上面只是一个简单的应用实例,介绍如何使用

二、Condition原理

前面简单介绍了Condition的使用,现在开始介绍Condition的原理,本节主要结合图和文字描述condition的原理,下一节将从源码的角度分析。在分析condition的原理之前,最好先了解AQS的原理,我之前有写过一篇文章,如果不了解的可以参考(深入理解ReentrantLock和AQS)。先看下面一副condition的原理图

在这里插入图片描述

这里我用不同的颜色线条和文字标注了对应的关系,这里为什么有两个condition等待队列,是因为一个lock可以有多个不同的阻塞条件,比如大家可以参考ArrayBlockingQueue的原理,里面就定义了一个lock,两个condition(notEmpty,notFull)用于提升性能

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

下面我用下面的问题来解释上面的逻辑图

1、当调用condition.await()方法时,AQS同步队列中获取锁的线程(一般是Header指向的Node)会释放锁(state同步状态置为0),同时创建一个Node节点封装thread信息,加入到condition等待队列的队尾(如果有多个condition,哪一个condition调用了就加入到对应condition的队尾)

2、AQS同步队列中释放锁之后,会唤醒同步队列下面一个Node节点,让其去竞争锁资源(修改同步状态state)

3、当AQS同步队列中持有锁的线程调用condition.signal()时,则会将condition等待队列中的第一个节点Node加入到同步队列的队尾(当然也要用cas咯),如果调用的是condition.signalAll()则会将该condition队列的所有的节点唤醒加入到AQS的队尾,后面的逻辑就和之前分析AQS的逻辑一致。

这里一定要注意,一个lock可以有多个condition,也就意味着有多个condition等待队列,调用不同的condition则处理不同的conditin的等待队列

到这里相信结合图和文字大家对condition的原理已经有了一个比较深刻的了解了,下面开始分析condition的源码

三、深入源码分析

condition是在AbstractQueuedSynchronizer内部类ConditionObject,它实现了Condition接口。我们今天主要分析它的await()和signal()方法,其他的方法大家可自行研究

1、condition.await()

public final void await() throws InterruptedException {
   
    if (Thread.interrupted())
        throw new InterruptedException();
    //1.构建Node节点,加入到condition等待队列
    Node node = addConditionWaiter();
    //2.释放全部锁(重入的话,会全部释放)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //3.判断node是否在AQS同步队列中,如果不在则阻塞
    while (!isOnSyncQueue(node)) {
   
        //阻塞
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //4.下面的逻辑就是在AQS同步队列中了,和之前分析的AQS一样的
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

我们详细来分析下上面的注释部分,其实不难。

注释1:构建Node节点,加入到condition等待队列

private Node addConditionWaiter() {
   
    //1.判断下当前线程事否是获取到锁的线程,只有获取锁的线程才能调用await方法
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    //2.清除已经取消的节点,里面有个while循环,判断节点状态,如果已取消,则剔除队列,大家自行阅读
    if (t != null && t.waitStatus != Node.CONDITION) {
   
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    //3.创建节点,注意下这里的状态是CONDITION
    Node node = new Node(Node.CONDITION);
	//4.如果队列中还没有,则该节点就是第一个
    if (t == null)
        firstWaiter = node;
    //否则加入到队列尾部
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

注释2:释放全部锁(重入的话,会全部释放)

这里就不细看了,简单的描述一下:就是将当前线程持有的这个lock全部释放,如果有重入(假如state=2)说明对该锁获取了两次,那么都要释放,说白了就是放弃对这个锁的持有;另外会记录这个state的次数,当这个线程再次获取锁时这个线程要恢复这个锁状态(依然state=2),因为这个线程要在它park的地方继续执行。

注释3:判断node是否在AQS同步队列中,如果不在则阻塞

final boolean isOnSyncQueue(Node node) {
   
    //第一次刚创建的肯定返回false 
    if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */
        return findNodeFromTail(node);
    }

这里判断的是该node是否在同步队列AQS中(注意不是condition队列哦),新创建出来的肯定是返回false,那么while(!isOnSyncQueue(node))就返回true,进循环体阻塞,当被唤醒时,它就会加入到AQS同步队列了,细节代码大家自行阅读。

注释4:下面的逻辑就是在AQS同步队列中了,和之前分析的AQS一样的

这里就不再继续啰嗦了,就是跟获取锁的流程一样,无非这个state的值可能不是1,而是之前释放是记录的(有可能之前有重入),不熟悉的可以参考我之前的一片文章
深入理解ReentrantLock和AQS

上面代码总逻辑分析如下

(1)创建节点,并且加入到condition的等待队列的队尾

(2)当前线程需要释放同步状态(将state置为0),并且唤醒AQS后继节点,让其竞争锁

(3)判断node是否在AQS中,此时node一般是下面几种情况

在condition队列中:处于等待队列中,状态为CONDITION,需要等待调用signal,加入到AQS同步队列

在AQS同步队列中:在AQS队列中,如果被前一个节点唤醒,则有竞争锁的资格,状态为SIGNAL,一般是调用condition.signal()后从condition的等待队列转移到AQS队列中

取消状态:过时或者状态为取消状态CANCELED,则会剔除出队列

上面awiat()方法已经基本上分析完了,接下来我们来看看signal方法

此时上面condition.await()中的while循环会阻塞,直到调用condition.signal()或者condition.signalAll(),将该node从condition等待队列中移到AQS同步队列的尾部。接下来我们就看condition.signal()方法

4、condition.signal()

/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */
public final void signal() {
   
    //1.只有持有锁的线程,才能调用signal,首先判断
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //2.直接看这个方法
        doSignal(first);
}

这里很简单,首先判断当前线程是否持有锁,如果没有持有则会抛异常,也就是说持有锁(同步状态)的线程才有资格调用signal()方法,下面看doSignal()

5、doSignal()

/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */
private void doSignal(Node first) {
   
    do {
   
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

这里也很简单,使用do-while,其实就是从condition的第一个node节点开始,把一个node节点转移到AQS同步队列中,如果失败,则再找下一个…依次循环(注意这里是转移一个,如果转移成功后就退出循环),接着看transferForSignal()

6、transferForSignal()

final boolean transferForSignal(Node node) {
   
    /* * If cannot change waitStatus, the node has been cancelled. */
    //1.首先要将node节点状态从CONDITION改变成0,失败则认为该节点被取消,则剔除
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
    //2.使用自旋的cas将node节点加入到AQS队列中,返回的是node的前一个节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //前面一个节点不管是CANCELLED状态还是设置signal失败,都唤醒node节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //唤醒节点
        LockSupport.unpark(node.thread);
    return true;
}

这段逻辑也不难,首先使用cas将node从CONDITION状态变成0,如果失败则表示该节点已取消(CANCELLED),不用转移到AQS同步队列,否则使用enq(node)自旋cas加入到AQS同步队列,这个enq(node)方法在之前获取锁失败,加入AQS同步队列时也调用了,之前的文章分析过,这里不再继续分析。接着看最后一个if条件,前一个节点的状态>0(CANNELLED)或者使用cas尝试将前一个节点的状态修改成SIGNAL,失败后则唤醒当前节点,为什么要这么做呢?

(1)如果前一个节点时SIGNAL状态,则不需要唤醒当前节点,因为AQS队列自动会通知后继节点(如果同之前没有取消的话),能保证当前节点一定会被唤醒

(2)如果前一个节点时取消,或者无法将前一个节点修改成SIGNAL状态,则当前节点有可能唤醒不了,因为前一个节点不会通知它,此时就需要手动唤醒,当手动唤醒后,该节点会执行下面的方法,将它前面取消状态(CANCELED)移除,这样就保证自己有机会被唤醒竞争锁。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
   
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /* * This node has already set status asking a release * to signal it, so it can safely park. */
        return true;
    if (ws > 0) {
   
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
   
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
   
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

这个方法在之前的文章分析过,这里不再继续分析。到这里基本上所有的逻辑都执行完了,此时该节点已经正常进入了同步AQS队列,和之前竞争锁失败进入同步AQS队列效果是一样的。后面竞争锁的逻辑也是一样的,这里就不再继续分析。

前面描述的在之前的文章分析过,该文章就是
深入理解ReentrantLock和AQS),本文就分享到这里,希望可以给大家带来一点帮助,同时如果有错误,欢迎大家指正!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/111209.html原文链接:https://javaforall.net

(0)
上一篇 2021年8月3日 下午9:00
下一篇 2021年8月3日 下午10:00


相关推荐

  • ubuntu 自动挂起_ubuntu 的挂起与休眠

    ubuntu 自动挂起_ubuntu 的挂起与休眠待机计算机将目前的运行状态等数据存放在内存 关闭硬盘 外设等设备 进入等待状态 此时内存仍然需要电力维持其数据 但整机耗电很少 恢复时计算机从内存读出数据 回到挂起前的状态 恢复速度较快 一般笔记本在电池无故障且充满的情况下可以支持这种挂起数小时甚至数天 依具体机型有差别 其他名称 Suspend STR SuspendToRAM 挂起 挂起到内存休眠计算机将目前的运行状态等数据存

    2025年10月11日
    7
  • 传奇——用回忆感受幸福,用记忆寻找快乐

    传奇——用回忆感受幸福,用记忆寻找快乐

    2021年7月27日
    62
  • Oracle NUMBER类型细讲

    Oracle NUMBER类型细讲1>.NUMBER类型细讲:Oraclenumberdatatype语法:NUMBER[(precision[,scale])]简称:precision–>p     scale    –>sNUMBER(p,s)范围:1保存数据范围:-1.0e-130保存在机器内部的范围:1~22bytes有效为:从左边第一个不为0的数算起的位数。s的情况:s>0  

    2022年7月24日
    12
  • navicat 15 for mysql.ink激活码【在线破解激活】

    navicat 15 for mysql.ink激活码【在线破解激活】,https://javaforall.net/100143.html。详细ieda激活码不妨到全栈程序员必看教程网一起来了解一下吧!

    2022年3月16日
    74
  • 一起学JAVA 反射学习(超详细)

    一起学JAVA 反射学习(超详细)1什么是反射?Reflection(反射)是Java程序开发语言的特征之一,它允许运行中的Java程序对自身进行检查,或者说“自审”,也有称作“自省”。反射非常强大,它甚至能直接操作程序的私有属性。我们前面学习都有一个概念,被private封装的资源只能类内部访问,外部是不行的,但这个规定被反射赤裸裸的打破了。反射就像一面镜子,它可以在运行时获取一个类的所有信息,可以获取到任何定义的信息(包括成员变量,成员方法,构造器等),并且可以操纵类的字段、方法、构造器等部分。2为什么需要反射?

    2022年6月11日
    36
  • 什么是 Sidecar[通俗易懂]

    什么是 Sidecar[通俗易懂]Sidecar是什么将本将属于应用程序的功能拆分成单独的进程,这个进程可以被理解为Sidecar。在微服务体系内,将集成在应用内的微服务功能剥离到了sidecar内,sidecar提供了微服务发现、注册,服务调用,应用认证,限速等功能。特点:Sidecar为独立部署的进程。sidecar降低应用程序代码和底层代码的耦合度,帮助异构服务通过sidecar快速接入微服务体系。Sidecar如何工作接下来以异构服务为基础介绍sidecar如何工作。Sidecar代理服务注册发现下图为异构

    2022年4月19日
    59

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号