目录
- 多线程@Async的使用体验
- 场景
- 1.线程池配置
- 2.子父线程之间共享一个Request的配置方案
- 3.阻塞主线程,等待所有子线程执行完毕后继续执行主线程
- 1.CountDownLatch
- 2.Future
- 4.多线程共用一个事务
- 异步调用@Async问题
- 1.使用背景
- 2.异步处理方式
- 3.@Async不返回数据
- 4.@Async返回数据
- 5.异常处理
多线程@Async的使用体验
场景
导入:可以将大批量的数据insert操作采用多线程的方式并行执行
第三方服务的接口调用:由于存在个别第三方服务调用比较耗时的场景,此时就可以与自身服务的逻辑并行执行
简而言之:接口中部份业务逻辑可以通过并行的方式来优化接口性能
1.线程池配置
@Configuration
@EnableAsync
public class TaskPoolConfig {
@Bean("taskExecutor") // bean 的名称,默认为首字母小写的方法名
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数(CPU核心数+1)
executor.setCorePoolSize(10);
//最大线程数(2*CPU核心数+1)
executor.setMaxPoolSize(20);
//缓冲队列数
executor.setQueueCapacity(200);
//允许线程空闲时间(单位:默认为秒)
executor.setKeepAliveSeconds(60);
//线程池名前缀
executor.setThreadNamePrefix("sub-thread-");
// 增加 TaskDecorator 属性的配置
executor.setTaskDecorator(new ContextDecorator());
// 线程池对拒绝任务的处理策略:CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
2.子父线程之间共享一个Request的配置方案
1.实现TaskDecorator接口
/**
* 子线程装饰器
*
* @author Da Shuai
* @date 2021-06-10 18:28:17
*/
public class SubThreadTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
return () -> {
try {
RequestContextHolder.setRequestAttributes(context);
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
}
};
}
}
2.之前的线程池配置加如下代码使其生效
// 增加 TaskDecorator 属性的配置 executor.setTaskDecorator(new ContextDecorator());
3.阻塞主线程,等待所有子线程执行完毕后继续执行主线程
1.CountDownLatch
思路:
- 实例化CountDownLatch对象,同时传入x(线程数量:这个数量必须等于子线程数量)进行构造
- 每个子线程执行完毕后会调用countDown()方法
- 子线程逻辑后方调用await()方法
这样线程计数器为0之前,主线程就一直处于pending状态
主线程逻辑
new CountDownLatch(X) latch.await()
@Override
@Transactional
public void importExcel(File file) {
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
VoteDO voteDO = new VoteDO();
voteDO.setTitle(i + "");
asyncManager.asyncSaveVote(voteDO);
}
//System.out.println(1/0);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
子线程逻辑
latch.countDown()
@Override
@Async
public void asyncSaveVote(VoteDO voteDO, CountDownLatch latch) {
log.info("当前线程为 {},休眠10s开始", Thread.currentThread().getName());
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("当前线程为 {},休眠10s结束", Thread.currentThread().getName());
log.info("当前线程为 {},保存开始", Thread.currentThread().getName());
voteDO.setDesc(Thread.currentThread().getName());
voteDao.insert(voteDO);
latch.countDown();
log.info("当前线程为 {},保存结束", Thread.currentThread().getName());
}
日志
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============请求内容===============
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求地址:http://localhost:8080/api/import
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求方式:POST
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求类方法:com.zhdj.controller.ImportController.importExcel
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求类方法参数:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@42c3f403]
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============请求内容===============
2021-06-11 16:31:08.676 INFO 27516 — [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 – Starting…
2021-06-11 16:31:08.894 INFO 27516 — [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 – Start completed.
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,休眠10s开始
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,休眠10s开始
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,休眠10s开始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,休眠10s结束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,休眠10s结束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,保存开始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,休眠10s结束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,保存开始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,保存开始
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-1(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-3(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-2(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.172 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.178 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.187 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,保存结束
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,保存结束
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,保存结束
2021-06-11 16:31:19.226 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ————–返回内容—————-
2021-06-11 16:31:19.328 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : Response内容:null
2021-06-11 16:31:19.328 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ————–返回内容—————-
2.Future
思路:
1.子线程逻辑返回Future对象
2.主线程逻辑循环判断每个子线程返回的Future对象isDone()是否为true
主线程逻辑
循环判断future.isDone()是否为true
@Override
@Transactional
public void importExcel(File file) {
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
VoteDO voteDO = new VoteDO();
voteDO.setTitle(i + "");
Future future = asyncManager.asyncSaveVote(voteDO);
futureList.add(future);
}
//检查所有子线程是否均执行完毕
while (true) {
boolean isAllDone = true;
for (Future future : futureList) {
if (null == future || !future.isDone()) {
isAllDone = false;
}
}
if (isAllDone) {
log.info("所有子线程执行完毕");
break;
}
}
}
子线程逻辑
返回Future对象
@Override
public Future asyncSaveVote(VoteDO voteDO) {
log.info("当前线程为 {},休眠10s开始", Thread.currentThread().getName());
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("当前线程为 {},休眠10s结束", Thread.currentThread().getName());
log.info("当前线程为 {},保存开始", Thread.currentThread().getName());
voteDO.setDesc(Thread.currentThread().getName());
voteDao.insert(voteDO);
log.info("当前线程为 {},保存结束", Thread.currentThread().getName());
//返回需要用AsyncResult类
return new AsyncResult<>(true);
}
日志
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============请求内容===============
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求地址:http://localhost:8080/api/import
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求方式:POST
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求类方法:com.zhdj.controller.ImportController.importExcel
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求类方法参数:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@7e23bacc]
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============请求内容===============
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,休眠10s开始
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,休眠10s开始
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,休眠10s开始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,休眠10s结束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,休眠10s结束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,休眠10s结束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,保存开始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,保存开始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,保存开始
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-5(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-4(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-6(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.988 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:38.989 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,保存结束
2021-06-11 16:42:38.993 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:38.993 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,保存结束
2021-06-11 16:42:39.004 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:39.005 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,保存结束
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.service.impl.VoteServiceImpl : 所有子线程执行完毕
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ————–返回内容—————-
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : Response内容:null
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ————–返回内容—————-
4.多线程共用一个事务
暂时无解决方案,这是弊端
异步调用@Async问题
1.使用背景
在项目中,当访问其他人的接口较慢或者做耗时任务时,不想程序一直卡在耗时任务上,想程序能够并行执行,我们可以使用多线程来并行的处理任务,也可以使用spring提供的异步处理方式@Async。
2.异步处理方式
调用之后,不返回任何数据。
调用之后,返回数据,通过Future来获取返回数据
3.@Async不返回数据
使用@EnableAsync启用异步注解
@Configuration
@EnableAsync
@Slf4j
public class AsyncConfig{
}
在异步处理的方法dealNoReturnTask上添加注解@Async
@Component
@Slf4j
public class AsyncTask {
@Async
public void dealNoReturnTask(){
log.info("Thread {} deal No Return Task start", Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis());
}
}
Test测试类:
@SpringBootTest(classes = SpringbootApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
@Slf4j
public class AsyncTest {
@Autowired
private AsyncTask asyncTask;
@Test
public void testDealNoReturnTask(){
asyncTask.dealNoReturnTask();
try {
log.info("begin to deal other Task!");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
日志打印结果为:
begin to deal other Task!
AsyncExecutorThread-1 deal No Return Task start
AsyncExecutorThread-1 deal No Return Task end at 1499751227034
从日志中我们可以看出,方法dealNoReturnTask()是异步执行完成的。
dealNoReturnTask()设置sleep 3s是为了模拟耗时任务
testDealNoReturnTask()设置sleep 10s是为了确认异步是否执行完成
4.@Async返回数据
异步调用返回数据,Future表示在未来某个点获取执行结果,返回数据类型可以自定义
@Async
public Future<String> dealHaveReturnTask() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("thread", Thread.currentThread().getName());
jsonObject.put("time", System.currentTimeMillis());
return new AsyncResult<String>(jsonObject.toJSONString());
}
测试类用isCancelled判断异步任务是否取消,isDone判断任务是否执行结束
@Test
public void testDealHaveReturnTask() throws Exception {
Future<String> future = asyncTask.dealHaveReturnTask();
log.info("begin to deal other Task!");
while (true) {
if(future.isCancelled()){
log.info("deal async task is Cancelled");
break;
}
if (future.isDone() ) {
log.info("deal async task is Done");
log.info("return result is " + future.get());
break;
}
log.info("wait async task to end ...");
Thread.sleep(1000);
}
}
日志打印如下,我们可以看出任务一直在等待异步任务执行完毕,用future.get()来获取异步任务的返回结果
begin to deal other Task!
wait async task to end …
wait async task to end …
wait async task to end …
wait async task to end …
deal async task is Done
return result is {“thread”:”AsyncExecutorThread-1″,”time”:1499752617330}
5.异常处理
我们可以实现AsyncConfigurer接口,也可以继承AsyncConfigurerSupport类来实现
在方法getAsyncExecutor()中创建线程池的时候,必须使用 executor.initialize(),
不然在调用时会报线程池未初始化的异常。
如果使用threadPoolTaskExecutor()来定义bean,则不需要初始化
@Configuration
@EnableAsync
@Slf4j
public class AsyncConfig implements AsyncConfigurer {
// @Bean
// public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// executor.setCorePoolSize(10);
// executor.setMaxPoolSize(100);
// executor.setQueueCapacity(100);
// return executor;
// }
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsyncExecutorThread-");
executor.initialize(); //如果不初始化,导致找到不到执行器
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler();
}
}
异步异常处理类:
@Slf4j
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params));
if (ex instanceof AsyncException) {
AsyncException asyncException = (AsyncException) ex;
log.info("asyncException:{}",asyncException.getErrorMessage());
}
log.info("Exception :");
ex.printStackTrace();
}
}
异步处理异常类:
@Data
@AllArgsConstructor
public class AsyncException extends Exception {
private int code;
private String errorMessage;
}
在无返回值的异步调用中,异步处理抛出异常,AsyncExceptionHandler的handleUncaughtException()会捕获指定异常,原有任务还会继续运行,直到结束。
在有返回值的异步调用中,异步处理抛出异常,会直接抛出异常,异步任务结束,原有处理结束执行。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

评论(0)