当前位置: 首页 > 知识库问答 >
问题:

Rabbitmq-php amqp破裂的管道错误

施永宁
2023-03-14

我正在处理一个巨大的xml文档(包含大约一百万个条目),然后使用rabbitmq将一个格式化版本导入数据库。每次发布大约200000个条目后,我都会收到一个断管错误,rabbitmq无法从中恢复。

通知错误:f写():发送2651字节失败,errno=11资源暂时不可用在[/var/www/ribbon/app/控制台/命令/lib/php_amqplib/amqp.inc,第439行]

注意错误:fwrite():发送33字节失败,错误号为104,对等方在[/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,第439行]中重置连接

通知错误:f写():发送19个字节失败,errno=32在[/var/www/ribbon/app/控制台/命令/lib/php_amqplib/amqp.inc,第439行]

这随后会导致节点关闭错误,需要手动杀死进程才能从中恢复。

以下是我的课堂方法:-

public function publishMessage($message) {
    if (!isset($this->conn)) {
        $this->_createNewConnectionAndChannel();
    }
    try {
        $this->ch->basic_publish(
            new AMQPMessage($message, array('content_type' => 'text/plain')), 
            $this->defaults['exchange']['name'], 
            $this->defaults['binding']['routing_key']
        );
    } catch (Exception $e) {
        echo "Caught exception : " . $e->getMessage();
        echo "Creating new connection.";
        $this->_createNewConnectionAndChannel();
        $this->publishMessage($message); // try again
    }
}

protected function _createNewConnectionAndChannel() {
    if (isset($this->conn)) {
        $this->conn->close();
    }

    if(isset($this->ch)) {
        $this->ch->close();
    }

    $this->conn = new AMQPConnection(
        $this->defaults['connection']['host'], 
        $this->defaults['connection']['port'], 
        $this->defaults['connection']['user'], 
        $this->defaults['connection']['pass']
    );
    $this->ch = $this->conn->channel();
    $this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true);
    $this->ch->basic_qos(0 , 20 , 0); // fair dispatching

    $this->ch->queue_declare(
        $this->defaults['queue']['name'],
        $this->defaults['queue']['passive'],
        $this->defaults['queue']['durable'],
        $this->defaults['queue']['exclusive'],
        $this->defaults['queue']['auto_delete']
    );

    $this->ch->exchange_declare(
        $this->defaults['exchange']['name'],
        $this->defaults['exchange']['type'],
        $this->defaults['exchange']['passive'],
        $this->defaults['exchange']['durable'],
        $this->defaults['exchange']['auto_delete']
    );

    $this->ch->queue_bind(
        $this->defaults['queue']['name'],
        $this->defaults['exchange']['name'],
        $this->defaults['binding']['routing_key']
    );
}

任何帮助都将不胜感激。

共有3个答案

杨凌
2023-03-14

这个问题发生在我与RabbitMQ的连接断开时(原因无关紧要,在我的情况下,我故意停止RabbitMQ服务进行一些失败测试),我试图通过关闭旧连接并初始化新连接来重新连接RabbitMQ,但我收到了管道断裂或连接关闭的错误消息。我解决此问题的方法是在我的连接上使用重新连接()方法:

$channel->reconnect();

周楷
2023-03-14

实际上,当你的消息中有大量内容时,这个问题就会发生,而你的消费者花费了太多时间来处理一条消息,这就是响应兔子的“确认字符”并尝试消费另一条消息的问题。

例如,当我遇到这个问题时,我会尝试“适应”我的消息,因为它是一个产品工人,每条消息都有一些类似1k的产品id,所以我改为100个产品,效果非常好。

你可以在这里阅读更多关于用心跳检测死TCP连接的信息

东方化
2023-03-14

确保已在Rabbit MQ上为用户添加virtualhost访问权限。我创建了新用户,但忘记了为默认使用的“/”主机设置访问权限。

您可以通过管理面板(您的主机:15672)完成此操作

P. S.我假设您的RabbitMQ服务正在运行,用户存在,密码正确。

 类似资料:
  • 我有一个非常奇怪的问题,我希望你的眼睛能帮助解决它。 我定义了一个函数,它通过BASH连接到Oracle SQL数据库。连接后,我使用一个herdeoc传入一个简单的select语句,该语句查找最大订阅id并递增它,具体取决于函数被调用的次数。代码如下: 在命令行调用函数时,上述代码非常有效: 但是,当函数通过管道传输到AWK语句时,函数不再抖动,这毫无意义!请参阅下面的输出: 我不明白。哈哈,我

  • 我正在对一个由zuul代理服务支持的简单Spring云应用程序进行一些压力测试。由于Zuul和服务之间的旧连接,我们偶尔会出现管道异常。我使用不同的配置选项配置Zuul,但没有成功: 例外情况如下: 通用域名格式。netflix。祖尔。例外ZuulException:组织转发错误。springframework。云netflix。祖尔。过滤器。路线RibbonRoutingFilter。转发(Ri

  • 我们将我们的集群迁移到Cassandra 1.2(从1.1.7),并尝试在迁移后运行修复(我们按照建议定期运行它们)。 由于SSTable版本不同,修复失败,因此我们使用和并重试。 但是,修复失败了,但有这个例外: 我们还注意到,在死亡之前,< code>nodetool netstats显示了奇怪的统计数据(表传输为756845%...). 编辑 这似乎是压缩SSTables传输的问题,我将尝试

  • 我正在实现与Android应用程序通信的服务器端应用程序。Android应用程序在最初与C服务器通信之前就已经实现了。现在我想用java代码替换C服务器。Android应用程序与服务器进行通信,通过读卡器中的卡对此人进行身份验证。 身份验证协议包含应用程序和服务器之间要成功完成的几个通信步骤。 应用程序和服务器之间的消息格式如下: 首先,应用程序发送一个类型1的请求来建立与读卡器中sim卡的连接。

  • 问题内容: 我在gradle中运行测试时遇到问题。我知道我的gradle配置可以在其他机器上运行,但是不幸的是我的不行。我们有junit test和testNG,当尝试执行时,它们都产生如下的stacktrace: 它将连续产生此错误并带有不同的编号(Gradle Worker 2,Gradle Worker 3等)。有没有人遇到过类似的问题? 我正在使用gradle 1.6。 编辑:我忘了告诉我

  • 我使用ApacheCommons http客户端调用url,使用post方法发布参数,它很少抛出以下错误。 有人能建议是什么导致了这个异常以及如何调试它吗?