当前位置: 首页 > 知识库问答 >
问题:

特定收件人使用redis和python使用的故障安全消息广播

符献
2023-03-14

因此,Redis5.0新引入了一个名为Streams的新功能。它们似乎非常适合为进程间通信分发消息:

  • 在可靠性方面,它们超越了发布/订阅事件消息的能力:发布/订阅是一种“火与忘”的方式,无法保证收件人会收到消息
  • redis列表有些低级,但仍然可以使用。但是,流针对性能和上述用例进行了优化

然而,由于这个特性是相当新的,几乎没有任何Python(甚至一般redis)手册,我真的不知道如何使流系统适应我的用例。

我希望有一个发布者程序,将消息推送到流中并包含收件人信息(如收件人:“user1”)。然后我将有几个接收过程,所有这些过程都应该检查新的流消息,并比较它们是否是目标接收者。如果是,他们应该处理消息并将其标记为已处理(已确认)。

然而,我并不真正了解消费者群体、未决状态等等。有人能给我一个真实世界的例子来说明我的小伪代码吗?

发件人。派克

db = Redis(...)
db.the_stream.add({"recipient": "user1", "task": "be a python"})

收件人py(将有许多实例使用唯一的收件人id运行)

recipient_id = "user1" # you get the idea...
db = Redis(...)
while True:
    message = db.the_stream.blocking_read("$") # "$" somehow means: just receive new messages
    if message.recipient == recipient_id:
        perform_task(message.task)
        message.acknowledge() # let the stream know it was processed
    else:
        pass # well, do nothing here since it's not our message. Another recipient instance should do the job.```

共有1个答案

柳飞飙
2023-03-14

通过您给出的示例和伪代码,让我们想象一下:

  • recipient.user1每分钟收到60条信息
  • perform_task()方法需要2秒执行。

这里发生的事情是显而易见的:新消息传入和处理之间的延迟只会随着时间的推移而增加,与“实时处理”的距离越来越远。

系统吞吐量=30条消息/分钟

要解决这个问题,您可能需要为user1创建一个消费者组。在这里,您可以有4个不同的python进程并行运行,所有4个进程都加入到user1的同一组中。现在,当user1收到一条消息时,4个工作人员中的一个将拾取该消息并perform_task()

系统吞吐量=120条消息/分钟

在您的示例中,消息。acknowledge()实际上并不存在,因为流读取器是单独存在的(XREAD命令)。

如果它是一个组,消息的确认变得至关重要,这就是redis如何知道组成员之一实际上处理了该消息,因此它可能会继续前进(它可能会忘记该消息正在等待确认的事实)。当您使用组时,有一点服务器端逻辑可以确保每条消息都传递给一个消费者组工作人员一次(XGROUPREAD命令)。当客户端完成后,它发出对该消息的确认(XACK命令),以便服务器端“消费者组缓冲区”可以删除该消息并继续前进。

想象一下,如果一个工人死了,却从未承认这条信息。使用使用者组,您可以注意这种情况(使用XPENDING命令),并根据这些情况采取行动,例如,尝试在另一个使用者中处理相同的消息。

当您不使用组时,redis服务器不需要“继续前进”,“确认”成为100%客户端/业务逻辑。

 类似资料:
  • 本文向大家介绍Java中故障快速和故障安全之间的区别,包括了Java中故障快速和故障安全之间的区别的使用技巧和注意事项,需要的朋友参考一下 序号 键 不及格 故障安全 1 例外 集合中的任何更改(例如在线程期间添加,删除和更新集合)都是迭代集合,然后使快速抛出并发修改异常失败。  故障安全集合不会引发异常。  2。 集合类型 ArrayList和hashmap集合是快速失败迭代器的示例  Copy

  • 我们正在设计一个解决方案,它将使用JMS使用来自IBMMQ的消息。计划是使用WASLiberty,所以JMS是首选技术。我们将创建Message-Drive bean来侦听MQ队列中的消息。 我们也在考虑WAS自由和开放自由。 这里的诀窍是,我们必须使用故障转移来实现它,这样,如果一台服务器出现故障,另一台服务器将继续自动使用MQ中的消息。比如在主动/被动机制中。 我知道需要安装MQ适配器,因为它

  • YAML模式被定义为一组标签的组合,并包括用于解析非特定标签的机制。 YAML中的故障安全模式以这样的方式创建,即它可以与任何YAML文档一起使用。 它也被视为通用YAML文档的推荐架构。 类型 故障安全模式有两种类型:通用映射和通用序列。 通用映射 它代表一个关联容器。 这里,每个键在关联中是唯一的,并且映射到恰好一个值。 YAML对键定义没有任何限制。 下面给出了表示通用映射的示例 - JSO

  • 问题内容: Java中有两种迭代器:故障安全和故障快速。 这是什么意思,它们之间的区别是什么? 问题答案: 他们之间有什么区别… “故障安全”(在工程中)表示某些故障不会造成或只造成最小的损坏。严格地说,有 没有这样的事情 在Java中的故障安全迭代器。如果迭代器失败(通常为“失败”),则可能会发生损坏。 我怀疑您实际上是在说“弱一致性”迭代器。Javadoc说: “大多数并发Collection

  • 我在使用时遇到了困难,无法从开头或其他任何显式偏移量读取它。 为同一主题的使用者运行命令行工具,我确实看到带有选项的消息,否则它将挂起 我使用的是kafka-python 0.9.5,而代理运行的是Kafka8.2。不确定确切的问题是什么。 按照dpkp的建议设置_group_id=none_以模拟控制台使用者的行为。

  • 我们在每个节点上都有3个redis服务器和sentinel(由Linux人员设置):devredis01:6383(主)devredis02:6383(从)devredis03:6383(从)devredis01:26379(sentinel)devredis02:26379(sentinel)devredis03:26379(sentinel) 我能够将StackExchange客户机连接到re