从源码学习线程池的使用原理及核心思想解析

从源码学习线程池的使用原理及核心思想解析我们都知道线程的作用 能够异步处理任务 并且能处理多个任务 但是无限制的使用线程 线程之间的创建 销毁 切换 都会带来一定的消耗 所以 为了控制线程的数量 复用已有线程 同时减少线程切换带来的开销 线程池这种池化技术就出来了 给同学们总结了应付面试的要点 线程池核心设计思想 固定的线程数 来消费我们不定量的 task 本文是对源码层面对线程池解析 有关线程池的使用 大家可以移步这篇文章 链接 Java 并发编程 四种线程池的使用及分析大致给出几种常用线程池介绍 其实 除了 newWo

1为什么要使用线程池

给同学们总结了应付面试的要点:

  1. 降低资源消耗:重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性:使用线程池可以进行统一的分配、调优和监控。
  4. 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。

2几种常用线程池介绍

大致给出几种常用线程池介绍:

  • newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。
  • newSingleThreadExecutor: 创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
  • newCachedThreadPool:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动回收
  • newScheduledThreadPool: 创建一个可以指定线程的数量的线程池,但是这个线程池还带有延迟和周期性执行任务的功能,类似定时器。
  • newWorkStealingPool:适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中。

  其实,除了newWorkStealingPool,线程池都是对ThreadPoolExecutor的一层封装,并且,建议大家不要用这些封装的,用底层的ThreadPoolExecutor,这样你就逼着自己去把线程池的一些参数去搞明白!!并且能提供比封装的更多功能!比如监控!
  这是阿里开发手册中的建议哦

  今天我们就去看下ThreadPoolExecutor中怎么去实现固定的线程数,来消费我们不定量的task。

闲话不多说,让我们从初始化进入看源码的正题:

3从初始化开始

我们先看下初始化(构造)5个参数:

public ThreadPoolExecutor(int corePoolSize,//主线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //线程存活时间(除主线程外,其他的线程在没有任务执行的时候需要回收,多久后回收) TimeUnit unit, //存活时间的时间单位 BlockingQueue<Runnable>workQueue//阻塞队列,我们需要执行的task都在该队列) { 
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } 

唉?里面的this有7个参数,于是点进this构造方法

 public ThreadPoolExecutor(int corePoolSize,//主线程数 int maximumPoolSize,//最大线程数 long keepAliveTime,//线程存活时间(除主线程外,其他的线程在没有任务执行的时候需要回收,多久后回收) TimeUnit unit,//存活时间的时间单位 BlockingQueue<Runnable> workQueue,//阻塞队列,我们需要执行的task都在该队列 ThreadFactory threadFactory,//生成thread的工厂 RejectedExecutionHandler handler//拒绝饱和策略,当队列满了并且线程个数达到maximunPoolSize后采取的策略) { 
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) //对数值传递不合理及最大线程数小于主线程数的情况做异常处理 throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) //这里能看出来这三个参数不能传null或不传 throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null :AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 

System.getSecurityManager:这是一个安全管理器,当运行未知的Java程序的时候,该程序可能有恶意代码(删除系统文件、重启系统等),为了防止运行恶意代码对系统产生影响,需要对运行的代码的权限进行控制,这时候就要启用Java安全管理器。这里不必深入,往下需要看很深。。。

总结:初始化(构造函数)就是赋了一些初始值

初始化完成之后,就该执行了

4执行任务execute

 public void execute(Runnable command) { 
    if (command == null)//如果要执行的任务是空的,异常 throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get();//这里是啥见下文解释 //高三位代表线程池的状态,低29位代表线程池中的线程数量  //如果当前线程数小于主线程数,添加线程 if (workerCountOf(c) < corePoolSize) { 
   //workerCountOf见下文 if (addWorker(command, true))//addWorker才是添加线程的方法 return; c = ctl.get(); } //如果超过主线程数,将任务添加至workqueue 阻塞队列 if (isRunning(c) && workQueue.offer(command)) { 
    int recheck = ctl.get(); //如果线程池关闭,移除并拒绝 if (! isRunning(recheck) && remove(command)) reject(command); //否则如果当前线程池线程空,则添加一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //阻塞队列已满,添加失败,采用拒绝策略 else if (!addWorker(command, false)) reject(command); } 
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 

定义于threadPoolExecutor类中的一个原子性的32位二进制int数值
Java用这个二进制数的高三位代表线程池的状态,低29位代表线程池中的线程数量

workerCountOf是个啥?

private static int workerCountOf(int c) { 
    return c & CAPACITY; //CAPACITY 与(&) 上面的ctl } 

作用是:获取ctl的低29位,获取当前线程池中的线程数

 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; //Integer类的一个常量 @Native public static final int SIZE = 32; 

总结:这个方法就是做了一些线程的相关判断

5添加线程addWorker

让我们看看添加线程的方法:

 private boolean addWorker(Runnable firstTask, boolean core) { 
    retry: //这是goto语句 下面我会写一个demo讲解这玩意 for (;;) { 
   //大自旋检查线程池的状态。阻塞队列是否为空等判断 int c = ctl.get(); int rs = runStateOf(c);//线程池运行状态 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;//线程池被关或者任务为null(无任务可执行) for (;;) { 
   //小自旋 int wc = workerCountOf(c); //如果现有线程数大于最大值,或者大于等于最大线程数(主线程数)返回false //意味着线程都是满的 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //cas添加线程 线程+1 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //如果失败了,继续自旋外层循环判断 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { 
    //开启一个线程,Worker实现了runnable接口 w = new Worker(firstTask);//这里点进去看worker的run方法 final Thread t = w.thread; if (t != null) { 
    final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { 
    // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { 
    if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //添加至wokers workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { 
    mainLock.unlock(); } //添加成功 if (workerAdded) { 
    t.start();//启动线程,会调用我们线程的run接口,也就是我们worker的run workerStarted = true; } } } finally { 
    if (! workerStarted) addWorkerFailed(w); } return workerStarted; } 

goto语句写法:

 retry: for (int i = 0; i < 3; i++) { 
    for (int j = 3; j < 10; j++) { 
    //if (j == 4) { 
    // break retry;  //跳出外面循环 //  } if(j == 7){ 
    continue retry; //继续外面循环  } System.out.println(i+":"+j); } } 

开启这个新的线程会执行这个线程的run方法,上面有写在哪里点进去,点进去后发现:

 public void run() { 
    runWorker(this); } 

继续点进去:

6运行新的线程runWorker

 final void runWorker(Worker w) { 
    Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { 
    //只要一直能获取到task,就一直会执行,不会关闭,所以线程也不会销毁,线程销毁只有当task为null while (task != null || (task = getTask()) != null) { 
    w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { 
    //调用线程执行之前方法,这里可以重写,放一些我们自己的业务前置逻辑 beforeExecute(wt, task); Throwable thrown = null; try { 
    //调用task的run方法,这里就是去执行我们的业务逻辑了 task.run(); } catch (RuntimeException x) { 
    thrown = x; throw x; } catch (Error x) { 
    thrown = x; throw x; } catch (Throwable x) { 
    thrown = x; throw new Error(x); } finally { 
    //调用线程执行之后方法,这里可以重写,放一些我们自己的业务后置逻辑 afterExecute(task, thrown); } } finally { 
    task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { 
    processWorkerExit(w, completedAbruptly); } } 

总结:通过自旋判断任务是否为空,不为空就去执行,为空就去取任务

7线程回收复用的关键:getTask():

 private Runnable getTask() { 
    boolean timedOut = false; // Did the last poll() time out? //自旋获取 for (;;) { 
    int c = ctl.get(); int rs = runStateOf(c);//获取线程池状态 // Check if queue empty only if necessary.必要时检 查空,状态是否停止或者shutdown if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 
    decrementWorkerCount(); return null;//线程池状态异常或无任务返回null } //获取线程数量 int wc = workerCountOf(c); // Are workers subject to culling? //线程数大于主线程数时,或者allowCoreThreadTimeOut参数为 true, allowCoreThreadTimeOut默认为false boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //超过最大线程,或者timed为true && (wc大于1个,并且任务队列为空)的时候 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { 
    //线程数-1,并且返回null,该线程结束 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { 
    //线程复用的关键就在这里了↓↓↓↓↓↓↓↓↓↓↓↓ //如果timed是true,超过时间不阻塞,不然一直阻塞,不回收 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //移除并返回队列头部的元素,如果为空,超过时间返回null workQueue.take();//移除并返回队列头部的元素,如果为空,一直阻塞 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { 
    timedOut = false; } } } 

8线程的回收

 threadPoolExecutor.allowCoreThreadTimeOut(true); 

9线程的复用

10超过核心线程数小于最大线程数的那一撮所谓的临时线程

如果你只是背了八股文,各种所谓的面试秘籍会告诉你线程池有核心线程有临时线程,并发高时会创建临时线程帮忙,并发低时销毁这些临时线程,面试官问你?哪些是临时线程?你要是按八股文这样回答那就说明你没用过(起码没点进去看过代码),面试铁定是挂了。

其实没有什么临时线程,所谓的核心线程数是要保留几个线程

假如我们设置了核心数为3,最大数为10.

并发量刚经历一波高峰期,线程数量为我们的最大线程数10,然后这段时间并发很低,那超过核心线程数的线程会被回收,回收的是哪7个?

谁先执行完,谁先被回收被回收的就是所谓的临时的,最后剩下的3个就是核心的线程
核心线程只是几个一直被阻塞等待任务的线程而已
可能上一波并发高峰它还不是核心线程,但是它跑得慢,于是被留下来当核心了,下一波并发高峰,它先跑了,于是核心线程又换成了另外一批。




11拒绝策略

 //阻塞队列已满,添加失败,采用拒绝策略 else if (!addWorker(command, false)) reject(command); 
 final void reject(Runnable command) { 
    handler.rejectedExecution(command, this); } 
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
    throw new RejectedExecutionException("Task " + r.toString() + //就是这个错误 " rejected from " + e.toString()); } } 

只要实现一下RejectedExecutionHandler这个接口就可以了

public class ExecJavaTemplate implements RejectedExecutionHandler { 
    @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
    //在这里根据自己的业务写一个暂存的逻辑,然后塞回去即可 //比较懒的话暂存策略也不用写,不断地塞回去就行,慢就慢点,别让它报错即可 System.out.println("进入拒绝策略"); executor.execute(r); //再次调用execute } } 

在这里插入图片描述
那有人说了阻塞队列我设置成无限大,不就不会有上面的问题了。
如果阻塞队列是无限的,会发生什么?
首先我想到的就是OOM了,你的队列无限大,内存够不够?
不够你的内存不久溢出了,项目挂掉然后紧急排查错误,发现你这个开发将队列设置为无限大
好嘛,公司严格的话给你一个线上一级错误警告处理,这找谁说理去










12线程设置多少合理

线程数 = CPU可用核心数/(1 – 阻塞系数),其中阻塞系数的取值在0和1之间。阻塞系数=阻塞时间/(阻塞时间+计算时间)。

  1. 线程的 CPU 耗时所占比例越高,就需要越少的线程
  2. 线程的 IO 耗时所占比例越高,就需要越多的线程
  3. 针对不同的程序,进行对应的实际测试就可以得到最合适的选择
  4. 线程数 >= CPU 核心数
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/222851.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午2:52
下一篇 2026年3月17日 下午2:53


相关推荐

  • 2013成都邀请赛J称号||HDU4725 The Shortest Path in Nya Graph(spfa+slf最短的优化)

    2013成都邀请赛J称号||HDU4725 The Shortest Path in Nya Graph(spfa+slf最短的优化)

    2022年1月3日
    65
  • 背英语句子,来巧记单词[通俗易懂]

    背英语句子,来巧记单词[通俗易懂]WithmyownearsIclearlyheardtheheartbeatofthenuclearbomb.我亲耳清楚地听到原子弹的心脏的跳动。Nextyearthebeardedbearwillbearadearbabyintherear.明年,长胡子的熊将在后方产一头可爱的小崽.EarlyIsearchedthroughtheearthforearthwaresoastoresearchinearthqua.

    2022年8月24日
    17
  • Maskrcnn中resnet50改为resnet34「建议收藏」

    Maskrcnn中resnet50改为resnet34「建议收藏」因需要训练的数据集并不复杂,resnet50的结构有点冗余,于是就把maskrcnn的backbone从resnet50改为resnet34结构。找到model文件,将resnet50部分代码做一定的修改,就可以得到resnet34的相关代码下面是相关代码:##con_block修改为conv_block0并添加到model文件中defconv_block0(input_tensor…

    2022年10月6日
    7
  • 数据库引擎及区别

    数据库引擎及区别数据库引擎是用于存储 处理和保护数据的核心服务 利用数据库引擎可控制访问权限并快速处理事务 从而满足企业内大多数需要处理大量数据的应用程序的要求 数据库应用项目是通过数据库引擎与数据库链接的 何为数据库引擎呢 简而言之 数据库引擎就是驱动各种数据库的程序 它负责处理数据库相关工作的整个核心部份 同样的 数据库应用项目的操作指令 均会通过数据库引擎的处理作用到数据库上 数据

    2026年3月17日
    1
  • WLAN 与WIFI的区别?

    WLAN 与WIFI的区别?一、WIFIWIFI是一种可以将个人电脑、手持设备(如PDA、手机)等终端以无线方式互相连接的技术。WIFI技术与蓝牙技术一样,同属于在办公室和家庭中使用的短距离无线技术。Wi-Fi原先是无线保真的缩写,Wi-Fi的英文全称为wirelessfidelity,在无线局域网的范畴是指“无线相容性认证”,实质上是一种商业认证,同时也是一种无线联网的技术,以前通过网线连接电脑,而现在则…

    2022年7月11日
    20
  • 5g切片隔离原理_5G切片编排器

    5g切片隔离原理_5G切片编排器5G网络切片安全隔离机制与应用*毛玉欣1,陈林2,游世林1,闫新成1,吴强1【摘要】介绍了满足多样化垂直行业应用的5G网络服务化架构和网络切片实现。针对5G网络架构重构、网络部署形态的变化,研究提出了网络切片端到端安全隔离的实现方法,包括切片在接入网络、承载网络和核心网络中的隔离实现。结合典型行业应用的要求,给出了定制化切片的隔离实现案例。【关键词】垂直行业;服务化架构;网络切片;切片隔离引用格式:毛玉欣,陈林,游世林,等.5G网络切片安全隔离机制与应用[J].移动通信,2019,4

    2026年4月17日
    8

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号