我有一个将数据推送到kafka的endpoint。现在,我想分别在kafka写入成功或失败的情况下使用近似的状态代码2xx或5xx响应调用。代码片段是
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(Message.of(price)
.withAck(() -> {
// Called when the message is acked
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
return CompletableFuture.completedFuture(null);
}));
// return appropriate response
return Response
}
}
现在的问题是,在执行ack或nack回调之前,endpoint正在使用状态代码进行响应。还尝试了MutinyEmitter
的sendAndAwait
方法,但该方法返回void。因此,无法知道该消息是已确认还是未确认。
这里最好的方法是链接异步操作,如下所示:
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Uni<Response> addPrice(Double price) {
return Uni.createFrom().completionStage(priceEmitter.send(price))
.onItem().transform(ignored -> Response.ok().entity("foo").build())
.onFailure().recoverWithItem(Response.serverError().build());
}
如果您想使用同步代码(我不推荐):
@Blocking
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Response addPrice(Double price) {
try {
Uni.createFrom().completionStage(priceEmitter.send(price))
.await().indefinitely();
return Response.ok().entity("foo").build();
} catch (Exception e) {
return Response.serverError().build();
}
}
这个等待()。如果Uni发出故障,infinally()将抛出异常。
您还可以选择直接使用发射器返回的CompletionStage,而无需将其转换为Uni,但请记住Quarkus选择Mutiny作为其默认反应框架。
确认登入状态 排列于联系人名单中,联系人的目前登入状态可依下述判别。 在线 已登入 Skype 。 脱机 目前未登入 Skype ,或刻意将登入状态设定为[隐形]或[脱机]。 Skype Me™ 已登入 Skype ,且接受未承认或陌生 Skype 联系人的电话。 暂时离开 虽已登入 Skype 但暂时未使用 Skype 。 离开 虽已登入 Skype 但已长时间未使用 Skype 。
我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将
我有一个web服务,它接收对象,通过AMQP发送通知,并向请求者返回JSON响应。每个请求都是在一个线程上执行的,我正在尝试实现publisher confirms,我正在努力解决如何设置它。我有它的工作,但我不喜欢我这样做。 我这样做的方式是: 在邮件上添加一些标题 拥有一个包含2个订阅者的发布-订阅频道 订户1)创建一个阻塞队列,使其准备就绪,并通过amqp发送消息 订户2)开始在该队列上拉动
我正在使用Twilio WhatsApp API。我有沙盒账户,我可以从Twilio向WhatsApp号码发送消息,但是,我面临一个问题,当消息不发送给收件人时,我从API获得的状态与我获得的成功相同。 如果需要,这里是我的代码。
我正在尝试使用Drools编写一个基于规则的引擎。规则如下: 规则“警报”:如果状态为“警报”,请立即发送通知。 规则“警告”:如果状态为“警告”,则将设备ID保存在内存中并等待5分钟。如果在5分钟内收到另一条具有相同设备ID和状态“已解决”的消息,则取消此规则。否则,将状态升级为Alarm并触发规则“Alarm”。 规则“已解决”:如果状态为“已解决”并且设备ID已在内存中,则清除该设备ID的“
我一直在考虑使用Apache Kafka作为事件源配置中的事件存储。发布的事件将与特定的资源相关联,传递到与资源类型相关联的主题,并按资源ID分片到分区中。因此,例如,创建类型为Folder和id 1的资源将产生一个FolderCreate事件,该事件将通过在主题中的分区总数中对id 1进行分片来传递到给定分区中的“Folders”主题。即使我不知道如何处理使日志不一致的并发事件。 最简单的场景是