当前位置: 首页 > 知识库问答 >
问题:

Redis—在使用BRPOPLPUSH时清理处理队列(可靠)的更好方法

南门烈
2023-03-14

环境再贴现2.8。17

我们已经使用与redis文档中描述的模式类似的模式在RPOPLPPUSH下实现了可靠队列

然而,考虑到其阻塞性质,我们使用BRPOPLPUSH,而LPUSH用于确保FIFO顺序。

生产者:使用LPUSH推送项目的多个线程(来自多个服务器)。

消费者:使用BRPOPLPUSH处理项目的多个线程(来自多个服务器)。

BRPOPLPUSH q processing-q

如文件所述,redis从队列“q”中弹出项目,同时将它们添加到“processing-q”中。

由于应用程序的多线程(异步)特性,我们无法控制使用者何时完成处理。

因此,如果我们使用LREM(根据文档)从processing-q中删除已处理元素,这将只删除processing-q的顶部元素。因为它不能保证是否已删除由相应消费者处理的实际元素。

因此,如果我们不做任何事情,处理q会继续增长(吞噬内存),恕我直言,这是非常糟糕的。

有什么建议或想法吗?

共有3个答案

寿鸣
2023-03-14

在一个类似的项目中,我使用备份队列的工作人员的主机名和进程ID。每个辅助角色都有自己的备份队列,如果辅助角色死亡,该项目不会丢失。

查看自述文件和实现以了解更多详细信息。

聂永怡
2023-03-14

我将采用的方法是使用每个消费者的processing-q(例如processing-q:消费者id)。这将解决您当前的问题,但您仍然需要以某种方式处理崩溃的消费者。为此,我建议您也保留每个消费者最后一次弹出任务的时间,并定期检查超时情况。如果使用者已达到超时,请将其任务移回主队列并删除其队列。

柴禄
2023-03-14

您只需在对LREM的呼叫中包含要删除的作业。

LREM的形式如下:

LREM queue count "object"

它将从队列中删除等于“对象”的计数项。因此,要删除您的消费者线程正在处理的特定作业,您可以执行这样的操作。

LREM processing-q 1 "job_identifier"

有关更多信息,请参阅此处的文档:http://redis.io/commands/lrem

然后,为了处理崩溃的消费者和放弃的作业,您可以使用SETEX创建带过期的锁,并定期检查没有锁的作业。

所以整个过程是这样的:

制作人

  1. RPUSH q"job_identifier"

消费者

  1. SETEX-lock:processing-q:job\u标识符60(首先设置锁以避免竞争条件)
  2. BRPOPLPUSH q处理队列
  3. 处理作业
  4. LREM处理队列“作业标识符”

已过期工作监控

  1. 作业=LRANGE处理队列0-1

@NotAUser发布了一个开源java实现,如下所示:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq

 类似资料:
  • 编辑:Redis+Sidekiq完成该工作。在这里,Redis作为一个消息队列工作,Sidekiq在后台处理这些消息。我很想知道,选择一个显式代理(如RabbitMQ、SQS、Redis PubSub)而不是Redis+SideKiQ有什么用例和好处?

  • 我试图实现事件处理函数,以避免每次组件和时创建新函数。 场景1: 如果我像下面那样在构造函数中绑定函数,并且在中没有参数,那么它只会在bundle文件中创建一个新函数一次 场景2: 但是,当我想将以及传递给函数时,我相信每当组件和 所以 如何更好地编写场景2,使新函数在bundle文件中只创建一次,而不是每次组件渲染和重新渲染时都创建一次?可能吗? 编辑: param1和param2是我自己的自定

  • 问题内容: 在我的JMS应用程序上,我们在生产者上使用临时队列,以便能够接收来自消费者应用程序的回复。 我在这个线程上遇到了与我完全相同的问题:http : //activemq.2283324.n4.nabble.com/jira- Created-AMQ-3336-Temporary-Destination-errors-on-HA-failover-in -broker- network-w

  • 我的发现 根据我的研究,每个队列用于存储发送到每个客户端订阅的主题的消息。例如,当同一个主题(例如)被3个不同的客户端(例如、、&)订阅时,将有3个订阅队列(即、&)。因此,当消息发送到主题时,Artemis将把这些消息放在订阅队列中、、。 实际问题 我们的broker.xml

  • 问题内容: Sidekiq可以阻止哪些可能的原因来处理队列中的作业?队列已满。日志文件表明完全没有活动。因此,队列已满,但日志为空,Sidekiq似乎未处理项目。似乎没有工人在处理工作。重新启动Redis或用FLUSHALL或FLUSHDB冲洗均无效。Sidekiq已开始于 捆绑执行程序sidekiq -L log / sidekiq.log 并生成以下日志文​​件: 您如何找出问题所在?是否有隐

  • 问题内容: 我正在尝试将CS​​V文件读入(字符串的)列表列表,将其传递以从数据库中获取一些数据,构建新数据列表的新列表,然后传递该列表列表,以便写入新的CSV文件。我到处都看了,似乎找不到如何做的例子。 我宁愿不使用简单的数组,因为文件的大小会有所不同,而且我也不知道该如何使用数组的尺寸。我没有处理文件的问题。我只是不确定如何处理列表列表。 我发现的大多数示例都将创建多维数组或在从文件中读取数据