目录
  • 多线程@Async的使用体验
    • 场景
    • 1.线程池配置
    • 2.子父线程之间共享一个Request的配置方案
    • 3.阻塞主线程,等待所有子线程执行完毕后继续执行主线程
      • 1.CountDownLatch
      • 2.Future
    • 4.多线程共用一个事务
    • 异步调用@Async问题
      • 1.使用背景
        • 2.异步处理方式
          • 3.@Async不返回数据
            • 4.@Async返回数据
              • 5.异常处理

              多线程@Async的使用体验

              场景

              导入:可以将大批量的数据insert操作采用多线程的方式并行执行

              第三方服务的接口调用:由于存在个别第三方服务调用比较耗时的场景,此时就可以与自身服务的逻辑并行执行

              简而言之:接口中部份业务逻辑可以通过并行的方式来优化接口性能

              1.线程池配置

              @Configuration
              @EnableAsync
              public class TaskPoolConfig {
              
                  @Bean("taskExecutor") // bean 的名称,默认为首字母小写的方法名
                  public Executor taskExecutor() {
                      ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
                      //核心线程数(CPU核心数+1)
                      executor.setCorePoolSize(10);
                      //最大线程数(2*CPU核心数+1)
                      executor.setMaxPoolSize(20);
                      //缓冲队列数
                      executor.setQueueCapacity(200);
                      //允许线程空闲时间(单位:默认为秒)
                      executor.setKeepAliveSeconds(60);
                      //线程池名前缀
                      executor.setThreadNamePrefix("sub-thread-");
                      // 增加 TaskDecorator 属性的配置
                      executor.setTaskDecorator(new ContextDecorator());
                      // 线程池对拒绝任务的处理策略:CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
                      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                      executor.initialize();
                      return executor;
                  }
              }
              

              2.子父线程之间共享一个Request的配置方案

              1.实现TaskDecorator接口

              /**
               * 子线程装饰器
               *
               * @author Da Shuai
               * @date 2021-06-10 18:28:17
               */
              public class SubThreadTaskDecorator implements TaskDecorator {
              
                  @Override
                  public Runnable decorate(Runnable runnable) {
                      RequestAttributes context = RequestContextHolder.currentRequestAttributes();
                      return () -> {
                          try {
                              RequestContextHolder.setRequestAttributes(context);
                              runnable.run();
                          } finally {
                              RequestContextHolder.resetRequestAttributes();
                          }
                      };
                  }
              }
              

              2.之前的线程池配置加如下代码使其生效

              // 增加 TaskDecorator 属性的配置
              executor.setTaskDecorator(new ContextDecorator());

              3.阻塞主线程,等待所有子线程执行完毕后继续执行主线程

              1.CountDownLatch

              思路:

              • 实例化CountDownLatch对象,同时传入x(线程数量:这个数量必须等于子线程数量)进行构造
              • 每个子线程执行完毕后会调用countDown()方法
              • 子线程逻辑后方调用await()方法

              这样线程计数器为0之前,主线程就一直处于pending状态

              主线程逻辑

              new CountDownLatch(X)
              latch.await()
              @Override
              @Transactional
              public void importExcel(File file) {
                  CountDownLatch latch = new CountDownLatch(3);
                  for (int i = 0; i < 3; i++) {
                      VoteDO voteDO = new VoteDO();
                      voteDO.setTitle(i + "");
                      asyncManager.asyncSaveVote(voteDO);
                  }
                  //System.out.println(1/0);
                  try {
                      latch.await();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }

              子线程逻辑

              latch.countDown()
              @Override
              @Async
              public void asyncSaveVote(VoteDO voteDO, CountDownLatch latch) {
                  log.info("当前线程为 {},休眠10s开始", Thread.currentThread().getName());
                  try {
                      Thread.sleep(10000L);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  log.info("当前线程为 {},休眠10s结束", Thread.currentThread().getName());
                  log.info("当前线程为 {},保存开始", Thread.currentThread().getName());
                  voteDO.setDesc(Thread.currentThread().getName());
                  voteDao.insert(voteDO);
                  latch.countDown();
                  log.info("当前线程为 {},保存结束", Thread.currentThread().getName());
              }

              日志

              2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============请求内容===============
              2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求地址:http://localhost:8080/api/import
              2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求方式:POST
              2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求类方法:com.zhdj.controller.ImportController.importExcel
              2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 请求类方法参数:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@42c3f403]
              2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============请求内容===============
              2021-06-11 16:31:08.676 INFO 27516 — [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 – Starting…
              2021-06-11 16:31:08.894 INFO 27516 — [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 – Start completed.
              2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,休眠10s开始
              2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,休眠10s开始
              2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,休眠10s开始
              2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,休眠10s结束
              2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,休眠10s结束
              2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,保存开始
              2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,休眠10s结束
              2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,保存开始
              2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,保存开始
              2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
              2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
              2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
              2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-1(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
              2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-3(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
              2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-2(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
              2021-06-11 16:31:19.172 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : <== Updates: 1
              2021-06-11 16:31:19.178 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : <== Updates: 1
              2021-06-11 16:31:19.187 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : <== Updates: 1
              2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-3,保存结束
              2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-1,保存结束
              2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-2,保存结束
              2021-06-11 16:31:19.226 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ————–返回内容—————-
              2021-06-11 16:31:19.328 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : Response内容:null
              2021-06-11 16:31:19.328 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ————–返回内容—————-

              2.Future

              思路:

              1.子线程逻辑返回Future对象

              2.主线程逻辑循环判断每个子线程返回的Future对象isDone()是否为true

              主线程逻辑

              循环判断future.isDone()是否为true

              @Override
              @Transactional
              public void importExcel(File file) {
               List<Future> futureList = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   VoteDO voteDO = new VoteDO();
                   voteDO.setTitle(i + "");
                   Future future = asyncManager.asyncSaveVote(voteDO);
                   futureList.add(future);
               }
               //检查所有子线程是否均执行完毕
               while (true) {
                   boolean isAllDone = true;
                   for (Future future : futureList) {
                       if (null == future || !future.isDone()) {
                           isAllDone = false;
                       }
                   }
                   if (isAllDone) {
                       log.info("所有子线程执行完毕");
                       break;
                   }
               }
              }

              子线程逻辑

              返回Future对象

              @Override
              public Future asyncSaveVote(VoteDO voteDO) {
                  log.info("当前线程为 {},休眠10s开始", Thread.currentThread().getName());
                  try {
                      Thread.sleep(10000L);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  log.info("当前线程为 {},休眠10s结束", Thread.currentThread().getName());
                  log.info("当前线程为 {},保存开始", Thread.currentThread().getName());
                  voteDO.setDesc(Thread.currentThread().getName());
                  voteDao.insert(voteDO);
                  log.info("当前线程为 {},保存结束", Thread.currentThread().getName());
                  //返回需要用AsyncResult类
                  return new AsyncResult<>(true);
              }

              日志

              2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============请求内容===============
              2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求地址:http://localhost:8080/api/import
              2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求方式:POST
              2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求类方法:com.zhdj.controller.ImportController.importExcel
              2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 请求类方法参数:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@7e23bacc]
              2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============请求内容===============
              2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,休眠10s开始
              2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,休眠10s开始
              2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,休眠10s开始
              2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,休眠10s结束
              2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,休眠10s结束
              2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,休眠10s结束
              2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,保存开始
              2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,保存开始
              2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,保存开始
              2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
              2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
              2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
              2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-5(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
              2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-4(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
              2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-6(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
              2021-06-11 16:42:38.988 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : <== Updates: 1
              2021-06-11 16:42:38.989 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-5,保存结束
              2021-06-11 16:42:38.993 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : <== Updates: 1
              2021-06-11 16:42:38.993 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-6,保存结束
              2021-06-11 16:42:39.004 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : <== Updates: 1
              2021-06-11 16:42:39.005 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 当前线程为 sub-thread-4,保存结束
              2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.service.impl.VoteServiceImpl : 所有子线程执行完毕
              2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ————–返回内容—————-
              2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : Response内容:null
              2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ————–返回内容—————-

              4.多线程共用一个事务

              暂时无解决方案,这是弊端

              异步调用@Async问题

              1.使用背景

              在项目中,当访问其他人的接口较慢或者做耗时任务时,不想程序一直卡在耗时任务上,想程序能够并行执行,我们可以使用多线程来并行的处理任务,也可以使用spring提供的异步处理方式@Async。

              2.异步处理方式

              调用之后,不返回任何数据。

              调用之后,返回数据,通过Future来获取返回数据

              3.@Async不返回数据

              使用@EnableAsync启用异步注解

              @Configuration
              @EnableAsync
              @Slf4j
              public class AsyncConfig{
              }

              在异步处理的方法dealNoReturnTask上添加注解@Async

              @Component
              @Slf4j
              public class AsyncTask {
              
                  @Async
                  public void dealNoReturnTask(){
                      log.info("Thread {} deal No Return Task start", Thread.currentThread().getName());
                      try {
                          Thread.sleep(3000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis());
                  }
              }

              Test测试类:

              @SpringBootTest(classes = SpringbootApplication.class)
              @RunWith(SpringJUnit4ClassRunner.class)
              @Slf4j
              public class AsyncTest {
                  @Autowired
                  private AsyncTask asyncTask;
              
                  @Test
                  public void testDealNoReturnTask(){
                      asyncTask.dealNoReturnTask();
                      try {
                          log.info("begin to deal other Task!");
                          Thread.sleep(10000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              

              日志打印结果为:

              begin to deal other Task!
              AsyncExecutorThread-1 deal No Return Task start
              AsyncExecutorThread-1 deal No Return Task end at 1499751227034

              从日志中我们可以看出,方法dealNoReturnTask()是异步执行完成的。

              dealNoReturnTask()设置sleep 3s是为了模拟耗时任务

              testDealNoReturnTask()设置sleep 10s是为了确认异步是否执行完成

              4.@Async返回数据

              异步调用返回数据,Future表示在未来某个点获取执行结果,返回数据类型可以自定义

                  @Async
                  public Future<String> dealHaveReturnTask() {
                      try {
                          Thread.sleep(3000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      JSONObject jsonObject = new JSONObject();
                      jsonObject.put("thread", Thread.currentThread().getName());
                      jsonObject.put("time", System.currentTimeMillis());
                      return new AsyncResult<String>(jsonObject.toJSONString());
                  }

              测试类用isCancelled判断异步任务是否取消,isDone判断任务是否执行结束

                  @Test
                  public void testDealHaveReturnTask() throws Exception {
              
                      Future<String> future = asyncTask.dealHaveReturnTask();
                      log.info("begin to deal other Task!");
                      while (true) {
                          if(future.isCancelled()){
                              log.info("deal async task is Cancelled");
                              break;
                          }
                          if (future.isDone() ) {
                              log.info("deal async task is Done");
                              log.info("return result is " + future.get());
                              break;
                          }
                          log.info("wait async task to end ...");
                          Thread.sleep(1000);
                      }
                  }

              日志打印如下,我们可以看出任务一直在等待异步任务执行完毕,用future.get()来获取异步任务的返回结果

              begin to deal other Task!
              wait async task to end …
              wait async task to end …
              wait async task to end …
              wait async task to end …
              deal async task is Done
              return result is {“thread”:”AsyncExecutorThread-1″,”time”:1499752617330}

              5.异常处理

              我们可以实现AsyncConfigurer接口,也可以继承AsyncConfigurerSupport类来实现

              在方法getAsyncExecutor()中创建线程池的时候,必须使用 executor.initialize(),

              不然在调用时会报线程池未初始化的异常。

              如果使用threadPoolTaskExecutor()来定义bean,则不需要初始化

              @Configuration
              @EnableAsync
              @Slf4j
              public class AsyncConfig implements AsyncConfigurer {
              
              //    @Bean
              //    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
              //        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
              //        executor.setCorePoolSize(10);
              //        executor.setMaxPoolSize(100);
              //        executor.setQueueCapacity(100);
              //        return executor;
              //    }
              
                  @Override
                  public Executor getAsyncExecutor() {
                      ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
                      executor.setCorePoolSize(10);
                      executor.setMaxPoolSize(100);
                      executor.setQueueCapacity(100);
                      executor.setThreadNamePrefix("AsyncExecutorThread-");
                      executor.initialize(); //如果不初始化,导致找到不到执行器
                      return executor;
                  }
                  @Override
                  public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
                      return new AsyncExceptionHandler();
                  }
              }
              

              异步异常处理类:

              @Slf4j
              public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
                  @Override
                  public void handleUncaughtException(Throwable ex, Method method, Object... params) {
                      log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params));
              
                      if (ex instanceof AsyncException) {
                          AsyncException asyncException = (AsyncException) ex;
                          log.info("asyncException:{}",asyncException.getErrorMessage());
                      }
              
                      log.info("Exception :");
                      ex.printStackTrace();
                  }
              }
              

              异步处理异常类:

              @Data
              @AllArgsConstructor
              public class AsyncException extends Exception {
                  private int code;
                  private String errorMessage;
              }

              在无返回值的异步调用中,异步处理抛出异常,AsyncExceptionHandler的handleUncaughtException()会捕获指定异常,原有任务还会继续运行,直到结束。

              在有返回值的异步调用中,异步处理抛出异常,会直接抛出异常,异步任务结束,原有处理结束执行。

              以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

              声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。