R2DBC 响应式数据库管理

简介

在 Reactive 响应式编程里,几乎所有的思想都被重写,从Web到IO,再到服务器,及数据库连接组件,都需要满足响应式编程思维,本例为用于 Reactive 响应式编程所使用的 数据库连接组件,与 JDBC 同级,负责响应式的数据库连接工作。

官方网站级初始使用:https://r2dbc.io/

目前 Reactive 响应式编程暂没有 MyBatis 或 MyBatis Plus 的支持,因此查询数据库目前依然需要使用原生的 R2DBC 来执行。

 

引入依赖

具体的支持数据库的依赖可以参考官方网站:https://r2dbc.io/drivers/

本次使用 mysql 的驱动。

r2dbc-spi

r2dbc-spi 是一种连接数据库的接口规范,我们知道,JDBC 本身也是一系列的接口规范,各大数据库厂商通过实现该规范来支持对应的数据库系统,而相应的,r2dbc-spi 则是 一种统一接口规范。

 

r2dbc-mysql

r2dbc-mysql 是 Mysql 对于 r2dbc-spi 接口规范的一个实现,用以支持 Mysql 的连接和查询。

以下为引入r2dbc-mysql依赖

<dependency>
  <groupId>io.asyncer</groupId>
  <artifactId>r2dbc-mysql</artifactId>
  <version>1.4.0</version>
</dependency>

 

 

初始使用

我们针对数据库连接无非就那几个步骤

  • 1.填写连接配置文件
    • 比如主机地址,用户名,密码,端口,连接数据库等
  • 2.创建一个连接工厂
    • 传入配置对象
  • 3.创建一个 statement 的SQL 连接
    • 编写一个查询的SQL查询对象
  • 4.查询并获得查询结果
    • 通过 result 对象取出数据并封装成对象
  • 5.对DAO对象进行其它使用

这是手动连接数据库时需要执行的步骤。

1.官方建议:填写连接配置文件

参考官方的填写连接配置文件,使用 MySqlConnectionConfiguration 配置类来载入连接数据库需要的配置:

MySqlConnectionConfiguration 提供一个 builder 方法以快速创建配置对象(因为使用了 private 构造函数,所以不能 new)

        MySqlConnectionConfiguration mysqlConfigBuild = MySqlConnectionConfiguration.builder()
                .host("127.0.0.1")
                .port(3306)
                .username("root")
                .password("root")
                .database("reactive")
                .build();

 

 

2.官方建议:创建一个连接工厂

R2DBC 提供一个用于创建连接工厂的对象 MySqlConnectionFactory

使用 MySqlConnectionFactory.from 来引入 MySqlConnectionConfiguration  配置类对象获得连接配置

MySqlConnectionFactory 对象是一个 Mono 流,里面包含了一个 MySqlConnection 对象的连接对象。

Mono<? extends MySqlConnection> mono = MySqlConnectionFactory.from(mysqlConfigBuild).create();

 

 

3.官方建议:创建一个 statement 的SQL 连接

由 2. 我们可知,MySqlConnectionFactory  对象是一个 Mono 流,流中带有一个 MySqlConnection  对象,

Reactive 响应式编程中,可以使用 map 来使一个流中的数据转为经过加工的数据并加入到新流中。

而使用 MySqlConnection 对象进行查询数据库后,有可能会得出 多个 行数据,这时如果使用 map 就不行

因为 Mono 流的 map 后依然是 Mono 流,不符合存多个数据的条件。(flatMap 也是会返回 Mono 流)

所以要使用 flatMapMany 来生产一个 Mono 流经过加工后能存放多个数据的 Flux 流。

使用 createStatement 来创建一个 SQL 查询对象:

 // 使用 flatMapMany 来让一个 Mono 经过加工后能返回一个 Flux,result 留在 4. 步中使用
 Flux<? extends MySqlResult> result = sqlConnectionFactory.flatMapMany(confactory->
            // 取出 Mono 流里面的 MySqlConnection 对象进行创建 Statement
            confactory.createStatement("SELECT * from users where username=?")
                    .bind(0,"tzming")
                    .execute());
  •  在 Statement 中,我们可以使用两种方式来绑定查询变量
    • 第一种,使用 ? 来占位
      • 在绑定数据时,使用占位的排序数来定位,如上面代码 .bind(0,"tzming") 就是绑定第一个 ? 的数据
    • 第二种,使用 ?name 来占位(也叫具名绑定)
      • 在绑定数据时,可以直接使用名字来绑定数据,如
        • confactory.createStatement("SELECT * from users where username=?name")
        • .bind("name", "tzming")
  • 使用 execute() 来执行查询,会返回一个 MySQLResult 对象的 Flux 流。

 

 

4.官方建议:查询并获得查询结果

当我们取到查询的结果的流后,我们就能通过它来进行获取查询数据及DAO封装。

        Flux<User> userFlux = result.flatMap(r -> r.map(data -> {
                    Long id = data.get("id", Long.class);
                    String username = data.get("username", String.class);
                    CharSequence password = data.get("password", CharSequence.class);
                    String email = data.get("email", String.class);
                    return new User(id, username, password, email);
                })
        );

这里使用 flatMap 是因为,flatMap 是用于把一个数据的流,打散成多个数据的流,如果使用 map 的话,就会变成一个Flux流中存在一个数据,这个数据是一个存在多个User的 Flux 流,即 Flux<Flux<User>> 包装了。我们要的是 Flux<User> 即可。

 

整合与自动配置 Spring Data R2DBC

1.引入依赖

把 R2DBC 进 SpringBoot 项目中,需要使用 spring-boot-starter-data-r2dbc 依赖进去

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>

 

 

2.编写配置

spring:
  application:
    name: WebFlux
  r2dbc:
    url: r2dbc:mysql://localhost:3306/reactive
    username: root
    password: root

以前的 jdbc 框架使用的是 jdbc:mysql:// 的格式,而r2dbc 使用的就是 r2dbc:mysql:// 格式了

 

 

3.获取数据库操作客户端

当我们的Spring程序运行并连接数据库之后,我们就可以拿到由 Spring-Data 提供的 Template ,也就是我们常说的用于操作的客户端对象。

 

R2dbcEntityTemplate 对象

  • SpringData 中提供了一个用于操作数据库的对象 R2dbcEntityTemplate
    • 它使用了与 MyBatis 类似的思想
    • 即【条件】使用 对象来封装,不使用 SQL 写死的方法
      • 1.创建【查询条件】的类为 Criteria 类
      • 2.通过 Query 等类,与 Criteria 类 对象封装成一个查询
      • 3.通过 Query 类的查询方法中提交的 Dao 数据类 Class 来取得查询出来的数据要封装成什么类型的DAO
      • 4.通过 DAO 中加入 @Table 来识别要查询的表名

示例代码如下:

   @Resource
    private R2dbcEntityTemplate r2dbcEntityTemplate;

    @GetMapping("/r2dbc/{id}")
    public Flux<User> dataFormR2dbc(@PathVariable("id") Long id){

        /**
         * 使用 Criteria 类来创建查询条件
         * 此处代表,“id” 的值是需要等于 接收回来的值
         */
        Criteria criteria = Criteria.empty()
                .and("id").is(id);

        /**
         * 使用 Query 类来引入查询条件及一些查询的数据字段等参数,比如排序,限数查询等
         * 查询5条数据
         */
        Query columns = Query.query(criteria)
                .columns("id", "username", "password", "email")
                .limit(5);

        /**
         * User 类中 加入了 @Table() 注解,以声明 DAO 所属的数据表名称
         * 查询出来之后是一个发布者对象,R2DBC 会自动把数据封装成 DAO 对象
         */
        Flux<User> select = r2dbcEntityTemplate.select(columns, User.class);

        return select;
    }

R2dbcEntityTemplate 的缺点是如果需要联表查询会非常麻烦,通常只用于单表查询。

 

 

 

DatabaseClient 对象

DatabaseClient 对象是比上面的 R2dbcEntityTemplate 更底层,因此它通常是用于直接提交SQL代码进行查询的

这样的好处是比较灵活,联表查询等都通过SQL来查。

缺点是不会帮我们自动封装DAO对象

  • 使用 databaseClient.sql 来直接写SQL
  • 使用 bind() 来绑定查询条件
  • 使用 fetch() 来抓取查询结果
    • fetch() 方法返回一个 FetchSpec<Map<String,Object>> 的类型
      • FetchSpec 实现了 RowsFetchSpec 接口,
      • RowsFetchSpec 接口提供了如下:
        • Mono<T> one() : 取一条数据,返回 Mono 流
        • Mono<T> first() : 取第一条数据,返回 Mono 流
        • Flux<T> all() : 取所有行数据,返回 Flux 流
    • 我们可以通过 RowsFetchSpec 的 all() 来获取所有查询的行数据
    • 但是 databaseClient 不会对我们的数据类型进行封装或转换
  • fetch() 方法采用 Map<String, Object> 来保存数据,也就是说,表字段名即为 key,查询的列数据即为 value

代码示例:

@Resource
    private DatabaseClient databaseClient;

    @GetMapping("/r2dbc/{id}")
    public Flux<User> dataFormR2dbc(@PathVariable("id") Long id) {
 
        // 通过 DatabaseClient  对象使用 SQL 代码直接查询
        FetchSpec<Map<String, Object>> result = databaseClient.sql("SELECT * from users where id=?id")
                .bind("id", id) // 绑定占位
                .fetch();  // 使用 抓取,会拿到一个 FetchSpec 流

        Flux<User> usermap = result.all()  // 流中获取所有数据
                .map(v -> {  // 进行 map 加工
                    Long uid = id;//Long.parseLong((String) v.get("id"));
                    String username = (String) v.get("username");
                    CharSequence password = (CharSequence) v.get("password");
                    String email = (String) v.get("email");
                    return new User(uid, username, password, email);
                });

        return usermap;
    }

 

SpringData 的 CRUD 接口组件(重点推荐)

通过上面使用 R2dbcEntityTemplate 和 DatabaseClient 来执行SQL,是比较底层且繁琐的操作,其实SpringData 提供了一种类似于 MyBatisPlus 的预留接口,我们只需要把我们创建的 表的数据库操作接口 继承给R2dbc提供的 ReactiveCrudRepository 接口(非DAO,而是称为 Repository 类),即可完成基本的数据查询方法。

具体操作如下:

  • 1.创建一个表对应的 Repository 接口,比如 UserRepository(和MyBatisPlus一样的,继承 IService 差不多)
  • 2.Repository 接口继承 ReactiveCrudRepository,官方更推荐使用子接口 R2dbcRepository
    • R2dbcRepository 接收两个泛型,R2dbcRepository<DAO, IDType>,第一个泛型是指DAO的类型,如 User,第二个泛型是指表的主键使用的类型,如Long
  • 3.Repository 接口 打上 @Repository 标注
  • 4.Repository 接口就能直接用一些常用的表操作了

代码如下:

@Repository
//第一个泛型是指DAO的类型,第二个泛型是指表的主键使用的类型,DAO的类型使用 @Table定义表名
public interface UserRepository extends R2dbcRepository<User, Long> { 
}

基础的继承方法:

 

 

强大的方法名生成复杂SQL技术

R2dbcRepository 有一个非常厉害的技术,是通过方法名来预测将会使用的SQL查询,只要通过方法名,就能生成出来SQL代码

比如:

SQL 代码 SELECT * from users where username=tzming and id in (1,2,3) or password like "%123%"

从上面的SQL 代码中,我们知道查询的条件有几样

  • 包含 username 为多少
  • 包含 And id 在 1,2,3 之内
  • 包含 Or password 包含 "123" 的密码

分析:

  • 包含 username 为多少
    • 可以名为 UsernameIs
  • 包含 And id 在 1,2,3 之内
    • 可以名为 AndIdIn
  • 包含 Or password 包含 "123" 的密码
    • 可以名为 OrPasswordLike

然后我们只需要方法名开头为 "findALLBy",再拼合上面的名字组合

变成 Flux findALLByUsernameIsAndIdInOrPasswordLike()

系统就会自动识别我们需要三个条件,并自动计算出必要的传入参数

Flux<User> findAllByUsernameIsAndIdInOrPasswordLike(String username, Collection<Long> id, CharSequence password);

注意:使用这个方法时,要注意传入参数的类型需要符合查询条件需要的,比如使用 In 时,就需要传 Collection<> 数组类型,否则会报错不能启动服务器

这个方法不需要任何注解扩展,也不需要再写SQL,SpringData 会自动识别并执行!

注意:使用本方法只能单表复杂查询。

 

@Query 查询注解

对于一些想要定义使用SQL代码查询的接口方法,我们可以在接口方法上加上 @Query 注解,并写上 SQL 代码,就可以让接口具有查询功能

    @Query("SELECT * from User where id = :userId")
    Flux<User> getUserById(@Param("userId") Long userid);

说明:

  • @Param 是用于接收 @Query 中的查询条件参数,使用 :xxx 来占位
  • 条件参数除了使用 :xxx 来占位外,还可以使用 $1 $2 来表示第一个和第二个。。。的参数占位,推荐使用 :xxx 来表示

 

@Query 除了可以作为查询外,也可以作为增删改使用,不过需要增一个 @Modifying 注解

    @Modifying
    @Query("DELETE from users where id = :userId")
    Flux<Integer> deleteUserById(@Param("userId") Long userId);

 

一对一联表查询(Dao转换器)

针对一对一联表查询,我们可以在@Query 上写,也可以使用 DatabaseClient 中使用SQL来执行获取数据,也可以通过两次以上的查询来对数据进行封装。

例如:作者与书本

  • 作者表 Author包含【id, username】等数据
  • @Table("author")
    @Data
    @AllArgsConstructor
    public class Author {
        @Id
        private Long id;
        private String username;
    
    }
  • 书本表 Boot包含【id, title, author_id, publisher, Author】等数据
    • Book 表中包含了 Author 对象
    • @Table("book")
      @Data
      @AllArgsConstructor
      public class Book {
      
          @Id
          private Long id;
          private String title;
          private Long authorId;
          /**
           * 在 R2DBC 中,时间请不要使用 DateTime
           * 要使用 Instant 或 LocalDateTime
           */
          private Instant publisher;
          // 自定义的 DAO
          private Author author;
      }
  • 通过联表查询书本,并查询出 Author 数据,即为一对一联表查询

具体SQL代码:

Select b.*,a.username as name from Book b left join Author a on b.author_id = a.id where b.title = ""

说明:通过查询Book表,并通过Book表中的author_id来查询Author表中的数据,并封装到BookDAO中

 

对于联表的DAO包含问题,SpringData 提供了一种叫“自定义数据封装”接口,它本身就包含了众多转换器,这些转换器都用于把 Row(行数据) 转为 String 等Java类型,所以我们可以创建一个用于把 Row 转换为 BootDAO 类型的转换器

  •  第一步:创建转换器,实现 Converter 接口,接口要求提供两个泛型,第一个泛型为 Row ,即数据源,第二个泛型为要转换到的DAO类型
    • Row 是查询数据后一代表一行数据的接口,我们可以在Row 对象中取一行数据
    • public class BookConverter implements Converter<Row, Book>
    • 实现Converter接口的方法
    • public Book convert(Row source)
    • 即当一条数据到达时,要如何把这个数据转为你想要封装的类型
    • public class BookConverter implements Converter<Row, Book> {
          @Override
          public Book convert(Row source) {
      
              // 取得一行数据中的BookID
              Long bookId = source.get("id", Long.class);
              // 取得Book中的title
              String title = source.get("title", String.class);
              // 取得在Book 中的 Author 的 ID
              Long authorId = source.get("author_id", Long.class);
              // 取得Book的发布时间
              Instant publisher = source.get("publisher", Instant.class);
              // 取得Author的名字 username as name
              String name = source.get("name", String.class);
      
              Book book = new Book();
              book.setId(bookId);
              book.setTitle(title);
              book.setAuthorId(authorId);
              book.setPublisher(publisher);
              book.setAuthor(new Author(authorId, name));
      
              return book;
          }
      }
    • 完成后,需要把这个 Converter 转换器注册到 Bean 中。
    • 需要注册一个返回 R2dbcCustomConversions 类型的Bean
    •     @Bean
          public R2dbcCustomConversions conversions(){
              // 把我们自己的Converter 加载到 R2dbcCustomConversions 中。
              return R2dbcCustomConversions.of(MySqlDialect.INSTANCE,new BookConverter());
          }
    • MySqlDialect 是一个用于解决数据库SQL方言之间的差异化对象,转换器需要知道,查询的数据库是什么数据库,因为我们使用的是 MySQL数据库,所以我们需要提供 MySqlDialect 的方言抽象对象,如果是其它数据库,就要使用对应数据库的方言抽象对象,如SqlServerDialect或者OracleDialect

 

一对多查询

相对比一本书只有一个作者,则为一对一查询。而查询一个作者有多本书,则为一对多查询。

例如:作者与书本

  • 作者表 Author包含【id, username】等数据,其中作者表中,包含了 Book 类型数组,用于保存作者下拥有的 Book 书本对象
    • 在 Author 表中,可以设定一个成员变量 private List<Book> books;
    • 但是我不希望这个成员变量被识别为DAO和表字段映射关系
    • 可以使用 @Transient (译:临时字段) 注解标记这个字段不参与字段映射,这和 MyBatis 的 @TableField(exist = false) 同理
    • @Table("author")
      @Data
      @AllArgsConstructor
      public class Author {
      
          @Id
          private Long id;
          private String username;
      
      
          @Transient
          private List<Book> books;
      }
  • 书本表 Boot包含【id, title, author_id, publisher】等数据
  • @Table("book")
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class Book {
    
        @Id
        private Long id;
        private String title;
        private Long authorId;
        /**
         * 在 R2DBC 中,时间请不要使用 DateTime
         * 要使用 Instant 或 LocalDateTime
         */
        private Instant publisher;
        // 自定义的 DAO
        @Transient
        private Author author;
    }
  • 通过查询作者,联表查询出多本书,并处理成数组存放于 Author 表中的 List<Book> books; 成员中

具体SQL代码:

SELECT b.*,a.username as username From book b left join author a on b.author_id = a.id 
where a.username = "张三" or a.username = "李四"  
GROUP BY b.author_id, b.id

至少我们需要在SQL中查询进行分组,每一个作者的书本信息需要放在一个地方不能零散摆放。

通过上面的查询结果,我们得出如下结论

  • 张三有两本书,书本的 author_id 都为1
  • 李四有一本书,书本的 author_id 为2
  • 我们需要把张三的两本书信息,转为一个 List,并存到 张三的对象中,如何使多个张三的书本对象,存到一个张三的作者对象中?

这时我们可以使用 bufferUntilChanged 来进行判断性分组,分组基于 author_id 作为分组依据

@Resource
    private AuthorBookRepository authorBookRepository;

    @GetMapping("/takeBookFromAuthor")
    public Flux<Author> takeBookFromAuthor() {
        // 通过 AuthorBookRepository 取出数据,(也可添加AuthorBookConverter转换器转换)
        return authorBookRepository.takeBoosByAuthorName()
                // 使用 bufferUntilChanged 判断性分组,以 AuthorId 为基础判断是否该合一组
                .bufferUntilChanged(authorBook -> {
                            // 第一个函数方法是加工每一个原始数据的,以用于第二个函数方法的后续操作
                            return authorBook;
                        },
                        // 第二个函数方法,给出的数据由第一个函数方法加工而成,给出上一个数据和下一个数据进行对比
                        (last, current) ->
                                // 如果它们两个AuthorId都一样,则返回 true 作为一个组
                                Objects.equals(last.getAuthorId(), current.getAuthorId()))
                // 分组后的 List 需要转为 Book 的List 用以加入到 Author 对象中
                .map(authorBooks -> {
                    // 重新构建为 Book 数组
                    List<Book> bookList = new ArrayList<>();
                    for (AuthorBook authorBook : authorBooks) {
                        Book book = new Book();
                        book.setAuthorId(authorBook.getAuthorId());
                        book.setPublisher(authorBook.getPublisher());
                        book.setTitle(authorBook.getTitle());
                        book.setId(authorBook.getId());
                        bookList.add(book);
                    }
                    // 拿其中一个数据的Author 字段数据来填充作者信息
                    Author author = new Author();
                    author.setId(authorBooks.get(0).getAuthorId());
                    author.setUsername(authorBooks.get(0).getUsername());
                    // 把加工好的 Book 数组加入到 Author 对象中
                    author.setBooks(bookList);
                    return author;
                });

    }

 

另外,上面代码中用到的 AuthorBookRepository 代码如下:

public interface AuthorBookRepository extends R2dbcRepository<AuthorBook, Long> {

    @Query("SELECT a.id as aid,a.username as username,b.* FROM author a LEFT JOIN book b ON a.id = b.author_id  ORDER BY author_id")
    Flux<AuthorBook> takeBoosByAuthorName();
}

 

用于保存 AuthorBook 关系对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class AuthorBook {

    private Long aid;
    private String username;
    private Long id;
    private String title;
    private Long authorId;
    private Instant publisher;

}

 

如果您喜欢本站,点击这儿不花一分钱捐赠本站

这些信息可能会帮助到你: 下载帮助 | 报毒说明 | 进站必看

修改版本安卓软件,加群提示为修改者自留,非本站信息,注意鉴别

THE END
分享
二维码
打赏
海报
R2DBC 响应式数据库管理
简介 在 Reactive 响应式编程里,几乎所有的思想都被重写,从Web到IO,再到服务器,及数据库连接组件,都需要满足响应式编程思维,本例为用于 Reactive 响应式编程所使用的 数据库连接组件,……
<<上一篇
下一篇>>