当前位置: 首页 > 面试题库 >

使用Redis实施消息队列时出错,使用BLPOP时出错

黄景胜
2023-03-14
问题内容

我正在尝试使用Redis构建消息队列。每当客户端发送新数据时,它们就会被添加到列表中。

这是它的代码

$client->lPush("my_queue", $data);

现在有一个单独的脚本 slave.php ,它将弹出新到达的数据并进行处理。slave.php的代码

while (true) {
   list($queue, $message)  = $client->brPop(["my_queue"], 0);

    /*
    Logic to process the data
    */
}

我已经修改了apache启动脚本,以便slave.php应该以apache开始和停止。它运作良好。但是在等待几分钟后,brPop停止监听并显示以下错误消息:

Uncaught exception 'Predis\Connection\ConnectionException' with message 'Error while reading line from the server [tcp://127.0.0.1:6379]' in /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php:139
Stack trace:
#0 /var/www/api/lib/predis-0.8/lib/Predis/Connection/StreamConnection.php(205): Predis\Connection\AbstractConnection->onConnectionError('Error while rea...')
#1 /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php(128): Predis\Connection\StreamConnection->read()
#2 /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php(120): Predis\Connection\AbstractConnection->readResponse(Object(Predis\Command\ListPopLastBlocking))
#3 /var/www/api/lib/predis-0.8/lib/Predis/Client.php(227): Predis\Connection\AbstractConnection->executeCommand(Object(Predis\Command\ListPopLastBlocking))
#4 /var/www/api/lib/slave.php(7): Predis\Client->__call('brPop', Array)
#5 /var/www/api/lib/slave.php(7): Predis\Client->brPop(Array, 0)
#6 {main}
 thrown in /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php on line 139

根据文档,如果list为空,则BLPOP /
BRPOP会阻止连接,直到另一个客户端对其中一个键执行LPUSH或RPUSH操作。但这对我而言并没有发生。就我而言,一旦brpop阻止了连接,即使新数据到达列表中,它也不会再次侦听。

我应该进行哪些更改才能使其正常工作?


问题答案:

它现在对我有用,但是我不确定这是否是正确的方法。现在,我正在捕获错误并在连接失败的情况下递归调用该函数。我的新slave.php看起来像这样:

function process_data()
{
    try {
        $client = new \Predis\Client();

        require_once("logger.php");

        while (true) {
            list($queue, $message) = $client->brPop(["bookmark_queue"], 0);
            // logic
        }
    } catch (Exception $ex) {
        $error = $ex->getMessage();
        log_error($error, "slave.php");
        process_data(); // call the function recursively if connection fails
    }
}
process_data(); // call the function


 类似资料:
  • ActiveMQ:5.10.2在ServiceMix的Karaf OSGi中 卡哈布坚持。 默认代理设置。连接中的默认设置(TCP://x.x.x.x.x:61616) 一切正常,但是:如果我将消费者的数量减少到1(或者2或3个,我不知道阈值在哪里),那么来自1个队列的消息将被消耗,来自另一个队列的消息将被存储。过了一段时间,我看到了这张照片: 1用户停止接收消息。他认为没有更多消息了。 从act

  • 主要内容:什么是Stream?,常用命令汇总,基本命令应用,创建消息ID,创建消费组,消费消息Redis Stream 是 Redis 5.0 版本引入的一种新数据类型,同时它也是 Redis 中最为复杂的数据结构,本节主要对 Stream 做相关介绍。 什么是Stream? Stream 实际上是一个具有消息发布/订阅功能的组件,也就常说的消息队列。其实这种类似于 broker/consumer(生产者/消费者)的数据结构很常见,比如 RabbitMQ 消息中间件、Celery 消息中间

  • 我试图列出屏幕上剩余的应用编程接口的信息。我有这个方法: 这里我得到的错误: _类型错误(类型“\u InternalLinkedHashMap 我也尝试过这种做法: 但这里我得到了一个错误: _类型错误(类型“\u InternalLinkedHashMap 你们可以帮我解决这个错误。我感谢您的评论! 更新我也用过这种方法,但没有用! 我的模型:

  • 我试图将Firebase云消息集成到我的应用程序中,以便向用户发送通知。当试图启动应用程序,我得到以下错误: 这在这里发生: 在FIRAuthm. 这是我的大多数应用委托的样子: 发生了什么,我如何修复这个错误? 我试着按照这里的指示去做。。。

  • 我尝试将我的自定义类型的< code>ProducerRecord发送到Kafka,但我收到错误消息: 我在schema:GET中设置了schema 回应: 得到 回答 } 这是我的Scala类: Kafka制作人的部分: 我错过了什么?我看到我可以为我的类实现SpecificRecord,但在我阅读的书/教程中,我没有看到这一点。谢谢 编辑:固定类名