0%

场景

数据处理项目, 新数据源的数据不是100%完整, 会由多个topic组合出一条完整数据, 处理后发送到项目。 可是具体完成的topic未知, 为保证尽可能将数据推送,需要一个延迟队列。

遇到的问题

项目没有redis资源, MQ只有发送功能, 需要定期将之前异常的消息进行重试

解决方案

使用Java自带的DelayQueue, 延迟指定时间, 重启丢失问题忽略, 通过其他定时任务手段进行补全

由于是Java本地数据, 因此对象创建需要保证不会过多, 信息量不能过大, 避免OOM问题

示例

  1. 创建实现类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;

    public class CustomizeDelayQueue implements Delayed {

    public Long time;

    public Long id;


    public CustomizeDelayQueue(Long time, Long id, TimeUnit unit) {
    this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    this.id = id;
    }

    @Override
    public long getDelay(TimeUnit unit) {
    return time - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
    CustomizeDelayQueue delay = (CustomizeDelayQueue) o;
    long diff = this.time - delay.time;
    return diff <= 0 ? -1 : 1;
    }
    }
  2. 创建自定义Pool

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28

    import com.record.model.CustomizeDelayQueue;
    import org.springframework.stereotype.Component;

    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.TimeUnit;

    @Component
    public class DelayQueueComponent {

    DelayQueue<CustomizeDelayQueue> delayQueue = new DelayQueue<>();

    public void put(Long time, Long id, TimeUnit unit) {
    delayQueue.put(new CustomizeDelayQueue(time, id, unit));
    }

    public CustomizeDelayQueue poll() {
    return delayQueue.poll();
    }

    public int size() {
    return delayQueue.size();
    }

    public void offer(CustomizeDelayQueue obj) {
    delayQueue.offer(obj);
    }
    }
  3. 调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Slf4j
    public class DelayQueueTest extends CaseApplicationTest {

    @Autowired
    private DelayQueueComponent queueComponent;

    @Test
    public void delayTest() {
    log.info("start delayTest");
    queueComponent.put(10L, 1L, TimeUnit.SECONDS);
    while (true) {
    if (queueComponent.size() > 0) {
    CustomizeDelayQueue object = queueComponent.poll();
    if (Objects.nonNull(object)) {
    log.info("poll:{}", object);
    // do something
    Long id = object.id;
    log.info("id:{}", id);

    break;
    }
    }
    }
    }
1
2
3
CASE  2024-10-27 00:43:06.167  INFO 75635 --- [           main] record.service.DelayQueueTest            : start delayTest
CASE 2024-10-27 00:43:16.167 INFO 75635 --- [ main] record.service.DelayQueueTest : poll:com.record.model.CustomizeDelayQueue@7ee2423
CASE 2024-10-27 00:43:16.167 INFO 75635 --- [ main] record.service.DelayQueueTest : id:1

代码记录

Code: https://github.com/swzxsyh/Case
Rivision Num: 7196623c7e6bcaf0f6566f3c0eb272c79d7d923e

前言

之前开发的是Java, 这次希望通过C#编写一些程式,但是两者结构不同, 在此记录

本机环境

Chip: Apple M3 Pro
System: macOS 15.0

安装基本环境

.NET

官方链接: https://dotnet.microsoft.com/zh-cn/download/dotnet
下载选择 macOS Arm64 即可

IDE Rider

官方链接: https://www.jetbrains.com/rider/

项目创建

  1. 创建空文件夹,使用Rider打开, New Solution / 开启Rider, 左侧选择MAUI项目
  2. 设置ProjectName 环境 net8.0 C# MAUI App
  3. 自动生成template文件

预处理

  • 执行如下命令, 否则项目测试启动异常
    1
    sudo xcodebuild -runFirstLaunch
  • 修改Nuget配置, 否则依赖包全部下到默认路径 ~/.nuget/packages
    • 使用VsCode或任何编辑器打开 /Users/用户名/.nuget/NuGet/NuGet.Config
    • 我的默认样式
      1
      2
      3
      4
      5
      6
      <?xml version="1.0" encoding="utf-8"?>
      <configuration>
      <packageSources>
      <add key="nuget.org" value="https://api.nuget.org/v3/index.json" protocolVersion="3" />
      </packageSources>
      </configuration>
    • 创建自定义文件夹
      1
      mkdir -p /path/to/your/floder
    • 修改配置为
      1
      2
      3
      4
      5
      6
      7
      8
      9
      <?xml version="1.0" encoding="utf-8"?>
      <configuration>
      <packageSources>
      <add key="nuget.org" value="https://api.nuget.org/v3/index.json" protocolVersion="3" />
      </packageSources>
      <config>
      <add key="globalPackagesFolder" value="/path/to/your/floder" />
      </config>
      </configuration>
    • 存储后验证, 应该就是刚刚配置的自定义路径
      1
      dotnet nuget locals global-packages -l

文件基本结构

  • 这里跟Java不同, 不是src 向下, 结构如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    $ ls -l
    total 64
    -rw-r--r-- 1 swzxsyh staff 653 Oct 6 04:32 App.xaml
    -rw-r--r-- 1 swzxsyh staff 159 Oct 6 04:32 App.xaml.cs
    -rw-r--r-- 1 swzxsyh staff 420 Oct 6 04:32 AppShell.xaml
    -rw-r--r-- 1 swzxsyh staff 127 Oct 6 04:32 AppShell.xaml.cs
    -rw-r--r-- 1 swzxsyh staff 3695 Oct 6 04:32 MyProject.csproj
    -rw-r--r-- 1 swzxsyh staff 1316 Oct 6 04:32 MainPage.xaml
    -rw-r--r-- 1 swzxsyh staff 451 Oct 6 04:32 MainPage.xaml.cs
    -rw-r--r-- 1 swzxsyh staff 545 Oct 6 04:32 MauiProgram.cs
    drwxr-xr-x 7 swzxsyh staff 224 Oct 6 04:32 Platforms
    drwxr-xr-x 3 swzxsyh staff 96 Oct 6 04:32 Properties
    drwxr-xr-x 8 swzxsyh staff 256 Oct 6 04:32 Resources
    drwxr-xr-x 3 swzxsyh staff 96 Oct 6 04:32 bin
    drwxr-xr-x 11 swzxsyh staff 352 Oct 7 02:30 obj
  • Platforms/ 包含不同平台(Android、iOS、macOS、Windows)相关的代码

    • Platforms/Android:Android 平台的特定配置和代码。
    • Platforms/iOS:iOS 平台的特定配置和代码。
  • App.xamlApp.xaml.cs
    NET MAUI 的入口点,类似于 Java 中的 Main 类。可以在这里定义全局资源和应用程序启动逻辑。

  • MainPage.xamlMainPage.xaml.cs:类似于 Java 的视图控制器,定义主页面的 UI 和逻辑。xaml 是用于定义 UI 的文件,而 cs 文件是用来处理 UI 逻辑的 C# 代码。

  • *.csproj 类似于Java中的 pom.xmlbuild.gradle 文件, 负责处理依赖

到这里,点击Run应该就可以正常运行

场景

我们的一个数据处理项目, 通过接入第三方平台的 WebSocket 进行数据处理。
最初,我们为单个平台构建了一个线程池,直接分配任务。随着项目发展,需要处理更多平台的数据,但硬件资源有限。为了隔离不同平台的业务,我们为每个平台的长链接都创建了一个独立的线程池。

遇到的问题

  • 优先级处理困难
    尽管多个线程池实现了业务隔离,但由于 CPU 调度的非确定性,无法保证高优先级任务的及时处理。
  • 资源争夺
    在同一节点上运行多个服务,且需要保证主数据源的优先处理,这要求我们对线程池进行优化。

解决方案

  • 统一线程池,增强优先级处理
    为了解决上述问题,我们将多个线程池合并为一个。同时,自定义了 PriorityTask 任务类,以便对任务进行优先级排序。
  • 最小化业务侵入
    在实现过程中,我们尽量避免对现有业务逻辑造成大的改动。

示例

  1. 自定义线程任务 PriorityTask

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    import java.util.concurrent.Callable;
    import java.util.concurrent.FutureTask;

    /**
    * 递归泛型约束
    * </p>
    * 这实现Comparable 的自己, 进行约束效果, 确保比较操作仅在相同类型(PriorityTask<T>)的对象之间进行。
    **/
    public class PriorityTask<T> extends FutureTask<T> implements Comparable<PriorityTask<T>> {
    private final int priority;

    public PriorityTask(Callable<T> callable, int priority) {
    super(callable);
    this.priority = priority;
    }

    public PriorityTask(Runnable runnable, T result, int priority) {
    super(runnable, result);
    this.priority = priority;
    }

    @Override
    public int compareTo(PriorityTask<T> other) {
    // 优先级高的排在前面
    return Integer.compare(other.priority, this.priority);
    }
    }

  2. 将线程任务构造接入自定义线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    import jakarta.annotation.PostConstruct;
    import org.springframework.stereotype.Component;

    import java.util.concurrent.*;

    @Component
    public class ClientPoolComponent {

    private ThreadPoolExecutor threadPoolExecutor;
    private final int THREAD_NUM = Runtime.getRuntime().availableProcessors();
    private final int MAX_QUEUE_SIZE = 300;

    @PostConstruct
    public void init() {
    threadPoolExecutor = new ThreadPoolExecutor(
    THREAD_NUM * 10,
    THREAD_NUM * 10,
    60L,
    TimeUnit.SECONDS,
    new PriorityBlockingQueue<>(MAX_QUEUE_SIZE),
    new CustomizeThreadFactory("SPIDER-POOL"),
    new ThreadPoolExecutor.DiscardOldestPolicy()
    );
    }

    public <T> Future<T> submit(Callable<T> task, int priority) {
    PriorityTask<T> priorityTask = new PriorityTask<>(task, priority);
    // 注意此处是 execute 而不是 submit
    threadPoolExecutor.execute(priorityTask);
    // 返回正确类型的 Future<T>
    return priorityTask;
    }

    public <T> Future<T> submit(Runnable task, T result, int priority) {
    PriorityTask<T> priorityTask = new PriorityTask<>(task, result, priority);
    // 注意此处是 execute 而不是 submit
    threadPoolExecutor.execute(priorityTask);
    // 返回正确类型的 Future<T>
    return priorityTask;
    }

    /**
    * execute()方法, 如果需要抛出异常,则使用wrap
    * */
    public void execute(Runnable command, int priority) {
    // 注意这里的泛型为null
    threadPoolExecutor.execute(new PriorityTask<>(wrap(command), null, priority));
    }

    private Runnable wrap(Runnable command) {
    return () -> {
    try {
    command.run();
    } catch (Exception e) {
    log.error("Exception in task execution: ", e); // 记录异常日志
    throw e; // 如果需要,可以重新抛出异常
    }
    };
    }
    }
  3. 使用线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    @Autowired
    privat ClientPoolComponent component;

    // 这里为了测试, 将corePoolSize 调为0, 保证提交后执行

    // low
    component.execute(()->doSomething(),1);
    // middle
    component.execute(()->doSomething(),5);
    //high
    component.execute(()->doSomething(),10);

这样就可以在多个平台中, 让需要优先执行的平台, 优先执行对应的线程。

代码记录

Code: https://github.com/swzxsyh/Case
Rivision Num: a1a289cd65954772823934e85b2b73e3a03281bf

场景

  • 项目JDK版本升级, 从JDK8升至JDK 21,Spring Boot设置为3.3.4
  • JDK和Spring Boot在该版本均支持虚拟线程
  • 业务属于接收数据, 数据清洗, 存储数据库逻辑, 较轻量化
  • 开发机服务, 向SIT环境RabbitMQ发送数据

异常

将原本自定义线程池, 直接替换为 ExecutorService executors = Executors.newVirtualThreadPerTaskExecutor() 后,触发了RabbitMQ的 ChannelMax reached异常

定位问题

  1. 尝试业务RabbitMQ的Config限制使用channel数量

    1
    2
    3
    ConnectionFactory factory = new ConnectionFactory();
    factory.setChannelMax(50); // 限制每个连接的最大通道数
    Connection connection = factory.newConnection();

    测试无效

  2. 由于是测试状态, 直接发送给Queue, 尝试添加fanout交换机, 不等待response
    测试无效

  3. 查看当前每次都是OOM后, 出现该异常

  • Heap Dump检查几乎都是byte[]数据, 定位问题到定时抓取数据业务, 细化方法, 提前提取需要的数据, 将传入值设置为null, 触发尽快GC业务
    解决了OOM的问题, 但仍然会出现该错误
  1. 网络问题
  • 由于是本地电脑网络, 与服务器在其他国家, 中途需要代理,所以经常出现异常
    1
    2
    3
    clean channel shutdown; protocol method: #method<channel.close>(reply-code=406, reply-text=TIMEOUT WAITING FOR ACK, class-id=0, method-id=0)
    Shutdown Signal: clean channel shutdown; protocol method: #method<channel.close>(reply-code=406, reply-text=TIMEOUT WAITING FOR ACK, class-id=0, method-id=0)
    Received a frame on an unknown channel, ignoring it

推测原因

  1. 虚拟线程无限创建线程
  2. 猜测为平台数据频率过高, 且每一个虚拟线程都创建Channel
  3. 网络稳定性问题, 经常出现channel异常

由于接收端频率高, 因此一直发送消息, 但与MQ网络环境不稳定问题, 导致channel经常还未成功销毁时就新建, 并且是虚拟线程, 会无限创建对应数据的新channel

处理

由于MQ是多个服务公用的, 所以不对 channel_max 进行更改

  1. 还原为原本的自定义ClientPoolComponent, 通过复用线程减少channel频繁创建频率
  2. 优化网络环境

场景

  • 多层WebSocket
    使用Spring Boot框架,接入长连接WebSocket-Client,接入第三方平台pushtech,对其进行了封装,需要pom引入使用diffusion。
  • 连接状态管理
    正常情况下,使用@Bean保证WebSocket连接的单例,并定时检查连接状态,进行重连。

异常

  • CLOSED_FAILED状态不可恢复
    这是pushtech平台的限制,意味着一旦进入该状态,当前连接无法恢复。
  • @Bean失效
    Spring的@Bean注解保证了Bean的单例,但在CLOSED_FAILED状态后,Bean的状态可能已经不可恢复,导致后续无法正常工作。

处理

可能有其他更好的处理方式,但我还未找到,所以使用增强单例模式处理, 如果有更好的办法请指导下, 谢谢您

  • 处理方式 生产Session

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39

    import com.pushtechnology.diffusion.client.Diffusion;
    import com.pushtechnology.diffusion.client.session.Session;
    import com.pushtechnology.diffusion.client.session.reconnect.ReconnectionStrategy;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;


    @Slf4j
    @Component
    public class DiffusionFactory {

    /**
    * 多例模式,使用原因:当出现 CLOSED_FAILED 状态时,无法恢复
    * SDK ##: The session has lost its connection to a server and could not be recovered.
    * <р>
    * 但为避免 多处创建 或OOM 问题还是全局SingLeton模式
    *
    * @return Session
    */
    public Session session() {
    //session有 Autocloseable,这里尝试只调用cLose()方法,让其自动释放
    return Diffusion.sessions()
    .principal("username")
    .password("pwd")
    .listener((currentSession, oldState, newstate) -> log.info("Session state changed from {} to {}", oldState, newstate))
    .listener((session1, reason, thr) -> log.warn("Session warning: {}, e:{}", reason, thr))
    .maximumQueueSize(2)
    .noReconnection().reconnectionTimeout(0)
    .errorHandler((errorSession, errorReason) -> {
    //session有 Autocloseable,这里尝试只调用cLose()方法,让其自动释放
    errorSession.close();
    errorSession = null;
    log.error("Session error: {}", errorReason);
    })
    .reconnectionStrategy(ReconnectionStrategy.ReconnectionAttempt::abort)
    .open("wss://xxx");
    }
    }
  • 管理Session

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118

    import com.pushtechnology.diffusion.client.session.Session;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;

    import java.util.HashSet;
    import java.util.Objects;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.stream.Collectors;

    /**
    * Diffusion 会话管理器, 尽量其它位置访问时, 同一时间正常情况下只有一个会话
    */
    @Slf4j
    @Component
    public class DiffusionSessionManager {

    private final DiffusionFactory diffusion;
    private volatile Session session = null;

    /**
    * 需要添加 static 修饰, 否则对于子类是新instance
    */
    public static final ConcurrentHashMap<Session, Set<String>> SUBSCRIBE_MAP = new ConcurrentHashMap<>();
    public static final ConcurrentHashMap<Session, Set<String>> SUBSCRIBE_TOPIC = new ConcurrentHashMap<>();

    @Autowired
    public DiffusionSessionManager(DiffusionFactory diffusion) {
    this.diffusion = diffusion;
    }

    @Scheduled(fixedDelay = 500)
    public void init() {
    // 定时检查会话是否有效
    try {
    session = getSession();
    if (Objects.isNull(session)) {
    log.error("session create error, session is null");
    return;
    }
    if (!isSessionValid(session)) {
    SUBSCRIBE_MAP.remove(session);
    destroy(session);
    }
    // 检查并清理 CLOSED_FAILED 的会话
    Set<Session> failedSession = SUBSCRIBE_MAP.keySet().stream().filter(e -> !isSessionValid(e)).collect(Collectors.toSet());
    failedSession.forEach(this::clear);
    } catch (Exception e) {
    log.error("DiffusionSessionManager error", e);
    }
    }

    public synchronized void clear(Session session) {
    if (Objects.isNull(session) || isSessionValid(session)) {
    return;
    }
    destroy(session);
    if (!isSessionValid(session)) {
    SUBSCRIBE_MAP.remove(session);
    SUBSCRIBE_TOPIC.remove(session);
    session = null;
    }
    }

    public synchronized Session getSession() {
    if (Objects.isNull(session) || session.getState().isClosed() ||
    session.getState() == Session.State.CLOSED_FAILED ||
    session.getState() == Session.State.CLOSED_BY_CLIENT ||
    session.getState() == Session.State.CLOSED_BY_SERVER ||
    session.getState() == Session.State.RECOVERING_RECONNECT) {
    log.info("Get session start, current session state:{}", Objects.isNull(session) ? "null" : session.getState());

    destroy(session);

    // 创建新会话并更新监控
    session = diffusion.session();
    log.info("Get session end, current session state:{}", Objects.isNull(session) ? "null" : session.getState());
    }
    return session;
    }

    private boolean isSessionValid(Session session) {
    return session.getState().equals(Session.State.CONNECTED_ACTIVE) || session.getState().equals(Session.State.CONNECTING);
    }

    public synchronized void destroy(Session session) {
    if (Objects.nonNull(session)) {
    log.info("Destroy, session state:{}", session.getState());
    }
    if (Objects.isNull(session)) {
    return;
    }
    if (!isSessionValid(session)) {
    // Session 有 AutoCloseable,这里尝试只调用close()方法, 让其自动释放
    session.close();
    }
    }

    public boolean containsKey(Session session) {
    return SUBSCRIBE_MAP.containsKey(session);
    }

    public void computeSession(Session session, String topic) {
    SUBSCRIBE_MAP.computeIfAbsent(session, k -> new HashSet<>()).add(topic);
    }

    public boolean ensureSession(Session session, String topic) {
    return SUBSCRIBE_TOPIC.containsKey(session) && SUBSCRIBE_TOPIC.get(session).contains(topic);
    }

    public void computeTopic(Session session, String topic) {
    SUBSCRIBE_TOPIC.computeIfAbsent(session, k -> new HashSet<>()).add(topic);
    }
    }

  • 执行Client

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
    import com.pushtechnology.diffusion.client.features.Topics;
    import com.pushtechnology.diffusion.client.session.Session;
    import com.pushtechnology.diffusion.client.topics.TopicSelector;
    import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
    import com.pushtechnology.diffusion.datatype.json.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;

    @Slf4j
    @Component
    public class AbstractDiffusionClient {

    protected boolean isSessionActive(Session session) {
    return session.getState().isConnected();
    }

    protected void doSubscribe(Topics topics, TopicSelector selector) {
    topics.addStream(selector, JSON.class, new Topics.ValueStream.Default<>() {
    @Override
    public void onSubscription(String topicPath, TopicSpecification specification) {
    super.onSubscription(topicPath, specification);
    }

    @Override
    public void onValue(String topicPath, TopicSpecification specification, JSON oldValue, JSON newValue) {
    String str = newValue.toJsonString();
    // do something
    log.info("onValue:{}", str);
    }

    @Override
    public void onClose() {
    super.onClose();
    }

    @Override
    public void onError(ErrorReason errorReason) {
    super.onError(errorReason);
    }

    @Override
    public void onUnsubscription(String topicPath, TopicSpecification specification, Topics.UnsubscribeReason reason) {
    super.onUnsubscription(topicPath, specification, reason);
    }
    });
    }
    }
  • 订阅逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    import com.pushtechnology.diffusion.client.features.Topics;
    import com.pushtechnology.diffusion.client.session.Session;
    import com.record.platform.diffusion.DiffusionSessionManager;
    import com.record.platform.diffusion.client.AbstractDiffusionClient;
    import com.record.platform.diffusion.constant.TopicConstant;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.concurrent.BasicThreadFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;

    import java.util.Objects;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;

    @Slf4j
    @Service
    public class SubscribeService extends AbstractDiffusionClient {

    @Autowired
    private DiffusionSessionManager manager;

    private final Object lock = new Object();

    static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Subscribe-reconnect-%d").daemon(true).build());

    public void subscribeToTopics() {
    executor.scheduleAtFixedRate(() -> {
    try {
    synchronized (lock) {
    Session session = manager.getSession();
    if (Objects.isNull(session)) {
    log.error("session create error, session is null");
    return;
    }
    if (!manager.containsKey(session)) {
    doFeature(session);
    manager.computeSession(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());
    }
    if (!isSessionActive(session)) {
    manager.clear(session);
    }
    }
    } catch (Exception e) {
    log.error("SubscribeService subscribeToTopics error", e);
    }
    }, 1, 1, TimeUnit.SECONDS);
    }

    private synchronized void doFeature(Session session) {
    Topics topics = session.feature(Topics.class);
    // 检查是否已经订阅了该主题
    boolean resubscribe = manager.ensureSession(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());
    if (resubscribe) {
    log.warn("Diffusion_SUBSCRIBE 当前session重复订阅:{}, session:{}, topic:{}", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), session, topics);
    return;
    }
    // 存储当前会话的 Topics
    manager.computeTopic(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());

    doSubscribe(topics, TopicConstant.SUBSCRIBE_TOPIC);

    topics.subscribe(TopicConstant.SUBSCRIBE_TOPIC)
    .whenComplete((result, ex) -> {
    if (Objects.nonNull(ex)) {
    log.error("Failed to subscribe to topic: {}, e:", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), ex);
    topics.unsubscribe(TopicConstant.SUBSCRIBE_TOPIC);
    subscribeToTopics();
    } else {
    log.info("Subscribed to topic: {},result:{}", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), result);
    }
    });
    }
    }

代码记录

Code: https://github.com/swzxsyh/Case
Rivision Num: e1bda435c075d7685593eac1d21a31732426ed46

execute()异常未抛出

现象

  • 代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Autowired
    private ClientPoolComponent component;

    @Test
    public void executeExceptionTest() {
    log.info("start executeExceptionTest");
    component.execute(this::executeError, 0);
    log.info("end executeExceptionTest");
    }

    private void executeError() {
    throw new RuntimeException("execute exception");
    }
  • 日志

    1
    2
    CASE  2024-10-05 18:55:33.332  INFO 28504 --- [           main] record.service.ExecutorsComponentTest    : start executeExceptionTest
    CASE 2024-10-05 18:55:33.333 INFO 28504 --- [ main] record.service.ExecutorsComponentTest : end executeExceptionTest

原因

在 JDK环境 默认线程池, 在 方法 java.util.concurrent.ThreadPoolExecutor#runWorker 中触发了异常,进入throw ex状态, 异常抛出, 日志打印

1
2
3
4
5
6
7
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}

但是自定义线程池中,由于使用了自定义PriorityTask, Runnable.run() 方法的签名不支持抛出受检异常,因此任何异常都不会自动冒泡或向外抛出。问题根源在于 Runnable 的限制。

当使用 threadPoolExecutor.execute() 时,Runnable.run() 的执行不会抛出异常到调用者,即使内部抛出 RuntimeException 或其他未检查异常,它也不会像 Callable 那样有返回值(比如 Future),从而通过 get() 方法捕获异常。
这里使用了 PriorityTask<>(command, null, priority),并且没有捕获到 command 内部的异常,因为 Runnable.run() 方法不会传播异常。

解决

  • 针对自定义优先级线程池, 添加异常捕获抛出方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public void execute(Runnable command, int priority) {
    threadPoolExecutor.execute(new PriorityTask<>(wrap(command), null, priority)); // 注意这里的泛型为null
    }

    private Runnable wrap(Runnable command) {
    return () -> {
    try {
    command.run();
    } catch (Exception e) {
    log.error("Exception in task execution: ", e); // 记录异常日志
    throw e; // 如果需要,可以重新抛出异常
    }
    };
    }

线程状态

线程异常, 线程池移除该线程, 如果线程池 coreSize 不够, 则补充一个新的线程进入

submit()异常

现象

  • code

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Autowired
    private ClientPoolComponent component;

    @Test
    public void submitExceptionTest() throws Exception {
    log.info("start submitExceptionTest");
    Future<Integer> submit = component.submit(this::submitError, 0);
    log.info("submit submitExceptionTest");
    Integer result = submit.get();
    log.info("end submitExceptionTest, result:{}", result);
    }

    private int submitError() {
    throw new RuntimeException("submit exception");
    }
  • log

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    CASE  2024-10-05 19:18:02.219  INFO 29283 --- [           main] record.service.ExecutorsComponentTest    : start submitExceptionTest
    CASE 2024-10-05 19:18:02.219 INFO 29283 --- [ main] record.service.ExecutorsComponentTest : submit submitExceptionTest
    java.util.concurrent.ExecutionException: java.lang.RuntimeException: submit exception
    at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at record.service.ExecutorsComponentTest.submitExceptionTest(ExecutorsComponentTest.java:40)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    Caused by: java.lang.RuntimeException: submit exception
    at record.service.ExecutorsComponentTest.submitError(ExecutorsComponentTest.java:50)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)

原因

  • 调用 submit(Callable) 或 submit(Runnable) 时,线程池会将任务提交并返回一个 Future 对象。
  • Future 对象代表了任务的异步执行结果,可以通过 get() 方法来获取任务的执行结果或异常。
  • 如果不调用 get(),异常不会被抛出到主线程中,无法知道任务是否成功或失败。异常可能存在于任务执行的线程中,但主线程不会收到通知,任务完成后,异常信息会保存在 Future 对象中,但它不会主动传播或处理,只有调用 get() 时才能显式获取这个异常。
  • 如果调用 get(),它将阻塞主线程,直到任务完成。如果任务执行过程中抛出了异常,get() 会将异常重新抛出
  • 线程的状态:即使不调用 get(),任务线程仍然会按计划执行并完成(或失败),它不会“停留”在线程池中。任务线程执行完毕后,线程池可能会重用这个线程去处理其他任务,但任务本身的结果或异常会保存在 Future 中,直到你调用 get()。

线程状态

在get()方法时,手动进行捕获。该线程不会被线程池移除,也不会补充新的线程进入。

@Async与自定义线程池抢占

现象

  • 定时任务调用的方法,配置了@Async,但是观察现象仍然阻塞了所有的线程。

原因

  • @Async 注解的方法如果没有配置独立的线程池,会默认使用 Spring 提供的 SimpleAsyncTaskExecutor 或者其他默认线程池,而这个线程池可能与主线程资源共享。
  • 我的线程池策略当时配置为CallerRunsPolicy, 由于高频数据, 导致队列满, 最终也与其他线程共同争取主线程。

解决

  • 由于未给@Async配置独立线程池, 导致与所有任务共同争抢主线程, 导致堵塞。这里给@Async配置对应的线程池即可

  • 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

    import java.util.concurrent.Executor;

    @EnableAsync
    @Configuration
    public class AsyncConfig {

    @Bean(name = "asyncTaskExecutor")
    public Executor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(500);
    executor.setThreadNamePrefix("Async-");
    executor.initialize();
    return executor;
    }
    }
  • 调用

    1
    2
    3
    4
    @Async("asyncTaskExecutor")
    public void pull() {
    // do something...
    }

代码记录

Code: https://github.com/swzxsyh/Case
Rivision Num: 65f906e413170680a7e674c8c54d143f6fd93273

场景

  • JDK 8/21
  • 业务使用三元表达式,获取结果抛出空指针
1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
Integer source = 1;
Integer result = Objects.nonNull(source) ? getTarget() : 2;
System.out.println(result);
}

private static Integer getTarget() {
return null;
}

异常

1
2
Exception in thread "main" java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "com.test.service.NullPointService.getTarget()" is null
at com.record.service.NullPointService.main(NullPointService.java:32)

原因

处理

  • 显示判定

    1
    2
    3
    4
    5
    if (Objects.nonNull(source)) {
    if (Objects.isNull(target)) {
    log.error("target is null");
    }
    }
  • 指定类型, 避免触发分支预测时的拆装箱情况

    1
    2
    3
    Integer result = Objects.nonNull(source) ? Optional.ofNullable(target).orElse(null) : Integer.valueOf(2);
    log.info("source:{}, target:{} , result:{}", source, target, result);

代码记录

Code: https://github.com/swzxsyh/Case
Rivision Num: 9e5a514b6f9e57ed18c0989c9f05e22f3bb7b59a

AWS S3 踩坑

第一次用S3,记录一下

1. Key在哪里

  • IAM 设置一个用户组,权限勾选S3 的选项
  • 生成一个专属用户, 划分到用户组
  • 用户处生成密钥,那AccessKey 和 SecretKey

2. 桶策略配置

1
2
3
4
5
6
7
8
9
10
11
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::桶名称/*"
}
]
}

3. Spring Boot怎么连接

  • Maven 找到AWS SDK S3 导入
    1
    2
    3
    4
    5
    6
    <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3 -->
    <dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-s3</artifactId>
    <version>1.12.696</version>
    </dependency>
  • application 填写Region,两个Key
    1
    2
    3
    4
    5
    6
    amazon:
    s3:
    accessKey:
    secretKey:
    region:
    bucketName:
  • 构建单例工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
    /**
* AmazonS3工具类
*/
@Slf4j
@Component
public class AmazonS3Util {

private static String accessKey;
private static String secretKey;
private static String region;


@Value("${amazon.s3.accessKey}")
public void setAccessKey(String accessKey) {
AmazonS3Util.accessKey = accessKey;
}

@Value("${amazon.s3.secretKey}")
public void setSecretKey(String secretKey) {
AmazonS3Util.secretKey = secretKey;
}

@Value("${amazon.s3.region}")
public void setRegion(String region) {
AmazonS3Util.region = region;
}

@Value("${amazon.s3.bucketName}")
public String bucketName;

public static volatile AmazonS3 client;

/**
* singleton
*
* @return OkHttpClient
*/
public static AmazonS3 getInstance() {
if (Objects.isNull(client)) {
synchronized (AmazonS3.class) {
if (Objects.isNull(client)) {
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
ClientConfiguration conf = new ClientConfiguration();
// 设置AmazonS3使用的最大连接数
conf.setMaxConnections(200);
// 设置socket超时时间
conf.setSocketTimeout(10000);
// 设置失败请求重试次数
conf.setMaxErrorRetry(1);
// 如果要用https协议,请加上下面语句
conf.setProtocol(Protocol.HTTPS);
// 设置加密版本
conf.setSignerOverride("AWSS3V4SignerType");
return AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.withRegion(region)
.withClientConfiguration(conf)
.build();
}
}
}
return client;
}

public String uploadFile(String contextType, String fileName, MultipartFile file) {
try (InputStream input = new ByteArrayInputStream(file.getBytes())) {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentType(contextType);
metadata.setHttpExpiresDate(new Date(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(365)));
metadata.setContentLength(input.available());
fileName = UUID.randomUUID().toString().replace("-", "") + fileName;
getInstance().putObject(bucketName, fileName, input, metadata);
return fileName;
} catch (Exception e) {
log.error("上传文件失败", e);
}
return null;
}

public String generateResignedUrl(String fileName) {
URL url = getInstance().generatePresignedUrl(bucketName, fileName, new Date(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(7)), HttpMethod.GET);
return url.toString();
}
}

4. 公网访问文件,是否必须关闭网络访问

屏蔽公共访问权限(存储桶设置) 阻止所有公开访问,获取授权访问可以调用 generatePresignedUrl 但是有时效,S3 SDK获取公网链接,最长7天

5. 使用时遇到的坑

本来想构建几个目录,划分不同职责,之前公司华为云、阿里云上都可以这么做,但是S3 这里有问题,我可以在bucket下增删除查改数据,但是如果我新建目录 image/ 再在这个目录下进行数据CURD操作,则报 SignatureDoesNotMatch 错误.

1
2
<Code>SignatureDoesNotMatch</Code>
<Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message>

我一直以为是我Key有问题,因为一开始我测试文件是放bucket下的,新增目录肯定是我的问题,但是后面在github上发现有人说,配置目录后就是会这样的。

规避方法

文件名添加UUID前缀,放在bucket下。
如果有其他方法,请告知我,谢谢。

其他记录

  1. MultipartFile 中文名乱码,用ISO_8859_1 读取文件名并使用UTF8记录
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static String getFileName(MultipartFile file) {
    try {
    // 尝试解码
    return new String(Objects.requireNonNull(file.getOriginalFilename()).getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8);
    } catch (Exception e) {
    // 返回原名
    return file.getOriginalFilename();
    }
    }