当前位置: 首页 > 知识库问答 >
问题:

在Kafka中寻求操作

黄聪
2023-03-14

我有一个场景做一些管理任务与Kafka。

现在管理员想,

例1:将app2的偏移量从当前偏移量位置(X)向前移动5个位置。对于此任务,是否应该停止app2并创建一个KafkaConsumer(不是spring),其组id与app2相同,即“Group2”,并提交新的偏移量x+5并启动app2?

案例2:将主题A的所有应用程序的偏移量从它们当前的偏移量位置(X)向前移动5个位置,假设所有应用程序都处于相同的偏移量X。对于这个任务,是否应该停止所有应用程序并创建一个具有空组id“”的KafkaConsumer,提交新的偏移量X+5并启动所有应用程序?或者对每个应用程序单独执行类似于案例-1的操作?

或者还有什么更好的方法来处理Kafka?

共有1个答案

姚麒
2023-03-14

ConsumerSeekAware对您没有帮助吗?

为了查找,您的侦听器必须实现ConsumerSeekAware,它具有以下方法:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

若要在运行时任意搜索,请使用RegisterSeekCallback对适当线程的回调引用。

https://docs.spring.io/spring-kafka/docs/2.0.1.release/reference/html/_reference.html#seek

 类似资料:
  • Project V 提供了多种方式进行交流。 Project V 团队支持中文和英文,请选择你所熟悉的语言来提问,以避免一些不必要的误会。管理员会以问题发起者使用的语言来回复;如果提问者使用了其它的语言,则以英文回复。 Github Issue 我们使用几个不同的仓库进行不同类型的讨论。 代码问题 仅用于讨论 V2Ray 的代码问题,比如 bug。 未来计划 常规讨论 Telegram 讨论组 P

  • 校验者: @片刻 翻译者: @X 项目邮件列表 如果您在使用 scikit 的过程中发现错误或者需要在说明文档中澄清的内容,可以随时通过 Mailing List 进行咨询。 机器学习从业者的 Q&A 社区 <colgroup><col class="field-name"> <col class="field-body"></colgroup> | Quora.com: | Quora有一个和机

  • 我想用kafka流实现请求-响应模式,我使用spring boot kafka,其中添加了一些数据作为报头,命名为关联id,但是当kafka流API处理请求消息时,报头数据会丢失,无法发送到响应主题!我该怎么解决,还是用另一种方法??

  • 我很抱歉,如果这是一个重复的帖子,但我一直在寻找答案,到目前为止什么也没有得到。 我需要的是在更改Kafka Streams中的键类型的映射操作期间指定serdes。原始KStream有一个字符串类型的键和avro(通用记录)值,但我需要将其重新映射到avro键和值。大致如下: 我相信我需要指定serde,因为类型正在改变,但是我在地图操作符上找不到方法。当从主题读取、分组或写回主题时,我们通常可

  • 还在纠结递归。我有一个代码,它应该让我得到最少的操作,以便从x到y。只有乘以2或加+1,例如从7到12...它的5次操作,因为你需要+1次。我的代码对我来说没有正确的工作,我无法找出我缺少了什么来保证它是正确的。

  • 本文向大家介绍请你说一说操作系统中的页表寻址相关面试题,主要包含被问及请你说一说操作系统中的页表寻址时的应答技巧和注意事项,需要的朋友参考一下 参考回答: 页式内存管理,内存分成固定长度的一个个页片。操作系统为每一个进程维护了一个从虚拟地址到物理地址的映射关系的数据结构,叫页表,页表的内容就是该进程的虚拟地址到物理地址的一个映射。页表中的每一项都记录了这个页的基地址。通过页表,由逻辑地址的高位部分