0%

Java 自定义优先级线程

场景

我们的一个数据处理项目, 通过接入第三方平台的 WebSocket 进行数据处理。
最初,我们为单个平台构建了一个线程池,直接分配任务。随着项目发展,需要处理更多平台的数据,但硬件资源有限。为了隔离不同平台的业务,我们为每个平台的长链接都创建了一个独立的线程池。

遇到的问题

  • 优先级处理困难
    尽管多个线程池实现了业务隔离,但由于 CPU 调度的非确定性,无法保证高优先级任务的及时处理。
  • 资源争夺
    在同一节点上运行多个服务,且需要保证主数据源的优先处理,这要求我们对线程池进行优化。

解决方案

  • 统一线程池,增强优先级处理
    为了解决上述问题,我们将多个线程池合并为一个。同时,自定义了 PriorityTask 任务类,以便对任务进行优先级排序。
  • 最小化业务侵入
    在实现过程中,我们尽量避免对现有业务逻辑造成大的改动。

示例

  1. 自定义线程任务 PriorityTask

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    import java.util.concurrent.Callable;
    import java.util.concurrent.FutureTask;

    /**
    * 递归泛型约束
    * </p>
    * 这实现Comparable 的自己, 进行约束效果, 确保比较操作仅在相同类型(PriorityTask<T>)的对象之间进行。
    **/
    public class PriorityTask<T> extends FutureTask<T> implements Comparable<PriorityTask<T>> {
    private final int priority;

    public PriorityTask(Callable<T> callable, int priority) {
    super(callable);
    this.priority = priority;
    }

    public PriorityTask(Runnable runnable, T result, int priority) {
    super(runnable, result);
    this.priority = priority;
    }

    @Override
    public int compareTo(PriorityTask<T> other) {
    // 优先级高的排在前面
    return Integer.compare(other.priority, this.priority);
    }
    }

  2. 将线程任务构造接入自定义线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    import jakarta.annotation.PostConstruct;
    import org.springframework.stereotype.Component;

    import java.util.concurrent.*;

    @Component
    public class ClientPoolComponent {

    private ThreadPoolExecutor threadPoolExecutor;
    private final int THREAD_NUM = Runtime.getRuntime().availableProcessors();
    private final int MAX_QUEUE_SIZE = 300;

    @PostConstruct
    public void init() {
    threadPoolExecutor = new ThreadPoolExecutor(
    THREAD_NUM * 10,
    THREAD_NUM * 10,
    60L,
    TimeUnit.SECONDS,
    new PriorityBlockingQueue<>(MAX_QUEUE_SIZE),
    new CustomizeThreadFactory("SPIDER-POOL"),
    new ThreadPoolExecutor.DiscardOldestPolicy()
    );
    }

    public <T> Future<T> submit(Callable<T> task, int priority) {
    PriorityTask<T> priorityTask = new PriorityTask<>(task, priority);
    // 注意此处是 execute 而不是 submit
    threadPoolExecutor.execute(priorityTask);
    // 返回正确类型的 Future<T>
    return priorityTask;
    }

    public <T> Future<T> submit(Runnable task, T result, int priority) {
    PriorityTask<T> priorityTask = new PriorityTask<>(task, result, priority);
    // 注意此处是 execute 而不是 submit
    threadPoolExecutor.execute(priorityTask);
    // 返回正确类型的 Future<T>
    return priorityTask;
    }

    /**
    * execute()方法, 如果需要抛出异常,则使用wrap
    * */
    public void execute(Runnable command, int priority) {
    // 注意这里的泛型为null
    threadPoolExecutor.execute(new PriorityTask<>(wrap(command), null, priority));
    }

    private Runnable wrap(Runnable command) {
    return () -> {
    try {
    command.run();
    } catch (Exception e) {
    log.error("Exception in task execution: ", e); // 记录异常日志
    throw e; // 如果需要,可以重新抛出异常
    }
    };
    }
    }
  3. 使用线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    @Autowired
    privat ClientPoolComponent component;

    // 这里为了测试, 将corePoolSize 调为0, 保证提交后执行

    // low
    component.execute(()->doSomething(),1);
    // middle
    component.execute(()->doSomething(),5);
    //high
    component.execute(()->doSomething(),10);

这样就可以在多个平台中, 让需要优先执行的平台, 优先执行对应的线程。

代码记录

Code: https://github.com/swzxsyh/Case
Rivision Num: a1a289cd65954772823934e85b2b73e3a03281bf