我从RabbitMq服务器收到此错误
服务器关闭通道:406(前提条件-失败),并显示消息“前提条件_失败-未知交付标签80”
发生这种情况是因为在消费者任务期间连接丢失,最后,当消息被确认/nack时,我收到此错误,因为我无法在与我接收它的通道不同的通道上确认消息。
这是 RabbitMq 连接的代码
async connect({ prefetch = 1, queueName }) {
this.queueName = queueName;
console.log(`[AMQP][${this.queueName}] | connecting`);
return queue
.connect(this.config.rabbitmq.connstring)
.then(conn => {
conn.once('error', err => {
this.channel = null;
if (err.message !== 'Connection closing') {
console.error(
`[AMQP][${this.queueName}] (evt:error) | ${err.message}`,
);
}
});
conn.once('close', () => {
this.channel = null;
console.error(
`[AMQP][${this.queueName}] (evt:close) | reconnecting`,
);
this.connect({ prefetch, queueName: this.queueName });
});
return conn.createChannel();
})
.then(ch => {
console.log(`[AMQP-channel][${this.queueName}] created`);
ch.on('error', err => {
console.error(
`[AMQP-ch][${this.queueName}] (evt:error) | ${err.message}`,
);
});
ch.on('close', () => {
console.error(`[AMQP-ch][${this.queueName}] (evt:close)`);
});
this.channel = ch;
return this.channel;
})
.then(ch => {
return this.channel.prefetch(prefetch);
})
.then(ch => {
return this.channel.assertQueue(this.queueName);
})
.then(async ch => {
while (this.buffer.length > 0) {
const request = this.buffer.pop();
await request();
}
return this.channel;
})
.catch(error => {
console.error(error);
console.log(`[AMQP][${this.queueName}] reconnecting in 1s`);
return this._delay(1000).then(() =>
this.connect({ prefetch, queueName: this.queueName }),
);
});
}
async ack(msg) {
try {
if (this.channel) {
console.log(`[AMQP][${this.queueName}] ack`);
await this.channel.ack(msg);
} else {
console.log(`[AMQP][${this.queueName}] ack (buffer)`);
this.buffer.push(() => {
this.ack(msg);
});
}
} catch (e) {
console.error(`[AMQ][${this.queueName}] ack error: ${e.message}`);
}
}
如您所见,连接建立后会创建一个通道,在我收到连接问题后,通道设置为NULL,1秒钟后连接重试,重新创建一个新通道。
为了管理离线时间,我使用了一个缓冲区,该缓冲区收集通道为NULL时发送的所有ack消息,并在连接重新稳定后卸载缓冲区。
所以基本上,我必须找到一种方法来发送一个确认字符后,连接丢失或通道关闭犹豫的原因。
谢谢你的帮助
如果连接由于某种原因被丢弃或断开,则无法发送 ACK,因为连接发生在套接字级别,一旦关闭,就无法使用相同的套接字重新创建它。
当连接断开时,消息将保持非 ACK,因此另一个侦听器可以处理它,或者当它再次连接时,断开连接的侦听器将再次处理它。
在我看来,你试图解决的问题不是RabbitMQ给出的,而是由基础的套接字实现给出的。
您可以通过避免管理消息缓冲区并利用RabbitMQ的特性来解决这一问题,只要侦听器再次连接,Rabbit MQ就会重新显示最后一条未处理的消息。
一旦通道关闭,您就无法确认消息(无论是什么原因)。代理将自动将相同的消息重新传递给另一个消费者。
这在RabbitMQ消息确认部分中有很好的记录。
当消费者失败或失去连接时:自动请求
使用手动确认时,当发生传递的通道(或连接)关闭时,任何未确认的传递(消息)都会自动重新排队。这包括客户端的 TCP 连接丢失、使用者应用程序(进程)故障和通道级协议异常(如下所述)。
...
由于这种行为,消费者必须准备好处理重新交付,否则将在考虑幂等性的情况下实现。重新交付将有一个特殊的布尔属性重新交付,由RabbitMQ设置为true。对于首次交付,它将设置为false。请注意,消费者可以接收之前传递给另一个消费者的消息。
正如文档所建议的,您需要通过实现消息等幂设计模式在消费者端处理此类问题。换句话说,您的体系结构应该准备好处理由于错误导致的消息重新传递。
或者,您可以禁用消息确认并获取“一次传递”类型的模式。这意味着在发生错误时,您将不得不处理消息丢失。
事项的进一步读数:https://bravenewgeek.com/you-cannot-have-exactly-once-delivery/
Kafka引入了新的语义学:https://bravenewgeek.com/you-cannot-have-exactly-once-delivery-redux/
问题内容: 我可以轻松进行“作业”,但是我发现输入流的关闭存在一些问题。简单地说,我必须使用Java创建一个联系人“列表”应用程序,才能以正确的方式使用多态。所以我有一个Contact类和一个Private类(contact)。在这两个类中,都有一个Modify方法来更改变量的值。 这是不会产生问题的Contact方法 相反,这是Private中方法的替代。首先,我创建一个Private对象,然后
问题内容: 我的警报消息有问题。它正常显示,当用户按下(关闭)时,我可以将其关闭,但是当用户尝试再次显示它(例如,单击按钮事件)时,它不会显示。(此外,如果我将此警报消息打印到控制台,则等于。)我的代码在这里: 事件: PS! 仅在发生某些事件(例如,单击按钮)后才需要显示警报消息。还是我做错了什么? 问题答案: 数据删除会完全删除该元素。请改用jQuery的.hide()方法。 快速修复方法:
使用以下代码时: 以下用户行为创建了一种不一致的通知方法: null null
我有一个问题,就是在一次单击一个按钮的过程中处理两个JFrames。我有2个JFrames(SUC_Forms和add_suc),当SUC_Forms中的按钮“Add Records”被点击时,将显示另一个新的JFrame,它是add_suc JFrame,包含注册/归档的文本框。当我按ok验证并插入数据库中的所有数据时,将弹出一个JDialogbox进行确认。当我点击“OK”后,两个JFrame
新窗口打开链接 该方法可以新起webview打开页面,由于客户端窗口过多会占用较多内存,请谨慎使用 Tida.pushWindow("http://www.tmall.com/go/chn/common/u-award.php?disableptf=1"); 关闭窗口 关闭当前webview窗口 Tida.popWindow();
在RabbitMQ总线上使用带重载的spring amqp,我们有时会从org获取日志。springframework。amqp。兔子联系CachingConnectionFactory说:通道关闭:清洁通道关闭;协议方法:#方法 你能解释一下这个日志吗?为什么它处于错误级别?我们有什么调整吗?提前谢谢你的回答。