当前位置: 首页 > 知识库问答 >
问题:

通过代码/api重置Kafka流应用程序

锺离边浩
2023-03-14

我想知道用Kafka Streams执行这种操作的最佳方法是什么。

我有一个 Kafka 流和一个 KGlobal 表,让我们说产品 (1.000.000 消息) 和类别逻辑表 (10 msg)。每当新消息到达主题类别LogicBlobTable时,我需要重新处理所有将新到达的消息应用于产品的产品,并且输出将转到第三个主题。

我在考虑使用Kafka . tools . streams reset逻辑,并以某种方式挂钩我的代码,以停止kafkaStream,运行reset并再次启动流。

第二种选择是没有kafka流,而只有两个消费者和一个生产者。通过这种方式,我可以使用方法消费者.seekToBegining(集合.emptyList());

共有1个答案

那开济
2023-03-14

在这种情况下,重置KafkaStreams应用程序将导致大量重复输出。假设流中有10条记录,表中有5条记录,在处理时会生成3条输出记录。现在,将第6条记录添加到表中,并重新读取整个流。因此,您将把前3条输出记录重新发送到输出主题,如果某些记录也连接到新添加的第6条表记录,则可能还会发送其他输出记录。这似乎不是你想要的。

我猜你需要手动使用KafkaConsumer/KafkaProducer。

 类似资料:
  • 最近,在一次采访中,我被问到一个关于Kafka流的问题,更具体地说,面试官想知道为什么/什么时候您会使用Kafka流DSL而不是普通的Kafka消费者API来读取和处理消息流?我不能给出一个令人信服的答案,我想知道使用这两种流处理风格的其他人是否可以分享他们的想法/意见。多谢了。

  • 我有一个活动应用程序设置为使用自我管理的连接服务,因为我们正在使用音频和视频,并且希望能够利用系统。然而,关于我们关闭连接或可能更改音频流的方式的一些事情正在引起一个问题,我将尽我所能在这里描述。 当我开始我们的应用程序的通话时,一切都按照我们想要的方式工作,它在免提电话中启动,但对免提电话按钮的按钮按下反应良好,音频效果很好!然而,当通话结束时,我的手机陷入了一种模式,任何通知都不会通过扬声器播

  • 在服务器端,我正在使用一个HTTP API,它以页面形式返回结果。如中所示,响应包含x个结果,如果有超过0的结果,我可以再次调用它以获得下一个x个结果。x可以任意选择直到API的最大页面大小。 现在,我想要在WebSocket上高效地流式传输全套结果,而不会使它不堪重负(施加背压)。最初,我构建了整个resultset,然后从中创建了一个源代码: 这样可以工作,WebSocket客户机以其最大速度

  • 同样以 blog应用为例 1.在api目录下创建blog目录 blog结构: ├─api 应用目录 │ ├─blog 应用目录 │ │ ├─controller 控制器目录 │ │ ├─lang 多语言包(可选) │ │ ├─logic 逻辑层目录(可选) │ │

  • 其中一个Kafka流应用程序在Kafka代理和消费者端产生了大量未知生产者ID错误。 流配置如下: 消费者方面的错误: 这背后的原因是什么?

  • 或者可以在controller创建之后,使用configure API来配置controller,具体使用方式如下所示: controller.configure(configs); configure API接受的参数和通过构造函数配置配置一样,可以查看 配置参数表 了解各个配置参数的详细解释。