当前位置: 首页 > 知识库问答 >
问题:

如何使用spring integration kafka确认消费者阅读的kafka消息

颜昕
2023-03-14

我们使用的是spring集成kafka版本3.1.2。RELEASE and int kafka:消息驱动的通道适配器,用于使用来自远程kafka主题的消息。生产者发送加密消息,我们使用反序列化器解密实际消息。我们可以使用主题中发布的所有消息。我们将自动提交用作false。我们想知道在成功处理消息后如何从我们的服务提交或确认消息。有人能帮助我们如何提交从消息驱动通道读取的消息并提供一些参考实现吗?

当我们将自动提交设置为true时,我们假设它将在提交间隔之后提交消息,但我们希望在我们的服务中处理它。我遇到了下面的示例,但我们在反序列化后收到了一个自定义对象,而不是spring集成消息。因此,我们想知道如何在转换器中实现类似的确认,以便在转换过程中出现任何错误时不会提交消息。成功转换后提交消息。

  Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 
  if(acknowledgment != null) { System.out.println("Acknowledgment provided");
  acknowledgment.acknowledge(); }
   }



<int-kafka:message-driven-channel-adapter
    id="kafkaMessageListener"
    listener-container="kafkaMessageContainer" auto-startup="true"
    phase="100" send-timeout="5000" mode="record"
    message-converter="messageConverter"
    recovery-callback="recoveryCallback" error-message-strategy="ems"
    channel="inputFromKafkaChannel" error-channel="errorChannel" />

<int:transformer id="transformerid"
    ref="transformerBean"
    input-channel="inputFromKafkaChannel" method="transform"
    output-channel="messageTransformer" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${spring.kafka.bootstrap-servers}" />
                    <entry key="enable.auto.commit" value="false" />
                    <entry key="auto.commit.interval.ms" value="100" />
                    <entry key="session.timeout.ms" value="15000" />
                    <entry key="group.id" value="${spring.kafka.consumer.group-id}" />
                    <entry key="key.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                    <entry key="value.deserializer"
                        value="com.test.CustomDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.ContainerProperties">
            <constructor-arg name="topics" value="${spring.kafka.topics}" />
        </bean>
    </constructor-arg>
</bean>

共有1个答案

孟胤
2023-03-14

auto.commit.offset=true表示kafka客户端库提交偏移量。

如果为false(对于Apache Kafka,首选Spring),默认情况下,侦听器容器在轮询()收到每个批之后提交偏移量,但机制由容器的AckMode属性控制。

请参见提交偏移。

如果您将容器的AckMode设置为MANUALMANUAL_IMMEDIATE,那么您的应用程序必须使用确认对象执行提交。

当使用Spring集成时,确认对象在KafkaHeaders中可用。ACKNOWLEDGENT标头。

在大多数情况下,AckMode。BATCH(默认)或AckMode。应该使用RECORD,并且您的应用程序不需要担心提交偏移量。

 类似资料:
  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

  • 我使用以下代码来读取主题的数据,即“sha-test2”,但它正在读取完全替代的代码行,即 10 行中的 20 行。但是当我运行控制台时,它显示所有 20 行。即.bin/kafka-console-consumer.sh --zookeeper 本地主机:2181 --主题 sha-test2 --从头 我做错了什么?非常感谢您的帮助。

  • 我看到一个问题,我的主题中的所有消息都被我的消费者重新阅读。我只有1个消费者,我在开发/测试时打开/关闭它。我注意到,有时在几天没有运行消费者之后,当我再次打开它时,它会突然重新阅读我的所有消息。 客户端 ID 和组 ID 始终保持不变。我显式调用提交同步,因为我的启用.我确实设置了 auto.offset.reset=最早,但据我所知,只有在服务器上删除了偏移量时,才应该启动。我正在使用 IBM

  • 我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow

  • 我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决

  • 我实现了一个Java使用者,它使用来自Kafka主题的消息,然后将这些消息与POST请求一起发送到REST API。 假设一个消息已经被使用,但是Java类未能到达REST API。消息将永远不会被传递,但它将被标记为已消耗。处理这类案件最好的办法是什么?当且仅当来自REST API的响应成功时,我能以某种方式确认消息吗?