当前位置: 首页 > 知识库问答 >
问题:

Quarkus-基于Kafka写入确认和nack的状态响应

劳星晖
2023-03-14

我有一个将数据推送到kafka的endpoint。现在,我想分别在kafka写入成功或失败的情况下使用近似的状态代码2xx或5xx响应调用。代码片段是

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
         // return appropriate response
          return Response
    }
}

现在的问题是,在执行ack或nack回调之前,endpoint正在使用状态代码进行响应。还尝试了MutinyEmittersendAndAwait方法,但该方法返回void。因此,无法知道该消息是已确认还是未确认。

共有1个答案

潘灵均
2023-03-14

这里最好的方法是链接异步操作,如下所示:

@POST
@Consumes(MediaType.TEXT_PLAIN)
public Uni<Response> addPrice(Double price) {
    return Uni.createFrom().completionStage(priceEmitter.send(price))
            .onItem().transform(ignored -> Response.ok().entity("foo").build())
            .onFailure().recoverWithItem(Response.serverError().build());
}

如果您想使用同步代码(我不推荐):

@Blocking
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Response addPrice(Double price) {
    try {
        Uni.createFrom().completionStage(priceEmitter.send(price))
                .await().indefinitely();

        return Response.ok().entity("foo").build();
    } catch (Exception e) {
        return Response.serverError().build();
    }
}

这个等待()。如果Uni发出故障,infinally()将抛出异常。

您还可以选择直接使用发射器返回的CompletionStage,而无需将其转换为Uni,但请记住Quarkus选择Mutiny作为其默认反应框架。

 类似资料:
  • 确认登入状态     排列于联系人名单中,联系人的目前登入状态可依下述判别。 在线 已登入 Skype 。 脱机 目前未登入 Skype ,或刻意将登入状态设定为[隐形]或[脱机]。 Skype Me™ 已登入 Skype ,且接受未承认或陌生 Skype 联系人的电话。 暂时离开 虽已登入 Skype 但暂时未使用  Skype 。 离开 虽已登入 Skype 但已长时间未使用  Skype 。

  • 我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将

  • 我有一个web服务,它接收对象,通过AMQP发送通知,并向请求者返回JSON响应。每个请求都是在一个线程上执行的,我正在尝试实现publisher confirms,我正在努力解决如何设置它。我有它的工作,但我不喜欢我这样做。 我这样做的方式是: 在邮件上添加一些标题 拥有一个包含2个订阅者的发布-订阅频道 订户1)创建一个阻塞队列,使其准备就绪,并通过amqp发送消息 订户2)开始在该队列上拉动

  • 我正在使用Twilio WhatsApp API。我有沙盒账户,我可以从Twilio向WhatsApp号码发送消息,但是,我面临一个问题,当消息不发送给收件人时,我从API获得的状态与我获得的成功相同。 如果需要,这里是我的代码。

  • 我正在尝试使用Drools编写一个基于规则的引擎。规则如下: 规则“警报”:如果状态为“警报”,请立即发送通知。 规则“警告”:如果状态为“警告”,则将设备ID保存在内存中并等待5分钟。如果在5分钟内收到另一条具有相同设备ID和状态“已解决”的消息,则取消此规则。否则,将状态升级为Alarm并触发规则“Alarm”。 规则“已解决”:如果状态为“已解决”并且设备ID已在内存中,则清除该设备ID的“

  • 我一直在考虑使用Apache Kafka作为事件源配置中的事件存储。发布的事件将与特定的资源相关联,传递到与资源类型相关联的主题,并按资源ID分片到分区中。因此,例如,创建类型为Folder和id 1的资源将产生一个FolderCreate事件,该事件将通过在主题中的分区总数中对id 1进行分片来传递到给定分区中的“Folders”主题。即使我不知道如何处理使日志不一致的并发事件。 最简单的场景是