- 使用的是iOS Anki
- 卡片上导入的
托福绿宝书
- 持续背单词中
Spring Boot Reactive Redisson && R2DBC 初体验
Maven引入
1 | <!-- spring-webflux 用于支持 WebClient --> |
配置TraceId
添加支持 && 过滤器
1 | import org.slf4j.MDC; |
启动类注册 traceId 和 异常
1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
Hooks.enableAutomaticContextPropagation();
Hooks.onErrorDropped(e -> log.warn("Dropped error: {}", e.toString()));
Hooks.onNextDropped(o -> log.warn("Dropped value: {}", o));
Hooks.onOperatorDebug();
// 注册 Reactor 全局 Hook
ReactorMdcSupport.install();
SpringApplication.run(SpiderCanalApplication.class, args);
}
Redisson配置修改
给redisson进行相关配置
1 | // 序列化解码 |
Reacive获取和存储Data
1 | /** |
R2DBC 使用中遇到的问题:
java.util.Date不可用
- 需要使用LocalDateTime, 旧的Date在该组件中不支持
LocalDateTime传入的的格式问题
- 构建自定义解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class FlexibleLocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
private static final List<DateTimeFormatter> FORMATTERS = List.of(
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
);
@Override
public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
String text = p.getText().trim();
for (DateTimeFormatter formatter : FORMATTERS) {
try {
return LocalDateTime.parse(text, formatter);
} catch (DateTimeParseException ignored) {
}
}
throw new IOException("无法解析时间: " + text);
}
} - 在对应entity的字段上, 配置该解析
1
@JsonDeserialize(using = FlexibleLocalDateTimeDeserializer.class)
之前使用JPA, 现在找不到Repository
- 日志错误
1
Field PlatformGroupService in com.swzxsyh.components.FastQueryService required a bean of type 'com.swzxsyh.repository.PlatformGroupRepository' that could not be found.
- 是引入包的问题, 同名称需使用R2DBC的包
1
2
3
4
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
不打印语句
需要自定义Listener, 这里手动构建成一个像MyBatis的语句打印
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class R2dbcProxyListener implements ProxyExecutionListener, ProxyMethodExecutionListener {
private static final Logger logger = LoggerFactory.getLogger(R2dbcProxyListener.class);
/**
* 结果打印
*
* @param execInfo statement
*/
@Override
public void eachQueryResult(QueryExecutionInfo execInfo) {
Object currentMappedResult = execInfo.getCurrentMappedResult();
logger.info("<== Affected Rows: {}", currentMappedResult);
}
/**
* 语句打印
*
* @param queryExecutionInfo statement
*/
@Override
public void beforeExecuteOnStatement(@NotNull QueryExecutionInfo queryExecutionInfo) {
this.sqlPrinter(queryExecutionInfo);
}
public void sqlPrinter(QueryExecutionInfo execInfo) {
// 打 SQL
for (QueryInfo qi : execInfo.getQueries()) {
logger.info("==> Preparing: {}", qi.getQuery());
// 打 参数
for (Bindings bindings : qi.getBindingsList()) {
// 打印位置绑定参数(index-based)
StringBuilder sb = new StringBuilder();
bindings.getIndexBindings().forEach(b -> {
Object key = b.getKey();
BoundValue bv = b.getBoundValue();
Object value = bv.getValue();
String type = bv.isNull() ? bv.getNullType().getSimpleName() + " (NULL)" : bv.getValue().getClass().getSimpleName();
// logger.info(" -> param[{}] = {} ({})", key, value, type);
// key value type
sb.append(value).append("(").append(type).append(")").append(", ");
});
logger.info(sb.substring(0, sb.length() - 2));
// 打印命名绑定参数(name-based)
bindings.getNamedBindings().forEach(b -> {
Object key = b.getKey();
BoundValue bv = b.getBoundValue();
Object value = bv.getValue();
String type = bv.isNull() ? bv.getNullType().getSimpleName() + " (NULL)" : bv.getValue().getClass().getSimpleName();
logger.info(" -> param['{}'] = {} ({})", key, value, type);
});
}
}
}
}
代码注册
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class R2dbcConfig extends AbstractR2dbcConfiguration {
private final ConnectionFactory connectionFactory;
public R2dbcConfig(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Override
public ConnectionFactory connectionFactory() {
return ProxyConnectionFactory.builder(this.connectionFactory).listener(new R2dbcProxyListener()).build();
}
@Override
public DatabaseClient databaseClient() {
return DatabaseClient.builder().connectionFactory(connectionFactory()).build();
}
日志配置
1
2
3
4
5
6
7
8
9
10
11
logging:
level:
com.swzxsyh: info
org.springframework: warn
org.springframework.r2dbc: warn
org.redisson.connection: warn
io.r2dbc.proxy: warn
com.swzxsyh.config.listener.R2dbcProxyListener: info
io.r2dbc.proxy.listener.DefaultQueryExecutionListener: warn
io.r2dbc.proxy.observation.ObservationProxyExecutionListener: warn
io.asyncer.r2dbc.mysql.client.ReactorNettyClient: warn
repository使用语句.one()
反而limit 2
- 出现这种SQL
1
SELECT spider_config.* FROM spider_config WHERE spider_config.status = ? AND spider_config.domain = ? LIMIT 2
- 详见
https://github.com/spring-projects/spring-data-r2dbc/issues/758
, 如果一定要one, 使用.first()
语法
配置了重载转换器, 仍然提示没有转换器的错误
错误日志
1
2
org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [java.lang.String] to type [com.swzxsyh.model.platform.PlatformScore]
at org.springframework.core.convert.support.GenericConversionService.handleConverterNotFound(GenericConversionService.java:294)
读写转换器代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@ReadingConverter
public class PlatformScoreReadConverter implements Converter<String, PlatformScore> {
@Override
public PlatformScore convert(String source) {
if (StringUtils.isBlank(source)) {
return null;
}
return JsonUtil.toClass(source, PlatformScore.class);
}
}
@WritingConverter
public class PlatformScoreWriteConverter implements Converter<PlatformScore, String> {
@Override
public String convert(PlatformScore source) {
return JsonUtil.toJson(source);
}
}
不生效的配置类代码
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class R2dbcConvert{
@Bean
public R2dbcCustomConversions r2dbcCustomConversions() {
List<Converter<?, ?>> converters = new ArrayList<>();
converters.add(new PlatformScoreWriteConverter());
converters.add(new PlatformScoreReadConverter());
return new R2dbcCustomConversions(getStoreConversions(), converters);
}
}
修正方法: 使用继承AbstractR2dbcConfiguration的@Override进行存储.
- 该类允许Hook getCustomConverters方法, 这个方法是protected的, 一定要继承AbstractR2dbcConfiguration进行处理
- 为什么之前的注册不生效: 它是直接调用this方法, 没有去找@Bean, 所以除非重载, 否则不会生效
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35@Configuration
public class R2dbcConfig extends AbstractR2dbcConfiguration {
private final ConnectionFactory connectionFactory;
public R2dbcConfig(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Override
public ConnectionFactory connectionFactory() {
return ProxyConnectionFactory.builder(this.connectionFactory).listener(new R2dbcProxyListener()).build();
}
@Override
public DatabaseClient databaseClient() {
return DatabaseClient.builder().connectionFactory(connectionFactory()).build();
}
@Override
public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient, ReactiveDataAccessStrategy dataAccessStrategy) {
// 通过 DialectResolver 获取 R2dbcDialect
return new R2dbcEntityTemplate(databaseClient, dataAccessStrategy); // 传入 ConnectionFactory
}
@Override
protected List<Object> getCustomConverters() {
return Arrays.asList(
new PlatformScoreWriteConverter(),
new PlatformScoreReadConverter()
);
}
}
Java 安全随机数在阿里云ECS卡住
场景
业务有一个安全随机数方法,需要调用生成数据掉落区间进行结果判定。
最开始业务是在AWS的EC2部署, 后面业务需要迁移,选择了阿里云的ECS,系统是CentOS 8.5
遇到的问题
- 业务有一个逻辑需要生成安全随机数,但是迁移后业务会在安全随机数卡住没有后续,DEBUG级别日志也没有任何日志报错
- 数据、代码全部相同,本地IDEA运行,在开发机本机正常,数据库和缓存连接阿里云服务的外网地址,一切正常。
- 尝试使用Docker image, 现象相同。
- 查询部分说明是发行版系统在Kernel 4之后的版本关闭了安全随机数,需要重新编译内核才能开启,因此尝试将ECS退回到CentOS 7.9版本,仍未恢复。
定位问题
- 尝试本机连接阿里云RDS测试业务是否正常
本机运行正常,代码相同,服务器业务不可用 - 尝试统一环境,使用Docker进行
相同的Docker-Compose文件,相同代码,本机正常,服务器业务不可用 - 测试IDEA远程Debug
断点进入后,到安全随机数获取方法直接没有后续 - 尝试抽取方法,直接javac运行
没有任何后续输出,命令行卡住
由于数据库、缓存使用本地开发机连接正常,代码没有任何变化,即使使用了Docker、尝试变更系统也未解决,因此判定是ECS硬件问题。
由于安全随机数需要足够的熵,即足够的设备才能生成,运行如下命令查询阿里云ECS随机数值,只有25,而安全随机数需要100以上才能成功运行
1 | $ cat /proc/sys/kernel/random/entropy_avail |
解决方案
- 修改安全随机数获取方法
业务需要,安全随机数必须保留,因此不能使用普通Random,这里使用了SecureRandom的另一个方法,保证获取可用的工具。
SecureRandom.getInstance(“SHA1PRNG”) 在部分 Linux 发行版上仍然可能阻塞,因为它可能仍然尝试访问 /dev/random 作为熵源(虽然概率低)。
SecureRandom 需要熵(entropy),如果系统熵池不足(比如云服务器没有鼠标、键盘等设备),它可能会等到足够的熵才会返回。 - 修改启动命令,使用 /dev/urandom 作为熵源
新增启动参数-Djava.security.egd=file:/dev/urandom
, 保证业务运行
示例
原代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
public class RandomTest {
private static final SecureRandom sr;
static {
SecureRandom temp;
try {
// 获取高强度安全随机数生成器
temp = SecureRandom.getInstanceStrong();
} catch (NoSuchAlgorithmException e) {
// 获取普通的安全随机数生成器
temp = new SecureRandom();
}
sr = temp;
}
public static void main(String[] args) {
byte[] buffer = new byte[8];
// 用安全随机数填充buffer
sr.nextBytes(buffer);
long randomFactor = 0;
for (byte b : buffer) {
randomFactor = (randomFactor << 8) | (b & 0xFF);
}
double randomValue = sr.nextDouble() + (randomFactor / (double) Long.MAX_VALUE);
// 设置上限为max
System.out.println(Math.abs(randomValue % 1.0));
}
}原代码现象
1
2
3
4
5
6
7$ javac RandomTest.java
$ java RandomTest
$ # 没有输出手动中断代码修改部分
SecureRandom.getInstanceStrong()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
public class RandomTest {
private static final SecureRandom sr;
static {
SecureRandom temp;
try {
// 获取高强度安全随机数生成器
//temp = SecureRandom.getInstanceStrong();
temp = SecureRandom.getInstance("SHA1PRNG");
// temp = SecureRandom.getInstance("NativePRNGNonBlocking"); // 或者使用这个, 不会阻塞
} catch (NoSuchAlgorithmException e) {
// 获取普通的安全随机数生成器
temp = new SecureRandom();
}
sr = temp;
}
public static void main(String[] args) {
byte[] buffer = new byte[8];
// 用安全随机数填充buffer
sr.nextBytes(buffer);
long randomFactor = 0;
for (byte b : buffer) {
randomFactor = (randomFactor << 8) | (b & 0xFF);
}
double randomValue = sr.nextDouble() + (randomFactor / (double) Long.MAX_VALUE);
// 设置上限为max
System.out.println(Math.abs(randomValue % 1.0));
}
}启动命令添加
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17nohup /usr/local/java/jdk-21/bin/java \
-Djava.awt.headless=true \
-Djava.net.preferIPv4Stack=true \
-Djava.security.egd=file:/dev/urandom \ # 这里添加 JVM 选项,防止 SecureRandom 阻塞
-server \
-Xms1024m -Xmx2048m \
-XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m \
-XX:SurvivorRatio=6 \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:InitiatingHeapOccupancyPercent=45 \
-XX:+UnlockDiagnosticVMOptions \
-XX:+PrintFlagsFinal \
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 \ # 这里是外部Debug接口, 用来IDEA远程DEBUG,需ECS开放端口
-jar /wls/esxd/esxd-api.jar \
--spring.profiles.active=sit \
>> /wls/esxd/stdout.log 2>&1 &
代码记录
Code: https://github.com/swzxsyh/Case
Rivision Num: 6c09f88ea4f2f41a1fd93cfb838e34ae7158a8c0
Java JUC DelayQueue使用
场景
数据处理项目, 新数据源的数据不是100%完整, 会由多个topic组合出一条完整数据, 处理后发送到项目。 可是具体完成的topic未知, 为保证尽可能将数据推送,需要一个延迟队列。
遇到的问题
项目没有redis资源, MQ只有发送功能, 需要定期将之前异常的消息进行重试
解决方案
使用Java自带的DelayQueue, 延迟指定时间, 重启丢失问题忽略, 通过其他定时任务手段进行补全
由于是Java本地数据, 因此对象创建需要保证不会过多, 信息量不能过大, 避免OOM问题
示例
创建实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class CustomizeDelayQueue implements Delayed {
public Long time;
public Long id;
public CustomizeDelayQueue(Long time, Long id, TimeUnit unit) {
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
this.id = id;
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
CustomizeDelayQueue delay = (CustomizeDelayQueue) o;
long diff = this.time - delay.time;
return diff <= 0 ? -1 : 1;
}
}创建自定义Pool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.record.model.CustomizeDelayQueue;
import org.springframework.stereotype.Component;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
@Component
public class DelayQueueComponent {
DelayQueue<CustomizeDelayQueue> delayQueue = new DelayQueue<>();
public void put(Long time, Long id, TimeUnit unit) {
delayQueue.put(new CustomizeDelayQueue(time, id, unit));
}
public CustomizeDelayQueue poll() {
return delayQueue.poll();
}
public int size() {
return delayQueue.size();
}
public void offer(CustomizeDelayQueue obj) {
delayQueue.offer(obj);
}
}调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24@Slf4j
public class DelayQueueTest extends CaseApplicationTest {
@Autowired
private DelayQueueComponent queueComponent;
@Test
public void delayTest() {
log.info("start delayTest");
queueComponent.put(10L, 1L, TimeUnit.SECONDS);
while (true) {
if (queueComponent.size() > 0) {
CustomizeDelayQueue object = queueComponent.poll();
if (Objects.nonNull(object)) {
log.info("poll:{}", object);
// do something
Long id = object.id;
log.info("id:{}", id);
break;
}
}
}
}
1 | CASE 2024-10-27 00:43:06.167 INFO 75635 --- [ main] record.service.DelayQueueTest : start delayTest |
代码记录
Code: https://github.com/swzxsyh/Case
Rivision Num: 7196623c7e6bcaf0f6566f3c0eb272c79d7d923e
macOS 环境 Rider 初始化 C# MAUI环境
前言
之前开发的是Java, 这次希望通过C#编写一些程式,但是两者结构不同, 在此记录
本机环境
Chip: Apple M3 Pro
System: macOS 15.0
安装基本环境
.NET
官方链接: https://dotnet.microsoft.com/zh-cn/download/dotnet
下载选择 macOS Arm64 即可
IDE Rider
官方链接: https://www.jetbrains.com/rider/
项目创建
- 创建空文件夹,使用Rider打开, New Solution / 开启Rider, 左侧选择MAUI项目
- 设置ProjectName 环境 net8.0 C# MAUI App
- 自动生成template文件
预处理
- 执行如下命令, 否则项目测试启动异常
1
sudo xcodebuild -runFirstLaunch
- 修改Nuget配置, 否则依赖包全部下到默认路径
~/.nuget/packages
下- 使用VsCode或任何编辑器打开
/Users/用户名/.nuget/NuGet/NuGet.Config
- 我的默认样式
1
2
3
4
5
6<?xml version="1.0" encoding="utf-8"?>
<configuration>
<packageSources>
<add key="nuget.org" value="https://api.nuget.org/v3/index.json" protocolVersion="3" />
</packageSources>
</configuration> - 创建自定义文件夹
1
mkdir -p /path/to/your/floder
- 修改配置为
1
2
3
4
5
6
7
8
9<?xml version="1.0" encoding="utf-8"?>
<configuration>
<packageSources>
<add key="nuget.org" value="https://api.nuget.org/v3/index.json" protocolVersion="3" />
</packageSources>
<config>
<add key="globalPackagesFolder" value="/path/to/your/floder" />
</config>
</configuration> - 存储后验证, 应该就是刚刚配置的自定义路径
1
dotnet nuget locals global-packages -l
- 使用VsCode或任何编辑器打开
文件基本结构
这里跟Java不同, 不是src 向下, 结构如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15$ ls -l
total 64
-rw-r--r-- 1 swzxsyh staff 653 Oct 6 04:32 App.xaml
-rw-r--r-- 1 swzxsyh staff 159 Oct 6 04:32 App.xaml.cs
-rw-r--r-- 1 swzxsyh staff 420 Oct 6 04:32 AppShell.xaml
-rw-r--r-- 1 swzxsyh staff 127 Oct 6 04:32 AppShell.xaml.cs
-rw-r--r-- 1 swzxsyh staff 3695 Oct 6 04:32 MyProject.csproj
-rw-r--r-- 1 swzxsyh staff 1316 Oct 6 04:32 MainPage.xaml
-rw-r--r-- 1 swzxsyh staff 451 Oct 6 04:32 MainPage.xaml.cs
-rw-r--r-- 1 swzxsyh staff 545 Oct 6 04:32 MauiProgram.cs
drwxr-xr-x 7 swzxsyh staff 224 Oct 6 04:32 Platforms
drwxr-xr-x 3 swzxsyh staff 96 Oct 6 04:32 Properties
drwxr-xr-x 8 swzxsyh staff 256 Oct 6 04:32 Resources
drwxr-xr-x 3 swzxsyh staff 96 Oct 6 04:32 bin
drwxr-xr-x 11 swzxsyh staff 352 Oct 7 02:30 objPlatforms/
包含不同平台(Android、iOS、macOS、Windows)相关的代码- Platforms/Android:Android 平台的特定配置和代码。
- Platforms/iOS:iOS 平台的特定配置和代码。
App.xaml
和App.xaml.cs
NET MAUI 的入口点,类似于 Java 中的 Main 类。可以在这里定义全局资源和应用程序启动逻辑。MainPage.xaml
和MainPage.xaml.cs
:类似于 Java 的视图控制器,定义主页面的 UI 和逻辑。xaml 是用于定义 UI 的文件,而 cs 文件是用来处理 UI 逻辑的 C# 代码。*.csproj 类似于Java中的
pom.xml
或build.gradle
文件, 负责处理依赖
到这里,点击Run应该就可以正常运行
Java 自定义优先级线程
场景
我们的一个数据处理项目, 通过接入第三方平台的 WebSocket 进行数据处理。
最初,我们为单个平台构建了一个线程池,直接分配任务。随着项目发展,需要处理更多平台的数据,但硬件资源有限。为了隔离不同平台的业务,我们为每个平台的长链接都创建了一个独立的线程池。
遇到的问题
- 优先级处理困难
尽管多个线程池实现了业务隔离,但由于 CPU 调度的非确定性,无法保证高优先级任务的及时处理。 - 资源争夺
在同一节点上运行多个服务,且需要保证主数据源的优先处理,这要求我们对线程池进行优化。
解决方案
- 统一线程池,增强优先级处理
为了解决上述问题,我们将多个线程池合并为一个。同时,自定义了 PriorityTask 任务类,以便对任务进行优先级排序。 - 最小化业务侵入
在实现过程中,我们尽量避免对现有业务逻辑造成大的改动。
示例
自定义线程任务 PriorityTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
/**
* 递归泛型约束
* </p>
* 这实现Comparable 的自己, 进行约束效果, 确保比较操作仅在相同类型(PriorityTask<T>)的对象之间进行。
**/
public class PriorityTask<T> extends FutureTask<T> implements Comparable<PriorityTask<T>> {
private final int priority;
public PriorityTask(Callable<T> callable, int priority) {
super(callable);
this.priority = priority;
}
public PriorityTask(Runnable runnable, T result, int priority) {
super(runnable, result);
this.priority = priority;
}
@Override
public int compareTo(PriorityTask<T> other) {
// 优先级高的排在前面
return Integer.compare(other.priority, this.priority);
}
}将线程任务构造接入自定义线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60import jakarta.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
@Component
public class ClientPoolComponent {
private ThreadPoolExecutor threadPoolExecutor;
private final int THREAD_NUM = Runtime.getRuntime().availableProcessors();
private final int MAX_QUEUE_SIZE = 300;
@PostConstruct
public void init() {
threadPoolExecutor = new ThreadPoolExecutor(
THREAD_NUM * 10,
THREAD_NUM * 10,
60L,
TimeUnit.SECONDS,
new PriorityBlockingQueue<>(MAX_QUEUE_SIZE),
new CustomizeThreadFactory("SPIDER-POOL"),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}
public <T> Future<T> submit(Callable<T> task, int priority) {
PriorityTask<T> priorityTask = new PriorityTask<>(task, priority);
// 注意此处是 execute 而不是 submit
threadPoolExecutor.execute(priorityTask);
// 返回正确类型的 Future<T>
return priorityTask;
}
public <T> Future<T> submit(Runnable task, T result, int priority) {
PriorityTask<T> priorityTask = new PriorityTask<>(task, result, priority);
// 注意此处是 execute 而不是 submit
threadPoolExecutor.execute(priorityTask);
// 返回正确类型的 Future<T>
return priorityTask;
}
/**
* execute()方法, 如果需要抛出异常,则使用wrap
* */
public void execute(Runnable command, int priority) {
// 注意这里的泛型为null
threadPoolExecutor.execute(new PriorityTask<>(wrap(command), null, priority));
}
private Runnable wrap(Runnable command) {
return () -> {
try {
command.run();
} catch (Exception e) {
log.error("Exception in task execution: ", e); // 记录异常日志
throw e; // 如果需要,可以重新抛出异常
}
};
}
}使用线程池
1
2
3
4
5
6
7
8
9
10
11
12
@Autowired
privat ClientPoolComponent component;
// 这里为了测试, 将corePoolSize 调为0, 保证提交后执行
// low
component.execute(()->doSomething(),1);
// middle
component.execute(()->doSomething(),5);
//high
component.execute(()->doSomething(),10);
这样就可以在多个平台中, 让需要优先执行的平台, 优先执行对应的线程。
代码记录
Code: https://github.com/swzxsyh/Case
Rivision Num: a1a289cd65954772823934e85b2b73e3a03281bf
网络 && JDK虚拟线程 触发RabbitMQ - The channelMax limit is reached
场景
- 项目JDK版本升级, 从JDK8升至JDK 21,Spring Boot设置为3.3.4
- JDK和Spring Boot在该版本均支持虚拟线程
- 业务属于接收数据, 数据清洗, 存储数据库逻辑, 较轻量化
- 开发机服务, 向SIT环境RabbitMQ发送数据
异常
将原本自定义线程池, 直接替换为 ExecutorService executors = Executors.newVirtualThreadPerTaskExecutor() 后,触发了RabbitMQ的 ChannelMax reached异常
定位问题
尝试业务RabbitMQ的Config限制使用channel数量
1
2
3ConnectionFactory factory = new ConnectionFactory();
factory.setChannelMax(50); // 限制每个连接的最大通道数
Connection connection = factory.newConnection();测试无效
由于是测试状态, 直接发送给Queue, 尝试添加fanout交换机, 不等待response
测试无效查看当前每次都是OOM后, 出现该异常
- Heap Dump检查几乎都是byte[]数据, 定位问题到定时抓取数据业务, 细化方法, 提前提取需要的数据, 将传入值设置为null, 触发尽快GC业务
解决了OOM的问题, 但仍然会出现该错误
- 网络问题
- 由于是本地电脑网络, 与服务器在其他国家, 中途需要代理,所以经常出现异常
1
2
3clean channel shutdown; protocol method: #method<channel.close>(reply-code=406, reply-text=TIMEOUT WAITING FOR ACK, class-id=0, method-id=0)
Shutdown Signal: clean channel shutdown; protocol method: #method<channel.close>(reply-code=406, reply-text=TIMEOUT WAITING FOR ACK, class-id=0, method-id=0)
Received a frame on an unknown channel, ignoring it
推测原因
- 虚拟线程无限创建线程
- 猜测为平台数据频率过高, 且每一个虚拟线程都创建Channel
- 网络稳定性问题, 经常出现channel异常
由于接收端频率高, 因此一直发送消息, 但与MQ网络环境不稳定问题, 导致channel经常还未成功销毁时就新建, 并且是虚拟线程, 会无限创建对应数据的新channel
处理
由于MQ是多个服务公用的, 所以不对 channel_max 进行更改
- 还原为原本的自定义ClientPoolComponent, 通过复用线程减少channel频繁创建频率
- 优化网络环境
Java 单例模式增强, 手动管理单例Bean
场景
- 多层WebSocket
使用Spring Boot框架,接入长连接WebSocket-Client,接入第三方平台pushtech,对其进行了封装,需要pom引入使用diffusion。 - 连接状态管理
正常情况下,使用@Bean保证WebSocket连接的单例,并定时检查连接状态,进行重连。
异常
- CLOSED_FAILED状态不可恢复
这是pushtech平台的限制,意味着一旦进入该状态,当前连接无法恢复。 - @Bean失效
Spring的@Bean注解保证了Bean的单例,但在CLOSED_FAILED状态后,Bean的状态可能已经不可恢复,导致后续无法正常工作。
处理
可能有其他更好的处理方式,但我还未找到,所以使用增强单例模式处理, 如果有更好的办法请指导下, 谢谢您
处理方式 生产Session
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.reconnect.ReconnectionStrategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DiffusionFactory {
/**
* 多例模式,使用原因:当出现 CLOSED_FAILED 状态时,无法恢复
* SDK ##: The session has lost its connection to a server and could not be recovered.
* <р>
* 但为避免 多处创建 或OOM 问题还是全局SingLeton模式
*
* @return Session
*/
public Session session() {
//session有 Autocloseable,这里尝试只调用cLose()方法,让其自动释放
return Diffusion.sessions()
.principal("username")
.password("pwd")
.listener((currentSession, oldState, newstate) -> log.info("Session state changed from {} to {}", oldState, newstate))
.listener((session1, reason, thr) -> log.warn("Session warning: {}, e:{}", reason, thr))
.maximumQueueSize(2)
.noReconnection().reconnectionTimeout(0)
.errorHandler((errorSession, errorReason) -> {
//session有 Autocloseable,这里尝试只调用cLose()方法,让其自动释放
errorSession.close();
errorSession = null;
log.error("Session error: {}", errorReason);
})
.reconnectionStrategy(ReconnectionStrategy.ReconnectionAttempt::abort)
.open("wss://xxx");
}
}管理Session
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import com.pushtechnology.diffusion.client.session.Session;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* Diffusion 会话管理器, 尽量其它位置访问时, 同一时间正常情况下只有一个会话
*/
@Slf4j
@Component
public class DiffusionSessionManager {
private final DiffusionFactory diffusion;
private volatile Session session = null;
/**
* 需要添加 static 修饰, 否则对于子类是新instance
*/
public static final ConcurrentHashMap<Session, Set<String>> SUBSCRIBE_MAP = new ConcurrentHashMap<>();
public static final ConcurrentHashMap<Session, Set<String>> SUBSCRIBE_TOPIC = new ConcurrentHashMap<>();
@Autowired
public DiffusionSessionManager(DiffusionFactory diffusion) {
this.diffusion = diffusion;
}
@Scheduled(fixedDelay = 500)
public void init() {
// 定时检查会话是否有效
try {
session = getSession();
if (Objects.isNull(session)) {
log.error("session create error, session is null");
return;
}
if (!isSessionValid(session)) {
SUBSCRIBE_MAP.remove(session);
destroy(session);
}
// 检查并清理 CLOSED_FAILED 的会话
Set<Session> failedSession = SUBSCRIBE_MAP.keySet().stream().filter(e -> !isSessionValid(e)).collect(Collectors.toSet());
failedSession.forEach(this::clear);
} catch (Exception e) {
log.error("DiffusionSessionManager error", e);
}
}
public synchronized void clear(Session session) {
if (Objects.isNull(session) || isSessionValid(session)) {
return;
}
destroy(session);
if (!isSessionValid(session)) {
SUBSCRIBE_MAP.remove(session);
SUBSCRIBE_TOPIC.remove(session);
session = null;
}
}
public synchronized Session getSession() {
if (Objects.isNull(session) || session.getState().isClosed() ||
session.getState() == Session.State.CLOSED_FAILED ||
session.getState() == Session.State.CLOSED_BY_CLIENT ||
session.getState() == Session.State.CLOSED_BY_SERVER ||
session.getState() == Session.State.RECOVERING_RECONNECT) {
log.info("Get session start, current session state:{}", Objects.isNull(session) ? "null" : session.getState());
destroy(session);
// 创建新会话并更新监控
session = diffusion.session();
log.info("Get session end, current session state:{}", Objects.isNull(session) ? "null" : session.getState());
}
return session;
}
private boolean isSessionValid(Session session) {
return session.getState().equals(Session.State.CONNECTED_ACTIVE) || session.getState().equals(Session.State.CONNECTING);
}
public synchronized void destroy(Session session) {
if (Objects.nonNull(session)) {
log.info("Destroy, session state:{}", session.getState());
}
if (Objects.isNull(session)) {
return;
}
if (!isSessionValid(session)) {
// Session 有 AutoCloseable,这里尝试只调用close()方法, 让其自动释放
session.close();
}
}
public boolean containsKey(Session session) {
return SUBSCRIBE_MAP.containsKey(session);
}
public void computeSession(Session session, String topic) {
SUBSCRIBE_MAP.computeIfAbsent(session, k -> new HashSet<>()).add(topic);
}
public boolean ensureSession(Session session, String topic) {
return SUBSCRIBE_TOPIC.containsKey(session) && SUBSCRIBE_TOPIC.get(session).contains(topic);
}
public void computeTopic(Session session, String topic) {
SUBSCRIBE_TOPIC.computeIfAbsent(session, k -> new HashSet<>()).add(topic);
}
}执行Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.TopicSelector;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.datatype.json.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class AbstractDiffusionClient {
protected boolean isSessionActive(Session session) {
return session.getState().isConnected();
}
protected void doSubscribe(Topics topics, TopicSelector selector) {
topics.addStream(selector, JSON.class, new Topics.ValueStream.Default<>() {
@Override
public void onSubscription(String topicPath, TopicSpecification specification) {
super.onSubscription(topicPath, specification);
}
@Override
public void onValue(String topicPath, TopicSpecification specification, JSON oldValue, JSON newValue) {
String str = newValue.toJsonString();
// do something
log.info("onValue:{}", str);
}
@Override
public void onClose() {
super.onClose();
}
@Override
public void onError(ErrorReason errorReason) {
super.onError(errorReason);
}
@Override
public void onUnsubscription(String topicPath, TopicSpecification specification, Topics.UnsubscribeReason reason) {
super.onUnsubscription(topicPath, specification, reason);
}
});
}
}订阅逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.session.Session;
import com.record.platform.diffusion.DiffusionSessionManager;
import com.record.platform.diffusion.client.AbstractDiffusionClient;
import com.record.platform.diffusion.constant.TopicConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class SubscribeService extends AbstractDiffusionClient {
@Autowired
private DiffusionSessionManager manager;
private final Object lock = new Object();
static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Subscribe-reconnect-%d").daemon(true).build());
public void subscribeToTopics() {
executor.scheduleAtFixedRate(() -> {
try {
synchronized (lock) {
Session session = manager.getSession();
if (Objects.isNull(session)) {
log.error("session create error, session is null");
return;
}
if (!manager.containsKey(session)) {
doFeature(session);
manager.computeSession(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());
}
if (!isSessionActive(session)) {
manager.clear(session);
}
}
} catch (Exception e) {
log.error("SubscribeService subscribeToTopics error", e);
}
}, 1, 1, TimeUnit.SECONDS);
}
private synchronized void doFeature(Session session) {
Topics topics = session.feature(Topics.class);
// 检查是否已经订阅了该主题
boolean resubscribe = manager.ensureSession(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());
if (resubscribe) {
log.warn("Diffusion_SUBSCRIBE 当前session重复订阅:{}, session:{}, topic:{}", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), session, topics);
return;
}
// 存储当前会话的 Topics
manager.computeTopic(session, TopicConstant.SUBSCRIBE_TOPIC.getExpression());
doSubscribe(topics, TopicConstant.SUBSCRIBE_TOPIC);
topics.subscribe(TopicConstant.SUBSCRIBE_TOPIC)
.whenComplete((result, ex) -> {
if (Objects.nonNull(ex)) {
log.error("Failed to subscribe to topic: {}, e:", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), ex);
topics.unsubscribe(TopicConstant.SUBSCRIBE_TOPIC);
subscribeToTopics();
} else {
log.info("Subscribed to topic: {},result:{}", TopicConstant.SUBSCRIBE_TOPIC.getExpression(), result);
}
});
}
}
代码记录
Code: https://github.com/swzxsyh/Case
Rivision Num: e1bda435c075d7685593eac1d21a31732426ed46
Java 自定义优先级线程池使用问题记录
execute()异常未抛出
现象
代码
1
2
3
4
5
6
7
8
9
10
11
12
13@Autowired
private ClientPoolComponent component;
@Test
public void executeExceptionTest() {
log.info("start executeExceptionTest");
component.execute(this::executeError, 0);
log.info("end executeExceptionTest");
}
private void executeError() {
throw new RuntimeException("execute exception");
}日志
1
2CASE 2024-10-05 18:55:33.332 INFO 28504 --- [ main] record.service.ExecutorsComponentTest : start executeExceptionTest
CASE 2024-10-05 18:55:33.333 INFO 28504 --- [ main] record.service.ExecutorsComponentTest : end executeExceptionTest
原因
在 JDK环境 默认线程池, 在 方法 java.util.concurrent.ThreadPoolExecutor#runWorker 中触发了异常,进入throw ex状态, 异常抛出, 日志打印
1 | try { |
但是自定义线程池中,由于使用了自定义PriorityTask, Runnable.run() 方法的签名不支持抛出受检异常,因此任何异常都不会自动冒泡或向外抛出。问题根源在于 Runnable 的限制。
当使用 threadPoolExecutor.execute() 时,Runnable.run() 的执行不会抛出异常到调用者,即使内部抛出 RuntimeException 或其他未检查异常,它也不会像 Callable 那样有返回值(比如 Future),从而通过 get() 方法捕获异常。
这里使用了 PriorityTask<>(command, null, priority),并且没有捕获到 command 内部的异常,因为 Runnable.run() 方法不会传播异常。
解决
- 针对自定义优先级线程池, 添加异常捕获抛出方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14public void execute(Runnable command, int priority) {
threadPoolExecutor.execute(new PriorityTask<>(wrap(command), null, priority)); // 注意这里的泛型为null
}
private Runnable wrap(Runnable command) {
return () -> {
try {
command.run();
} catch (Exception e) {
log.error("Exception in task execution: ", e); // 记录异常日志
throw e; // 如果需要,可以重新抛出异常
}
};
}
线程状态
线程异常, 线程池移除该线程, 如果线程池 coreSize 不够, 则补充一个新的线程进入
submit()异常
现象
code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15@Autowired
private ClientPoolComponent component;
@Test
public void submitExceptionTest() throws Exception {
log.info("start submitExceptionTest");
Future<Integer> submit = component.submit(this::submitError, 0);
log.info("submit submitExceptionTest");
Integer result = submit.get();
log.info("end submitExceptionTest, result:{}", result);
}
private int submitError() {
throw new RuntimeException("submit exception");
}log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15CASE 2024-10-05 19:18:02.219 INFO 29283 --- [ main] record.service.ExecutorsComponentTest : start submitExceptionTest
CASE 2024-10-05 19:18:02.219 INFO 29283 --- [ main] record.service.ExecutorsComponentTest : submit submitExceptionTest
java.util.concurrent.ExecutionException: java.lang.RuntimeException: submit exception
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at record.service.ExecutorsComponentTest.submitExceptionTest(ExecutorsComponentTest.java:40)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: java.lang.RuntimeException: submit exception
at record.service.ExecutorsComponentTest.submitError(ExecutorsComponentTest.java:50)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
原因
- 调用 submit(Callable) 或 submit(Runnable) 时,线程池会将任务提交并返回一个 Future 对象。
- Future 对象代表了任务的异步执行结果,可以通过 get() 方法来获取任务的执行结果或异常。
- 如果不调用 get(),异常不会被抛出到主线程中,无法知道任务是否成功或失败。异常可能存在于任务执行的线程中,但主线程不会收到通知,任务完成后,异常信息会保存在 Future 对象中,但它不会主动传播或处理,只有调用 get() 时才能显式获取这个异常。
- 如果调用 get(),它将阻塞主线程,直到任务完成。如果任务执行过程中抛出了异常,get() 会将异常重新抛出
- 线程的状态:即使不调用 get(),任务线程仍然会按计划执行并完成(或失败),它不会“停留”在线程池中。任务线程执行完毕后,线程池可能会重用这个线程去处理其他任务,但任务本身的结果或异常会保存在 Future 中,直到你调用 get()。
线程状态
在get()方法时,手动进行捕获。该线程不会被线程池移除,也不会补充新的线程进入。
@Async与自定义线程池抢占
现象
- 定时任务调用的方法,配置了@Async,但是观察现象仍然阻塞了所有的线程。
原因
- @Async 注解的方法如果没有配置独立的线程池,会默认使用 Spring 提供的 SimpleAsyncTaskExecutor 或者其他默认线程池,而这个线程池可能与主线程资源共享。
- 我的线程池策略当时配置为CallerRunsPolicy, 由于高频数据, 导致队列满, 最终也与其他线程共同争取主线程。
解决
由于未给@Async配置独立线程池, 导致与所有任务共同争抢主线程, 导致堵塞。这里给@Async配置对应的线程池即可
配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@EnableAsync
@Configuration
public class AsyncConfig {
@Bean(name = "asyncTaskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("Async-");
executor.initialize();
return executor;
}
}调用
1
2
3
4@Async("asyncTaskExecutor")
public void pull() {
// do something...
}
代码记录
Code: https://github.com/swzxsyh/Case
Rivision Num: 65f906e413170680a7e674c8c54d143f6fd93273
Java 三元表达式空指针异常
场景
- JDK 8/21
- 业务使用三元表达式,获取结果抛出空指针
1 | public static void main(String[] args) { |
异常
1 | Exception in thread "main" java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "com.test.service.NullPointService.getTarget()" is null |
原因
- JDK 底层会进行拆装箱,当null 进行 int转化,则会有空指针异常,所有包装类都会触发。
- 详细参考
处理
显示判定
1
2
3
4
5if (Objects.nonNull(source)) {
if (Objects.isNull(target)) {
log.error("target is null");
}
}指定类型, 避免触发分支预测时的拆装箱情况
1
2
3Integer result = Objects.nonNull(source) ? Optional.ofNullable(target).orElse(null) : Integer.valueOf(2);
log.info("source:{}, target:{} , result:{}", source, target, result);
代码记录
Code: https://github.com/swzxsyh/Case
Rivision Num: 9e5a514b6f9e57ed18c0989c9f05e22f3bb7b59a