当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ: Ack/Nack关闭并重新打开的通道上的消息

商业
2023-03-14

我从RabbitMq服务器收到此错误

服务器关闭通道:406(前提条件-失败),并显示消息“前提条件_失败-未知交付标签80”

发生这种情况是因为在消费者任务期间连接丢失,最后,当消息被确认/nack时,我收到此错误,因为我无法在与我接收它的通道不同的通道上确认消息。

这是 RabbitMq 连接的代码

async connect({ prefetch = 1, queueName }) {
    this.queueName = queueName;
    console.log(`[AMQP][${this.queueName}] | connecting`);
    return queue
        .connect(this.config.rabbitmq.connstring)
        .then(conn => {
            conn.once('error', err => {
                this.channel = null;
                if (err.message !== 'Connection closing') {
                    console.error(
                        `[AMQP][${this.queueName}] (evt:error) | ${err.message}`,
                    );
                }
            });

            conn.once('close', () => {
                this.channel = null;
                console.error(
                    `[AMQP][${this.queueName}] (evt:close) | reconnecting`,
                );
                this.connect({ prefetch, queueName: this.queueName });
            });
            return conn.createChannel();
        })
        .then(ch => {
            console.log(`[AMQP-channel][${this.queueName}] created`);
            ch.on('error', err => {
                console.error(
                    `[AMQP-ch][${this.queueName}] (evt:error) | ${err.message}`,
                );
            });
            ch.on('close', () => {
                console.error(`[AMQP-ch][${this.queueName}] (evt:close)`);
            });
            this.channel = ch;
            return this.channel;
        })
        .then(ch => {
            return this.channel.prefetch(prefetch);
        })
        .then(ch => {
            return this.channel.assertQueue(this.queueName);
        })
        .then(async ch => {
            while (this.buffer.length > 0) {
                const request = this.buffer.pop();
                await request();
            }
            return this.channel;
        })
        .catch(error => {
            console.error(error);
            console.log(`[AMQP][${this.queueName}] reconnecting in 1s`);
            return this._delay(1000).then(() =>
                this.connect({ prefetch, queueName: this.queueName }),
            );
        });
}

async ack(msg) {
    try {
        if (this.channel) {
            console.log(`[AMQP][${this.queueName}] ack`);
            await this.channel.ack(msg);
        } else {
            console.log(`[AMQP][${this.queueName}] ack (buffer)`);
            this.buffer.push(() => {
                this.ack(msg);
            });
        }
    } catch (e) {
        console.error(`[AMQ][${this.queueName}] ack error: ${e.message}`);
    }
}

如您所见,连接建立后会创建一个通道,在我收到连接问题后,通道设置为NULL,1秒钟后连接重试,重新创建一个新通道。

为了管理离线时间,我使用了一个缓冲区,该缓冲区收集通道为NULL时发送的所有ack消息,并在连接重新稳定后卸载缓冲区。

所以基本上,我必须找到一种方法来发送一个确认字符后,连接丢失或通道关闭犹豫的原因。

谢谢你的帮助

共有2个答案

史骏祥
2023-03-14

如果连接由于某种原因被丢弃或断开,则无法发送 ACK,因为连接发生在套接字级别,一旦关闭,就无法使用相同的套接字重新创建它。

当连接断开时,消息将保持非 ACK,因此另一个侦听器可以处理它,或者当它再次连接时,断开连接的侦听器将再次处理它。

在我看来,你试图解决的问题不是RabbitMQ给出的,而是由基础的套接字实现给出的。

您可以通过避免管理消息缓冲区并利用RabbitMQ的特性来解决这一问题,只要侦听器再次连接,Rabbit MQ就会重新显示最后一条未处理的消息。

薛朝
2023-03-14

一旦通道关闭,您就无法确认消息(无论是什么原因)。代理将自动将相同的消息重新传递给另一个消费者。

这在RabbitMQ消息确认部分中有很好的记录。

当消费者失败或失去连接时:自动请求

使用手动确认时,当发生传递的通道(或连接)关闭时,任何未确认的传递(消息)都会自动重新排队。这包括客户端的 TCP 连接丢失、使用者应用程序(进程)故障和通道级协议异常(如下所述)。

...

由于这种行为,消费者必须准备好处理重新交付,否则将在考虑幂等性的情况下实现。重新交付将有一个特殊的布尔属性重新交付,由RabbitMQ设置为true。对于首次交付,它将设置为false。请注意,消费者可以接收之前传递给另一个消费者的消息。

正如文档所建议的,您需要通过实现消息等幂设计模式在消费者端处理此类问题。换句话说,您的体系结构应该准备好处理由于错误导致的消息重新传递。

或者,您可以禁用消息确认并获取“一次传递”类型的模式。这意味着在发生错误时,您将不得不处理消息丢失。

事项的进一步读数:https://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

Kafka引入了新的语义学:https://bravenewgeek.com/you-cannot-have-exactly-once-delivery-redux/

 类似资料:
  • 问题内容: 我可以轻松进行“作业”,但是我发现输入流的关闭存在一些问题。简单地说,我必须使用Java创建一个联系人“列表”应用程序,才能以正确的方式使用多态。所以我有一个Contact类和一个Private类(contact)。在这两个类中,都有一个Modify方法来更改变量的值。 这是不会产生问题的Contact方法 相反,这是Private中方法的替代。首先,我创建一个Private对象,然后

  • 问题内容: 我的警报消息有问题。它正常显示,当用户按下(关闭)时,我可以将其关闭,但是当用户尝试再次显示它(例如,单击按钮事件)时,它不会显示。(此外,如果我将此警报消息打印到控制台,则等于。)我的代码在这里: 事件: PS! 仅在发生某些事件(例如,单击按钮)后才需要显示警报消息。还是我做错了什么? 问题答案: 数据删除会完全删除该元素。请改用jQuery的.hide()方法。 快速修复方法:

  • 使用以下代码时: 以下用户行为创建了一种不一致的通知方法: null null

  • 我有一个问题,就是在一次单击一个按钮的过程中处理两个JFrames。我有2个JFrames(SUC_Forms和add_suc),当SUC_Forms中的按钮“Add Records”被点击时,将显示另一个新的JFrame,它是add_suc JFrame,包含注册/归档的文本框。当我按ok验证并插入数据库中的所有数据时,将弹出一个JDialogbox进行确认。当我点击“OK”后,两个JFrame

  • 新窗口打开链接 该方法可以新起webview打开页面,由于客户端窗口过多会占用较多内存,请谨慎使用 Tida.pushWindow("http://www.tmall.com/go/chn/common/u-award.php?disableptf=1"); 关闭窗口 关闭当前webview窗口 Tida.popWindow();

  • 在RabbitMQ总线上使用带重载的spring amqp,我们有时会从org获取日志。springframework。amqp。兔子联系CachingConnectionFactory说:通道关闭:清洁通道关闭;协议方法:#方法 你能解释一下这个日志吗?为什么它处于错误级别?我们有什么调整吗?提前谢谢你的回答。