环境再贴现2.8。17
我们已经使用与redis文档中描述的模式类似的模式在RPOPLPPUSH下实现了可靠队列
然而,考虑到其阻塞性质,我们使用BRPOPLPUSH,而LPUSH用于确保FIFO顺序。
生产者:使用LPUSH推送项目的多个线程(来自多个服务器)。
消费者:使用BRPOPLPUSH处理项目的多个线程(来自多个服务器)。
BRPOPLPUSH q processing-q
如文件所述,redis从队列“q”中弹出项目,同时将它们添加到“processing-q”中。
由于应用程序的多线程(异步)特性,我们无法控制使用者何时完成处理。
因此,如果我们使用LREM(根据文档)从processing-q中删除已处理元素,这将只删除processing-q的顶部元素。因为它不能保证是否已删除由相应消费者处理的实际元素。
因此,如果我们不做任何事情,处理q会继续增长(吞噬内存),恕我直言,这是非常糟糕的。
有什么建议或想法吗?
在一个类似的项目中,我使用备份队列的工作人员的主机名和进程ID。每个辅助角色都有自己的备份队列,如果辅助角色死亡,该项目不会丢失。
查看自述文件和实现以了解更多详细信息。
我将采用的方法是使用每个消费者的processing-q(例如processing-q:消费者id)。这将解决您当前的问题,但您仍然需要以某种方式处理崩溃的消费者。为此,我建议您也保留每个消费者最后一次弹出任务的时间,并定期检查超时情况。如果使用者已达到超时,请将其任务移回主队列并删除其队列。
您只需在对LREM的呼叫中包含要删除的作业。
LREM的形式如下:
LREM queue count "object"
它将从队列中删除等于“对象”的计数项。因此,要删除您的消费者线程正在处理的特定作业,您可以执行这样的操作。
LREM processing-q 1 "job_identifier"
有关更多信息,请参阅此处的文档:http://redis.io/commands/lrem
然后,为了处理崩溃的消费者和放弃的作业,您可以使用SETEX创建带过期的锁,并定期检查没有锁的作业。
所以整个过程是这样的:
制作人
RPUSH q"job_identifier"
消费者
SETEX-lock:processing-q:job\u标识符60
(首先设置锁以避免竞争条件)BRPOPLPUSH q处理队列
LREM处理队列“作业标识符”
已过期工作监控
LRANGE处理队列0-1
@NotAUser发布了一个开源java实现,如下所示:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq
编辑:Redis+Sidekiq完成该工作。在这里,Redis作为一个消息队列工作,Sidekiq在后台处理这些消息。我很想知道,选择一个显式代理(如RabbitMQ、SQS、Redis PubSub)而不是Redis+SideKiQ有什么用例和好处?
我试图实现事件处理函数,以避免每次组件和时创建新函数。 场景1: 如果我像下面那样在构造函数中绑定函数,并且在中没有参数,那么它只会在bundle文件中创建一个新函数一次 场景2: 但是,当我想将以及传递给函数时,我相信每当组件和 所以 如何更好地编写场景2,使新函数在bundle文件中只创建一次,而不是每次组件渲染和重新渲染时都创建一次?可能吗? 编辑: param1和param2是我自己的自定
问题内容: 在我的JMS应用程序上,我们在生产者上使用临时队列,以便能够接收来自消费者应用程序的回复。 我在这个线程上遇到了与我完全相同的问题:http : //activemq.2283324.n4.nabble.com/jira- Created-AMQ-3336-Temporary-Destination-errors-on-HA-failover-in -broker- network-w
我的发现 根据我的研究,每个队列用于存储发送到每个客户端订阅的主题的消息。例如,当同一个主题(例如)被3个不同的客户端(例如、、&)订阅时,将有3个订阅队列(即、&)。因此,当消息发送到主题时,Artemis将把这些消息放在订阅队列中、、。 实际问题 我们的broker.xml
问题内容: Sidekiq可以阻止哪些可能的原因来处理队列中的作业?队列已满。日志文件表明完全没有活动。因此,队列已满,但日志为空,Sidekiq似乎未处理项目。似乎没有工人在处理工作。重新启动Redis或用FLUSHALL或FLUSHDB冲洗均无效。Sidekiq已开始于 捆绑执行程序sidekiq -L log / sidekiq.log 并生成以下日志文件: 您如何找出问题所在?是否有隐
问题内容: 我正在尝试将CSV文件读入(字符串的)列表列表,将其传递以从数据库中获取一些数据,构建新数据列表的新列表,然后传递该列表列表,以便写入新的CSV文件。我到处都看了,似乎找不到如何做的例子。 我宁愿不使用简单的数组,因为文件的大小会有所不同,而且我也不知道该如何使用数组的尺寸。我没有处理文件的问题。我只是不确定如何处理列表列表。 我发现的大多数示例都将创建多维数组或在从文件中读取数据