场景
我们的一个数据处理项目, 通过接入第三方平台的 WebSocket 进行数据处理。
最初,我们为单个平台构建了一个线程池,直接分配任务。随着项目发展,需要处理更多平台的数据,但硬件资源有限。为了隔离不同平台的业务,我们为每个平台的长链接都创建了一个独立的线程池。
遇到的问题
- 优先级处理困难
尽管多个线程池实现了业务隔离,但由于 CPU 调度的非确定性,无法保证高优先级任务的及时处理。 - 资源争夺
在同一节点上运行多个服务,且需要保证主数据源的优先处理,这要求我们对线程池进行优化。
解决方案
- 统一线程池,增强优先级处理
为了解决上述问题,我们将多个线程池合并为一个。同时,自定义了 PriorityTask 任务类,以便对任务进行优先级排序。 - 最小化业务侵入
在实现过程中,我们尽量避免对现有业务逻辑造成大的改动。
示例
自定义线程任务 PriorityTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
/**
* 递归泛型约束
* </p>
* 这实现Comparable 的自己, 进行约束效果, 确保比较操作仅在相同类型(PriorityTask<T>)的对象之间进行。
**/
public class PriorityTask<T> extends FutureTask<T> implements Comparable<PriorityTask<T>> {
private final int priority;
public PriorityTask(Callable<T> callable, int priority) {
super(callable);
this.priority = priority;
}
public PriorityTask(Runnable runnable, T result, int priority) {
super(runnable, result);
this.priority = priority;
}
public int compareTo(PriorityTask<T> other) {
// 优先级高的排在前面
return Integer.compare(other.priority, this.priority);
}
}将线程任务构造接入自定义线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60import jakarta.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
public class ClientPoolComponent {
private ThreadPoolExecutor threadPoolExecutor;
private final int THREAD_NUM = Runtime.getRuntime().availableProcessors();
private final int MAX_QUEUE_SIZE = 300;
public void init() {
threadPoolExecutor = new ThreadPoolExecutor(
THREAD_NUM * 10,
THREAD_NUM * 10,
60L,
TimeUnit.SECONDS,
new PriorityBlockingQueue<>(MAX_QUEUE_SIZE),
new CustomizeThreadFactory("SPIDER-POOL"),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}
public <T> Future<T> submit(Callable<T> task, int priority) {
PriorityTask<T> priorityTask = new PriorityTask<>(task, priority);
// 注意此处是 execute 而不是 submit
threadPoolExecutor.execute(priorityTask);
// 返回正确类型的 Future<T>
return priorityTask;
}
public <T> Future<T> submit(Runnable task, T result, int priority) {
PriorityTask<T> priorityTask = new PriorityTask<>(task, result, priority);
// 注意此处是 execute 而不是 submit
threadPoolExecutor.execute(priorityTask);
// 返回正确类型的 Future<T>
return priorityTask;
}
/**
* execute()方法, 如果需要抛出异常,则使用wrap
* */
public void execute(Runnable command, int priority) {
// 注意这里的泛型为null
threadPoolExecutor.execute(new PriorityTask<>(wrap(command), null, priority));
}
private Runnable wrap(Runnable command) {
return () -> {
try {
command.run();
} catch (Exception e) {
log.error("Exception in task execution: ", e); // 记录异常日志
throw e; // 如果需要,可以重新抛出异常
}
};
}
}使用线程池
1
2
3
4
5
6
7
8
9
10
11
12
privat ClientPoolComponent component;
// 这里为了测试, 将corePoolSize 调为0, 保证提交后执行
// low
component.execute(()->doSomething(),1);
// middle
component.execute(()->doSomething(),5);
//high
component.execute(()->doSomething(),10);
这样就可以在多个平台中, 让需要优先执行的平台, 优先执行对应的线程。
代码记录
Code: https://github.com/swzxsyh/Case
Rivision Num: a1a289cd65954772823934e85b2b73e3a03281bf