RedisCluster如何高效率地批量插入数据

吉俊德
2023-12-01

Redis环境

RedisCluster:

三主三从,每个节点在单独的服务器上

Spring的redisTemplate自带的Lettuce(6.2.1版本,至2022.10.22是最新版):

spring:
  redis:
    password: xxx  #密码
    lettuce:  #lettuce连接池配置
      pool:
        max-active: -1
        max-idle: -1
        min-idle: 0
        max-wait: -1
      shutdown-timeout: 100
    cluster:  #集群配置
      nodes:
        - 1x2.xx.5.xx:6379
        - 1x2.xx.5.xx:6379
        - 1x2.xx.5.xx:6379
        - 1x2.xx.5.xx:6379
        - 1x2.xx.5.xx:6379
        - 1x2.xx.5.xx:6379
      max-redirects: 3

入库方式:

使用ThreadPoolExecutor,本次测试结果为10线程,每线程插入10w条

入库方式

① for循环执行set
② Pipline管道
③ multiSet()

方式②是客户端提供的一种批处理技术,用于一次处理多个 Redis 命令,从而提高整个交互的性能,解决了多个命令集中请求时造成网络资源浪费的问题,加快了 Redis 的响应速度,让 Redis 拥有更高的运行速度,其优势在于客户端与服务端的网络延迟越大,性能体能越明显,支持设置过期时间。

方式③是RedisTemplate的方法,速度比Pipline要快很多很多,但是不支持设置过期时间。

方式①是最原始的方式,效率很低,不推荐,这里也不进行测试了,大家可以自行测试。

测试代码

redis工具类

package com.nwd.pressuretestutil.util;

import com.topscomm.pressuretestutil.config.RedisConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

@Component
public class BatchRunRedisUtil {
    @Autowired
    RedisTemplate stringRedisTemplate;

    @Autowired
    RedisConfig redisConfig;

    //批量添加
    public void batchSet(Map<String, String> map) {
        stringRedisTemplate.opsForValue().multiSet(map);
    }

    //批量添加 并且设置失效时间
    public void batchSetOrExpire(Map<String, String> map, Long seconds) {
        RedisSerializer<String> serializer = stringRedisTemplate.getStringSerializer();
        stringRedisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                map.forEach((key, value) -> {
                    connection.set(serializer.serialize(key), serializer.serialize(value), Expiration.seconds(seconds), RedisStringCommands.SetOption.UPSERT);
                });
                return null;
            }
        }, serializer);
    }

    //批量获取
    public List<Object> batchGet(List<String> list) {
        List<Object> objectList = stringRedisTemplate.opsForValue().multiGet(list);
        return objectList;
    }

    // Redis批量Delete
    public void batchDelete(List<String> list) {
        stringRedisTemplate.delete(list);
    }

}

测试接口

调用url:

http://localhost:7001/redisOperation/redisBatchInsertByThread?insNum=100000&saveType=2&threadNum=10


/**
  * 多线程入redis
   * insNum:每个线程的存入数量
   * saveType:插入方式,1代表multi,2代表pipline
   * */
  CountDownLatch latch = null;
  @GetMapping("/redisBatchInsertByThread")
  public String redisBatchInsertByThread(@RequestParam("insNum") Integer insNum,@RequestParam("saveType") Integer saveType,@RequestParam("threadNum") Integer threadNum){
      try {
          //定义线程池
          ThreadFactory tf = new CustomizableThreadFactory("Thread-");
          ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                  threadNum, threadNum, 60, TimeUnit.SECONDS,
                  new LinkedBlockingDeque<Runnable>(100000),
                  tf);
          //线程同步计数器
          latch =  new CountDownLatch(threadNum);
          long startTime = System.currentTimeMillis();
          for (int i=0;i<threadNum;i++){
              threadPool.execute(() -> {
                  Integer threadCount = Integer.parseInt(Thread.currentThread().getName().split("-")[1])-1;
                  String saveT = saveType==1?"multi":"pipline";
                  //multiSet()批量操作
                  long startTime1 = System.currentTimeMillis();
                  Map<String,String> map = new HashMap(insNum);
                  for (int j = 0; j < insNum; j++) {
                      map.put(saveType+"_multi"+threadCount+":" + j, "b");
                  }
                  if(saveType==1){
                      batchRunRedisUtil.batchSet(map);
                  }else {
                      batchRunRedisUtil.batchSetOrExpire(map, 60L);
                  }
                  long endTime1 = System.currentTimeMillis();
                  System.out.println(saveT+"批量set消耗" + (endTime1 - startTime1) + "毫秒");
                  latch.countDown();
              });
          }
          long endTime = System.currentTimeMillis();
          latch.await();
          System.out.println("共用时:"+ (endTime-startTime));
          return "ok";
      }catch (Exception e){
          e.printStackTrace();
          return "false:"+e.getMessage();
      }
  }

测试结果

pipline:10线程;每条线程插入10w:

pipline_Thread-5消耗14167毫秒
pipline_Thread-6消耗14173毫秒
pipline_Thread-9消耗14302毫秒
pipline_Thread-8消耗14384毫秒
pipline_Thread-7消耗14436毫秒
pipline_Thread-10消耗14480毫秒
pipline_Thread-3消耗14493毫秒
pipline_Thread-1消耗14498毫秒
pipline_Thread-2消耗14501毫秒
pipline_Thread-4消耗14502毫秒

multi:10线程;每条线程插入10w:

multi_Thread-1消耗5689毫秒
multi_Thread-9消耗5918毫秒
multi_Thread-2消耗5996毫秒
multi_Thread-3消耗6038毫秒
multi_Thread-6消耗6407毫秒
multi_Thread-10消耗6899毫秒
multi_Thread-5消耗6936毫秒
multi_Thread-8消耗6936毫秒
multi_Thread-7消耗6939毫秒
multi_Thread-4消耗6967毫秒

由此可见,multi的效率比pipline快得多
需要注意的是pipline在配置文件参数设置不是最优的情况下还会报连接异常
如果业务场景无需设置超时时间,推荐使用multiSet
如果必须要用pipline,压力也很大的话,可以考虑拆分成多个小pipline来执行。

 类似资料: