分布式专题——Reactive访问Spring Data R2DBC

楚宇
2023-12-01

1、Spring Data R2DBC

⼀些主要的类

  • ConnectionFactory
  • DatabaseClient
    • execute().sql(SQL)
  • inTransaction(db -> {})
  • R2dbcExceptionTranslator
    • SqlErrorCodeR2dbcExceptionTranslator

1.1、pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.zhz</groupId>
    <artifactId>reactive-spring-boot-r2dbc-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>reactive-spring-boot-r2dbc-demo</name>
    <description>Spring Boot使用reactive集成r2dbc</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!-- R2DBC依赖 -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-r2dbc</artifactId>
            <version>1.0.0.M1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/io.r2dbc/r2dbc-h2 -->
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-h2</artifactId>
            <version>1.0.0.M6</version>
        </dependency>

        <!-- R2DBC依赖 -->

        <dependency>
            <groupId>org.joda</groupId>
            <artifactId>joda-money</artifactId>
            <version>1.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <!-- Milestone的依赖不在主仓库里 -->
    <repositories>
        <repository>
            <id>spring-milestone</id>
            <name>Spring Milestones Repository</name>
            <url>https://repo.spring.io/milestone/</url>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.2、代码

package com.zhz.reactivespringbootr2dbcdemo.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.joda.money.Money;

import java.util.Date;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Coffee {
    private Long id;
    private String name;
    private Money price;
    private Date createTime;
    private Date updateTime;
}
package com.zhz.reactivespringbootr2dbcdemo.converter;

import org.joda.money.CurrencyUnit;
import org.joda.money.Money;
import org.springframework.core.convert.converter.Converter;

public class MoneyReadConverter implements Converter<Long, Money> {
    @Override
    public Money convert(Long aLong) {
        return Money.ofMinor(CurrencyUnit.of("CNY"), aLong);
    }
}
package com.zhz.reactivespringbootr2dbcdemo.converter;

import org.joda.money.Money;
import org.springframework.core.convert.converter.Converter;

public class MoneyWriteConverter implements Converter<Money, Long> {
    @Override
    public Long convert(Money money) {
        return money.getAmountMinorLong();
    }
}

1.3、测试类

package com.zhz.reactivespringbootr2dbcdemo;

import com.zhz.reactivespringbootr2dbcdemo.converter.MoneyReadConverter;
import com.zhz.reactivespringbootr2dbcdemo.converter.MoneyWriteConverter;
import com.zhz.reactivespringbootr2dbcdemo.model.Coffee;
import io.r2dbc.h2.H2ConnectionConfiguration;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.convert.CustomConversions;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.dialect.Dialect;
import org.springframework.data.r2dbc.function.DatabaseClient;
import org.springframework.data.r2dbc.function.convert.R2dbcCustomConversions;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;

@SpringBootApplication
@Slf4j
public class ReactiveSpringBootR2dbcDemoApplication extends AbstractR2dbcConfiguration implements ApplicationRunner {

    @Autowired
    private DatabaseClient client;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveSpringBootR2dbcDemoApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        CountDownLatch cdl = new CountDownLatch(2);

        client.execute()
                .sql("select * from t_coffee")
                .as(Coffee.class)
                .fetch()
                .first()
                .doFinally(s -> cdl.countDown())
//				.subscribeOn(Schedulers.elastic())
                .subscribe(c -> log.info("Fetch execute() {}", c));

        client.select()
                .from("t_coffee")
                .orderBy(Sort.by(Sort.Direction.DESC, "id"))
                .page(PageRequest.of(0, 3))
                .as(Coffee.class)
                .fetch()
                .all()
                .doFinally(s -> cdl.countDown())
//				.subscribeOn(Schedulers.elastic())
                .subscribe(c -> log.info("Fetch select() {}", c));

        log.info("After Starting.");
        cdl.await();
    }

    @Override
    public ConnectionFactory connectionFactory() {
        return new H2ConnectionFactory(H2ConnectionConfiguration.builder().inMemory("spring-test").url("jdbc:mysql://119.29.36.141:3306/spring-test?serverTimezone=GMT%2B8").username("root").password("root").build());
    }
    @Bean
    public R2dbcCustomConversions r2dbcCustomConversions() {
        Dialect dialect = getDialect(connectionFactory());
        CustomConversions.StoreConversions storeConversions =
                CustomConversions.StoreConversions.of(dialect.getSimpleTypeHolder());
        return new R2dbcCustomConversions(storeConversions,
                Arrays.asList(new MoneyReadConverter(), new MoneyWriteConverter()));
    }
}

2、R2DBC Repository ⽀持

⼀些主要的类

  • @EnableR2dbcRepositories
  • ReactiveCrudRepository<T, ID>
  • @Table / @Id
  • 其中的⽅法返回都是 Mono 或者 Flux
  • ⾃定义查询需要⾃⼰写 @Query

2.1、pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.zhz</groupId>
    <artifactId>reactive-spring-boot-r2dbc-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>reactive-spring-boot-r2dbc-demo</name>
    <description>Spring Boot使用reactive集成r2dbc</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!-- R2DBC依赖 -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-r2dbc</artifactId>
            <version>1.0.0.M1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/io.r2dbc/r2dbc-h2 -->
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-h2</artifactId>
            <version>1.0.0.M6</version>
        </dependency>

        <!-- R2DBC依赖 -->

        <dependency>
            <groupId>org.joda</groupId>
            <artifactId>joda-money</artifactId>
            <version>1.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <!-- Milestone的依赖不在主仓库里 -->
    <repositories>
        <repository>
            <id>spring-milestone</id>
            <name>Spring Milestones Repository</name>
            <url>https://repo.spring.io/milestone/</url>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.2、yml

spring:
  datasource:
    url: jdbc:mysql://119.29.36.141:3306/spring-test?serverTimezone=GMT%2B8
    username: root
    password: root
    hikari:
      connection-test-query: SELECT 1
      connection-timeout: 60000
      idle-timeout: 500000
      max-lifetime: 540000
      maximum-pool-size: 12
      minimum-idle: 10
      pool-name: GuliHikariPool
  output:
    ansi:
      enabled: always
management:
  endpoints:
    web:
      exposure:
        include: '*'

2.3、代码

package com.zhz.reactivespringbootr2dbcdemo.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.joda.money.Money;

import java.util.Date;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Coffee {
    private Long id;
    private String name;
    private Money price;
    private Date createTime;
    private Date updateTime;
}
package com.zhz.reactivespringbootr2dbcdemo.converter;

import org.joda.money.CurrencyUnit;
import org.joda.money.Money;
import org.springframework.core.convert.converter.Converter;

public class MoneyReadConverter implements Converter<Long, Money> {
    @Override
    public Money convert(Long aLong) {
        return Money.ofMinor(CurrencyUnit.of("CNY"), aLong);
    }
}
package com.zhz.reactivespringbootr2dbcdemo.converter;

import org.joda.money.Money;
import org.springframework.core.convert.converter.Converter;

public class MoneyWriteConverter implements Converter<Money, Long> {
    @Override
    public Long convert(Money money) {
        return money.getAmountMinorLong();
    }
}
package com.zhz.reactivespringbootr2dbcdemo.repository;

import com.zhz.reactivespringbootr2dbcdemo.model.Coffee;
import org.springframework.data.r2dbc.repository.query.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;

public interface CoffeeRepository extends ReactiveCrudRepository<Coffee, Long> {
    @Query("select * from t_coffee where name = $1")
    Flux<Coffee> findByName(String name);
}

2.4、测试类

package com.zhz.reactivespringbootr2dbcdemo;

import com.zhz.reactivespringbootr2dbcdemo.converter.MoneyReadConverter;
import com.zhz.reactivespringbootr2dbcdemo.converter.MoneyWriteConverter;
import com.zhz.reactivespringbootr2dbcdemo.model.Coffee;
import com.zhz.reactivespringbootr2dbcdemo.repository.CoffeeRepository;
import io.r2dbc.h2.H2ConnectionConfiguration;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.convert.CustomConversions;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.dialect.Dialect;
import org.springframework.data.r2dbc.function.DatabaseClient;
import org.springframework.data.r2dbc.function.convert.R2dbcCustomConversions;
import reactor.core.publisher.Flux;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;

@SpringBootApplication
@Slf4j
public class ReactiveSpringBootR2dbcDemoApplication extends AbstractR2dbcConfiguration implements ApplicationRunner {

    @Autowired
    private CoffeeRepository repository;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveSpringBootR2dbcDemoApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        coffeeRepository();
    }

    private void coffeeRepository() throws InterruptedException {
        CountDownLatch cdl = new CountDownLatch(2);

        repository.findAllById(Flux.just(1L, 2L))
                .map(c -> c.getName() + "-" + c.getPrice().toString())
                .doFinally(s -> cdl.countDown())
                .subscribe(c -> log.info("Find {}", c));

        repository.findByName("mocha")
                .doFinally(s -> cdl.countDown())
                .subscribe(c -> log.info("Find {}", c));

        cdl.await();
    }

    @Override
    public ConnectionFactory connectionFactory() {
        return new H2ConnectionFactory(H2ConnectionConfiguration.builder().inMemory("spring-test").url("jdbc:mysql://119.29.36.141:3306/spring-test?serverTimezone=GMT%2B8").username("root").password("root").build());
    }
    @Bean
    public R2dbcCustomConversions r2dbcCustomConversions() {
        Dialect dialect = getDialect(connectionFactory());
        CustomConversions.StoreConversions storeConversions =
                CustomConversions.StoreConversions.of(dialect.getSimpleTypeHolder());
        return new R2dbcCustomConversions(storeConversions,
                Arrays.asList(new MoneyReadConverter(), new MoneyWriteConverter()));
    }



}
 类似资料: