0%

Spring Boot Reactive Redisson && R2DBC 初体验

Maven引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<!-- spring-webflux 用于支持 WebClient -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<!-- R2DBC 核心支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.asyncer</groupId>
<artifactId>r2dbc-mysql</artifactId>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.r2dbc/r2dbc-proxy -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-proxy</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
</dependency>

<!-- 监控与链路追踪 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
</dependency>

配置TraceId

添加支持 && 过滤器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import org.slf4j.MDC;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Operators;
import reactor.util.context.ContextView;
import reactor.util.context.Context;
import reactor.core.CoreSubscriber;

public class ReactorMdcSupport {

private static final String TRACE_ID = "traceId";

public static void install() {
Hooks.onEachOperator("mdc", Operators.lift((scannable, subscriber) -> new MdcContextLifter<>(subscriber)));
}

static class MdcContextLifter<T> implements CoreSubscriber<T> {
private final CoreSubscriber<T> actual;

MdcContextLifter(CoreSubscriber<T> actual) {
this.actual = actual;
}

@Override
public void onSubscribe(org.reactivestreams.Subscription subscription) {
actual.onSubscribe(subscription);
}

@Override
public void onNext(T t) {
copyContextToMdc();
actual.onNext(t);
}

@Override
public void onError(Throwable throwable) {
copyContextToMdc();
actual.onError(throwable);
}

@Override
public void onComplete() {
copyContextToMdc();
actual.onComplete();
}

@Override
public Context currentContext() {
return actual.currentContext();
}

private void copyContextToMdc() {
ContextView contextView = actual.currentContext();
if (contextView.hasKey(TRACE_ID)) {
String traceId = contextView.get(TRACE_ID);
MDC.put(TRACE_ID, traceId);
}
}
}
}




@Slf4j
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class MDCTraceFilter implements WebFilter {

@NotNull
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String traceId = ThreadMdcUtil.getTraceId();
exchange.getRequest().mutate().header(ThreadMdcUtil.TRACE_ID, traceId);
return chain.filter(exchange)
.contextWrite(Context.of(ThreadMdcUtil.TRACE_ID, traceId))
.doFirst(() -> MDC.put(ThreadMdcUtil.TRACE_ID, traceId))
.doFinally(signal -> MDC.remove(ThreadMdcUtil.TRACE_ID));
}
}


@Configuration
public class WebFilterConfig {

@Bean
public WebFilter mdcTraceFilter() {
return new MDCTraceFilter();
}
}

启动类注册 traceId 和 异常

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
Hooks.enableAutomaticContextPropagation();
Hooks.onErrorDropped(e -> log.warn("Dropped error: {}", e.toString()));
Hooks.onNextDropped(o -> log.warn("Dropped value: {}", o));
Hooks.onOperatorDebug();
// 注册 Reactor 全局 Hook
ReactorMdcSupport.install();
SpringApplication.run(SpiderCanalApplication.class, args);
}

Redisson配置修改

给redisson进行相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 序列化解码
@Autowired
private ObjectMapper objectMapper;

// 自定义配置类, 加载yaml的配置
@Autowired
private CustomProperties properties;

@Bean
public RedissonReactiveClient redissonReactiveClient() throws IOException {
Config config = new Config();
config.setCodec(new JsonJacksonCodec(objectMapper));

RedissonProperties.ClusterServersConfig clusterServersConfig = properties.getClusterServersConfig();
if (Objects.nonNull(clusterServersConfig)) {
config.useClusterServers()
.setUsername(properties.getUsername())
.setPassword(properties.getPassword())
.addNodeAddress(clusterServersConfig.getNodeAddresses())
.setScanInterval(clusterServersConfig.getScanInterval())
.setTimeout(clusterServersConfig.getTimeout())
.setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize())
.setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize());
} else {
RedissonProperties.SingleServerConfig singleServerConfig = properties.getSingleServerConfig();
if (Objects.nonNull(singleServerConfig)) {
config.useSingleServer()
.setUsername(properties.getUsername())
.setPassword(properties.getPassword())
.setAddress(singleServerConfig.getAddress())
.setDatabase(singleServerConfig.getDatabase())
.setTimeout(singleServerConfig.getTimeout())
.setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize())
.setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());
}
}

return Redisson.create(config).reactive();
}

Reacive获取和存储Data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 读取指定 key 的值
*/
public <T> Mono<T> get(String key, Class<T> clazz) {
RBucketReactive<T> bucket = redisson.getBucket(key, new TypedJsonJacksonCodec(clazz));
return bucket.get();
}

/**
* 设置 key-value ,指定过期时间
*/
public <T> Mono<Void> set(String key, T value, Duration ttl) {
return redisson.getBucket(key, new TypedJsonJacksonCodec(value.getClass()))
.set(value, ttl);
}

R2DBC 使用中遇到的问题:

java.util.Date不可用

  • 需要使用LocalDateTime, 旧的Date在该组件中不支持

LocalDateTime传入的的格式问题

  • 构建自定义解析
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class FlexibleLocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {

    private static final List<DateTimeFormatter> FORMATTERS = List.of(
    DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"),
    DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"),
    DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
    );

    @Override
    public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
    String text = p.getText().trim();
    for (DateTimeFormatter formatter : FORMATTERS) {
    try {
    return LocalDateTime.parse(text, formatter);
    } catch (DateTimeParseException ignored) {
    }
    }
    throw new IOException("无法解析时间: " + text);
    }
    }
  • 在对应entity的字段上, 配置该解析
    1
    @JsonDeserialize(using = FlexibleLocalDateTimeDeserializer.class)

之前使用JPA, 现在找不到Repository

  • 日志错误
    1
    Field PlatformGroupService in com.swzxsyh.components.FastQueryService required a bean of type 'com.swzxsyh.repository.PlatformGroupRepository' that could not be found.
  • 是引入包的问题, 同名称需使用R2DBC的包
    1
    2
    3
    4

    import org.springframework.data.relational.core.mapping.Column;
    import org.springframework.data.annotation.Id;
    import org.springframework.data.relational.core.mapping.Table;

不打印语句

需要自定义Listener, 这里手动构建成一个像MyBatis的语句打印

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class R2dbcProxyListener implements ProxyExecutionListener, ProxyMethodExecutionListener {

private static final Logger logger = LoggerFactory.getLogger(R2dbcProxyListener.class);

/**
* 结果打印
*
* @param execInfo statement
*/
@Override
public void eachQueryResult(QueryExecutionInfo execInfo) {
Object currentMappedResult = execInfo.getCurrentMappedResult();
logger.info("<== Affected Rows: {}", currentMappedResult);
}

/**
* 语句打印
*
* @param queryExecutionInfo statement
*/
@Override
public void beforeExecuteOnStatement(@NotNull QueryExecutionInfo queryExecutionInfo) {
this.sqlPrinter(queryExecutionInfo);
}

public void sqlPrinter(QueryExecutionInfo execInfo) {
// 打 SQL
for (QueryInfo qi : execInfo.getQueries()) {
logger.info("==> Preparing: {}", qi.getQuery());

// 打 参数
for (Bindings bindings : qi.getBindingsList()) {
// 打印位置绑定参数(index-based)
StringBuilder sb = new StringBuilder();
bindings.getIndexBindings().forEach(b -> {
Object key = b.getKey();
BoundValue bv = b.getBoundValue();
Object value = bv.getValue();
String type = bv.isNull() ? bv.getNullType().getSimpleName() + " (NULL)" : bv.getValue().getClass().getSimpleName();
// logger.info(" -> param[{}] = {} ({})", key, value, type);
// key value type
sb.append(value).append("(").append(type).append(")").append(", ");
});
logger.info(sb.substring(0, sb.length() - 2));

// 打印命名绑定参数(name-based)
bindings.getNamedBindings().forEach(b -> {
Object key = b.getKey();
BoundValue bv = b.getBoundValue();
Object value = bv.getValue();
String type = bv.isNull() ? bv.getNullType().getSimpleName() + " (NULL)" : bv.getValue().getClass().getSimpleName();
logger.info(" -> param['{}'] = {} ({})", key, value, type);
});
}
}
}
}

代码注册

    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class R2dbcConfig extends AbstractR2dbcConfiguration {

private final ConnectionFactory connectionFactory;

public R2dbcConfig(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}

@Override
public ConnectionFactory connectionFactory() {
return ProxyConnectionFactory.builder(this.connectionFactory).listener(new R2dbcProxyListener()).build();
}

@Override
public DatabaseClient databaseClient() {
return DatabaseClient.builder().connectionFactory(connectionFactory()).build();
}

日志配置

1
2
3
4
5
6
7
8
9
10
11
logging:
level:
com.swzxsyh: info
org.springframework: warn
org.springframework.r2dbc: warn
org.redisson.connection: warn
io.r2dbc.proxy: warn
com.swzxsyh.config.listener.R2dbcProxyListener: info
io.r2dbc.proxy.listener.DefaultQueryExecutionListener: warn
io.r2dbc.proxy.observation.ObservationProxyExecutionListener: warn
io.asyncer.r2dbc.mysql.client.ReactorNettyClient: warn

repository使用语句.one()反而limit 2

  • 出现这种SQL
    1
    SELECT spider_config.* FROM spider_config WHERE spider_config.status = ? AND spider_config.domain = ? LIMIT 2
  • 详见 https://github.com/spring-projects/spring-data-r2dbc/issues/758, 如果一定要one, 使用.first()语法

配置了重载转换器, 仍然提示没有转换器的错误

错误日志

1
2
org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [java.lang.String] to type [com.swzxsyh.model.platform.PlatformScore]
at org.springframework.core.convert.support.GenericConversionService.handleConverterNotFound(GenericConversionService.java:294)

读写转换器代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@ReadingConverter
public class PlatformScoreReadConverter implements Converter<String, PlatformScore> {

@Override
public PlatformScore convert(String source) {
if (StringUtils.isBlank(source)) {
return null;
}
return JsonUtil.toClass(source, PlatformScore.class);
}
}


@WritingConverter
public class PlatformScoreWriteConverter implements Converter<PlatformScore, String> {

@Override
public String convert(PlatformScore source) {
return JsonUtil.toJson(source);
}
}

不生效的配置类代码

1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class R2dbcConvert{

@Bean
public R2dbcCustomConversions r2dbcCustomConversions() {
List<Converter<?, ?>> converters = new ArrayList<>();
converters.add(new PlatformScoreWriteConverter());
converters.add(new PlatformScoreReadConverter());
return new R2dbcCustomConversions(getStoreConversions(), converters);
}
}

修正方法: 使用继承AbstractR2dbcConfiguration的@Override进行存储.

  • 该类允许Hook getCustomConverters方法, 这个方法是protected的, 一定要继承AbstractR2dbcConfiguration进行处理
  • 为什么之前的注册不生效: 它是直接调用this方法, 没有去找@Bean, 所以除非重载, 否则不会生效
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    @Configuration
    public class R2dbcConfig extends AbstractR2dbcConfiguration {

    private final ConnectionFactory connectionFactory;

    public R2dbcConfig(ConnectionFactory connectionFactory) {
    this.connectionFactory = connectionFactory;
    }

    @Override
    public ConnectionFactory connectionFactory() {
    return ProxyConnectionFactory.builder(this.connectionFactory).listener(new R2dbcProxyListener()).build();
    }

    @Override
    public DatabaseClient databaseClient() {
    return DatabaseClient.builder().connectionFactory(connectionFactory()).build();
    }

    @Override
    public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient, ReactiveDataAccessStrategy dataAccessStrategy) {
    // 通过 DialectResolver 获取 R2dbcDialect
    return new R2dbcEntityTemplate(databaseClient, dataAccessStrategy); // 传入 ConnectionFactory
    }



    @Override
    protected List<Object> getCustomConverters() {
    return Arrays.asList(
    new PlatformScoreWriteConverter(),
    new PlatformScoreReadConverter()
    );
    }
    }