自定义线程池拒绝策略

自定义线程池拒绝策略一 默认的拒绝策略 ThreadPoolEx AbortPolicy 丢弃任务并抛出 RejectedExec 异常 ThreadPoolEx DiscardPolic 丢弃任务 但是不抛出异常 ThreadPoolEx DiscardOldes 丢弃队列最前面的任务 然后重新提交被拒绝的任务 ThreadPoolEx CallerRunsPo

        ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

        一般来说, 默认的这4种拒绝策略, 在一些场景是不适用的, 比如我们不想丢弃任何任务, 那么前3种拒绝策略就不适用了

         第4种拒绝策略比较特殊, 当执行该策略后, 会把当前线程交给 提交任务的线程 去执行这个任务, 比如说你是通过 主线程 开启的线程任务, 那么这个拒绝策略就是把 被线程池拒绝执行的任务, 反手交给 主线程执行, 并发量高的场景下, 会阻塞主线程, 所以也不是很适用, 下面使用伪代码自定义拒绝策略

        LinkedBlockingQueue在实现 “多线程对竞争资源的互斥访问” 时, 对于该类中的 put 和 take  方法操作分别使用了不同的锁, 对于 put 操作, 通过 “插入锁 putLock” 进行同步, 对于取出操作, 通过 “取出锁takeLock” 进行同步

        若某线程(代码中模拟是 SchedulerTask)要取出数据时, 队列正好为空, 则该线程会执 notEmpty.await()进行等待, 当其它某个线程(代码中模拟是线程池的拒绝任务)向队列中插入了数据之后, 会调用 notEmpty.signal() 唤醒 “notEmpty上的等待线程”, 此时, (代码中模拟是 SchedulerTask)会被唤醒从而得以继续运行, 此外, (代码中模拟是 SchedulerTask)在执行取操作前, 会获取 takeLock, 在取操作执行完毕再释放 takeLock

        若某线程(代码中模拟是线程池的拒绝任务)要插入数据时, 队列已满, 则该线程会它执行   notFull.await() 进行等待, 当其它某个线程(代码中模拟是 SchedulerTask)取出数据之后,会调用notFull.signal() 唤醒 “notFull上的等待线程”, 此时, (代码中模拟是线程池的拒绝任务)就会被唤醒从而得以继续运行, 此外, (代码中模拟是线程池的拒绝任务)在执行插入操作前,会获取 putLock, 在插入操作执行完毕才释放 putLock

2.1 首先自定义拒绝策略实现
        自定义一个类实现 java.util.concurrent.RejectedExecutionHandler 接口

public class CustomizeRejectionPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r != null) { // 线程池没来得及执行的任务先放入队列 ThreadReader.put(r); } } }

2.2 实现一个中间类, 用于声明 LinkedBlockingQueue

@Component public class ThreadReader { private static final Logger logger = LoggerFactory.getLogger(ThreadReader.class); private static final BlockingQueue 
  
    BLOCKING_QUEUE = new LinkedBlockingQueue<>(); @Qualifier("thread-pool") @Autowired private ThreadPoolExecutor threadPoolExecutor; public static void put(Runnable runnable) { BlockingQueue 
   
     blockingQueue = getBlockingQueue(); try { blockingQueue.put(runnable); } catch (InterruptedException e) { // 忽略 } } public void take() { BlockingQueue 
    
      blockingQueue = getBlockingQueue(); if (CollectionUtils.isEmpty(blockingQueue)) { return; } try { Runnable runnable = blockingQueue.take(); logger.info(LogUtils.format("取出当前线程池没来得及执行的任务, runnable=<{0}>", runnable)); threadPoolExecutor.execute(runnable); } catch (InterruptedException e) { // } } public static BlockingQueue 
     
       getBlockingQueue() { return BLOCKING_QUEUE; } } 
      
     
    
  

2.3 定义该队列的取出类, 这里使用的是单线程, 不再是主线程

@Component public class CustomizeScheduler implements ApplicationListener 
  
    { @Autowired private ThreadReader threadReader; @Override public void onApplicationEvent(ContextRefreshedEvent event) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setThreadFactory(new NamedThreadFactory("SchedulerTask")).build(); ScheduledExecutorService scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory, new CustomizeRejectionPolicy()); // 之后每隔1秒执行队列中没有来得及执行的任务 scheduledExecutor.scheduleAtFixedRate(() -> threadReader.take(), 1, 1, TimeUnit.SECONDS); } } 
  

2.4 模拟测试

        首先线程池配置如下, 然后使用 jmeter 模拟2000个请求

@Configuration public class ThreadPoolConfig { @Bean("thread-pool") public static ThreadPoolExecutor threadPoolExecutor() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16,1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), new NamedThreadFactory("common-service"), new CustomizeRejectionPolicy()); threadPoolExecutor.allowCoreThreadTimeOut(true); return threadPoolExecutor; } }

自定义线程池拒绝策略

 之后查看控制台, 可以看到日志打印情况, 可以发现 单线程 每隔1秒从队列中捞取一个被线程池拒绝执行的任务, 并把该任务重新加入到线程池中, 再执行自定义线程池拒绝策略

 

        为什么不能直接在拒绝策略中引入单线程去执行被拒绝的任务?

        这样程序的性能会降低, 线程池只需要把被拒绝的任务往队列中添加, 无需关心单线程怎么调度的, 也无需等待单线程对这些任务的处理, 就可以立即把下一个任务放入到队列中

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/219040.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午10:56
下一篇 2026年3月17日 下午10:56


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号