文章目录
1为什么要使用线程池
给同学们总结了应付面试的要点:
- 降低资源消耗:重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。
2几种常用线程池介绍
大致给出几种常用线程池介绍:
newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。newSingleThreadExecutor: 创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。newCachedThreadPool:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动回收newScheduledThreadPool: 创建一个可以指定线程的数量的线程池,但是这个线程池还带有延迟和周期性执行任务的功能,类似定时器。newWorkStealingPool:适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中。
其实,除了newWorkStealingPool,线程池都是对ThreadPoolExecutor的一层封装,并且,建议大家不要用这些封装的,用底层的ThreadPoolExecutor,这样你就逼着自己去把线程池的一些参数去搞明白!!并且能提供比封装的更多功能!比如监控!
这是阿里开发手册中的建议哦
今天我们就去看下ThreadPoolExecutor中怎么去实现固定的线程数,来消费我们不定量的task。
闲话不多说,让我们从初始化进入看源码的正题:
3从初始化开始
我们先看下初始化(构造)5个参数:
public ThreadPoolExecutor(int corePoolSize,//主线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //线程存活时间(除主线程外,其他的线程在没有任务执行的时候需要回收,多久后回收) TimeUnit unit, //存活时间的时间单位 BlockingQueue<Runnable>workQueue//阻塞队列,我们需要执行的task都在该队列) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
唉?里面的this有7个参数,于是点进this构造方法
public ThreadPoolExecutor(int corePoolSize,//主线程数 int maximumPoolSize,//最大线程数 long keepAliveTime,//线程存活时间(除主线程外,其他的线程在没有任务执行的时候需要回收,多久后回收) TimeUnit unit,//存活时间的时间单位 BlockingQueue<Runnable> workQueue,//阻塞队列,我们需要执行的task都在该队列 ThreadFactory threadFactory,//生成thread的工厂 RejectedExecutionHandler handler//拒绝饱和策略,当队列满了并且线程个数达到maximunPoolSize后采取的策略) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) //对数值传递不合理及最大线程数小于主线程数的情况做异常处理 throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) //这里能看出来这三个参数不能传null或不传 throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null :AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
System.getSecurityManager:这是一个安全管理器,当运行未知的Java程序的时候,该程序可能有恶意代码(删除系统文件、重启系统等),为了防止运行恶意代码对系统产生影响,需要对运行的代码的权限进行控制,这时候就要启用Java安全管理器。这里不必深入,往下需要看很深。。。
总结:初始化(构造函数)就是赋了一些初始值
初始化完成之后,就该执行了
4执行任务execute
public void execute(Runnable command) {
if (command == null)//如果要执行的任务是空的,异常 throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get();//这里是啥见下文解释 //高三位代表线程池的状态,低29位代表线程池中的线程数量 //如果当前线程数小于主线程数,添加线程 if (workerCountOf(c) < corePoolSize) {
//workerCountOf见下文 if (addWorker(command, true))//addWorker才是添加线程的方法 return; c = ctl.get(); } //如果超过主线程数,将任务添加至workqueue 阻塞队列 if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); //如果线程池关闭,移除并拒绝 if (! isRunning(recheck) && remove(command)) reject(command); //否则如果当前线程池线程空,则添加一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //阻塞队列已满,添加失败,采用拒绝策略 else if (!addWorker(command, false)) reject(command); }
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
定义于threadPoolExecutor类中的一个原子性的32位二进制int数值
Java用这个二进制数的高三位代表线程池的状态,低29位代表线程池中的线程数量
workerCountOf是个啥?
private static int workerCountOf(int c) {
return c & CAPACITY; //CAPACITY 与(&) 上面的ctl }
作用是:获取ctl的低29位,获取当前线程池中的线程数
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; //Integer类的一个常量 @Native public static final int SIZE = 32;
总结:这个方法就是做了一些线程的相关判断
5添加线程addWorker
让我们看看添加线程的方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //这是goto语句 下面我会写一个demo讲解这玩意 for (;;) {
//大自旋检查线程池的状态。阻塞队列是否为空等判断 int c = ctl.get(); int rs = runStateOf(c);//线程池运行状态 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;//线程池被关或者任务为null(无任务可执行) for (;;) {
//小自旋 int wc = workerCountOf(c); //如果现有线程数大于最大值,或者大于等于最大线程数(主线程数)返回false //意味着线程都是满的 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //cas添加线程 线程+1 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //如果失败了,继续自旋外层循环判断 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try {
//开启一个线程,Worker实现了runnable接口 w = new Worker(firstTask);//这里点进去看worker的run方法 final Thread t = w.thread; if (t != null) {
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {
// Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //添加至wokers workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally {
mainLock.unlock(); } //添加成功 if (workerAdded) {
t.start();//启动线程,会调用我们线程的run接口,也就是我们worker的run workerStarted = true; } } } finally {
if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
goto语句写法:
retry: for (int i = 0; i < 3; i++) {
for (int j = 3; j < 10; j++) {
//if (j == 4) {
// break retry; //跳出外面循环 // } if(j == 7){
continue retry; //继续外面循环 } System.out.println(i+":"+j); } }
开启这个新的线程会执行这个线程的run方法,上面有写在哪里点进去,点进去后发现:
public void run() {
runWorker(this); }
继续点进去:
6运行新的线程runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try {
//只要一直能获取到task,就一直会执行,不会关闭,所以线程也不会销毁,线程销毁只有当task为null while (task != null || (task = getTask()) != null) {
w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try {
//调用线程执行之前方法,这里可以重写,放一些我们自己的业务前置逻辑 beforeExecute(wt, task); Throwable thrown = null; try {
//调用task的run方法,这里就是去执行我们的业务逻辑了 task.run(); } catch (RuntimeException x) {
thrown = x; throw x; } catch (Error x) {
thrown = x; throw x; } catch (Throwable x) {
thrown = x; throw new Error(x); } finally {
//调用线程执行之后方法,这里可以重写,放一些我们自己的业务后置逻辑 afterExecute(task, thrown); } } finally {
task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally {
processWorkerExit(w, completedAbruptly); } }
总结:通过自旋判断任务是否为空,不为空就去执行,为空就去取任务
7线程回收复用的关键:getTask():
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out? //自旋获取 for (;;) {
int c = ctl.get(); int rs = runStateOf(c);//获取线程池状态 // Check if queue empty only if necessary.必要时检 查空,状态是否停止或者shutdown if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); return null;//线程池状态异常或无任务返回null } //获取线程数量 int wc = workerCountOf(c); // Are workers subject to culling? //线程数大于主线程数时,或者allowCoreThreadTimeOut参数为 true, allowCoreThreadTimeOut默认为false boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //超过最大线程,或者timed为true && (wc大于1个,并且任务队列为空)的时候 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
//线程数-1,并且返回null,该线程结束 if (compareAndDecrementWorkerCount(c)) return null; continue; } try {
//线程复用的关键就在这里了↓↓↓↓↓↓↓↓↓↓↓↓ //如果timed是true,超过时间不阻塞,不然一直阻塞,不回收 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //移除并返回队列头部的元素,如果为空,超过时间返回null workQueue.take();//移除并返回队列头部的元素,如果为空,一直阻塞 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) {
timedOut = false; } } }
8线程的回收
threadPoolExecutor.allowCoreThreadTimeOut(true);
9线程的复用
10超过核心线程数小于最大线程数的那一撮所谓的临时线程
如果你只是背了八股文,各种所谓的面试秘籍会告诉你线程池有核心线程有临时线程,并发高时会创建临时线程帮忙,并发低时销毁这些临时线程,面试官问你?哪些是临时线程?你要是按八股文这样回答那就说明你没用过(起码没点进去看过代码),面试铁定是挂了。
其实没有什么临时线程,所谓的核心线程数是要保留几个线程
假如我们设置了核心数为3,最大数为10.
并发量刚经历一波高峰期,线程数量为我们的最大线程数10,然后这段时间并发很低,那超过核心线程数的线程会被回收,回收的是哪7个?
谁先执行完,谁先被回收被回收的就是所谓的临时的,最后剩下的3个就是核心的线程
核心线程只是几个一直被阻塞等待任务的线程而已
可能上一波并发高峰它还不是核心线程,但是它跑得慢,于是被留下来当核心了,下一波并发高峰,它先跑了,于是核心线程又换成了另外一批。
11拒绝策略
//阻塞队列已满,添加失败,采用拒绝策略 else if (!addWorker(command, false)) reject(command);
final void reject(Runnable command) {
handler.rejectedExecution(command, this); }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + //就是这个错误 " rejected from " + e.toString()); } }
只要实现一下RejectedExecutionHandler这个接口就可以了
public class ExecJavaTemplate implements RejectedExecutionHandler {
@Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//在这里根据自己的业务写一个暂存的逻辑,然后塞回去即可 //比较懒的话暂存策略也不用写,不断地塞回去就行,慢就慢点,别让它报错即可 System.out.println("进入拒绝策略"); executor.execute(r); //再次调用execute } }

那有人说了阻塞队列我设置成无限大,不就不会有上面的问题了。
如果阻塞队列是无限的,会发生什么?
首先我想到的就是OOM了,你的队列无限大,内存够不够?
不够你的内存不久溢出了,项目挂掉然后紧急排查错误,发现你这个开发将队列设置为无限大
好嘛,公司严格的话给你一个线上一级错误警告处理,这找谁说理去
12线程设置多少合理
线程数 = CPU可用核心数/(1 – 阻塞系数),其中阻塞系数的取值在0和1之间。阻塞系数=阻塞时间/(阻塞时间+计算时间)。
- 线程的 CPU 耗时所占比例越高,就需要越少的线程
- 线程的 IO 耗时所占比例越高,就需要越多的线程
- 针对不同的程序,进行对应的实际测试就可以得到最合适的选择
- 线程数 >= CPU 核心数
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/222851.html原文链接:https://javaforall.net
