ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
一般来说, 默认的这4种拒绝策略, 在一些场景是不适用的, 比如我们不想丢弃任何任务, 那么前3种拒绝策略就不适用了
第4种拒绝策略比较特殊, 当执行该策略后, 会把当前线程交给 提交任务的线程 去执行这个任务, 比如说你是通过 主线程 开启的线程任务, 那么这个拒绝策略就是把 被线程池拒绝执行的任务, 反手交给 主线程执行, 并发量高的场景下, 会阻塞主线程, 所以也不是很适用, 下面使用伪代码自定义拒绝策略
LinkedBlockingQueue在实现 “多线程对竞争资源的互斥访问” 时, 对于该类中的 put 和 take 方法操作分别使用了不同的锁, 对于 put 操作, 通过 “插入锁 putLock” 进行同步, 对于取出操作, 通过 “取出锁takeLock” 进行同步
若某线程(代码中模拟是 SchedulerTask)要取出数据时, 队列正好为空, 则该线程会执 notEmpty.await()进行等待, 当其它某个线程(代码中模拟是线程池的拒绝任务)向队列中插入了数据之后, 会调用 notEmpty.signal() 唤醒 “notEmpty上的等待线程”, 此时, (代码中模拟是 SchedulerTask)会被唤醒从而得以继续运行, 此外, (代码中模拟是 SchedulerTask)在执行取操作前, 会获取 takeLock, 在取操作执行完毕再释放 takeLock
若某线程(代码中模拟是线程池的拒绝任务)要插入数据时, 队列已满, 则该线程会它执行 notFull.await() 进行等待, 当其它某个线程(代码中模拟是 SchedulerTask)取出数据之后,会调用notFull.signal() 唤醒 “notFull上的等待线程”, 此时, (代码中模拟是线程池的拒绝任务)就会被唤醒从而得以继续运行, 此外, (代码中模拟是线程池的拒绝任务)在执行插入操作前,会获取 putLock, 在插入操作执行完毕才释放 putLock
2.1 首先自定义拒绝策略实现
自定义一个类实现 java.util.concurrent.RejectedExecutionHandler 接口
public class CustomizeRejectionPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r != null) { // 线程池没来得及执行的任务先放入队列 ThreadReader.put(r); } } }
2.2 实现一个中间类, 用于声明 LinkedBlockingQueue
@Component public class ThreadReader { private static final Logger logger = LoggerFactory.getLogger(ThreadReader.class); private static final BlockingQueue
BLOCKING_QUEUE = new LinkedBlockingQueue<>(); @Qualifier("thread-pool") @Autowired private ThreadPoolExecutor threadPoolExecutor; public static void put(Runnable runnable) { BlockingQueue
blockingQueue = getBlockingQueue(); try { blockingQueue.put(runnable); } catch (InterruptedException e) { // 忽略 } } public void take() { BlockingQueue
blockingQueue = getBlockingQueue(); if (CollectionUtils.isEmpty(blockingQueue)) { return; } try { Runnable runnable = blockingQueue.take(); logger.info(LogUtils.format("取出当前线程池没来得及执行的任务, runnable=<{0}>", runnable)); threadPoolExecutor.execute(runnable); } catch (InterruptedException e) { // } } public static BlockingQueue
getBlockingQueue() { return BLOCKING_QUEUE; } }
2.3 定义该队列的取出类, 这里使用的是单线程, 不再是主线程
@Component public class CustomizeScheduler implements ApplicationListener
{ @Autowired private ThreadReader threadReader; @Override public void onApplicationEvent(ContextRefreshedEvent event) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setThreadFactory(new NamedThreadFactory("SchedulerTask")).build(); ScheduledExecutorService scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory, new CustomizeRejectionPolicy()); // 之后每隔1秒执行队列中没有来得及执行的任务 scheduledExecutor.scheduleAtFixedRate(() -> threadReader.take(), 1, 1, TimeUnit.SECONDS); } }
2.4 模拟测试
首先线程池配置如下, 然后使用 jmeter 模拟2000个请求
@Configuration public class ThreadPoolConfig { @Bean("thread-pool") public static ThreadPoolExecutor threadPoolExecutor() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16,1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), new NamedThreadFactory("common-service"), new CustomizeRejectionPolicy()); threadPoolExecutor.allowCoreThreadTimeOut(true); return threadPoolExecutor; } }

之后查看控制台, 可以看到日志打印情况, 可以发现 单线程 每隔1秒从队列中捞取一个被线程池拒绝执行的任务, 并把该任务重新加入到线程池中, 再执行
为什么不能直接在拒绝策略中引入单线程去执行被拒绝的任务?
这样程序的性能会降低, 线程池只需要把被拒绝的任务往队列中添加, 无需关心单线程怎么调度的, 也无需等待单线程对这些任务的处理, 就可以立即把下一个任务放入到队列中
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/219040.html原文链接:https://javaforall.net
