当前位置: 首页 > 知识库问答 >
问题:

春云Kafka活页夹查询

秦博达
2023-03-14

我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件

  1. 使用者 - 使用消息
  2. 扩充 - 扩充使用的消息
  3. 制作人 - 已发布 向其他主题发送的丰富消息

我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOffSet设置为false。下面是我们正在做的方法

@StreamListener("INPUT")
@SendTo("OUTPUT")
public void consumer(Message message){
    String inputMessage = message.getPayload.toString();
    String enrichMessage = // Enrichment on inputMessage
    return enrichMessage;
}

我们观察到如果ack.acknowledge()由于某些问题而失败,消息仍然发送到出站通道。我们如何将整个消费者/生产者作为一个事务的一部分处理,以便如果确认失败消息将不会发送到主题。我也在下面设置了事务属性

    < Li > spring . cloud . stream . Kafka . binder . transaction . transactionid prefix = TX- < Li > spring . cloud . stream . Kafka . binder . transaction . producer . configuration . ack = all < Li > spring . cloud . stream . Kafka . binder . transaction . producer . configuration . retries = 1 < Li > spring . cloud . stream . Kafka . bindings . input . consumer . autocommit offset = true < Li > spring . cloud . stream . Kafka . bindings . input . consumer . enabled LQ = true < Li > spring . cloud . stream . Kafka . bindings . input . consumer . dlq name = error . topic < Li > spring . cloud . stream . Kafka . bindings . input . consumer . auto commit error = true

如果有任何可用的示例,那将是非常有帮助的。

干杯

共有1个答案

朱阳曜
2023-03-14

你得把活页夹做成交易的看看留档

https://docs . spring . io/spring-cloud-stream-binder-Kafka/docs/3 . 1 . 4/reference/html/spring-cloud-stream-binder-Kafka . html # _ Kafka _ binder _ properties

spring.cloud.stream.kafka.binder.transaction.transationId前缀

启用绑定程序中的事务。请参阅 Kafka 文档中的 transaction.id 和 spring-kafka 文档中的事务。启用事务后,将忽略单个生产者属性,所有生产者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

默认null(无事务)

请注意,必须使用< code > isolation . level = read _ committed 配置输出主题上的使用者,以避免接收回滚的记录。

 类似资料:
  • spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?

  • 如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?

  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重

  • 版本:Spring Boot: 1.4.2 .发布春云Deps:布里克斯顿。SR7 这是我的申请。处理器应用程序的属性。 当我启动此应用程序时,将按预期创建事件交换,并将其绑定到名为:events exchange的队列。eventconsumers组(也可以)。但routingKey始终为“#”。我已经尝试了从各种文档中找到的所有选项。我在这里遗漏了什么吗? 我希望这个应用程序只订阅某些消息(我

  • 我正在尝试通过SCSt频道构建并获取KTable。但这并不奏效。输入KTable没有数据,但如果我尝试查看KSTream聚合(toStream()),我可以看到一些数据。我明白了,KTable是不可查询的,它没有可查询的名称。 类别: 绑定: application.yml: