当前位置: 首页 > 知识库问答 >
问题:

在处理来自 Kafka 的消息时避免数据丢失

阴宏爽
2023-03-14

寻找设计我的Kafka消费者的最佳方法。基本上,我想看看什么是避免数据丢失的最佳方法,以防在处理消息期间出现任何异常/错误。

我的用例如下。

a)我使用SERVICE来处理消息的原因是 - 将来我计划编写一个ERROR处理器应用程序,该应用程序将在一天结束时运行,它将尝试再次处理失败的消息(不是所有消息,而是由于任何依赖项(如父级缺失)而失败的消息)。

b)我想确保没有消息丢失,所以我会将消息保存到一个文件中,以防在将消息保存到DB时出现任何问题。

c)在生产环境中,可能有多个消费者和服务实例在运行,因此多个应用程序尝试写入同一个文件的可能性很高。

Q-1)写入文件是避免数据丢失的唯一选择吗?

Q-2)如果这是唯一的选择,如何确保多个应用程序写入同一文件并同时读取?请考虑将来一旦构建错误处理器,它可能正在读取同一文件中的消息,而另一个应用程序正在尝试写入该文件。

ERROR PROCESSOR-我们的源遵循事件驱动的机制,并且很有可能有时依赖事件(例如,某物的父实体)可能会延迟几天。因此,在这种情况下,我希望我的ERROR PROCESSOR能够多次处理相同的消息。

共有2个答案

别峻
2023-03-14

如果在写入数据库之前不提交消耗的消息,那么在Kafka保留消息时不会丢失任何内容。这样做的折衷是,如果消费者确实提交了数据库,但Kafka偏移提交失败或超时,您将再次消费记录,并且可能在您的服务中处理重复数据

即使你写了一个文件,你也不能保证排序,除非你打开每个分区的文件,并确保所有消费者只在一台机器上运行(因为你在那里保留状态,这不是容错的)。重复数据删除也需要处理。

此外,您可以查看Kafka Connect框架,而不是将自己的消费者写入数据库。为了验证消息,您可以类似地部署Kafka Streams应用程序,将输入主题中的不良消息过滤到发送到数据库的主题中

邹弘
2023-03-14

我以前遇到过类似的事情。所以,直接进入你的问题:

> < li>

不一定,您也许可以在一个新的主题(比如- error-topic)中将这些消息发送回Kafka。因此,当您的错误处理器准备就绪时,它可以监听this error-topic并在这些消息到来时使用它们。

我认为这个问题已经在对第一个问题的答复中得到了解决。因此,Kafka可能是一个更好的选择,因为它是为解决这类问题而设计的,而不是使用一个文件来同时读写和打开多个文件句柄。

注意:以下几点只是基于我对你的问题领域的有限理解,值得思考的。所以,你可以选择安全地忽略这一点。

关于< code>service组件的设计,还有一点值得考虑——您还可以考虑通过将所有错误消息发送回Kafka来合并第4点和第5点。这将使您能够以一致的方式处理所有错误消息,而不是将一些消息放在error DB中,一些放在Kafka中。

编辑:基于关于ERROR PROCESSOR要求的附加信息,这里是解决方案设计的图解表示。

我故意将ERROR PROCESSOR的输出保持为抽象,只是为了保持它的通用性。

希望这有帮助!

 类似资料:
  • 本文向大家介绍RabbitMQ 怎么避免消息丢失?相关面试题,主要包含被问及RabbitMQ 怎么避免消息丢失?时的应答技巧和注意事项,需要的朋友参考一下 把消息持久化磁盘,保证服务器重启消息不丢失。 每个集群中至少有一个物理磁盘,保证消息落入磁盘。

  • 所以我和我的Kafka消费者之间有了一些恼人的矛盾。我使用“Kafka节点”为我的项目。我创造了一个话题。在一个使用者组中通过2台服务器创建了2个使用者。自动提交设置为false。对于我的消费者获得的每一个mesaage,他们会启动一个异步进程,该进程可能需要1~20秒,当进程完成时,消费者会提交偏移量。我的问题是:在一个senarios中,消费者1得到一个消息,需要20秒来处理。在过程中间,他得

  • 如果Spring集成通道是用任务执行器定义的,那么线程池将用于处理传入的消息。如果service activator或transformerendpoint组件从该内部通道接收消息,是否会实例化一个endpoint组件池,每个线程一个?如果这不是默认行为,那么需要什么配置来实现这一点? 这一点很重要,原因有二: > 以确保endpoint组件在内部通道使用的同一线程中处理消息,因此它们是同一事务的

  • 在这两种情况下,当应用程序处于前台时,消息是在扩展FirebaseMessagingService的服务中接收的,在onMessageReceived中,我们可以通过包中的自定义参数过滤请求,但当应用程序处于后台,消息是从控制台发送的时,接收器不会被调用,推送消息会以某种方式添加。 是否可以处理此请求?

  • 我正在开发一个windows应用程序,它以600Hz的频率从传感器接收数据。在五分之二的情况下,我的IO线程成功地从传感器读取4字节的数据,并将其传递给GUI线程。 问题是五次中有三次,QSerialPort有无法解释的超时,其中QSerialPort的waitForReadyRead()返回false和serial。errorString()有超时错误。在这种情况下,它将永远不会读取数据。如果我

  • 鉴于以下情况: 我在本地启动zookeeper和单个kafka代理,并创建“测试”主题,如kafka快速入门中所述:https://kafka.apache.org/quickstart 然后,我运行一个简单的java程序,该程序每秒向“测试”主题生成一条消息。一段时间后,我关闭了本地的kafka代理,看到制作人继续生成消息,它没有抛出任何异常。最后,我再次启动kafka broker,produ