当前位置: 首页 > 知识库问答 >
问题:

使用反应性生菜流水线Redis命令

庄星汉
2023-03-14

我使用spring boot webflux以非阻塞方式连接和查询Redis。我已经用LettuceConnectionFactory配置了reactivedistemplate。spring文档指出,将管道与reactivedistemplate一起使用的唯一方法是使用execute(

所以我的问题是,在使用Spring ReactiveRedisTemplate时,是否可以对命令进行管道连接?

我还注意到使用reactivedistemplate。使用具有多个Redis命令的RedisCallback执行,执行速度比单独调用命令慢。

带有ReactiveRedisTemboard的管道示例代码:

reactiveRedisTemplate.execute(connection -> keys.flatMap(key -> 
                                connection.hashCommands()
                                .hGetAll(ByteBuffer.wrap(key.getBytes()))))
                    .map(Map.Entry::getValue)
                    .map(ByteUtils::getBytes)
                    .map(b -> {
                        try {
                        return mapper.readValue(b, Value.class);
                        } catch (IOException e1) {
                        return null;
                        }
                    })
                    .collectList();

无管道代码:

keys.flatMap(key -> reactiveRedisTemplate.opsForHash().entries(key))
            .map(Map.Entry::getValue)
            .cast(Value.class)
            .collectList();

谢谢

共有1个答案

慕弘伟
2023-03-14

我认为不可能使用RedisReactiveTemplate或莴苣的反应api。实际上,当你构建你的反应链时,它的某些部分将被延迟评估。

getAsyncValue(a).flatMap(value -> doSomething(value)).subscribe()

例如,在此示例中,仅当getAsyncValue返回值时才会触发。

现在,如果我们获取您的RedisCallback示例并假设我们在连接对象中有一个flushAll方法。你在哪里/什么时候调用它?

tpl.execute(connection -> {
                    Flux<Map.Entry<ByteBuffer, ByteBuffer>> results = keys.flatMap(key ->
                            connection.hashCommands()
                                    .hGetAll(ByteBuffer.wrap(key.getBytes())));
                    connection.fluxAll();
                    return results;
                })

像这样,命令将被刷新到服务器,因为不会触发hashCommands。

现在让我们看看我们拥有的所有信号回调:

  • doOnNext
  • 错误
  • 邓肯塞尔
  • doFirst
  • doOnSubscribe
  • doOnRequest
  • doOnTerminate
  • DOAFTERMINATE
  • 最后,Do
  • doOnComplete公司

doOnError或doOnCancel帮不了我们。但我们可以考虑使用doFinally、doOnTerminate、doafterminate:

tpl.execute(connection -> keys.flatMap(key -> connection.hashCommands()
                                        .hGetAll(ByteBuffer.wrap(key.getBytes())))
                                        .doFinally(s -> connection.flushAll()))

但htGetAll在命令刷新到服务器之前不会完成,所以doFinnaly不会被调用,所以我们无法刷新。。。。

我能想到的唯一解决方法是直接使用莴苣的异步api。文档中有一个关于如何执行此操作的示例。

您的代码可能看起来像(未测试):

// client is a RedisClient from lettuce

StatefulRedisConnection<String, String> connection = client.connect();
RedisAsyncCommands<String, String> command = connection.async();
command.setAutoFlushCommands(false);
keys.map(command::hgetall)
                .collectList()
                .doOnNext(f -> command.flushCommands())
                .flatMapMany(f -> Flux.fromIterable(f).flatMap(Mono::fromCompletionStage))
 类似资料:
  • 问题内容: 我是redis的新手,应该使用流水线操作时还是有些困惑,或者应该在发送多个命令时始终使用它? 例如,如果我想一次向Redis服务器发送10条SET命令,我是否应该简单地一个接一个地运行这10条命令,还是应该对它们进行流水线处理? 用管道传输10条SET命令而不是一一发送它们有什么缺点吗? 非常感谢。 问题答案: 当我应该使用流水线 当需要向Redis发送许多命令时,管道用于减少RTT,

  • 问题内容: 我正在使用rub redis宝石。想知道我是否例如: 这样的执行顺序得到保证吗? 问题答案: 当然可以保证顺序,否则流水线将毫无用处。您可以随时查看代码。例如,此测试明确假定命令是按顺序执行的:https : //github.com/redis/redis- rb/blob/master/test/pipelining_commands_test.rb#L32

  • 问题内容: 当我们在Redis中使用事务时,它基本上流水线化了事务中的所有命令。当EXEC被触发时,所有命令将一起执行,从而始终保持多个命令的原子性。 这与流水线不一样吗? 流水线和事务有何不同?另外,为什么Redis的单线程性质不足以满足要求?为什么我们明确需要流水线/事务? 问题答案: 流水线主要是网络优化。从本质上讲,这意味着客户端可以缓冲一堆命令并将它们一次性发送到服务器。不能保证在事务中

  • 流水线功能提供给用户统一的视图来管理应用自动发布规则,您可通过此功能来自动发布自有主机或DCE混合云环境下的应用 设置自动发布规则 进入构建项目页,选择流水线 点击右侧 development 面板中的添加一个新应用, 输入应用的名字,我们可以搜索到使用该项目的应用 点击应用的“设置发布规则”来设置自动部署规则。 根据此规则,当源码的 master 分支更新时会自动触发CI与构建,并自动发布新版本

  • 主要内容:实例,实例,实例,实例,实例,实例关键词:流水线,乘法器 硬件描述语言的一个突出优点就是指令执行的并行性。多条语句能够在相同时钟周期内并行处理多个信号数据。 但是当数据串行输入时,指令执行的并行性并不能体现出其优势。而且很多时候有些计算并不能在一个或两个时钟周期内执行完毕,如果每次输入的串行数据都需要等待上一次计算执行完毕后才能开启下一次的计算,那效率是相当低的。流水线就是解决多周期下串行数据计算效率低的问题。 流水线 流水线的基

  • 问题内容: 我必须用Java实现HTTP客户端,并且出于我的需要,似乎最有效的方法是实现HTTP管道(按照RFC2616)。 顺便说一句,我想管道POST。(我也不在谈论多路复用。我在谈论流水线,即在接收到任何HTTP请求的响应之前,通过一个连接发送许多请求) 我找不到明确声明其支持流水线的第三方库。但是我可以使用例如Apache HTTPCore 来构建这样的客户端,或者如果需要的话,可以自己构