execute()异常未抛出
现象
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
private ClientPoolComponent component;
public void executeExceptionTest() {
log.info("start executeExceptionTest");
component.execute(this::executeError, 0);
log.info("end executeExceptionTest");
}
private void executeError() {
throw new RuntimeException("execute exception");
}日志
1
2CASE 2024-10-05 18:55:33.332 INFO 28504 --- [ main] record.service.ExecutorsComponentTest : start executeExceptionTest
CASE 2024-10-05 18:55:33.333 INFO 28504 --- [ main] record.service.ExecutorsComponentTest : end executeExceptionTest
原因
在 JDK环境 默认线程池, 在 方法 java.util.concurrent.ThreadPoolExecutor#runWorker 中触发了异常,进入throw ex状态, 异常抛出, 日志打印
1 | try { |
但是自定义线程池中,由于使用了自定义PriorityTask, Runnable.run() 方法的签名不支持抛出受检异常,因此任何异常都不会自动冒泡或向外抛出。问题根源在于 Runnable 的限制。
当使用 threadPoolExecutor.execute() 时,Runnable.run() 的执行不会抛出异常到调用者,即使内部抛出 RuntimeException 或其他未检查异常,它也不会像 Callable 那样有返回值(比如 Future),从而通过 get() 方法捕获异常。
这里使用了 PriorityTask<>(command, null, priority),并且没有捕获到 command 内部的异常,因为 Runnable.run() 方法不会传播异常。
解决
- 针对自定义优先级线程池, 添加异常捕获抛出方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14public void execute(Runnable command, int priority) {
threadPoolExecutor.execute(new PriorityTask<>(wrap(command), null, priority)); // 注意这里的泛型为null
}
private Runnable wrap(Runnable command) {
return () -> {
try {
command.run();
} catch (Exception e) {
log.error("Exception in task execution: ", e); // 记录异常日志
throw e; // 如果需要,可以重新抛出异常
}
};
}
线程状态
线程异常, 线程池移除该线程, 如果线程池 coreSize 不够, 则补充一个新的线程进入
submit()异常
现象
code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private ClientPoolComponent component;
public void submitExceptionTest() throws Exception {
log.info("start submitExceptionTest");
Future<Integer> submit = component.submit(this::submitError, 0);
log.info("submit submitExceptionTest");
Integer result = submit.get();
log.info("end submitExceptionTest, result:{}", result);
}
private int submitError() {
throw new RuntimeException("submit exception");
}log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15CASE 2024-10-05 19:18:02.219 INFO 29283 --- [ main] record.service.ExecutorsComponentTest : start submitExceptionTest
CASE 2024-10-05 19:18:02.219 INFO 29283 --- [ main] record.service.ExecutorsComponentTest : submit submitExceptionTest
java.util.concurrent.ExecutionException: java.lang.RuntimeException: submit exception
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at record.service.ExecutorsComponentTest.submitExceptionTest(ExecutorsComponentTest.java:40)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: java.lang.RuntimeException: submit exception
at record.service.ExecutorsComponentTest.submitError(ExecutorsComponentTest.java:50)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
原因
- 调用 submit(Callable) 或 submit(Runnable) 时,线程池会将任务提交并返回一个 Future 对象。
- Future 对象代表了任务的异步执行结果,可以通过 get() 方法来获取任务的执行结果或异常。
- 如果不调用 get(),异常不会被抛出到主线程中,无法知道任务是否成功或失败。异常可能存在于任务执行的线程中,但主线程不会收到通知,任务完成后,异常信息会保存在 Future 对象中,但它不会主动传播或处理,只有调用 get() 时才能显式获取这个异常。
- 如果调用 get(),它将阻塞主线程,直到任务完成。如果任务执行过程中抛出了异常,get() 会将异常重新抛出
- 线程的状态:即使不调用 get(),任务线程仍然会按计划执行并完成(或失败),它不会“停留”在线程池中。任务线程执行完毕后,线程池可能会重用这个线程去处理其他任务,但任务本身的结果或异常会保存在 Future 中,直到你调用 get()。
线程状态
在get()方法时,手动进行捕获。该线程不会被线程池移除,也不会补充新的线程进入。
@Async与自定义线程池抢占
现象
- 定时任务调用的方法,配置了@Async,但是观察现象仍然阻塞了所有的线程。
原因
- @Async 注解的方法如果没有配置独立的线程池,会默认使用 Spring 提供的 SimpleAsyncTaskExecutor 或者其他默认线程池,而这个线程池可能与主线程资源共享。
- 我的线程池策略当时配置为CallerRunsPolicy, 由于高频数据, 导致队列满, 最终也与其他线程共同争取主线程。
解决
由于未给@Async配置独立线程池, 导致与所有任务共同争抢主线程, 导致堵塞。这里给@Async配置对应的线程池即可
配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
public class AsyncConfig {
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("Async-");
executor.initialize();
return executor;
}
}调用
1
2
3
4
public void pull() {
// do something...
}
代码记录
Code: https://github.com/swzxsyh/Case
Rivision Num: 65f906e413170680a7e674c8c54d143f6fd93273