【JavaWeb】spring全家桶回顾——NoSql实践

赵华彩
2023-12-01

浏览博客时,若发现作者有描述错误或不清的地方,请私信或者留言讨论,共同进步

使用 docker

  接下来我们会多次使用到不同的中间件,所以推荐使用 docker 来安装中间件来方便我们进行学习和测试,使用 Mac 的同学可以直接去 docker 官方直接下载 docker-desktop, 而 windows 的同学则推荐通过虚拟机安装 Linux 系统来使用 docker

# 静候安装
yum install -y docker

# 安装后选择适合的国内镜像加速下载
# 镜像
https://registry.docker-cn.com
http://hub-mirror.c.163.com
https://3laho3y3.mirror.aliyuncs.com
http://f1361db2.m.daocloud.io
https://mirror.ccs.tencentyun.com
# 改写配置文件并且重启
sudo nano /etc/docker/daemon.json
{
	// 注意 CTRL+O 是保存改写记录  ctrl+X是退出编辑模式
    "registry-mirrors": ["http://hub-mirror.c.163.com"]
}
# 重启
systemctl restart docker

# 查看是否配置成功
docker info

简单的了解 MongoDB

  当我们准备学习入门一项新技术的时候,推荐先去了解一下这项技术的大致板块,比如这里就可以先去了解一下 MongoDB 是什么?它可以做什么?它经常被用在什么地方?有什么案例推荐?就跟我们上学时做英语阅读理解一下,首先要快速浏览整篇文章,了解大致语义,然后再仔细的去研究就能大大的提高答题的正确率。
  MongoDB 是文档型数据库,一般存储的数据类型都是 json 格式的数据,像是这样的:

{
	name:"小林",    // field:value
	age:23,
	group:["sports"] // 字段的值也可以是其他文档、数组和文档数组。
}

  json 格式类型数据即是对象,对应许多编程语言中的内置数据类型,这样使得 mongodb 拥有强大的查询能力可以直接面向对象查询,并且它覆盖了 sql 的基本所有能力,像是 CRUD、聚合、事务等等,还提供了高性能的持久化能力,为什么说它具备高性能的持久化呢?它的官网给出的解释是:对嵌入式数据模型的支持减少了数据库系统上的I / O操作,这该如何去理解?

  直白的理解就是连表查询的事情少了(因为都存到一个json串里,大部分不需要做多对多或一对多关联),并且支持索引更快的查询,可以包含来自嵌入式文档和数组的键。另外它也支持高可用可拓展,像 mysql 一样支持多种存储引擎。
  MongoDB 比较常见的应用领域如下:

  • 物流场景:使用 MongoDB 存储订单信息,订单状态在运送过程中会不断更新,以MongoDB 内嵌数组的形式来存储,一次查询就能将订单所有的变更读取出来。
  • 社交场景,使用 MongoDB 存储存储用户信息,以及用户发表的朋友圈信息,通过地理位置索引实现附近的人、地点等功能
  • 视频直播,使用 MongoDB 存储用户信息、礼物信息

  如果你还想更多的了解 MongoDB 的相关信息,简易从官网入手:MongoDB官网

安装 MongoDB

  了解完 MongoDB 的一些相关信息后,我们首先要安装它:

# 打开虚拟机,进入终端
docker pull mongo

# 启动
docker run --name mongo -p 27017:27017 -e MONGO_INITDB_ROOT_USERNAME=admin -e MONGO_INITDB_ROOT_PASSWORD=admin -d mongo

# 登入到 mongodb 容器中
docker exec -it mongo bash

# 登录
mongo -u admin -p admin

在 Spring 中操作 MongoDB

  spring 的目标就是在不失各个底层数据库特性的基础上来操纵数据。在 Spring Data 中Repository主要用作标记接口,spring 会为你动态代理生成对应的数据操作类。对于如何去使用 spring-data-mongodb 你完全可以将他像 JPA 那样使用,比如了解到它需要用到的注解 @Document、@Id 是做什么的,如何去定义接口内的 CRUD 方法等等。
  若你需要详细的了解 spring-data-mongoDB,建议先到官网了解:官方地址

学习前准备

# 查看当前数据库
show dbs;

# 切换数据库 mongoDB会自动帮你创建数据库
use springbucks;

# 创建操作角色
db.createUser(
	{
		user: "springbucks",
		pwd: "springbucks",
		roles: [
			{ role: "readWrite", db: "springbucks" }
		]
	}
)

  我们先简单学一下直接使用 MongoTemplate 是如何去进行 CRUD 的,准备一个 spring 初始化项目, 此次就只需要勾选 lombok 以及 spring-data-mongodb,此外还需要你加一个 joda-money 用来做额外的操作
  那么首先就是需要你做一下配置文件:

# 格式:用户名:密码@ip地址:端口/dbName
spring.data.mongodb.uri=mongodb://springbucks:springbucks@192.168.2.100:27017/springbucks

  代码实现

// 准备一个操作的实体类对象
@Document
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Coffee {
    @Id
    private String id;
    private String name;
    private Money price;
    private Date createTime;
    private Date updateTime;
}

// 准备一个转换器 不需要去关注实现细节 你只要知道这类似 json 处理且返回了 Money 对象就好
public class MoneyReadConverter implements Converter<Document, Money> {
    @Override
    public Money convert(Document source) {
        Document money = (Document) source.get("money");
        double amount = Double.parseDouble(money.getString("amount"));
        String currency = ((Document) money.get("currency")).getString("code");
        return Money.of(CurrencyUnit.of(currency), amount);
    }
}

// CRUD
@Slf4j
@SpringBootApplication
public class DemoApplication implements ApplicationRunner {
    @Autowired
    private MongoTemplate mongoTemplate;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public MongoCustomConversions mongoCustomConversions() {
        return new MongoCustomConversions(Arrays.asList(new MoneyReadConverter()));
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        Coffee espresso = Coffee.builder()
                .name("espresso")
                .price(Money.of(CurrencyUnit.of("CNY"), 20.0))
                .createTime(new Date())
                .updateTime(new Date()).build();
        Coffee saved = mongoTemplate.save(espresso);
        log.info("Coffee {}", saved);

        List<Coffee> list = mongoTemplate.find(
                Query.query(Criteria.where("name").is("espresso")), Coffee.class);
        log.info("Find {} Coffee", list.size());
        list.forEach(c -> log.info("Coffee {}", c));

        Thread.sleep(1000); // 为了看更新时间
        UpdateResult result = mongoTemplate.updateFirst(query(where("name").is("espresso")),
                new Update().set("price", Money.ofMajor(CurrencyUnit.of("CNY"), 30))
                        .currentDate("updateTime"),
                Coffee.class);
        log.info("Update Result: {}", result.getModifiedCount());
        Coffee updateOne = mongoTemplate.findById(saved.getId(), Coffee.class);
        log.info("Update Result: {}", updateOne);

        mongoTemplate.remove(updateOne);
    }
}

  了解完 mongoTemplate 的简单 CRUD 之后,就可以学习一下像是 JPA 那种实现方式的代码,仍然需要一个 Coffee 对象来作为数据操作对象,代码如下:

// 配置文件、操作对象类、转换器都一致
// repository
public interface CoffeeRepository extends MongoRepository<Coffee, String> {
    List<Coffee> findByName(String name);
}

@Slf4j
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {

    @Autowired
    private CoffeeRepository coffeeRepository;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public MongoCustomConversions mongoCustomConversions() {
        return new MongoCustomConversions(Arrays.asList(new MoneyReadConverter()));
    }

    @Override
    public void run(String... args) throws Exception {
        Coffee espresso = Coffee.builder()
                .name("espresso")
                .price(Money.of(CurrencyUnit.of("CNY"), 20.0))
                .createTime(new Date())
                .updateTime(new Date()).build();
        Coffee latte = Coffee.builder()
                .name("latte")
                .price(Money.of(CurrencyUnit.of("CNY"), 30.0))
                .createTime(new Date())
                .updateTime(new Date()).build();

        List<Coffee> insert = coffeeRepository.insert(Arrays.asList(espresso, latte));
        coffeeRepository.findAll(Sort.by("name"))
                .forEach(c -> log.info("Saved Coffee {}", c));

        Thread.sleep(1000);
        latte.setPrice(Money.of(CurrencyUnit.of("CNY"), 35.0));
        latte.setUpdateTime(new Date());
        coffeeRepository.save(latte);
        coffeeRepository.findByName("latte")
                .forEach(c -> log.info("Coffee {}", c));

        coffeeRepository.deleteAll();
    }
}

Redis 的哨兵和集群模式

   redis 是一款键值对类型的非关系型数据库,应用场景非常广泛,基本可以说是 web 开发人员的必知必会的中间件之一,推荐刚入门的新人同学可以先浏览一下 redis-菜鸟教程 熟悉一下命令即可,以我个人经验来讲,redis的使用场景有但不限于:排行榜、签到以及签到红包、各类抽奖、秒杀活动及秒杀奖品、分布式锁等大部分键值对可以覆盖的场景。
  另外对于 redis 的使用,常见的是有用到 jedis、redistemplate、 redission以及lua脚本,建议到 github 上看一下各位大佬的简单演示,这里堆出代码来就显得有些繁长。

jedis 的哨兵模式是如何实现?

   redis 的哨兵主要是为了解决 redis 集群中主机宕机之后,无法自主替换 master 而出现的一种高可用模式,一般使用我们都会像使用数据库那样来配置一个连接池供我们使用,那么哨兵模式下的 jedis 我们就可以使用 JedisSentinelPool 作为我们的连接池对象。

// 那么看到我们的 JedisSentinelPool 对象的构造函数部分,可以看到最终使用的构造函数是该构造函数
public JedisSentinelPool(String masterName, Set<String> sentinels,
    final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,
    final String password, final int database, final String clientName) {
  this.poolConfig = poolConfig;
  this.connectionTimeout = connectionTimeout;
  this.soTimeout = soTimeout;
  this.password = password;
  this.database = database;
  this.clientName = clientName;

  HostAndPort master = initSentinels(sentinels, masterName);
  initPool(master);
}

  在该构造函数方法内重要的方法是 initSentinels,我们点进去去查看对应的代码:

private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
	// 省略非关键代码
	// ....
	jedis = new Jedis(hap.getHost(), hap.getPort());
	// 通过名称获取主机地址 实际上就是执行 redis 命令:sentinel get-master-addr-by-name mymaster 
	// 返回的就是 host:port 两部分
	List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

	// 随后做了一个很讨巧的判断 list.size 不是2就直接跳过
	 if (masterAddr == null || masterAddr.size() != 2) {
		log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
		     + ".");
		 continue;
    }
    // 构建对应的HostAndPortUtil来管理和redis server的host和port. 设置完成后就是自然而然的和redis server建立连接.
    master = toHostAndPort(masterAddr);
}

了解一下 redis 的集群模式

  集群的意义是解决高可用的问题,像是假设单机 redis 可支持1W+的并发量,那么我上3台理论上就可以达到3W的程度(但实际你就需要打个八折来计算最稳妥),Redis 集群除了提升读写能力之外,还提供在多个节点间自动拆分数据集的能力,也就是数据分片,并且当节点的自己遇到故障时也可以从其他节点获取到问题节点上的数据。

  • redis 集群的TCP 端口
    每个redis集群节点实际上都是开发两个 TCP 链接的,一个是用于客户端连接提供服务的 TCP 端口,就是我们熟知的6379,还有一个就是集群间通信的接口,一般是10000+6379(如16379),你可以在配置cluster-port覆盖它,这里是提醒你,如果你想集群运行成功,要小心不要误关了集群间通信的 TCP 端口

  • redis 集群数据分片
    redis 集群中有 16384 个哈希槽,基本是按照取模的方式匹配与分配

  • redis 集群的一致性
    redis 集群不保证强一致性,也就是异步复制的情况下(master通知写到slave1/2/3,master不会等待回复,此时2未写入且master失去连接,随后2升级为主机,这时候就是永久失去了),这种情况是需要你配置 redis 的一些配置项,比如写入的环列队列的长度等。

jedis的集群模式又是怎样的?

   这里又到了源码导读环节, jedis 的源码写的简便适合像我这种初级开发来阅读,我们可以借鉴学习它的一些客户端方面操作用于自己公司的一些相关项目开发上。
   jedis 客户端的源码解析基本上都可以从构造函数来入手,这里的代码流程执行大概如下:
  首先是从 JedisCluster 该类开始,可以发现它的构造函数基本都是调用父类 BinaryJedisCluster 的构造函数,那么我们先进入父类,看到:

public class BinaryJedisCluster implements BasicCommands, BinaryJedisClusterCommands,
    MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {
    // 省略非关键代码....
	public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts,
	final GenericObjectPoolConfig poolConfig) {
		this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,
		timeout);
		this.maxAttempts = maxAttempts;
	}
}

  这里 jedis new 出了一个 connectionHandler 实际上是我们整个 jedisCluster 中比较重要的一个事件,我们进入该类的父类可以看到它构建一个 JedisClusterInfoCache

public abstract class JedisClusterConnectionHandler implements Closeable {
	protected final JedisClusterInfoCache cache;
	// 省略非关键代码
public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
	final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {
		this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName);
		initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName);
	}
}

  JedisClusterInfoCache 这个缓存相当于我们连接池的一个基础配置,进去之后你可以看到它分别初始化了连接池配置、连接超时时间、密码、客户端名称等,并且该构造函数之后还执行了 initializeSlotsCache 方法,主要是去发现整个的集群节点还有它的 slots 信息的方法,其实就是通过 jedis 去取了它 cluserSlots 的一个信息,然后做了个分类,就是映射了对应关系

private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig,
                                    int connectionTimeout, int soTimeout, String password, String clientName) {
  for (HostAndPort hostAndPort : startNodes) {
    Jedis jedis = null;
    try {
    	// 连接准备
      jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout);
      if (password != null) {
        jedis.auth(password);
      }
      if (clientName != null) {
        jedis.clientSetname(clientName);
      }
      // 分配 slot 注意该方法
      cache.discoverClusterNodesAndSlots(jedis);
      break;
    } catch (JedisConnectionException e) {
      // try next nodes
    } finally {
      if (jedis != null) {
        jedis.close();
      }
    }
  }
}

public void discoverClusterNodesAndSlots(Jedis jedis) {
  w.lock();

  try {
    reset();
    List<Object> slots = jedis.clusterSlots();

    for (Object slotInfoObj : slots) {
      List<Object> slotInfo = (List<Object>) slotInfoObj;

      if (slotInfo.size() <= MASTER_NODE_INDEX) {
        continue;
      }
	  // 获取分配的插槽阵列	 
      List<Integer> slotNums = getAssignedSlotArray(slotInfo);

      // hostInfos
      int size = slotInfo.size();
      for (int i = MASTER_NODE_INDEX; i < size; i++) {
        List<Object> hostInfos = (List<Object>) slotInfo.get(i);
        if (hostInfos.size() <= 0) {
          continue;
        }

        HostAndPort targetNode = generateHostAndPort(hostInfos);
        // 做设置
        setupNodeIfNotExist(targetNode);
        if (i == MASTER_NODE_INDEX) {
          // 分配
          assignSlotsToNode(slotNums, targetNode);
        }
      }
    }
  } finally {
    w.unlock();
  }
}

  这里 jedis 有两种取链接的方式,第一种通过我们 connectionHandler 连接的时候通过 pool 随机 去取, 然后做一下心跳,如果通过就可以直接返回,如果不通过就 close 掉。还有一种是通过特定的 slot 去取,只要我们知道 key 对应的 slot 是那个,就可以在缓存中找到对应的数据。

public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
	// 省略非关键代码
	  @Override
  public Jedis getConnection() {
    // In antirez's redis-rb-cluster implementation,
    // getRandomConnection always return valid connection (able to
    // ping-pong)
    // or exception if all connections are invalid

    List<JedisPool> pools = cache.getShuffledNodesPool();

    for (JedisPool pool : pools) {
      Jedis jedis = null;
      try {
        jedis = pool.getResource();

        if (jedis == null) {
          continue;
        }

        String result = jedis.ping();

        if (result.equalsIgnoreCase("pong")) return jedis;

        jedis.close();
      } catch (JedisException ex) {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
    throw new JedisNoReachableClusterNodeException("No reachable node in cluster");
  }
}

 @Override
  public Jedis getConnectionFromSlot(int slot) {
    JedisPool connectionPool = cache.getSlotPool(slot);
    if (connectionPool != null) {
      // It can't guaranteed to get valid connection because of node
      // assignment
      return connectionPool.getResource();
    } else {
      renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
      connectionPool = cache.getSlotPool(slot);
      if (connectionPool != null) {
        return connectionPool.getResource();
      } else {
        //no choice, fallback to new connection to random node
        return getConnection();
      }
    }
  }

  需要注意的是 jedis 的集群模式是不支持读写分离操作的,比如写在master、读在slave这种操作,具体可以看 github 上官方给出的解释:官方解释

 类似资料: