案例目标
简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提升吞吐量方面发生的效用。
案例背景
应用系统在数据推送或事件处理过程中,往往出现数据流经过多个网元;
然而在某些服务中,数据操作对redis 是强依赖的,在最近的一次分析中发现:
一次数据推送会对 redis 产生近30次读写操作!
在数据推送业务中的性能压测中,以数据上报 -> 下发应答为一次事务;而对于这样的读写模型,redis 的操作过于频繁,很快便导致系统延时过高,吞吐量低下,无法满足目标;
优化过程 主要针对业务代码做的优化,其中redis 操作经过大量合并,最终降低到原来的1/5,而系统吞吐量也提升明显。
其中,redis pipeline(管道机制) 的应用是一个关键手段。
pipeline的解释
Pipeline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。
管道技术使用广泛,例如许多POP3协议已经实现支持这个功能,大大加快了从服务器下载新邮件的过程。 Redis很早就支持管道(pipeline)技术。(因此无论你运行的是什么版本,你都可以使用管道(pipelining)操作Redis)
普通请求模型
[图-pipeline1]
Pipeline请求模型
[图-pipeline2]
从两个图的对比中可看出,普通的请求模型是同步的,每次请求对应一次IO操作等待;
而Pipeline 化之后所有的请求合并为一次IO,除了时延可以降低之外,还能大幅度提升系统吞吐量。
代码实例
说明
本地开启50个线程,每个线程完成1000个key的写入,对比pipeline开启及不开启两种场景下的性能表现。
相关常量
// 并发任务 private static final int taskCount = 50; // pipeline大小 private static final int batchSize = 10; // 每个任务处理命令数 private static final int cmdCount = 1000; private static final boolean usePipeline = true;
初始化连接
JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxActive(200); poolConfig.setMaxIdle(100); poolConfig.setMaxWait(2000); poolConfig.setTestOnBorrow(false); poolConfig.setTestOnReturn(false); jedisPool = new JedisPool(poolConfig, host, port);
并发启动任务,统计执行时间
public static void main(String[] args) throws InterruptedException { init(); flushDB(); long t1 = System.currentTimeMillis(); ExecutorService executor = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(taskCount); for (int i = 0; i < taskCount; i++) { executor.submit(new DemoTask(i, latch)); } latch.await(); executor.shutdownNow(); long t2 = System.currentTimeMillis(); System.out.println("execution finish time(s):" + (t2 - t1) / 1000.0); }
DemoTask 封装了执行key写入的细节,区分不同场景
public void run() { logger.info("Task[{}] start.", id); try { if (usePipeline) { runWithPipeline(); } else { runWithNonPipeline(); } } finally { latch.countDown(); } logger.info("Task[{}] end.", id); }
不使用Pipeline的场景比较简单,循环执行set操作
for (int i = 0; i < cmdCount; i++) { Jedis jedis = get(); try { jedis.set(key(i), UUID.randomUUID().toString()); } finally { if (jedis != null) { jedisPool.returnResource(jedis); } } if (i % batchSize == 0) { logger.info("Task[{}] process -- {}", id, i); } }
使用Pipeline,需要处理分段,如10个作为一批命令执行
for (int i = 0; i < cmdCount;) { Jedis jedis = get(); try { Pipeline pipeline = jedis.pipelined(); int j; for (j = 0; j < batchSize; j++) { if (i + j < cmdCount) { pipeline.set(key(i + j), UUID.randomUUID().toString()); } else { break; } } pipeline.sync(); logger.info("Task[{}] pipeline -- {}", id, i + j); i += j; } finally { if (jedis != null) { jedisPool.returnResource(jedis); } } }
运行结果
不使用Pipeline,整体执行26s;而使用Pipeline优化后的代码,执行时间仅需要3s!
NoPipeline-stat
[图-nopipeline]
Pipeline-stat
[图-pipeline]
注意事项
pipeline机制可以优化吞吐量,但无法提供原子性/事务保障,而这个可以通过Redis-Multi等命令实现。
参考这里
部分读写操作存在相关依赖,无法使用pipeline实现,可利用Script机制,但需要在可维护性方面做好取舍。
扩展阅读
官方文档-Redis-Pipelining
官方文档-Redis-Transaction
以上这篇redis通过pipeline提升吞吐量的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持小牛知识库。
我需要从很多客户端通过网络套接字连接到java服务器来提取数据。 有很多web套接字实现,我选择了vert。x、 我做了一个简单的演示,在那里我听json的文本帧,用jackson解析它们,然后返回响应。Json解析器对吞吐量没有显著影响。 我的总速度是每秒2.5公里,有2到10个客户。 然后我尝试使用缓冲,客户端不会等待每个响应,而是在服务器确认后发送一批消息(30k-90k),速度提高到每秒8
来自AWS Lambda常见问题解答: Q: 我一次可以执行的AWS Lambda函数的数量是否有限制? 不需要。AWS Lambda旨在并行运行多个函数实例。然而,AWS Lambda的默认安全限制为每个区域每个帐户100次并发执行。如果您希望提交请求以增加100次并发执行的限制,您可以访问我们的支持中心,单击“打开新案例”,然后提交服务限制增加请求。 Q: 如果我的帐户超过并发执行的默认限制,
在大数据存储中,IOPS和吞吐量之间的关键区别是什么
问题内容: 我知道这是一个非常笼统的问题。但是,我想了解使Redis(或诸如MemCached,Cassandra之类的缓存)在惊人的性能极限下工作的主要架构决策是什么。 如何维护连接? 连接是TCP还是HTTP? 我知道它完全用C编写。如何管理内存? 尽管存在竞争的读/写,但用于实现高吞吐量的同步技术有哪些? 基本上,具有内存高速缓存的计算机和可以响应命令的服务器的普通香草实现和Redis框之间
吞吐量值ex。400RU/s适用于每个分区,而不是集合?
无法通过运行Jmeter脚本来实现预期的吞吐量,因为预期的吞吐量更大,但得到的却非常少。 以每秒1000个请求(业务SLA)为目标运行Jmeter脚本,因此使用了“恒定吞吐量定时器”或“吞吐量整形定时器”,如下面的查询所建议的。 恒定吞吐量计时器:目标-60,000/分钟(60秒)-所有活动线程,线程(用户)-200上升-1秒,持续时间:1小时。或用户-2000或尝试使用10,000个用户。 结果