目录
  • 一、背景
  • 二、模拟
    • 2.1 常见写法
    • 2.2 尝试取消
      • 2.2.1 cancel(false)
      • 2.2.2 cancel(true)
  • 三、回归源码
    • 四、总结

      一、背景

      很多 Java 工程师在准备面试时,会刷很多八股文,线程和线程池这一块通常会准备线程的状态、线程的创建方式,Executors 里面的一些工厂方法和为什么不推荐使用这些工厂方法,ThreadPoolExecutor 构造方法的一些参数和执行过程等。

      工作中,很多人会使用线程池的 submit 方法 获取 Future 类型的返回值,然后使用 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 实现“最多等多久”的效果。

      但很多人对此的理解只停留在表面上,稍微问深一点点可能就懵逼了。

      比如,java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 超时之后,当前线程会怎样?线程池里执行对应任务的线程会有怎样的表现?

      如果你对这个问题没有很大的把握,说明你掌握的还不够扎实。

      最常见的理解就是,“超时以后,当前线程继续执行,线程池里的对应线程中断”,真的是这样吗?

      二、模拟

      2.1 常见写法

      下面给出一个简单的模拟案例:

      package basic.thread;
      import java.util.concurrent.*;
      public class FutureDemo {
          public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
              ExecutorService executorService = Executors.newFixedThreadPool(2);
              Future<?> future = executorService.submit(() -> {
                  try {
                      demo();
                  } catch (InterruptedException e) {
                      throw new RuntimeException(e);
                  }
              });
              String threadName = Thread.currentThread().getName();
              System.out.println(threadName + "获取的结果 -- start");
              Object result = future.get(100, TimeUnit.MILLISECONDS);
              System.out.println(threadName + "获取的结果 -- end :" + result);
          }
          private static String demo() throws InterruptedException {
              String threadName = Thread.currentThread().getName();
              System.out.println(threadName + ",执行 demo -- start");
              TimeUnit.SECONDS.sleep(1);
              System.out.println(threadName + ",执行 demo -- end");
              return "test";
          }
      }

      输出结果:

      main获取的结果 — start
      pool-1-thread-1,执行 demo — start
      Exception in thread "main" java.util.concurrent.TimeoutException
          at java.util.concurrent.FutureTask.get(FutureTask.java:205)
          at basic.thread.FutureDemo.main(FutureDemo.java:20)
      pool-1-thread-1,执行 demo — end

      我们可以发现:当前线程会因为收到 TimeoutException 而被中断,线程池里对应的线程“却”继续执行完毕。

      2.2 尝试取消

      我们尝试对未完成的线程进行取消,发现 Future#cancel 有个 boolean 类型的参数。

          /**
           * Attempts to cancel execution of this task.  This attempt will
           * fail if the task has already completed, has already been cancelled,
           * or could not be cancelled for some other reason. If successful,
           * and this task has not started when {@code cancel} is called,
           * this task should never run.  If the task has already started,
           * then the {@code mayInterruptIfRunning} parameter determines
           * whether the thread executing this task should be interrupted in
           * an attempt to stop the task.
           *
           * <p>After this method returns, subsequent calls to {@link #isDone} will
           * always return {@code true}.  Subsequent calls to {@link #isCancelled}
           * will always return {@code true} if this method returned {@code true}.
           *
           * @param mayInterruptIfRunning {@code true} if the thread executing this
           * task should be interrupted; otherwise, in-progress tasks are allowed
           * to complete
           * @return {@code false} if the task could not be cancelled,
           * typically because it has already completed normally;
           * {@code true} otherwise
           */
          boolean cancel(boolean mayInterruptIfRunning);

      看源码注释我们可以知道:

      当设置为 true 时,正在执行的任务将被中断(interrupted);

      当设置为 false 时,如果任务正在执行中,那么仍然允许任务执行完成。

      2.2.1 cancel(false)

      此时,为了不让主线程因为超时异常被中断,我们 try-catch 包起来。

      package basic.thread;
      import org.junit.platform.commons.util.ExceptionUtils;
      import java.util.concurrent.*;
      public class FutureDemo {
          public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
              ExecutorService executorService = Executors.newFixedThreadPool(2);
              Future<?> future = executorService.submit(() -> {
                  try {
                      demo();
                  } catch (InterruptedException e) {
                      throw new RuntimeException(e);
                  }
              });
              String threadName = Thread.currentThread().getName();
              System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
              try {
                  Object result = future.get(100, TimeUnit.MILLISECONDS);
                  System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
              } catch (Exception e) {
                  System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));
              }
              future.cancel(false);
              System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");
          }
          private static String demo() throws InterruptedException {
              String threadName = Thread.currentThread().getName();
              System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
              TimeUnit.SECONDS.sleep(1);
              System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
              return "test";
          }
      }
      

      结果:

      1653751759233,main获取的结果 — start
      1653751759233,pool-1-thread-1,执行 demo — start
      1653751759343,main获取的结果异常:java.util.concurrent.TimeoutException
          at java.util.concurrent.FutureTask.get(FutureTask.java:205)
          at basic.thread.FutureDemo.main(FutureDemo.java:23)

      1653751759351,main获取的结果 — cancel
      1653751760263,pool-1-thread-1,执行 demo — end

      我们发现,线程池里的对应线程在 cancel(false) 时,如果已经正在执行,则会继续执行完成。

      2.2.2 cancel(true)

      package basic.thread;
      import org.junit.platform.commons.util.ExceptionUtils;
      import java.util.concurrent.*;
      public class FutureDemo {
          public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
              ExecutorService executorService = Executors.newFixedThreadPool(2);
              Future<?> future = executorService.submit(() -> {
                  try {
                      demo();
                  } catch (InterruptedException e) {
                      System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ", Interrupted:" + ExceptionUtils.readStackTrace(e));
                      throw new RuntimeException(e);
                  }
              });
              String threadName = Thread.currentThread().getName();
              System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
              try {
                  Object result = future.get(100, TimeUnit.MILLISECONDS);
                  System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
              } catch (Exception e) {
                  System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));
              }
              future.cancel(true);
              System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");
          }
          private static String demo() throws InterruptedException {
              String threadName = Thread.currentThread().getName();
              System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
              TimeUnit.SECONDS.sleep(1);
              System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
              return "test";
          }
      }
      

      执行结果:

      1653752011246,main获取的结果 — start
      1653752011246,pool-1-thread-1,执行 demo — start
      1653752011347,main获取的结果异常:java.util.concurrent.TimeoutException
          at java.util.concurrent.FutureTask.get(FutureTask.java:205)
          at basic.thread.FutureDemo.main(FutureDemo.java:24)

      1653752011363,pool-1-thread-1, Interrupted:java.lang.InterruptedException: sleep interrupted
          at java.lang.Thread.sleep(Native Method)
          at java.lang.Thread.sleep(Thread.java:340)
          at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
          at basic.thread.FutureDemo.demo(FutureDemo.java:36)
          at basic.thread.FutureDemo.lambda$main$0(FutureDemo.java:14)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)

      1653752011366,main获取的结果 — cancel

      可以看出,此时,如果目标线程未执行完,那么会收到 InterruptedException ,被中断。

      当然,如果此时不希望目标线程被中断,可以使用 try-catch 包住,再执行其他逻辑。

      package basic.thread;
      import org.junit.platform.commons.util.ExceptionUtils;
      import java.util.concurrent.*;
      public class FutureDemo {
          public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
              ExecutorService executorService = Executors.newFixedThreadPool(2);
              Future<?> future = executorService.submit(() -> {
                  demo();
              });
              String threadName = Thread.currentThread().getName();
              System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
              try {
                  Object result = future.get(100, TimeUnit.MILLISECONDS);
                  System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
              } catch (Exception e) {
                  System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));
              }
              future.cancel(true);
              System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");
          }
          private static String demo() {
              String threadName = Thread.currentThread().getName();
              System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
              try {
                  TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException e) {
                  System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo 被中断,自动降级");
              }
              System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
              return "test";
          }
      }
      

      执行结果:

      1653752219803,main获取的结果 — start
      1653752219803,pool-1-thread-1,执行 demo — start
      1653752219908,main获取的结果异常:java.util.concurrent.TimeoutException
          at java.util.concurrent.FutureTask.get(FutureTask.java:205)
          at basic.thread.FutureDemo.main(FutureDemo.java:19)

      1653752219913,main获取的结果 — cancel
      1653752219914,pool-1-thread-1,执行 demo 被中断,自动降级
      1653752219914,pool-1-thread-1,执行 demo — end

      三、回归源码

      我们直接看 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 的源码注释,就可以清楚地知道各种情况的表现:

          /**
           * Waits if necessary for at most the given time for the computation
           * to complete, and then retrieves its result, if available.
           *
           * @param timeout the maximum time to wait
           * @param unit the time unit of the timeout argument
           * @return the computed result
           * @throws CancellationException if the computation was cancelled
           * @throws ExecutionException if the computation threw an
           * exception
           * @throws InterruptedException if the current thread was interrupted
           * while waiting
           * @throws TimeoutException if the wait timed out
           */
          V get(long timeout, TimeUnit unit)
              throws InterruptedException, ExecutionException, TimeoutException;

      我们还可以选取几个常见的实现类,查看下实现的基本思路:

      java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit)

         public V get(long timeout, TimeUnit unit)
              throws InterruptedException, ExecutionException, TimeoutException {
              if (unit == null)
                  throw new NullPointerException();
              int s = state;
              if (s <= COMPLETING &&
                  (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                  throw new TimeoutException();
              return report(s);
          }

      java.util.concurrent.CompletableFuture#get(long, java.util.concurrent.TimeUnit)

          /**
           * Waits if necessary for at most the given time for this future
           * to complete, and then returns its result, if available.
           *
           * @param timeout the maximum time to wait
           * @param unit the time unit of the timeout argument
           * @return the result value
           * @throws CancellationException if this future was cancelled
           * @throws ExecutionException if this future completed exceptionally
           * @throws InterruptedException if the current thread was interrupted
           * while waiting
           * @throws TimeoutException if the wait timed out
           */
          public T get(long timeout, TimeUnit unit)
              throws InterruptedException, ExecutionException, TimeoutException {
              Object r;
              long nanos = unit.toNanos(timeout);
              return reportGet((r = result) == null ? timedGet(nanos) : r);
          }
      
        /**
           * Returns raw result after waiting, or null if interrupted, or
           * throws TimeoutException on timeout.
           */
          private Object timedGet(long nanos) throws TimeoutException {
              if (Thread.interrupted())
                  return null;
              if (nanos <= 0L)
                  throw new TimeoutException();
              long d = System.nanoTime() + nanos;
              Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
              boolean queued = false;
              Object r;
              // We intentionally don't spin here (as waitingGet does) because
              // the call to nanoTime() above acts much like a spin.
              while ((r = result) == null) {
                  if (!queued)
                      queued = tryPushStack(q);
                  else if (q.interruptControl < 0 || q.nanos <= 0L) {
                      q.thread = null;
                      cleanStack();
                      if (q.interruptControl < 0)
                          return null;
                      throw new TimeoutException();
                  }
                  else if (q.thread != null && result == null) {
                      try {
                          ForkJoinPool.managedBlock(q);
                      } catch (InterruptedException ie) {
                          q.interruptControl = -1;
                      }
                  }
              }
              if (q.interruptControl < 0)
                  r = null;
              q.thread = null;
              postComplete();
              return r;
          }
      

      java.util.concurrent.Future#cancel 也一样

      /**
       * Attempts to cancel execution of this task.  This attempt will
       * fail if the task has already completed, has already been cancelled,
       * or could not be cancelled for some other reason. If successful,
       * and this task has not started when {@code cancel} is called,
       * this task should never run.  If the task has already started,
       * then the {@code mayInterruptIfRunning} parameter determines
       * whether the thread executing this task should be interrupted in
       * an attempt to stop the task.
       *
       * <p>After this method returns, subsequent calls to {@link #isDone} will
       * always return {@code true}.  Subsequent calls to {@link #isCancelled}
       * will always return {@code true} if this method returned {@code true}.
       *
       * @param mayInterruptIfRunning {@code true} if the thread executing this
       * task should be interrupted; otherwise, in-progress tasks are allowed
       * to complete
       * @return {@code false} if the task could not be cancelled,
       * typically because it has already completed normally;
       * {@code true} otherwise
       */
      boolean cancel(boolean mayInterruptIfRunning);
      

      java.util.concurrent.FutureTask#cancel

      public boolean cancel(boolean mayInterruptIfRunning) {
              if (!(state == NEW &&
                    UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                  return false;
              try {    // in case call to interrupt throws exception
                  if (mayInterruptIfRunning) {
                      try {
                          Thread t = runner;
                          if (t != null)
                              t.interrupt();
                      } finally { // final state
                          UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                      }
                  }
              } finally {
                  finishCompletion();
              }
              return true;
          }
      

      可以看到 mayInterruptIfRunning 为 true 时,会执行 Thread#interrupt 方法

      java.util.concurrent.CompletableFuture#cancel

          /**
           * If not already completed, completes this CompletableFuture with
           * a {@link CancellationException}. Dependent CompletableFutures
           * that have not already completed will also complete
           * exceptionally, with a {@link CompletionException} caused by
           * this {@code CancellationException}.
           *
           * @param mayInterruptIfRunning this value has no effect in this
           * implementation because interrupts are not used to control
           * processing.
           *
           * @return {@code true} if this task is now cancelled
           */
          public boolean cancel(boolean mayInterruptIfRunning) {
              boolean cancelled = (result == null) &&
                  internalComplete(new AltResult(new CancellationException()));
              postComplete();
              return cancelled || isCancelled();
          }
      

      通过注释我们也发现,不同的实现类对参数的“效果”也有差异。

      四、总结

      我们学习时不应该想当然,不能纸上谈兵,对于不太理解的地方,可以多看源码注释,多看源码,多写 DEMO 去模拟或调试。

      声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。