当前位置: 首页 > 知识库问答 >
问题:

Spring集成-Kafka消息驱动通道-自动确认

相俊迈
2023-03-14

我使用了spring io文档中列出的示例配置,它运行良好。

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        error-channel="errorChannel" />

然而,当我用下游应用程序测试它时,我从Kafka那里消费并将其发布到下游。如果下游关闭,则消息仍在使用中,不会重播。

或者说,在使用kafka主题后,如果我在service activator中发现一些异常,我还想抛出一些异常,这些异常应该回滚事务,以便可以重播kafka消息。

简而言之,如果消费应用程序有一些问题,那么我想回滚事务,这样消息就不会被自动确认,除非成功处理,否则会被一次又一次地重播。

共有1个答案

邹祺
2023-03-14

这不是Apache Kafka的工作方式。有类似于JMS的TX语义学。Kafka主题中的偏移与回弹或重新交付无关。

我建议您从他们的官方资源中更仔细地研究Apache Kafka。

Spring Kafka没有给常规Apache Kafka协议带来任何好处,但是您可以考虑使用Spring Kafka中的重试功能在本地重新传递相同的记录:http://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/_reference.html#_retrying_deliveries

是的,ack模式必须是MANUAL,不要在消耗后自动将偏移量提交到Kafka中。

 类似资料:
  • 试图调用不存在的方法。尝试是从以下位置进行的: 以下方法不存在: 3.如果您使用kafka版本2.5及以上版本,但却被2021-03-22 13:56:05.102-0400 org{local_sparta}WARN[data-pipeline,,,][DP-ACCOUNT][DPA][]annotationConfigServletWebServerApplicationContext:上下文

  • 我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方

  • 我使用的是< code>SpringXD,我的配置如下: Spring集成kafka 2.1.0.释放 kafka客户端0.10.0.1 Kafka0.10.x.x Spring-xd-1.3.1.释放 我的 xml 文件中有以下配置: 这是我用来启动/停止频道的Java类: 然后我创建了一个基本流来检查我发送到主题的一些消息是否通过 我检查了创建的文件,它包含我发送到 Kafka 主题的所有消息

  • 使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息 通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题 Spring XML配置 JUnit测试代码 测试用例成功,在调试时,我发现channel.send()返回true 我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。 bin/kaf

  • 我试图将从Quickfix读取消息(读取修复消息)配置到spring集成中。我知道我可以使用入站通道适配器从外部源(如QuickFix)读取数据。您能提供如何编写事件驱动入站通道适配器的示例吗?我有以下配置不起作用

  • 我的问题是,如果编写一个简单的REST API,就像文章描述的那样,执行由MongoDB支持的CRUD操作,并使用spring-boot-starter-data-mongodb-reactive,我可以调用消息驱动的API服务吗?我还可以添加一个Webclient来与一些下游服务进行对话。 在REST API上下文中驱动消息有意义吗?