目录
前言
- ForkJoin是JDK1.7加入的多线程并行处理框架。ForkJoin使用
分而治之的思想,把一个大任务拆分成一个个小任务,然后再聚合,得到最终结果。这有点像Hadoop中的MapReduce。还支持工作窃取。 - 下面附上ForkJoin Java并发动画。
这个Jar包下载地址: https://sourceforge.net/projects/javaconcurrenta/files/,还有很多有意思的动画,帮助我们学习JUC。

什么是工作窃取:假设有A、B两个线程执行一个任务,A比较快,把活儿干完了,这时候A可以把B的一部分活接过来。这样总体来说会加快任务执行速度。
应用
需求
- 假设有这样一个需求:我要统计用户表里全部的金额。这个表里有
条数据。如果我直接用SQL统计很慢,如下图所示。

- 花费了4.563秒才查出来。
- 我发现每次
条还是很快的,如下图所示。

- 我就想是否可以写个程序,拆分成多个小任务,分批查询,然后合并结果。
使用
根据id范围查询求SUM
...省略... @Override public long sumRecord(int toId, int fromId) { QueryWrapper
queryWrapper = new QueryWrapper<>(); // 用in语句合并成一条SQL,避免多次请求数据库的IO queryWrapper.ge("id", fromId); queryWrapper.le("id", toId); queryWrapper.select("IFNULL(SUM(money),0) as money"); List
users = usersMapper.selectList(queryWrapper); if (!CollectionUtils.isEmpty(users)) { return users.get(0).getMoney(); } return 0; } ...省略...
创建任务类和测试用例
...省略... @Test public void sumTask() { long startTime = System.currentTimeMillis(); ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); // 模拟千万数据 int min = 1; int max = ; SumTask sumTask = new SumTask(min, max, userService); pool.invoke(sumTask); System.out.println("总数 " + sumTask.join() + " 执行时间 " + (System.currentTimeMillis() - startTime)); } public static final Integer THRESHOLD = ; public static class SumTask extends RecursiveTask
{ int fromId; int toId; private UserService userService; public SumTask(int fromId, int toId, UserService userService) { this.fromId = fromId; this.toId = toId; this.userService = userService; } @Override protected Long compute() { if (toId - fromId < THRESHOLD) { return sumRecord(toId, fromId); } else { int mid = (fromId + toId) / 2; SumTask left = new SumTask(fromId, mid, userService); SumTask right = new SumTask(mid + 1, toId, userService); invokeAll(left, right); return left.join() + right.join(); } } public Long sumRecord(int toId, int fromId) { System.out.println(" 参数 " + fromId + " " + toId); return userService.sumRecord(toId, fromId); } } ...省略...
执行结果
- 执行结果明显速度快了。

小结
- 我们可以在new ForkJoinPool(int parallelism)传入线程数(默认是CPU核心数),进行调优。
- 如果是继承RecursiveAction:用于没有返回结果的任务。
完整代码
- https://gitee.com/apple_/spring-boot-kubernetes/tree/v1.0.5
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/212663.html原文链接:https://javaforall.net
