我有一个web服务,它接收对象,通过AMQP发送通知,并向请求者返回JSON响应。每个请求都是在一个线程上执行的,我正在尝试实现publisher confirms,我正在努力解决如何设置它。我有它的工作,但我不喜欢我这样做。
我这样做的方式是:
这种技术确实可以正常工作,但我不喜欢假设该链将在来自发布-订阅通道的WaitForPublisherConfig服务激活器之前接收消息。在这种情况下,顺序关系到哪个组件首先接收消息。
如果waitForPublisher确认服务激活器首先接收到消息,它只会阻塞线程5秒钟,然后允许链通过amqp: outbind-Channel-适配器发送消息。
我尝试将waitForPublisherConders放在amqp: outbinding-Channel适配器之后,但是由于outbinding-Channel适配器不会“返回”任何东西,所以服务激活器永远不会在链中被调用。
我觉得应该有更好的方法。我的目标是等待publisher确认(或者在spring的publisher确认中找不到支持的超时),然后再向请求者发送响应。
您是否可以帮助我更好地制定解决方案,或者让我知道是否可以依赖发布-订阅频道的第一个订户始终首先接收消息这一事实。
对不起,这个太长了。
一些配置
<int:header-enricher input-channel="addHeaders" output-channel="metadataIngestNotifications">
<int:header name="routingKey" ref="routingKeyResolver" method="resolveReoutingKey"/>
<int:header name="notificationId" expression="payload.id" />
</int:header-enricher>
<int:chain input-channel="metadataIngestNotifications" output-channel="nullChannel" >
<int:service-activator id="addPublisherConfirmQueue"
requires-reply="false"
ref="publisherConfirmService"
method="addPublisherConfirmQueue" />
<int:object-to-json-transformer id="transformObjectToJson" />
<int-amqp:outbound-channel-adapter id="amqpOutboundChannelAdapter"
amqp-template="rabbitTemplate"
exchange-name="${productNotificationExchange}"
confirm-ack-channel="publisherConfirms"
confirm-nack-channel="publisherConfirms"
mapped-request-headers="*"
routing-key-expression="headers.routingKey"
confirm-correlation-expression="headers.notificationId" />
</int:chain>
<int:service-activator id="waitForPublisherConfirm"
input-channel="metadataIngestNotifications"
output-channel="publisherConfirmed"
requires-reply="true"
ref="publisherConfirmService"
method="waitForPublisherConfirm" />
<int:service-activator id="publisherConfirmReceiver"
ref="publisherConfirmService"
method="receivePublisherConfirm"
input-channel="publisherConfirms"
output-channel="nullChannel" />
班
public class PublisherConfirmService {
private final Map<String, BlockingQueue<Boolean>> suspenders = new HashMap<>();
public Message addPublisherConfirmQueue(@Header("notificationId") String id, Message m){
LogManager.getLogger(this.getClass()).info("Adding publisher confirm queue.");
BlockingQueue<Boolean> bq = new LinkedBlockingQueue<>();
suspenders.put(id, bq);
return m;
}
public boolean waitForPublisherConfirm(@Header("notificationId") String id) {
LogManager.getLogger(this.getClass()).info("Waiting for publisher confirms for Notification: " + id);
BlockingQueue<Boolean> bq = suspenders.get(id);
try {
Boolean result = bq.poll(5, TimeUnit.SECONDS);
if(result == null){
LogManager.getLogger(this.getClass()).error("The broker took too long to return a publisher confirm. NotificationId: " + id);
return false;
}else if(!result){
LogManager.getLogger(this.getClass()).error("The publisher confirm indicated that the message was not confirmed. NotificationId: " + id);
return false;
}
} catch (InterruptedException ex) {
LogManager.getLogger(this.getClass()).error("Something went wrong polling for the publisher confirm for notificationId: " + id, ex);
return false;
}finally{
suspenders.remove(id);
}
return true;
}
public void receivePublisherConfirm(String id, @Header(AmqpHeaders.PUBLISH_CONFIRM) boolean confirmed){
LogManager.getLogger(this.getClass()).info("Received publisher confirm for Notification: " + id);
if (suspenders.containsKey(id)){
BlockingQueue<Boolean> bq = suspenders.get(id);
bq.add(confirmed);
}
}
}
出于同样的目的,我们来看看聚合器解决方案怎么样?
<代码>
confirm ack channel
必须是经过某种转换后将消息带到同一聚合器的内容,例如正确提取correlationKey
等。
有没有办法更改发布者-确认每条消息?我们有一个接收消息并发布到RabbitMQ的Rest层。根据特定的消息属性,我们决定是否需要发布者确认。 有没有一种方法可以在发送消息时覆盖,发布者-确认?
我正在创建一个API来发布和使用来自RabbitMQ的消息。在我当前的设计中,发布者将发出一个POST请求。我的API会将POST请求路由到Exchange。这样,发布者在发布时就不必知道服务器地址、exchange名称等。 现在消费者部分是我不确定如何继续的地方。 开始时不会排队。当一个新的消费者想要订阅一个主题时,那么我将创建一个队列并将其绑定到交换。我需要一些问题的答案- 一旦我为使用者创建
我有一个将数据推送到kafka的endpoint。现在,我想分别在kafka写入成功或失败的情况下使用近似的状态代码2xx或5xx响应调用。代码片段是 现在的问题是,在执行ack或nack回调之前,endpoint正在使用状态代码进行响应。还尝试了的方法,但该方法返回void。因此,无法知道该消息是已确认还是未确认。
提供者(Provider) 是一个连接以太坊网络的抽象,用与查询以太坊网络状态或者发送更改状态的交易。 EtherscanProvider 和 InfuraProvider 提供连接公开的第三方节点服务提供商,无需自己运行任何以太坊节点。 JsonRpcProvider 和 IpcProvider 允许连接到我们控制或可以访问的以太坊节点(包括主网,测试网,权威证明(PoA)节点或Ganache)
我正在尝试让一个基本的节点oidc提供者应用程序作为我的密钥斗篷服务器的oidc提供者。 Keycoat正确链接到我的应用程序的登录页面。输入用户名和密码后,我会被正确地传送回keycoat。 但是,密钥斗篷显示“使用身份提供者进行身份验证时出现意外错误”。 编辑:我调整了keycoat日志级别,现在看到以下错误: 无法进行身份提供程序oauth回调:org.keycloak.broker.pro
我有一个场景,其中我的提供者服务提供了3个功能。在消费者端,我有3个不同的类来为这3个功能编写测试。当生成pact文件时,它包含了具有相同的使用者-提供者组合的所有测试。到目前为止还不错。 示例: ConsumerTestClass1 ConsumerTestClass2 ConsumerTestClass3 所有定义以下协议 @pact(provider=provider,consumer=co