Vertx之响应式mysql

阎慈
2023-12-01

介绍

Vertx响应式mysql客户端,具有简单的API,关注可伸缩性和低开销, 特性:

  • 事件驱动
  • 轻量级
  • 内置数据库连接池
  • 预制的查询缓存
  • 游标支撑
  • 查询行stream操作
  • RxJava api支持
  • 0拷贝对象转化
  • 完备的数据类型支持
  • 存储过程支持
  • TLS/SSL支持
  • MySQL实用程序命令支持
  • MySQL和MariaDB支持
  • 丰富的字符集支持
  • Unix domain socket支持

1. maven项目依赖

<dependencies>
	<dependency>
		<groupId>io.vertx</groupId>
		<artifactId>vertx-web</artifactId>
	</dependency>
	<dependency>
		<groupId>io.vertx</groupId>
		<artifactId>vertx-config-yaml</artifactId>
	</dependency>
	<dependency>
		<groupId>io.vertx</groupId>
		<artifactId>vertx-mysql-client</artifactId>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>8.0.27</version>
	</dependency>
	<dependency>
		<groupId>com.fasterxml.jackson.core</groupId>
		<artifactId>jackson-databind</artifactId>
	</dependency>
	<dependency>
		<groupId>com.lance.common</groupId>
		<artifactId>vertx-common-core</artifactId>
		<version>0.0.1-SNAPSHOT</version>
	</dependency>
</dependencies>

2.YAML文件配置

server:
  port: 8000

mysql:
  host: 127.0.0.1
  port: 3306
  database: v_example
  username: root
  password: li123456
  charset: utf8
  collation: utf8_general_ci
  maxSize: 30
  reconnectAttempts: 3
  reconnectInterval: 1000
  poolName: v-pool

3.启动加载配置文件, 并放入到config中去

public class MysqlApplication {

  public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    ConfigRetriever retriever = readYaml(vertx);

    retriever.getConfig(json -> {
      try {
        JsonObject object = json.result();
        DbHelper dbHelper = new DbHelper(object.getJsonObject("mysql"), vertx);
        dbHelper.afterPropertiesSet();

        DeploymentOptions options = new DeploymentOptions().setConfig(object);
        vertx.deployVerticle(MainApp.class.getName(), options);
      } catch (Exception ex) {
        log.error("===> Vertx start fail: ", ex);
      }
    });
  }

  /**
   * read yaml config
   *
   * @param vertx vertx
   * @return ConfigRetriever
   */
  private static ConfigRetriever readYaml(Vertx vertx) {
    ConfigStoreOptions store = new ConfigStoreOptions()
        .setType("file")
        .setFormat("yaml")
        .setOptional(true)
        .setConfig(new JsonObject().put("path", "application.yaml"));

    return ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(store));
  }
}

4.Mysql连接池配置

public class DbHelper {
  private final JsonObject object;
  private final Vertx vertx;
  private static MySQLPool mySqlPool;

  /**
   * 获取客户端
   *
   * @return MySQLPool
   */
  public static MySQLPool client() {
    return mySqlPool;
  }

  /**
   * 初始化mysql连接
   */
  public void afterPropertiesSet() {
    ConfigProperties.MysqlProperties properties = object.mapTo(ConfigProperties.MysqlProperties.class);
    MySQLConnectOptions connectOptions = new MySQLConnectOptions()
        .setPort(properties.getPort())
        .setHost(properties.getHost())
        .setDatabase(properties.getDatabase())
        .setUser(properties.getUsername())
        .setPassword(properties.getPassword())
        .setReconnectAttempts(properties.getReconnectAttempts())
        .setReconnectInterval(properties.getReconnectInterval())
        .setCharset(properties.getCharset())
        .setCollation(properties.getCollation());

    PoolOptions poolOptions = new PoolOptions()
        .setMaxSize(properties.getMaxSize())
        .setName(properties.getPoolName())
        .setShared(true);

    mySqlPool = MySQLPool.pool(vertx, connectOptions, poolOptions);
  }
}

5.连接池数据库sql执行

public class UserService {

  /**
   * find list
   */
  public void list(RoutingContext ctx) {
    MySQLPool pool = DbHelper.client();
    pool.query("select *From t_user").mapping(UserInfo.row2User()).execute(rs -> {
      if (rs.succeeded()) {
        RowSet<UserInfo> result = rs.result();
        List<UserInfo> list = new ArrayList<>();
        result.forEach(list::add);
        ctx.json(R.data(list));
      } else {
        log.warn("Failure: ", rs.cause());
      }
    });
  }

  /**
   * find one
   */
  public void detail(RoutingContext ctx) {
    String userId = ctx.pathParam("userId");
    MySQLPool pool = DbHelper.client();
    pool.preparedQuery("select *From t_user where user_id=?").mapping(UserInfo.row2User()).execute(Tuple.of(userId), rs -> {
      if (rs.succeeded()) {
        RowSet<UserInfo> result = rs.result();
        if (result.size() > 0) {
          ctx.json(R.data(result.iterator().next()));
          return;
        }
        ctx.json(R.data(null));
      } else {
        log.warn("Failure: ", rs.cause());
      }
    });
  }

  /**
   * add user info
   */
  public void add(RoutingContext ctx) {
    UserInfo user = ctx.getBodyAsJson().mapTo(UserInfo.class);
    if (user == null) {
      ctx.json(R.fail("参数为空"));
      return;
    }

    MySQLPool pool = DbHelper.client();
    pool.preparedQuery("insert into t_user(username,password,age,status,create_time,update_time)value(?,?,?,1,now(),now())")
        .execute(Tuple.of(user.getUsername(), user.getPassword(), user.getAge()), rs -> {
          if (rs.succeeded()) {
            ctx.json(R.success("success"));
          } else {
            log.warn("Failure: ", rs.cause());
            ctx.json(R.fail("fail"));
          }
        });
  }

  /**
   * update user
   */
  public void update(RoutingContext ctx) {
    UserInfo user = ctx.getBodyAsJson().mapTo(UserInfo.class);
    if (user == null) {
      ctx.json(R.fail("参数为空"));
      return;
    }

    MySQLPool pool = DbHelper.client();
    pool.preparedQuery("update t_user set username=?,password=?,age=?,status=?,update_time=now() where user_id=?")
        .execute(Tuple.of(user.getUsername(), user.getPassword(), user.getAge(), user.getStatus(), user.getUserId()), rs -> {
          if (rs.succeeded()) {
            ctx.json(R.success("success"));
          } else {
            log.warn("Failure: ", rs.cause());
            ctx.json(R.fail("fail"));
          }
        });
  }

  /**
   * delete one
   */
  public void delete(RoutingContext ctx) {
    String userId = ctx.pathParam("userId");
    MySQLPool pool = DbHelper.client();
    pool.preparedQuery("delete From t_user where user_id=?").execute(Tuple.of(userId), rs -> {
      if (rs.succeeded()) {
        ctx.json(R.data("success"));
      } else {
        log.warn("Failure: ", rs.cause());
        ctx.json(R.fail("fail"));
      }
    });
  }
}

6.项目完整地址

Vertx之Mysql响应式客户端 Github 地址

Vertx之Mysql响应式客户端 Gitee 地址

 类似资料: