当前位置: 首页 > 知识库问答 >
问题:

通过在每次应用程序重新启动时创建新的使用者组来重置主题偏移量

呼延高超
2023-03-14

我有一个Kafka主题和一个消费者,在Spring云应用程序中分配了一个消费者组(必须)。作为一项要求,在每次应用程序重启时,我都需要从一开始就开始读取所有接收到的消息。这应该是通过resetOffsets属性实现的,但从这个问题可以清楚地看出,它目前不起作用。

我在kafka消费者api中发现了这个变通方法,它建议在每次重启时为消费者组分配一个新的随机名称,作为从最早开始阅读的一种方式。在Spring Cloud Stream中是否可能/推荐?如何为消费者组定义动态名称?

共有3个答案

毛正浩
2023-03-14

在SpringKafka中,如果您正在配置ConsumerVía配置文件,例如应用程序。yaml(而不是以编程方式)可以通过SpEL(spring表达语言)实现这一点,从而在每个执行偏移量上提供基于UUID的使用者组

# consume-all-configuration
auto-offset-reset: earliest
group: consumer-local-#{ T(java.util.UUID).randomUUID().toString() }
袁奇玮
2023-03-14

如果每次都要求应用程序从头开始重新启动,则有几个选项:

>

  • 在使用kafka-consumer-groups.sh工具重新启动应用程序之前,您可以将提交的偏移重置为最早的kafka.admin.消费者组ommand.scala

    重新启动后,应用程序可以查找到起始位置并手动提交偏移量0。如果您有auto。抵消重置设置为最早即使0不是有效的偏移量,它也会从头开始重新启动。

    您可以使用不同的消费者组。每次的id值。在消费者配置bean中,在属性对象中插入如下内容:

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    

    最后,您是否使用委员会补偿?如果没有,只需禁用enable。汽车提交则应用程序将始终遵循自动。抵消重置设置。

    选项1和2通常是首选选项,因为它们保持一致的组。id允许轻松将使用者实例添加到组并监视组。

  • 史商震
    2023-03-14

    是的,它也适用于SCSt,但是,正如您所说,设置随机组id有点棘手,尽管您可以在启动SpringApplication之前将其设置为System.property

    如果您直接使用spring kafka,那么很容易,只需实现ConsumerSekAware,在分配分区时,您就可以seektobegining

    但是,使用SCSt,您无法直接访问侦听器。

    一种解决方法是,在启动SpringApplication之前,通过创建具有相同组id的消费者手动执行搜索。不过,如果你有多个应用实例,这会有点棘手,因为你每次可能会得到不同的分区。

    我们将再次考虑解决这个问题(我只是对此发表了评论)。

     类似资料:
    • 我使用的是0.10.1.1 API的高级使用者。 奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。 我在代码中只有一个提交点。 一个分区的示例: 关机前偏移量:3107169023 分区分配时的偏移量:3107180350

    • 我的要求是,当应用程序无法处理从kafka主题的当前偏移量读取的消息时,重置kafka话题的偏移量,该偏移量是通过Spring Bootjava应用程序使用的。手动重置偏移量或发送否定确认后,需要通过spring boot java应用程序的kafka消费者从未提交的偏移量值再次轮询消息。这能实现吗?

    • 我有一个Spring Cloud Stream Kafka Stream应用程序,它读取主题(事件)并执行一个简单的处理: 该应用程序使用来自Confluent Cloud的Kafka环境,带有6个分区的事件主题。完整的配置是: 首先,它显示还原使用者客户端的创建。自动偏移复位无: > 配置了两个消费者的原因是什么? 为什么第二个函数具有,而我没有显式配置它,而且Kafka的默认值是最新的? 我已

    • 我尝试将auto.offset.reset设置为最早和最晚,但这不会更改行为。 我在消费者配置中遗漏了什么吗?

    • 我对Kafka0.11.0.0有意见 在Kafka0.10.2.1中我对此没有任何问题。我只在0.11.0.0版本中遇到这个问题。 我的使用者将auto.offset.reset设置为最早,而auto commit设置为false,因为我是手动提交的。Kafka数据存储在具有必要权限的非TMP目录中。broker配置的其余部分为默认配置。 我需要0.11.0.0版本的事务。我不知道问题出在哪里。这

    • 我正在尝试发布我的第一个Android应用程序,以发送给一些测试人员。但是,我遇到了一个问题。当您退出应用程序,然后通过其图标启动它重新进入它时,它会重新启动整个应用程序,而不是返回到其以前的位置。即使您在退出后立即重新进入,也会发生这种情况。但是,如果我按住主屏幕按钮并通过最近的应用程序列表启动它,则不会发生这种情况。 我在网上搜索过其他有这个问题的人,只有少数人,但没有人能确切回答为什么会发生