当前位置: 首页 > 知识库问答 >
问题:

如何基于发布者确认提供响应

胥玮
2023-03-14

我有一个web服务,它接收对象,通过AMQP发送通知,并向请求者返回JSON响应。每个请求都是在一个线程上执行的,我正在尝试实现publisher confirms,我正在努力解决如何设置它。我有它的工作,但我不喜欢我这样做。

我这样做的方式是:

  • 在邮件上添加一些标题
  • 拥有一个包含2个订阅者的发布-订阅频道
  • 订户1)创建一个阻塞队列,使其准备就绪,并通过amqp发送消息
  • 订户2)开始在该队列上拉动5秒钟,直到得到确认
  • amqp:出站通道适配器将其发布者确认发送给服务激活器
  • PublisherConfigReceiver接收确认并将其放入阻塞队列,导致订阅服务器2的拉取完成并返回确认结果

这种技术确实可以正常工作,但我不喜欢假设该链将在来自发布-订阅通道的WaitForPublisherConfig服务激活器之前接收消息。在这种情况下,顺序关系到哪个组件首先接收消息。

如果waitForPublisher确认服务激活器首先接收到消息,它只会阻塞线程5秒钟,然后允许链通过amqp: outbind-Channel-适配器发送消息。

我尝试将waitForPublisherConders放在amqp: outbinding-Channel适配器之后,但是由于outbinding-Channel适配器不会“返回”任何东西,所以服务激活器永远不会在链中被调用。

我觉得应该有更好的方法。我的目标是等待publisher确认(或者在spring的publisher确认中找不到支持的超时),然后再向请求者发送响应。

您是否可以帮助我更好地制定解决方案,或者让我知道是否可以依赖发布-订阅频道的第一个订户始终首先接收消息这一事实。

对不起,这个太长了。

一些配置

<int:header-enricher input-channel="addHeaders" output-channel="metadataIngestNotifications">
    <int:header name="routingKey" ref="routingKeyResolver" method="resolveReoutingKey"/>
    <int:header name="notificationId" expression="payload.id" />
</int:header-enricher>

<int:chain input-channel="metadataIngestNotifications" output-channel="nullChannel" >

    <int:service-activator id="addPublisherConfirmQueue"
        requires-reply="false"
        ref="publisherConfirmService"  
        method="addPublisherConfirmQueue" />

    <int:object-to-json-transformer id="transformObjectToJson" />

    <int-amqp:outbound-channel-adapter id="amqpOutboundChannelAdapter"
        amqp-template="rabbitTemplate"
        exchange-name="${productNotificationExchange}"
        confirm-ack-channel="publisherConfirms"
        confirm-nack-channel="publisherConfirms"
        mapped-request-headers="*"
        routing-key-expression="headers.routingKey"
        confirm-correlation-expression="headers.notificationId" />

</int:chain>

<int:service-activator id="waitForPublisherConfirm"
        input-channel="metadataIngestNotifications"
        output-channel="publisherConfirmed"
        requires-reply="true"
        ref="publisherConfirmService"  
        method="waitForPublisherConfirm"  />


<int:service-activator id="publisherConfirmReceiver" 
                       ref="publisherConfirmService" 
                       method="receivePublisherConfirm" 
                       input-channel="publisherConfirms" 
                       output-channel="nullChannel" />

public class PublisherConfirmService {

    private final Map<String, BlockingQueue<Boolean>> suspenders = new HashMap<>();

    public Message addPublisherConfirmQueue(@Header("notificationId") String id, Message m){
        LogManager.getLogger(this.getClass()).info("Adding publisher confirm queue.");
        BlockingQueue<Boolean> bq = new LinkedBlockingQueue<>();
        suspenders.put(id, bq);
        return m;
    }

    public boolean waitForPublisherConfirm(@Header("notificationId") String id) {
        LogManager.getLogger(this.getClass()).info("Waiting for publisher confirms for Notification: " + id);
        BlockingQueue<Boolean> bq = suspenders.get(id);
        try {
            Boolean result = bq.poll(5, TimeUnit.SECONDS);
            if(result == null){
                LogManager.getLogger(this.getClass()).error("The broker took too long to return a publisher confirm. NotificationId: " + id);
                return false;
            }else if(!result){
                LogManager.getLogger(this.getClass()).error("The publisher confirm indicated that the message was not confirmed. NotificationId: " + id);
                return false;
            }
        } catch (InterruptedException ex) {
            LogManager.getLogger(this.getClass()).error("Something went wrong polling for the publisher confirm for notificationId: " + id, ex);
            return false;
        }finally{
            suspenders.remove(id);
        }
        return true;
    }

    public void receivePublisherConfirm(String id, @Header(AmqpHeaders.PUBLISH_CONFIRM) boolean confirmed){
        LogManager.getLogger(this.getClass()).info("Received publisher confirm for Notification: " + id);
        if (suspenders.containsKey(id)){
            BlockingQueue<Boolean> bq = suspenders.get(id);
            bq.add(confirmed);
        }
    } 
}

共有1个答案

闻人嘉悦
2023-03-14

出于同样的目的,我们来看看聚合器解决方案怎么样?

<代码>

confirm ack channel必须是经过某种转换后将消息带到同一聚合器的内容,例如正确提取correlationKey等。

 类似资料:
  • 有没有办法更改发布者-确认每条消息?我们有一个接收消息并发布到RabbitMQ的Rest层。根据特定的消息属性,我们决定是否需要发布者确认。 有没有一种方法可以在发送消息时覆盖,发布者-确认?

  • 我正在创建一个API来发布和使用来自RabbitMQ的消息。在我当前的设计中,发布者将发出一个POST请求。我的API会将POST请求路由到Exchange。这样,发布者在发布时就不必知道服务器地址、exchange名称等。 现在消费者部分是我不确定如何继续的地方。 开始时不会排队。当一个新的消费者想要订阅一个主题时,那么我将创建一个队列并将其绑定到交换。我需要一些问题的答案- 一旦我为使用者创建

  • 我有一个将数据推送到kafka的endpoint。现在,我想分别在kafka写入成功或失败的情况下使用近似的状态代码2xx或5xx响应调用。代码片段是 现在的问题是,在执行ack或nack回调之前,endpoint正在使用状态代码进行响应。还尝试了的方法,但该方法返回void。因此,无法知道该消息是已确认还是未确认。

  • 提供者(Provider) 是一个连接以太坊网络的抽象,用与查询以太坊网络状态或者发送更改状态的交易。 EtherscanProvider 和 InfuraProvider 提供连接公开的第三方节点服务提供商,无需自己运行任何以太坊节点。 JsonRpcProvider 和 IpcProvider 允许连接到我们控制或可以访问的以太坊节点(包括主网,测试网,权威证明(PoA)节点或Ganache)

  • 我正在尝试让一个基本的节点oidc提供者应用程序作为我的密钥斗篷服务器的oidc提供者。 Keycoat正确链接到我的应用程序的登录页面。输入用户名和密码后,我会被正确地传送回keycoat。 但是,密钥斗篷显示“使用身份提供者进行身份验证时出现意外错误”。 编辑:我调整了keycoat日志级别,现在看到以下错误: 无法进行身份提供程序oauth回调:org.keycloak.broker.pro

  • 我有一个场景,其中我的提供者服务提供了3个功能。在消费者端,我有3个不同的类来为这3个功能编写测试。当生成pact文件时,它包含了具有相同的使用者-提供者组合的所有测试。到目前为止还不错。 示例: ConsumerTestClass1 ConsumerTestClass2 ConsumerTestClass3 所有定义以下协议 @pact(provider=provider,consumer=co