当前位置: 首页 > 知识库问答 >
问题:

有没有办法在死信发布恢复中找到消费者群体?

柳宪
2023-03-14

我有一个项目,使用了DLQ方法,在< code>@KafkaListener的任何异常中,错误将被发送到一个具有< code>error-结构的主题

这样,我们可以重试特定使用者组的消息。消息处理程序从主主题和重试主题中读取。

由于项目中有多个侦听器都使用相同的容器,因此我们将主题(主侦听器、错误侦听器和重试)放在应用程序属性中,并配置了 DeadLetter发布恢复程序以查找相应的 DLQ(注意,这是 Kotlin):

val recoverer = DeadLetterPublishingRecoverer(template) { record, _ ->
    val dlq = kafkaProperties.topics.values.firstOrNull { it.main == record.topic() || it.retry == record.topic() }!!.dlq

    return@DeadLetterPublishingRecoverer TopicPartition(dlq, -1)
}

这一切都很好。但是,现在我想添加另一个@KafkaListener,该@KafkaListener与另一个听众听同一主题。因此,我需要为这个侦听器提供一个单独的使用者组。

但是,如果我使用与上面相同的逻辑来查找相应的DLQ主题,那么两个侦听器中的一个将使用另一个的DLQ,因为它在这里没有查看消费者组。

所以我的问题是:在Spring Kafka中,是否有任何方法可以找到错误发生在哪个消费者组(即<code>groupId</code>)中?我看过文档和源代码,但我自己找不到。感谢任何帮助。

共有1个答案

颛孙国源
2023-03-14

使用者的< code>group.id可通过调用< code > kafkautils . get groupid()获得(当容器线程启动时,它存储在< code>ThreadLocal中)。

 类似资料:
  • 我相信这三种类型的确认由于生产者属性仅限于领导者和生产者,我希望生产者在消费者通过kafka broker消费来自存储/队列的消息时收到具体的消息。还请纠正我,如果我在制作人的“acks”属性上有错误,它的默认值是“-1”,它确认所有副本是否已接收/存储消息,但它是否与消费者有关,或者我们是否可以在消费者提交且Kafka向制作人发送确认时创建一个桥梁?

  • 我正在尝试使用Kafka-Python编写一个消费者,以确保精确的once语义。分区中的消息是使用事务感知生成器生成的。我从Kafka文档中了解到,我应该将指定为,这样它将只读取提交的消息。问题是,我在Python客户机的文档中没有看到关于如何指定的任何地方。关于如何让我的消费者只阅读提交的消息,有什么想法吗? 预期结果:只需获取transation committed消息实际结果:使用者甚至读取

  • 我想从Kafka的主题消费事件后,他们到达的时间。我希望使用事件的时间在消息的有效负载中。在Kafka那里有可能实现那样的事情吗?它的缺点是什么? 实际示例:一条消息M在12:10产生,在12:11到达我的Kafka主题,我希望消费者在12:41(到达后30分钟)轮询它

  • 例如,分区有1-10的偏移量。我只想从3-8消费。在消耗了第8条消息后,程序应该退出。

  • 现在,我有一个Spring Boot CLI应用程序,当应用程序启动时,它会自动启动Kafka消费者。我的任务是更新提供API的应用程序,允许在特定条件下启动或停止Kafka消费者。所以,我将使用SpringBootStarterWeb创建该API。但我找不到一种方法来手动管理消费过程。我需要的是 在不使用消费者的情况下启动API 关于如何手动管理消费过程的任何建议? 技术细节: 用于创建侦听器

  • 当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了