当前位置: 首页 > 面试题库 >

具有索赔的Redis队列已过期

南门欣怡
2023-03-14
问题内容

我有一个队列接口,我想在Redis中实现。诀窍在于,每个工人可以在假定该工人坠毁后需要N秒钟的时间索取一件物品,并且该物品需要再次索取。完成后移走物品是工人的责任。您将如何在Redis中做到这一点?我正在使用phpredis,但这是无关紧要的。


问题答案:

为了在redis中实现一个简单的队列,该队列可用于重新提交崩溃的作业,我会尝试如下操作:

  • 1个清单“ up_for_grabs”
  • 1个清单“ being_worked_on”
  • 自动过期锁

试图找工作的工人会做这样的事情:

timeout = 3600
#wrap this in a transaction so our cleanup wont kill the task
#Move the job away from the queue so nobody else tries to claim it
job = RPOPLPUSH(up_for_grabs, being_worked_on)
#Set a lock and expire it, the value tells us when that job will time out. This can be arbitrary though
SETEX('lock:' + job, Time.now + timeout, timeout)
#our application logic
do_work(job)

#Remove the finished item from the queue.
LREM being_worked_on -1 job
#Delete the item's lock. If it crashes here, the expire will take care of it
DEL('lock:' + job)

而且不时地,我们只需获取清单并检查其中的所有作业实际上是否都已锁定。如果我们发现没有锁的任何作业,则意味着该作业已过期,我们的工作人员很可能崩溃了。在这种情况下,我们将重新提交。

这将是伪代码

loop do
    items = LRANGE(being_worked_on, 0, -1)
    items.each do |job| 
        if !(EXISTS("lock:" + job))
            puts "We found a job that didn't have a lock, resubmitting"
            LREM being_worked_on -1 job
            LPUSH(up_for_grabs, job)
        end
    end
    sleep 60
end


 类似资料:
  • 问题内容: 我发现以下代码通过ZeroMQ和Node.js实现了异步消息队列(实际上没有队列,只有文件) 代码是从这里开始的。 函数“ WriteFile”和“ DeleteFile”在代码的后面定义,但是那里没有什么特别的。 函数“ client.send”也定义在另一个文件中,在该文件中定义了回调。显然,ZeroMQ提供了在消息传输成功时进行回调的功能。 现在,为了简化起见,我想使用Redis

  • 主要内容:什么是Stream?,常用命令汇总,基本命令应用,创建消息ID,创建消费组,消费消息Redis Stream 是 Redis 5.0 版本引入的一种新数据类型,同时它也是 Redis 中最为复杂的数据结构,本节主要对 Stream 做相关介绍。 什么是Stream? Stream 实际上是一个具有消息发布/订阅功能的组件,也就常说的消息队列。其实这种类似于 broker/consumer(生产者/消费者)的数据结构很常见,比如 RabbitMQ 消息中间件、Celery 消息中间

  • 异步队列区别于 RabbitMQ Kafka 等消息队列,它只提供一种 异步处理 和 异步延时处理 的能力,并 不能 严格地保证消息的持久化和 不支持 完备的 ACK 应答机制。 安装 composer require hyperf/async-queue 配置 配置文件位于 config/autoload/async_queue.php,如文件不存在可自行创建。 暂时只支持 Redis Dri

  • 问题内容: 我已经看到了线程池执行程序的实现及其所提供的拒绝执行策略。但是,我有一个自定义要求- 我想拥有一个回调机制,在该机制中,当达到队列大小限制时,我会收到通知,并说何时队列大小减少到最大允许队列大小的80%。 我觉得可以通过子类化线程池执行程序来实现,但是已经有一个实现的版本吗?我很乐意在需要时提供更多详细信息和我的工作,以便提供清晰的信息。 问题答案: 我希望有一个回调机制,当达到队列大

  • 问题内容: 我正在使用的类创建用于运行Web服务器的请求处理程序的固定线程池: 并且说明是: 创建一个线程池,该线程池重用在共享的 无边界 队列上运行的一组固定线程。 但是,我正在寻找实现与缓冲池完全相同的线程池实现,除了使用有 界 队列。有这样的实现吗?还是我需要为固定线程池实现自己的包装器? 问题答案: 您想要做的是新建自己的ExecutorService,可能使用ThreadPoolExec

  • 问题内容: 我的总体问题是: 使用Redis for PubSub,当发布者将消息推送到频道中的速度比订阅者能够阅读它们的速度快时,消息会如何处理? 例如,假设我有: 一个简单的发布者以2 msg / sec的速度发布消息。 一个简单的订户以1 msg / sec的速率读取消息。 我天真的假设是订户只会看到发布到Redis上的消息的50%。为了验证这一理论,我编写了两个脚本: pub.py 子py