0%

Java 自定义优先级线程池使用问题记录

execute()异常未抛出

现象

  • 代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Autowired
    private ClientPoolComponent component;

    @Test
    public void executeExceptionTest() {
    log.info("start executeExceptionTest");
    component.execute(this::executeError, 0);
    log.info("end executeExceptionTest");
    }

    private void executeError() {
    throw new RuntimeException("execute exception");
    }
  • 日志

    1
    2
    CASE  2024-10-05 18:55:33.332  INFO 28504 --- [           main] record.service.ExecutorsComponentTest    : start executeExceptionTest
    CASE 2024-10-05 18:55:33.333 INFO 28504 --- [ main] record.service.ExecutorsComponentTest : end executeExceptionTest

原因

在 JDK环境 默认线程池, 在 方法 java.util.concurrent.ThreadPoolExecutor#runWorker 中触发了异常,进入throw ex状态, 异常抛出, 日志打印

1
2
3
4
5
6
7
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}

但是自定义线程池中,由于使用了自定义PriorityTask, Runnable.run() 方法的签名不支持抛出受检异常,因此任何异常都不会自动冒泡或向外抛出。问题根源在于 Runnable 的限制。

当使用 threadPoolExecutor.execute() 时,Runnable.run() 的执行不会抛出异常到调用者,即使内部抛出 RuntimeException 或其他未检查异常,它也不会像 Callable 那样有返回值(比如 Future),从而通过 get() 方法捕获异常。
这里使用了 PriorityTask<>(command, null, priority),并且没有捕获到 command 内部的异常,因为 Runnable.run() 方法不会传播异常。

解决

  • 针对自定义优先级线程池, 添加异常捕获抛出方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public void execute(Runnable command, int priority) {
    threadPoolExecutor.execute(new PriorityTask<>(wrap(command), null, priority)); // 注意这里的泛型为null
    }

    private Runnable wrap(Runnable command) {
    return () -> {
    try {
    command.run();
    } catch (Exception e) {
    log.error("Exception in task execution: ", e); // 记录异常日志
    throw e; // 如果需要,可以重新抛出异常
    }
    };
    }

线程状态

线程异常, 线程池移除该线程, 如果线程池 coreSize 不够, 则补充一个新的线程进入

submit()异常

现象

  • code

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Autowired
    private ClientPoolComponent component;

    @Test
    public void submitExceptionTest() throws Exception {
    log.info("start submitExceptionTest");
    Future<Integer> submit = component.submit(this::submitError, 0);
    log.info("submit submitExceptionTest");
    Integer result = submit.get();
    log.info("end submitExceptionTest, result:{}", result);
    }

    private int submitError() {
    throw new RuntimeException("submit exception");
    }
  • log

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    CASE  2024-10-05 19:18:02.219  INFO 29283 --- [           main] record.service.ExecutorsComponentTest    : start submitExceptionTest
    CASE 2024-10-05 19:18:02.219 INFO 29283 --- [ main] record.service.ExecutorsComponentTest : submit submitExceptionTest
    java.util.concurrent.ExecutionException: java.lang.RuntimeException: submit exception
    at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at record.service.ExecutorsComponentTest.submitExceptionTest(ExecutorsComponentTest.java:40)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    Caused by: java.lang.RuntimeException: submit exception
    at record.service.ExecutorsComponentTest.submitError(ExecutorsComponentTest.java:50)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)

原因

  • 调用 submit(Callable) 或 submit(Runnable) 时,线程池会将任务提交并返回一个 Future 对象。
  • Future 对象代表了任务的异步执行结果,可以通过 get() 方法来获取任务的执行结果或异常。
  • 如果不调用 get(),异常不会被抛出到主线程中,无法知道任务是否成功或失败。异常可能存在于任务执行的线程中,但主线程不会收到通知,任务完成后,异常信息会保存在 Future 对象中,但它不会主动传播或处理,只有调用 get() 时才能显式获取这个异常。
  • 如果调用 get(),它将阻塞主线程,直到任务完成。如果任务执行过程中抛出了异常,get() 会将异常重新抛出
  • 线程的状态:即使不调用 get(),任务线程仍然会按计划执行并完成(或失败),它不会“停留”在线程池中。任务线程执行完毕后,线程池可能会重用这个线程去处理其他任务,但任务本身的结果或异常会保存在 Future 中,直到你调用 get()。

线程状态

在get()方法时,手动进行捕获。该线程不会被线程池移除,也不会补充新的线程进入。

@Async与自定义线程池抢占

现象

  • 定时任务调用的方法,配置了@Async,但是观察现象仍然阻塞了所有的线程。

原因

  • @Async 注解的方法如果没有配置独立的线程池,会默认使用 Spring 提供的 SimpleAsyncTaskExecutor 或者其他默认线程池,而这个线程池可能与主线程资源共享。
  • 我的线程池策略当时配置为CallerRunsPolicy, 由于高频数据, 导致队列满, 最终也与其他线程共同争取主线程。

解决

  • 由于未给@Async配置独立线程池, 导致与所有任务共同争抢主线程, 导致堵塞。这里给@Async配置对应的线程池即可

  • 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

    import java.util.concurrent.Executor;

    @EnableAsync
    @Configuration
    public class AsyncConfig {

    @Bean(name = "asyncTaskExecutor")
    public Executor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(500);
    executor.setThreadNamePrefix("Async-");
    executor.initialize();
    return executor;
    }
    }
  • 调用

    1
    2
    3
    4
    @Async("asyncTaskExecutor")
    public void pull() {
    // do something...
    }

代码记录

Code: https://github.com/swzxsyh/Case
Rivision Num: 65f906e413170680a7e674c8c54d143f6fd93273