当前位置: 首页 > 知识库问答 >
问题:

我们可以使用Kafka Streams API对消息进行顺序处理吗?

皇甫飞宇
2023-03-14

我们有一些消息需要保持序列。我们已经决定将所有消息从一个特定的源发送到一个分区,这样就可以维护消息序列(多个源可以产生到同一个分区,但一个源不能产生到多个分区),并且我们将能够用它们的密钥标识每个源。

现在,我们需要使用这些消息并进行一些处理。我们对已消费的消息执行多个独立操作(例如,将它们存储在数据库中,转发它们等)。现在,我一直在考虑是使用Kafka Streams API还是消费者API来实现这一点。

注意:我不能有太多的主题(例如,我不能为每个源创建一个主题,因为源会很多)。虽然我可以按键对消息进行分组以标识源,但对于使用流,我需要的是键的消息顺序。

用例:我希望按顺序将这些消息提交到数据库中,并希望按顺序转发这些消息。

那么,如何使用Streams API按顺序处理消息呢?

共有1个答案

傅树
2023-03-14

我们决定将所有消息从一个特定的源发送到一个分区

Kafka保证在单个分区内的顺序,是根据它们到达代理的顺序,而不是通过任何其他字段或时间。所有Kafka客户端(消费者、流媒体、第三方库)都尊重这一事实。

不过,一般来说,如果将消息时间戳作为数据库插入事件的一部分,则可以按键分组,按时间戳排序。不过,这将取决于数据库。

如果您已经通过已知的源密钥进行分区,为什么不在流应用程序中按此进行筛选呢?否则,您将不得不使用消费者应用编程接口,因为它允许您分配特定的分区(处理器应用编程接口可能会使用,但没有使用)

 类似资料:
  • 但它不止一次地使用消息。有没有人面对过这个问题。此外,使用上述配置,使用者总是在一个批处理中只接收到一个消息。我尝试增加和,但没有任何影响。 在对ConcurrentKafkaListenerContainerFactory进行如下更改后,批处理配置的问题得到了解决: factory.getContainerProperties().SetackMode(org.springFramework.k

  • 我每个websocket接收几十条消息,这些消息可能只差几毫秒就能到达。我需要用操作来处理这些数据,这些操作有时会花费一些时间(例如,在DB中的插入)。为了处理接收到的新消息,必须完成对前一个消息的处理。 我的第一个想法是用Node.js Bull(用Redis)准备一个队列,但恐怕太长了,无法运行。这些消息的处理必须保持快速。 我尝试使用JS迭代器/生成器(直到现在我还从未使用过),我测试了如下

  • 我们可以使用页面对象执行滚动吗? 实际上,我需要根据元素滚动网页(向上/向下)。如何使用页面对象执行。 我使用Selenium web drive来启动浏览器,并使用Page对象来自动化web页面。 有什么建议吗???

  • 我有一个中间件位于两个JMS队列之间。它从一个数据库读取、处理一些数据到数据库中,然后写入另一个数据库。 这里有一个小图来描绘设计: 考虑到这一点,我有一些有趣的逻辑要集成到服务中。 null

  • 问题内容: 我们可以将其用于 (Android开放源代码项目) 开发吗,有没有办法做到这一点。我没有找到一个。有人对此有任何想法吗? 问题答案: 您可以将Android Studio用作AOSP的IDE,因为它只是InteliJ IDE的修改版本。 从AOSP根目录: 然后只需在Android Studio中“打开项目”,然后选择它生成的android.ipr。

  • 我已经在WSO2 ESB 4.7.0中使用transaction和CLIENT_ACKNOWLEDGE配置了Apache ActiveMQ。axis2.xml配置如下: