目录
- 多线程@Async的使用体验
- 场景
- 1.线程池配置
- 2.子父线程之间共享一个Request的配置方案
- 3.阻塞主线程,等待所有子线程执行完毕后继续执行主线程
- 1.CountDownLatch
- 2.Future
- 4.多线程共用一个事务
- 异步调用@Async问题
- 1.使用背景
- 2.异步处理方式
- 3.@Async不返回数据
- 4.@Async返回数据
- 5.异常处理
多线程@Async的使用体验
场景
导入:可以将大批量的数据insert操作采用多线程的方式并行执行
第三方服务的接口调用:由于存在个别第三方服务调用比较耗时的场景,此时就可以与自身服务的逻辑并行执行
简而言之:接口中部份业务逻辑可以通过并行的方式来优化接口性能
1.线程池配置
@Configuration @EnableAsync public class TaskPoolConfig { @Bean("taskExecutor") // bean 的名称,默认为首字母小写的方法名 public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心线程数(CPU核心数+1) executor.setCorePoolSize(10); //最大线程数(2*CPU核心数+1) executor.setMaxPoolSize(20); //缓冲队列数 executor.setQueueCapacity(200); //允许线程空闲时间(单位:默认为秒) executor.setKeepAliveSeconds(60); //线程池名前缀 executor.setThreadNamePrefix("sub-thread-"); // 增加 TaskDecorator 属性的配置 executor.setTaskDecorator(new ContextDecorator()); // 线程池对拒绝任务的处理策略:CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
2.子父线程之间共享一个Request的配置方案
1.实现TaskDecorator接口
/** * 子线程装饰器 * * @author Da Shuai * @date 2021-06-10 18:28:17 */ public class SubThreadTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { RequestAttributes context = RequestContextHolder.currentRequestAttributes(); return () -> { try { RequestContextHolder.setRequestAttributes(context); runnable.run(); } finally { RequestContextHolder.resetRequestAttributes(); } }; } }
2.之前的线程池配置加如下代码使其生效
// 增加 TaskDecorator 属性的配置 executor.setTaskDecorator(new ContextDecorator());
3.阻塞主线程,等待所有子线程执行完毕后继续执行主线程
1.CountDownLatch
思路:
- 实例化CountDownLatch对象,同时传入x(线程数量:这个数量必须等于子线程数量)进行构造
- 每个子线程执行完毕后会调用countDown()方法
- 子线程逻辑后方调用await()方法
这样线程计数器为0之前,主线程就一直处于pending状态
主线程逻辑
new CountDownLatch(X) latch.await()
@Override @Transactional public void importExcel(File file) { CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { VoteDO voteDO = new VoteDO(); voteDO.setTitle(i + ""); asyncManager.asyncSaveVote(voteDO); } //System.out.println(1/0); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } }
子线程逻辑
latch.countDown()
@Override @Async public void asyncSaveVote(VoteDO voteDO, CountDownLatch latch) { log.info("当前线程为 {},休眠10s开始", Thread.currentThread().getName()); try { Thread.sleep(10000L); } catch (InterruptedException e) { e.printStackTrace(); } log.info("当前线程为 {},休眠10s结束", Thread.currentThread().getName()); log.info("当前线程为 {},保存开始", Thread.currentThread().getName()); voteDO.setDesc(Thread.currentThread().getName()); voteDao.insert(voteDO); latch.countDown(); log.info("当前线程为 {},保存结束", Thread.currentThread().getName()); }
日志
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============请求内容===============
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求地址:http://localhost:8080/api/import
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求方式:POST
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求类方法:com.zhdj.controller.ImportController.importExcel
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求类方法参数:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@42c3f403]
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============请求内容===============
2021-06-11 16:31:08.676 INFO 27516 — [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 – Starting…
2021-06-11 16:31:08.894 INFO 27516 — [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 – Start completed.
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,休眠10s开始
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,休眠10s开始
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,休眠10s开始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,休眠10s结束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,休眠10s结束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,保存开始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,休眠10s结束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,保存开始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,保存开始
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-1(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-3(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-2(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.172 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.178 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.187 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,保存结束
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,保存结束
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,保存结束
2021-06-11 16:31:19.226 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ————–返回内容—————-
2021-06-11 16:31:19.328 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : Response内容:null
2021-06-11 16:31:19.328 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ————–返回内容—————-
2.Future
思路:
1.子线程逻辑返回Future对象
2.主线程逻辑循环判断每个子线程返回的Future对象isDone()是否为true
主线程逻辑
循环判断future.isDone()是否为true
@Override @Transactional public void importExcel(File file) { List<Future> futureList = new ArrayList<>(); for (int i = 0; i < 3; i++) { VoteDO voteDO = new VoteDO(); voteDO.setTitle(i + ""); Future future = asyncManager.asyncSaveVote(voteDO); futureList.add(future); } //检查所有子线程是否均执行完毕 while (true) { boolean isAllDone = true; for (Future future : futureList) { if (null == future || !future.isDone()) { isAllDone = false; } } if (isAllDone) { log.info("所有子线程执行完毕"); break; } } }
子线程逻辑
返回Future对象
@Override public Future asyncSaveVote(VoteDO voteDO) { log.info("当前线程为 {},休眠10s开始", Thread.currentThread().getName()); try { Thread.sleep(10000L); } catch (InterruptedException e) { e.printStackTrace(); } log.info("当前线程为 {},休眠10s结束", Thread.currentThread().getName()); log.info("当前线程为 {},保存开始", Thread.currentThread().getName()); voteDO.setDesc(Thread.currentThread().getName()); voteDao.insert(voteDO); log.info("当前线程为 {},保存结束", Thread.currentThread().getName()); //返回需要用AsyncResult类 return new AsyncResult<>(true); }
日志
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============请求内容===============
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求地址:http://localhost:8080/api/import
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求方式:POST
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求类方法:com.zhdj.controller.ImportController.importExcel
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求类方法参数:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@7e23bacc]
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============请求内容===============
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,休眠10s开始
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,休眠10s开始
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,休眠10s开始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,休眠10s结束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,休眠10s结束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,休眠10s结束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,保存开始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,保存开始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,保存开始
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-5(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-4(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-6(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.988 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:38.989 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,保存结束
2021-06-11 16:42:38.993 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:38.993 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,保存结束
2021-06-11 16:42:39.004 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:39.005 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,保存结束
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.service.impl.VoteServiceImpl : 所有子线程执行完毕
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ————–返回内容—————-
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : Response内容:null
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ————–返回内容—————-
4.多线程共用一个事务
暂时无解决方案,这是弊端
异步调用@Async问题
1.使用背景
在项目中,当访问其他人的接口较慢或者做耗时任务时,不想程序一直卡在耗时任务上,想程序能够并行执行,我们可以使用多线程来并行的处理任务,也可以使用spring提供的异步处理方式@Async。
2.异步处理方式
调用之后,不返回任何数据。
调用之后,返回数据,通过Future来获取返回数据
3.@Async不返回数据
使用@EnableAsync启用异步注解
@Configuration @EnableAsync @Slf4j public class AsyncConfig{ }
在异步处理的方法dealNoReturnTask上添加注解@Async
@Component @Slf4j public class AsyncTask { @Async public void dealNoReturnTask(){ log.info("Thread {} deal No Return Task start", Thread.currentThread().getName()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis()); } }
Test测试类:
@SpringBootTest(classes = SpringbootApplication.class) @RunWith(SpringJUnit4ClassRunner.class) @Slf4j public class AsyncTest { @Autowired private AsyncTask asyncTask; @Test public void testDealNoReturnTask(){ asyncTask.dealNoReturnTask(); try { log.info("begin to deal other Task!"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }
日志打印结果为:
begin to deal other Task!
AsyncExecutorThread-1 deal No Return Task start
AsyncExecutorThread-1 deal No Return Task end at 1499751227034
从日志中我们可以看出,方法dealNoReturnTask()是异步执行完成的。
dealNoReturnTask()设置sleep 3s是为了模拟耗时任务
testDealNoReturnTask()设置sleep 10s是为了确认异步是否执行完成
4.@Async返回数据
异步调用返回数据,Future表示在未来某个点获取执行结果,返回数据类型可以自定义
@Async public Future<String> dealHaveReturnTask() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } JSONObject jsonObject = new JSONObject(); jsonObject.put("thread", Thread.currentThread().getName()); jsonObject.put("time", System.currentTimeMillis()); return new AsyncResult<String>(jsonObject.toJSONString()); }
测试类用isCancelled判断异步任务是否取消,isDone判断任务是否执行结束
@Test public void testDealHaveReturnTask() throws Exception { Future<String> future = asyncTask.dealHaveReturnTask(); log.info("begin to deal other Task!"); while (true) { if(future.isCancelled()){ log.info("deal async task is Cancelled"); break; } if (future.isDone() ) { log.info("deal async task is Done"); log.info("return result is " + future.get()); break; } log.info("wait async task to end ..."); Thread.sleep(1000); } }
日志打印如下,我们可以看出任务一直在等待异步任务执行完毕,用future.get()来获取异步任务的返回结果
begin to deal other Task!
wait async task to end …
wait async task to end …
wait async task to end …
wait async task to end …
deal async task is Done
return result is {“thread”:”AsyncExecutorThread-1″,”time”:1499752617330}
5.异常处理
我们可以实现AsyncConfigurer接口,也可以继承AsyncConfigurerSupport类来实现
在方法getAsyncExecutor()中创建线程池的时候,必须使用 executor.initialize(),
不然在调用时会报线程池未初始化的异常。
如果使用threadPoolTaskExecutor()来定义bean,则不需要初始化
@Configuration @EnableAsync @Slf4j public class AsyncConfig implements AsyncConfigurer { // @Bean // public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // executor.setCorePoolSize(10); // executor.setMaxPoolSize(100); // executor.setQueueCapacity(100); // return executor; // } @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(100); executor.setQueueCapacity(100); executor.setThreadNamePrefix("AsyncExecutorThread-"); executor.initialize(); //如果不初始化,导致找到不到执行器 return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncExceptionHandler(); } }
异步异常处理类:
@Slf4j public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params)); if (ex instanceof AsyncException) { AsyncException asyncException = (AsyncException) ex; log.info("asyncException:{}",asyncException.getErrorMessage()); } log.info("Exception :"); ex.printStackTrace(); } }
异步处理异常类:
@Data @AllArgsConstructor public class AsyncException extends Exception { private int code; private String errorMessage; }
在无返回值的异步调用中,异步处理抛出异常,AsyncExceptionHandler的handleUncaughtException()会捕获指定异常,原有任务还会继续运行,直到结束。
在有返回值的异步调用中,异步处理抛出异常,会直接抛出异常,异步任务结束,原有处理结束执行。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。
评论(0)