当前位置: 首页 > 知识库问答 >
问题:

如何发送basicAck到入站适配器后,发布者确认从出站适配器

庞阳波
2023-03-14

我们有一个入站通道适配器,用于接收事件通知。消费者标准的复杂性限制了我们使用简单路由键来分发消息的能力,因此应用程序使用分离器通过直接交换将消息发送到感兴趣的订户队列。

我们希望在出站通道适配器上使用publisher confirms,以确保传递到客户端队列。我们希望等待发布者确认原始消息,如果未收到发布者确认,或者如果ack==false,我们希望nack来自入站通道适配器的原始消息。

我假设这将在兔子模板的确认回调中完成,但我不确定如何完成。(或者如果这是可能的)

<rabbit:connection-factory id="rabbitConnectionFactory" 
        host="${amqpHost}"
        username="${amqpUsername}"
        password="${amqpPassword}"
        virtual-host="${amqpVirtualHost}" 
        publisher-confirms="true" />

<rabbit:template id="rabbitTemplate" 
        connection-factory="rabbitConnectionFactory"
        confirm-callback="PublisherConfirms"    />

<int-amqp:inbound-channel-adapter channel="notificationsFromRabbit" 
        queue-names="#{'${productNotificationQueue}' + '${queueSuffix}'}"
        connection-factory="rabbitConnectionFactory"
        mapped-request-headers="*"
        message-converter="productNotificationMessageConverter"  />

<int:chain input-channel="notificationsFromRabbit" output-channel="notificationsToClients">
        <int:service-activator ref="NotificationRouter" 
                      method="addRecipientsHeaders" />
        <int:splitter ref="NotificationRouter" 
                      method="groupMessages" />
        <int:object-to-json-transformer />
</int:chain>

<int-amqp:outbound-channel-adapter channel="notificationsToClients"
        amqp-template="rabbitTemplate"
        exchange-name="${servicesClientsExchange}"
        routing-key=""
        mapped-request-headers="*"  />

目前,我们正在通过将通道和传递标签作为参数传递给groupMessages方法来确认中的消息。但是,如果代理从不发送返回或返回带有ack=false的消息,那么从入站通道适配器发送nack消息就太迟了。

我需要一个保存地图的bean吗?

在我收到发布者确认时,入站通道适配器的通道是否将关闭?


共有1个答案

阳念
2023-03-14

只要在收到所有ACK/NACK之前暂停使用者线程,您就可以做您想做的事情。

如果您使通知从兔子发布订阅通道,您可以添加另一个订阅者(服务激活器),在那里您挂起线程;等待所有ack/nack并采取您想要的操作。

编辑:

您还可以使用springintegration为您管理ack,它将从出站适配器(而不是自己使用回调)将它们作为消息发出。

编辑2:

然后,您可以在相关数据中使用拆分器的序列大小/序列号头,以便在接收到所有ack时释放使用者。

编辑3:

像这样的东西应该有用...

在出站适配器上,设置confirm correlation expression=“#this”(整个出站消息)。

用两种方法类

private final Map<String, BlockingQueue<Boolean> suspenders;

public void suspend(Message<?> original) {
    BlockingQueue<Boolean> bq = new LinkedBlockingQueue();
    String key = someKeyFromOriginal(original);
    suspenders.put(key, bq);
    Boolean result = bq.poll(// some timeout);
    try {
        if (result == null) {
            // timed out
        } 
        else if (!result) {
            // throw some exception to nack the message
        }
    }
    finally {
        suspenders.remove(key);
    }
}

public void ackNack(Message<Message<?>> ackNak) {
    Message<?> theOutbound = ackNak.payload;
    BlockingQueue<Boolean> bq = suspenders.get(someKeyFromOriginal(theOutbound));
    if (bq == null) // late ack/nack; ignore
    else {
        // check the ack/nack header
        // if nack, bq.put(false)
        // else, use another map field, to 
        // keep track of ack count Vs sequenceSize header in 
        // theOutbound; when all acks received, bq.put(true);
    }
}

在第一种方法中挂起使用者线程;将ACK/NACK从出站适配器路由到第二个方法。

警告:这不是测试,只是在我的脑海中;但它应该很接近。

 类似资料:
  • 问题内容: Spring Integration FTP中的入站通道适配器和出站通道适配器之间有什么区别?我应该使用哪一个?何时使用? 我从文档中了解到,出站可以发送任何类型的文件(例如byte [],String,java.io.File),但入站仅限于文件类型。那仅仅是区别还是其他? 问题答案: 我建议您首先阅读理论 。 任何Inbound适配器都旨在从外部系统获取数据。Outbound-放置

  • 使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息 通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题 Spring XML配置 JUnit测试代码 测试用例成功,在调试时,我发现channel.send()返回true 我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。 bin/kaf

  • 问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更

  • 我在实现某些功能时遇到了一些问题,当我删除文件时,我注意到了一些不一致。 1)当我删除多个文件时,有时并不是所有文件都被转移到正确的目录。