0%

Java 单例模式增强, 手动管理单例Bean

场景

  • 多层WebSocket
    使用Spring Boot框架,接入长连接WebSocket-Client,接入第三方平台pushtech,对其进行了封装,需要pom引入使用diffusion。
  • 连接状态管理
    正常情况下,使用@Bean保证WebSocket连接的单例,并定时检查连接状态,进行重连。

异常

  • CLOSED_FAILED状态不可恢复
    这是pushtech平台的限制,意味着一旦进入该状态,当前连接无法恢复。
  • @Bean失效
    Spring的@Bean注解保证了Bean的单例,但在CLOSED_FAILED状态后,Bean的状态可能已经不可恢复,导致后续无法正常工作。

处理

可能有其他更好的处理方式,但我还未找到,所以使用增强单例模式处理, 如果有更好的办法请指导下, 谢谢您

  • 处理方式 生产Session

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39

    import com.pushtechnology.diffusion.client.Diffusion;
    import com.pushtechnology.diffusion.client.session.Session;
    import com.pushtechnology.diffusion.client.session.reconnect.ReconnectionStrategy;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;


    @Slf4j
    @Component
    public class DiffusionFactory {

    /**
    * 多例模式,使用原因:当出现 CLOSED_FAILED 状态时,无法恢复
    * SDK ##: The session has lost its connection to a server and could not be recovered.
    * <р>
    * 但为避免 多处创建 或OOM 问题还是全局SingLeton模式
    *
    * @return Session
    */
    public Session session() {
    //session有 Autocloseable,这里尝试只调用cLose()方法,让其自动释放
    return Diffusion.sessions()
    .principal("username")
    .password("pwd")
    .listener((currentSession, oldState, newstate) -> log.info("Session state changed from {} to {}", oldState, newstate))
    .listener((session1, reason, thr) -> log.warn("Session warning: {}, e:{}", reason, thr))
    .maximumQueueSize(2)
    .noReconnection().reconnectionTimeout(0)
    .errorHandler((errorSession, errorReason) -> {
    //session有 Autocloseable,这里尝试只调用cLose()方法,让其自动释放
    errorSession.close();
    errorSession = null;
    log.error("Session error: {}", errorReason);
    })
    .reconnectionStrategy(ReconnectionStrategy.ReconnectionAttempt::abort)
    .open("wss://xxx");
    }
    }
  • 管理Session

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118

    import com.pushtechnology.diffusion.client.session.Session;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;

    import java.util.HashSet;
    import java.util.Objects;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.stream.Collectors;

    /**
    * Diffusion 会话管理器, 尽量其它位置访问时, 同一时间正常情况下只有一个会话
    */
    @Slf4j
    @Component
    public class DiffusionSessionManager {

    private final DiffusionFactory diffusion;
    private volatile Session session = null;

    /**
    * 需要添加 static 修饰, 否则对于子类是新instance
    */
    public static final ConcurrentHashMap<Session, Set<String>> SUBSCRIBE_MAP = new ConcurrentHashMap<>();
    public static final ConcurrentHashMap<Session, Set<String>> SUBSCRIBE_TOPIC = new ConcurrentHashMap<>();

    @Autowired
    public DiffusionSessionManager(DiffusionFactory diffusion) {
    this.diffusion = diffusion;
    }

    @Scheduled(fixedDelay = 500)
    public void init() {
    // 定时检查会话是否有效
    try {
    session = getSession();
    if (Objects.isNull(session)) {
    log.error("session create error, session is null");
    return;
    }
    if (!isSessionValid(session)) {
    SUBSCRIBE_MAP.remove(session);
    destroy(session);
    }
    // 检查并清理 CLOSED_FAILED 的会话
    Set<Session> failedSession = SUBSCRIBE_MAP.keySet().stream().filter(e -> !isSessionValid(e)).collect(Collectors.toSet());
    failedSession.forEach(this::clear);
    } catch (Exception e) {
    log.error("DiffusionSessionManager error", e);
    }
    }

    public synchronized void clear(Session session) {
    if (Objects.isNull(session) || isSessionValid(session)) {
    return;
    }
    destroy(session);
    if (!isSessionValid(session)) {
    SUBSCRIBE_MAP.remove(session);
    SUBSCRIBE_TOPIC.remove(session);
    session = null;
    }
    }

    public synchronized Session getSession() {
    if (Objects.isNull(session) || session.getState().isClosed() ||
    session.getState() == Session.State.CLOSED_FAILED ||
    session.getState() == Session.State.CLOSED_BY_CLIENT ||
    session.getState() == Session.State.CLOSED_BY_SERVER ||
    session.getState() == Session.State.RECOVERING_RECONNECT) {
    log.info("Get session start, current session state:{}", Objects.isNull(session) ? "null" : session.getState());

    destroy(session);

    // 创建新会话并更新监控
    session = diffusion.session();
    log.info("Get session end, current session state:{}", Objects.isNull(session) ? "null" : session.getState());
    }
    return session;
    }

    private boolean isSessionValid(Session session) {
    return session.getState().equals(Session.State.CONNECTED_ACTIVE) || session.getState().equals(Session.State.CONNECTING);
    }

    public synchronized void destroy(Session session) {
    if (Objects.nonNull(session)) {
    log.info("Destroy, session state:{}", session.getState());
    }
    if (Objects.isNull(session)) {
    return;
    }
    if (!isSessionValid(session)) {
    // Session 有 AutoCloseable,这里尝试只调用close()方法, 让其自动释放
    session.close();
    }
    }

    public boolean containsKey(Session session) {
    return SUBSCRIBE_MAP.containsKey(session);
    }

    public void computeSession(Session session, String topic) {
    SUBSCRIBE_MAP.computeIfAbsent(session, k -> new HashSet<>()).add(topic);
    }

    public boolean ensureSession(Session session, String topic) {
    return SUBSCRIBE_TOPIC.containsKey(session) && SUBSCRIBE_TOPIC.get(session).contains(topic);
    }

    public void computeTopic(Session session, String topic) {
    SUBSCRIBE_TOPIC.computeIfAbsent(session, k -> new HashSet<>()).add(topic);
    }
    }

  • 执行Client

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
    import com.pushtechnology.diffusion.client.features.Topics;
    import com.pushtechnology.diffusion.client.session.Session;
    import com.pushtechnology.diffusion.client.topics.TopicSelector;
    import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
    import com.pushtechnology.diffusion.datatype.json.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;

    @Slf4j
    @Component
    public class AbstractDiffusionClient {

    protected boolean isSessionActive(Session session) {
    return session.getState().isConnected();
    }

    protected void doSubscribe(Topics topics, TopicSelector selector) {
    topics.addStream(selector, JSON.class, new Topics.ValueStream.Default<>() {
    @Override
    public void onSubscription(String topicPath, TopicSpecification specification) {
    super.onSubscription(topicPath, specification);
    }

    @Override
    public void onValue(String topicPath, TopicSpecification specification, JSON oldValue, JSON newValue) {
    String str = newValue.toJsonString();
    // do something
    log.info("onValue:{}", str);
    }

    @Override
    public void onClose() {
    super.onClose();
    }

    @Override
    public void onError(ErrorReason errorReason) {
    super.onError(errorReason);
    }

    @Override
    public void onUnsubscription(String topicPath, TopicSpecification specification, Topics.UnsubscribeReason reason) {
    super.onUnsubscription(topicPath, specification, reason);
    }
    });
    }
    }
  • 订阅逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    import com.pushtechnology.diffusion.client.features.Topics;
    import com.pushtechnology.diffusion.client.session.Session;
    import com.record.platform.diffusion.DiffusionSessionManager;
    import com.record.platform.diffusion.client.AbstractDiffusionClient;
    import com.record.platform.diffusion.constant.TopicConstant;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.concurrent.BasicThreadFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;

    import java.util.Objects;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;

    @Slf4j
    @Service
    public class SubscribeService extends AbstractDiffusionClient {

    @Autowired
    private DiffusionSessionManager manager;

    private final Object lock = new Object();

    static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Subscribe-reconnect-%d").daemon(true).build());

    public void subscribeToTopics() {
    executor.scheduleAtFixedRate(() -> {
    try {
    synchronized (lock) {
    Session session = manager.getSession();
    if (Objects.isNull(session)) {
    log.error("session create error, session is null");
    return;
    }
    if (!manager.containsKey(session)) {
    doFeature(session);
    manager.computeSession(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());
    }
    if (!isSessionActive(session)) {
    manager.clear(session);
    }
    }
    } catch (Exception e) {
    log.error("SubscribeService subscribeToTopics error", e);
    }
    }, 1, 1, TimeUnit.SECONDS);
    }

    private synchronized void doFeature(Session session) {
    Topics topics = session.feature(Topics.class);
    // 检查是否已经订阅了该主题
    boolean resubscribe = manager.ensureSession(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());
    if (resubscribe) {
    log.warn("Diffusion_SUBSCRIBE 当前session重复订阅:{}, session:{}, topic:{}", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), session, topics);
    return;
    }
    // 存储当前会话的 Topics
    manager.computeTopic(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());

    doSubscribe(topics, TopicConstant.SUBSCRIBE_TOPIC);

    topics.subscribe(TopicConstant.SUBSCRIBE_TOPIC)
    .whenComplete((result, ex) -> {
    if (Objects.nonNull(ex)) {
    log.error("Failed to subscribe to topic: {}, e:", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), ex);
    topics.unsubscribe(TopicConstant.SUBSCRIBE_TOPIC);
    subscribeToTopics();
    } else {
    log.info("Subscribed to topic: {},result:{}", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), result);
    }
    });
    }
    }

代码记录

Code: https://github.com/swzxsyh/Case
Rivision Num: e1bda435c075d7685593eac1d21a31732426ed46