当前位置: 首页 > 知识库问答 >
问题:

Quakus/Smallrye反应性kafka-来自消息的endpoint成功/失败响应

牛景同
2023-03-14

我希望使用动态接受主题作为查询参数的成功/失败响应来响应RESTendpoint。在带有小型反应式消息传递的Quakus中,代码看起来就像下面用OutgoingKafkaRecordMetadata包装有效负载一样

即https://myendpoint/PublishToKafka?主题=myDynamicTopic

@Channel("test")
Emitter<byte []> kafkaEmitter;

@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){

    kafkaEmitter.send(Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
            .withKey("my-key")
            .withTopic("myDynamicTopic")
            .build()));
    
}

从Quarkus doco“如果endpoint没有返回CompletionStage,HTTP响应可能会在消息发送到Kafka之前写入,因此不会向用户报告失败。”这里的示例描述了当您直接发送有效负载(即发射器)时的这个过程。send(有效负载)返回CompletionStage但不返回发射器。send(message)返回void),但这需要提前配置主题。是否可以使用消息指定元数据,并且仍然使用成功/失败响应来响应调用客户端?(我不介意是使用Emitter和CompletionStage,还是使用CommunityEmitter和Uni)。

如有任何建议,将不胜感激。

共有1个答案

仲绍晖
2023-03-14

因为你使用了一条消息(因为你需要指定主题),所以你需要一些更复杂的东西:

@Channel("test")
Emitter<byte []> kafkaEmitter;

@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){
    CompletableFuture<Void> future = new CompletableFuture<>();
    Message<byte[]> message = Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata. 
           <String>builder()
            .withKey("my-key")
            .withTopic("myDynamicTopic")
            .build()));
    message = message.withAck(() -> {
         future.complete(null));
         return CompleteableFuture.completedFuture(null);
    }
     .withNack(t -> {
       future.completeExceptionnaly(t));
       return CompleteableFuture.completedFuture(null);
    });
    kafkaEmitter.send(message);
    return future;    
}

在这个片段中,我还附加了在消息被确认(被代理接受)或被拒绝(发生了错误)时调用的ack和nack处理程序。

这些回调将报告给future,这是在方法中创建的一个完整的future。这是要返回的对象,因为它将执行您想要的操作:指示结果。

我知道回调有点复杂。这主要是由于规范:我们必须返回CompleteableFuture.completedFuture(...);以确认nack进程成功。如果我们返回未来;而不是(我们已将其设置为future.completeExceptionnaly(t));),这将被解释为nack进程期间的失败。这基本上相当于命令式世界中catch-块中的抛出

幸运的是,一个更简单的版本很快就会出现(别担心,我们不会崩溃)。

 类似资料:
  • 我正在使用SmallRye与Kafka的反应式消息传递,以及Confluent Registry和AVRO。正如在本博客中所解释的,它运行良好https://quarkus.io/blog/kafka-avro/但在与博客相关的源代码中,它在本机编译中似乎不起作用:https://github.com/cescoffier/quarkus-kafka-and-avro 我的环境(Avro 1.10

  • 我的处理来自的消息。周期性地,按摩无法处理,消费者抛出异常。不管怎样,消费者还是会做出补偿。在Kafka中,我能区分成功消息和失败消息吗?我想,我不能。这是真的吗?如果这是真的,我有一个主要问题: 如何重试失败消息?我知道一些方法,但我不确定它们是否正确。 1) 将“偏移”更改为“提前”。但通过这种方式,成功消息也会重试。 2) 当我捕捉到异常时,我会将此消息发送到另一个主题(例如错误主题)。但这

  • 我使用Firebase云消息向我的Android客户端应用发送通知,每个通知都应该根据其注册令牌发送到单个设备。 每次我通过https://fcm.googleapis.com/fcm/send,我收到一个JSON响应,如下所示: 我发现和是多余的——它们不是同一个意思吗?我应该检查两个以确保一切正常吗??

  • 我想使用这个扩展:[Quarkus Smallrye响应消息Kafka] 但是在我的应用程序中,主题的名称是事先不知道的,它是根据运行时从用户那里收到的消息指定的。如何在没有注释的情况下以编程方式指定主题名称和与主题相关的设置?(仅用于向Kafka发送消息- 或者这些配置应该在运行时以编程方式设置 因为我不认识路,所以我用了本地的Kafka驱动程序

  • 我遵循以下指南https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2/testing/testing.html在没有kafka代理的情况下进行测试。我已设置了以下QuarKustestResource:

  • 问题内容: 我可以使用以下两种方式之一编写断言消息。说明成功: 或说明被破坏的条件: JUnit中是否专门为此提供标准?如果没有,双方的论点是什么? PS:我在网上看到的文章都在没有说明的情况下进行了演示,因此仅说“搜索Google”是无法解决的! [更新] 每个人都对我用过的事实感到困惑,因此该消息可能毫无用处。但这当然只是因为我想简单地说明这个问题。 因此,想象一下它是: 消息有用的地方。 问