Maven引入 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-webflux</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-r2dbc</artifactId > </dependency > <dependency > <groupId > io.asyncer</groupId > <artifactId > r2dbc-mysql</artifactId > <scope > runtime</scope > </dependency > <dependency > <groupId > io.r2dbc</groupId > <artifactId > r2dbc-proxy</artifactId > </dependency > <dependency > <groupId > io.r2dbc</groupId > <artifactId > r2dbc-pool</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency > <dependency > <groupId > io.micrometer</groupId > <artifactId > micrometer-tracing-bridge-brave</artifactId > </dependency > <dependency > <groupId > io.micrometer</groupId > <artifactId > context-propagation</artifactId > </dependency >
配置TraceId 添加支持 && 过滤器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 import org.slf4j.MDC;import reactor.core.publisher.Hooks;import reactor.core.publisher.Operators;import reactor.util.context.ContextView;import reactor.util.context.Context;import reactor.core.CoreSubscriber;public class ReactorMdcSupport { private static final String TRACE_ID = "traceId" ; public static void install () { Hooks.onEachOperator("mdc" , Operators.lift((scannable, subscriber) -> new MdcContextLifter <>(subscriber))); } static class MdcContextLifter <T> implements CoreSubscriber <T> { private final CoreSubscriber<T> actual; MdcContextLifter(CoreSubscriber<T> actual) { this .actual = actual; } @Override public void onSubscribe (org.reactivestreams.Subscription subscription) { actual.onSubscribe(subscription); } @Override public void onNext (T t) { copyContextToMdc(); actual.onNext(t); } @Override public void onError (Throwable throwable) { copyContextToMdc(); actual.onError(throwable); } @Override public void onComplete () { copyContextToMdc(); actual.onComplete(); } @Override public Context currentContext () { return actual.currentContext(); } private void copyContextToMdc () { ContextView contextView = actual.currentContext(); if (contextView.hasKey(TRACE_ID)) { String traceId = contextView.get(TRACE_ID); MDC.put(TRACE_ID, traceId); } } } } @Slf4j @Component @Order(Ordered.HIGHEST_PRECEDENCE) public class MDCTraceFilter implements WebFilter { @NotNull @Override public Mono<Void> filter (ServerWebExchange exchange, WebFilterChain chain) { String traceId = ThreadMdcUtil.getTraceId(); exchange.getRequest().mutate().header(ThreadMdcUtil.TRACE_ID, traceId); return chain.filter(exchange) .contextWrite(Context.of(ThreadMdcUtil.TRACE_ID, traceId)) .doFirst(() -> MDC.put(ThreadMdcUtil.TRACE_ID, traceId)) .doFinally(signal -> MDC.remove(ThreadMdcUtil.TRACE_ID)); } } @Configuration public class WebFilterConfig { @Bean public WebFilter mdcTraceFilter () { return new MDCTraceFilter (); } }
启动类注册 traceId 和 异常 1 2 3 4 5 6 7 8 9 public static void main (String[] args) { Hooks.enableAutomaticContextPropagation(); Hooks.onErrorDropped(e -> log.warn("Dropped error: {}" , e.toString())); Hooks.onNextDropped(o -> log.warn("Dropped value: {}" , o)); Hooks.onOperatorDebug(); ReactorMdcSupport.install(); SpringApplication.run(SpiderCanalApplication.class, args); }
Redisson配置修改 给redisson进行相关配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Autowired private ObjectMapper objectMapper;@Autowired private CustomProperties properties;@Bean public RedissonReactiveClient redissonReactiveClient () throws IOException { Config config = new Config (); config.setCodec(new JsonJacksonCodec (objectMapper)); RedissonProperties.ClusterServersConfig clusterServersConfig = properties.getClusterServersConfig(); if (Objects.nonNull(clusterServersConfig)) { config.useClusterServers() .setUsername(properties.getUsername()) .setPassword(properties.getPassword()) .addNodeAddress(clusterServersConfig.getNodeAddresses()) .setScanInterval(clusterServersConfig.getScanInterval()) .setTimeout(clusterServersConfig.getTimeout()) .setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize()) .setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize()); } else { RedissonProperties.SingleServerConfig singleServerConfig = properties.getSingleServerConfig(); if (Objects.nonNull(singleServerConfig)) { config.useSingleServer() .setUsername(properties.getUsername()) .setPassword(properties.getPassword()) .setAddress(singleServerConfig.getAddress()) .setDatabase(singleServerConfig.getDatabase()) .setTimeout(singleServerConfig.getTimeout()) .setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize()) .setConnectionPoolSize(singleServerConfig.getConnectionPoolSize()); } } return Redisson.create(config).reactive(); }
Reacive获取和存储Data 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public <T> Mono<T> get (String key, Class<T> clazz) { RBucketReactive<T> bucket = redisson.getBucket(key, new TypedJsonJacksonCodec (clazz)); return bucket.get(); } public <T> Mono<Void> set (String key, T value, Duration ttl) { return redisson.getBucket(key, new TypedJsonJacksonCodec (value.getClass())) .set(value, ttl); }
R2DBC 使用中遇到的问题: java.util.Date不可用
需要使用LocalDateTime, 旧的Date在该组件中不支持
LocalDateTime传入的的格式问题
构建自定义解析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class FlexibleLocalDateTimeDeserializer extends JsonDeserializer <LocalDateTime> { private static final List<DateTimeFormatter> FORMATTERS = List.of( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS" ), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS" ), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ) ); @Override public LocalDateTime deserialize (JsonParser p, DeserializationContext ctxt) throws IOException { String text = p.getText().trim(); for (DateTimeFormatter formatter : FORMATTERS) { try { return LocalDateTime.parse(text, formatter); } catch (DateTimeParseException ignored) { } } throw new IOException ("无法解析时间: " + text); } }
在对应entity的字段上, 配置该解析 1 @JsonDeserialize(using = FlexibleLocalDateTimeDeserializer.class)
之前使用JPA, 现在找不到Repository
日志错误 1 Field PlatformGroupService in com.swzxsyh.components.FastQueryService required a bean of type 'com.swzxsyh.repository.PlatformGroupRepository' that could not be found.
是引入包的问题, 同名称需使用R2DBC的包 1 2 3 4 import org.springframework.data.relational.core.mapping.Column;import org.springframework.data.annotation.Id;import org.springframework.data.relational.core.mapping.Table;
不打印语句 需要自定义Listener, 这里手动构建成一个像MyBatis的语句打印 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class R2dbcProxyListener implements ProxyExecutionListener , ProxyMethodExecutionListener { private static final Logger logger = LoggerFactory.getLogger(R2dbcProxyListener.class); @Override public void eachQueryResult (QueryExecutionInfo execInfo) { Object currentMappedResult = execInfo.getCurrentMappedResult(); logger.info("<== Affected Rows: {}" , currentMappedResult); } @Override public void beforeExecuteOnStatement (@NotNull QueryExecutionInfo queryExecutionInfo) { this .sqlPrinter(queryExecutionInfo); } public void sqlPrinter (QueryExecutionInfo execInfo) { for (QueryInfo qi : execInfo.getQueries()) { logger.info("==> Preparing: {}" , qi.getQuery()); for (Bindings bindings : qi.getBindingsList()) { StringBuilder sb = new StringBuilder (); bindings.getIndexBindings().forEach(b -> { Object key = b.getKey(); BoundValue bv = b.getBoundValue(); Object value = bv.getValue(); String type = bv.isNull() ? bv.getNullType().getSimpleName() + " (NULL)" : bv.getValue().getClass().getSimpleName(); sb.append(value).append("(" ).append(type).append(")" ).append(", " ); }); logger.info(sb.substring(0 , sb.length() - 2 )); bindings.getNamedBindings().forEach(b -> { Object key = b.getKey(); BoundValue bv = b.getBoundValue(); Object value = bv.getValue(); String type = bv.isNull() ? bv.getNullType().getSimpleName() + " (NULL)" : bv.getValue().getClass().getSimpleName(); logger.info(" -> param['{}'] = {} ({})" , key, value, type); }); } } } }
代码注册 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Configuration public class R2dbcConfig extends AbstractR2dbcConfiguration { private final ConnectionFactory connectionFactory; public R2dbcConfig (ConnectionFactory connectionFactory) { this .connectionFactory = connectionFactory; } @Override public ConnectionFactory connectionFactory () { return ProxyConnectionFactory.builder(this .connectionFactory).listener(new R2dbcProxyListener ()).build(); } @Override public DatabaseClient databaseClient () { return DatabaseClient.builder().connectionFactory(connectionFactory()).build(); }
日志配置 1 2 3 4 5 6 7 8 9 10 11 logging: level: com.swzxsyh: info org.springframework: warn org.springframework.r2dbc: warn org.redisson.connection: warn io.r2dbc.proxy: warn com.swzxsyh.config.listener.R2dbcProxyListener: info io.r2dbc.proxy.listener.DefaultQueryExecutionListener: warn io.r2dbc.proxy.observation.ObservationProxyExecutionListener: warn io.asyncer.r2dbc.mysql.client.ReactorNettyClient: warn
repository使用语句.one()
反而limit 2
出现这种SQL 1 SELECT spider_config.* FROM spider_config WHERE spider_config.status = ? AND spider_config.domain = ? LIMIT 2
详见 https://github.com/spring-projects/spring-data-r2dbc/issues/758
, 如果一定要one, 使用.first()
语法
配置了重载转换器, 仍然提示没有转换器的错误 错误日志 1 2 org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [java.lang.String] to type [com.swzxsyh.model.platform.PlatformScore] at org.springframework.core.convert.support.GenericConversionService.handleConverterNotFound(GenericConversionService.java:294)
读写转换器代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @ReadingConverter public class PlatformScoreReadConverter implements Converter <String, PlatformScore> { @Override public PlatformScore convert (String source) { if (StringUtils.isBlank(source)) { return null ; } return JsonUtil.toClass(source, PlatformScore.class); } } @WritingConverter public class PlatformScoreWriteConverter implements Converter <PlatformScore, String> { @Override public String convert (PlatformScore source) { return JsonUtil.toJson(source); } }
不生效的配置类代码 1 2 3 4 5 6 7 8 9 10 11 @Configuration public class R2dbcConvert { @Bean public R2dbcCustomConversions r2dbcCustomConversions () { List<Converter<?, ?>> converters = new ArrayList <>(); converters.add(new PlatformScoreWriteConverter ()); converters.add(new PlatformScoreReadConverter ()); return new R2dbcCustomConversions (getStoreConversions(), converters); } }
修正方法: 使用继承AbstractR2dbcConfiguration的@Override进行存储.
该类允许Hook getCustomConverters方法, 这个方法是protected的, 一定要继承AbstractR2dbcConfiguration进行处理
为什么之前的注册不生效: 它是直接调用this方法, 没有去找@Bean, 所以除非重载, 否则不会生效 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Configuration public class R2dbcConfig extends AbstractR2dbcConfiguration { private final ConnectionFactory connectionFactory; public R2dbcConfig (ConnectionFactory connectionFactory) { this .connectionFactory = connectionFactory; } @Override public ConnectionFactory connectionFactory () { return ProxyConnectionFactory.builder(this .connectionFactory).listener(new R2dbcProxyListener ()).build(); } @Override public DatabaseClient databaseClient () { return DatabaseClient.builder().connectionFactory(connectionFactory()).build(); } @Override public R2dbcEntityTemplate r2dbcEntityTemplate (DatabaseClient databaseClient, ReactiveDataAccessStrategy dataAccessStrategy) { return new R2dbcEntityTemplate (databaseClient, dataAccessStrategy); } @Override protected List<Object> getCustomConverters () { return Arrays.asList( new PlatformScoreWriteConverter (), new PlatformScoreReadConverter () ); } }