场景
- 多层WebSocket
使用Spring Boot框架,接入长连接WebSocket-Client,接入第三方平台pushtech,对其进行了封装,需要pom引入使用diffusion。 - 连接状态管理
正常情况下,使用@Bean保证WebSocket连接的单例,并定时检查连接状态,进行重连。
异常
- CLOSED_FAILED状态不可恢复
这是pushtech平台的限制,意味着一旦进入该状态,当前连接无法恢复。 - @Bean失效
Spring的@Bean注解保证了Bean的单例,但在CLOSED_FAILED状态后,Bean的状态可能已经不可恢复,导致后续无法正常工作。
处理
可能有其他更好的处理方式,但我还未找到,所以使用增强单例模式处理, 如果有更好的办法请指导下, 谢谢您
处理方式 生产Session
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.reconnect.ReconnectionStrategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
public class DiffusionFactory {
/**
* 多例模式,使用原因:当出现 CLOSED_FAILED 状态时,无法恢复
* SDK ##: The session has lost its connection to a server and could not be recovered.
* <р>
* 但为避免 多处创建 或OOM 问题还是全局SingLeton模式
*
* @return Session
*/
public Session session() {
//session有 Autocloseable,这里尝试只调用cLose()方法,让其自动释放
return Diffusion.sessions()
.principal("username")
.password("pwd")
.listener((currentSession, oldState, newstate) -> log.info("Session state changed from {} to {}", oldState, newstate))
.listener((session1, reason, thr) -> log.warn("Session warning: {}, e:{}", reason, thr))
.maximumQueueSize(2)
.noReconnection().reconnectionTimeout(0)
.errorHandler((errorSession, errorReason) -> {
//session有 Autocloseable,这里尝试只调用cLose()方法,让其自动释放
errorSession.close();
errorSession = null;
log.error("Session error: {}", errorReason);
})
.reconnectionStrategy(ReconnectionStrategy.ReconnectionAttempt::abort)
.open("wss://xxx");
}
}管理Session
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import com.pushtechnology.diffusion.client.session.Session;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* Diffusion 会话管理器, 尽量其它位置访问时, 同一时间正常情况下只有一个会话
*/
public class DiffusionSessionManager {
private final DiffusionFactory diffusion;
private volatile Session session = null;
/**
* 需要添加 static 修饰, 否则对于子类是新instance
*/
public static final ConcurrentHashMap<Session, Set<String>> SUBSCRIBE_MAP = new ConcurrentHashMap<>();
public static final ConcurrentHashMap<Session, Set<String>> SUBSCRIBE_TOPIC = new ConcurrentHashMap<>();
public DiffusionSessionManager(DiffusionFactory diffusion) {
this.diffusion = diffusion;
}
public void init() {
// 定时检查会话是否有效
try {
session = getSession();
if (Objects.isNull(session)) {
log.error("session create error, session is null");
return;
}
if (!isSessionValid(session)) {
SUBSCRIBE_MAP.remove(session);
destroy(session);
}
// 检查并清理 CLOSED_FAILED 的会话
Set<Session> failedSession = SUBSCRIBE_MAP.keySet().stream().filter(e -> !isSessionValid(e)).collect(Collectors.toSet());
failedSession.forEach(this::clear);
} catch (Exception e) {
log.error("DiffusionSessionManager error", e);
}
}
public synchronized void clear(Session session) {
if (Objects.isNull(session) || isSessionValid(session)) {
return;
}
destroy(session);
if (!isSessionValid(session)) {
SUBSCRIBE_MAP.remove(session);
SUBSCRIBE_TOPIC.remove(session);
session = null;
}
}
public synchronized Session getSession() {
if (Objects.isNull(session) || session.getState().isClosed() ||
session.getState() == Session.State.CLOSED_FAILED ||
session.getState() == Session.State.CLOSED_BY_CLIENT ||
session.getState() == Session.State.CLOSED_BY_SERVER ||
session.getState() == Session.State.RECOVERING_RECONNECT) {
log.info("Get session start, current session state:{}", Objects.isNull(session) ? "null" : session.getState());
destroy(session);
// 创建新会话并更新监控
session = diffusion.session();
log.info("Get session end, current session state:{}", Objects.isNull(session) ? "null" : session.getState());
}
return session;
}
private boolean isSessionValid(Session session) {
return session.getState().equals(Session.State.CONNECTED_ACTIVE) || session.getState().equals(Session.State.CONNECTING);
}
public synchronized void destroy(Session session) {
if (Objects.nonNull(session)) {
log.info("Destroy, session state:{}", session.getState());
}
if (Objects.isNull(session)) {
return;
}
if (!isSessionValid(session)) {
// Session 有 AutoCloseable,这里尝试只调用close()方法, 让其自动释放
session.close();
}
}
public boolean containsKey(Session session) {
return SUBSCRIBE_MAP.containsKey(session);
}
public void computeSession(Session session, String topic) {
SUBSCRIBE_MAP.computeIfAbsent(session, k -> new HashSet<>()).add(topic);
}
public boolean ensureSession(Session session, String topic) {
return SUBSCRIBE_TOPIC.containsKey(session) && SUBSCRIBE_TOPIC.get(session).contains(topic);
}
public void computeTopic(Session session, String topic) {
SUBSCRIBE_TOPIC.computeIfAbsent(session, k -> new HashSet<>()).add(topic);
}
}执行Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.TopicSelector;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.datatype.json.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
public class AbstractDiffusionClient {
protected boolean isSessionActive(Session session) {
return session.getState().isConnected();
}
protected void doSubscribe(Topics topics, TopicSelector selector) {
topics.addStream(selector, JSON.class, new Topics.ValueStream.Default<>() {
public void onSubscription(String topicPath, TopicSpecification specification) {
super.onSubscription(topicPath, specification);
}
public void onValue(String topicPath, TopicSpecification specification, JSON oldValue, JSON newValue) {
String str = newValue.toJsonString();
// do something
log.info("onValue:{}", str);
}
public void onClose() {
super.onClose();
}
public void onError(ErrorReason errorReason) {
super.onError(errorReason);
}
public void onUnsubscription(String topicPath, TopicSpecification specification, Topics.UnsubscribeReason reason) {
super.onUnsubscription(topicPath, specification, reason);
}
});
}
}订阅逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.session.Session;
import com.record.platform.diffusion.DiffusionSessionManager;
import com.record.platform.diffusion.client.AbstractDiffusionClient;
import com.record.platform.diffusion.constant.TopicConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SubscribeService extends AbstractDiffusionClient {
private DiffusionSessionManager manager;
private final Object lock = new Object();
static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Subscribe-reconnect-%d").daemon(true).build());
public void subscribeToTopics() {
executor.scheduleAtFixedRate(() -> {
try {
synchronized (lock) {
Session session = manager.getSession();
if (Objects.isNull(session)) {
log.error("session create error, session is null");
return;
}
if (!manager.containsKey(session)) {
doFeature(session);
manager.computeSession(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());
}
if (!isSessionActive(session)) {
manager.clear(session);
}
}
} catch (Exception e) {
log.error("SubscribeService subscribeToTopics error", e);
}
}, 1, 1, TimeUnit.SECONDS);
}
private synchronized void doFeature(Session session) {
Topics topics = session.feature(Topics.class);
// 检查是否已经订阅了该主题
boolean resubscribe = manager.ensureSession(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());
if (resubscribe) {
log.warn("Diffusion_SUBSCRIBE 当前session重复订阅:{}, session:{}, topic:{}", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), session, topics);
return;
}
// 存储当前会话的 Topics
manager.computeTopic(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());
doSubscribe(topics, TopicConstant.SUBSCRIBE_TOPIC);
topics.subscribe(TopicConstant.SUBSCRIBE_TOPIC)
.whenComplete((result, ex) -> {
if (Objects.nonNull(ex)) {
log.error("Failed to subscribe to topic: {}, e:", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), ex);
topics.unsubscribe(TopicConstant.SUBSCRIBE_TOPIC);
subscribeToTopics();
} else {
log.info("Subscribed to topic: {},result:{}", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), result);
}
});
}
}
代码记录
Code: https://github.com/swzxsyh/Case
Rivision Num: e1bda435c075d7685593eac1d21a31732426ed46