当前位置: 首页 > 知识库问答 >
问题:

匹配Kafka消费者和生产者分区

濮阳安澜
2023-03-14

我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。

我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。

到目前为止,我有两种解决方案,但都不是特别令人满意的。如有任何想法或想法,将不胜感激:

  1. 让每个前端决定它将侦听哪个分区,并将该分区与消息一起传递给“请求”主题。当后端上的处理完成时,它将查看消息的分区成员并推送到适当的分区。这里一个紧迫的问题是如何协调前端服务,以便在每个分区上都有一个均匀的分布(随机分配?)
  2. 每个消息都有一个相关ID和GUID,因此对于发送到前端的每个html" target="_blank">请求,我们可以通过将GUID哈希到分区总数来开始侦听分区,然后将消息推送到“请求”主题。然后,后端将查看相关ID以确定要推送到的适当分区。这里的一个问题是,对于传入的每个请求,前端必须在新分区上建立一个新的使用者(这里有开销吗?)并且可能在同一分区上有多个活动使用者,以及跨多个分区的多个活动使用者
  3. 让一个消费者组拥有相同数量的消费者和分区,然后使用与(1)类似的方法,但允许Kafka处理哪个消费者在哪个分区上。但是我们需要弄清楚当重新平衡发生时会发生什么,特别是对于已经在后端传输的消息(因为所有分区都可能发生变化?)

这似乎应该是一个常见的模式,所以我想知道其他人是如何解决这个问题的。

共有3个答案

白文彬
2023-03-14

一种优雅的方法是在后端生产者中使用分区函数,并使用手动分区分配分配为前端消费者只监听感兴趣的分区。

更详细的信息:

在前端生产者中,在将“请求”消息生成为“请求”主题之前,请将消息键设置为前端客户端id(它需要是唯一的)。

在后端使用者中,不需要手动分配分区,只需使用subscribe订阅request主题即可。但值得注意的是,当您收到“请求”消息并对其进行处理时,请不要丢失消息密钥,请保留它。因为它确定了请求的来源。

在后端producer中,当您完成请求过程后,您将生成一条要回复的响应消息,并将响应消息密钥设置为上面保留的前端客户端id。您还需要定义分区函数(散列函数,将客户机id映射到分区号)。使用分区函数执行send()

在前端消费者中,您需要使用assgin()方法来监听特定的分区。但是如何知道应该听哪个分区呢?只需使用它的client-id(在同一个客户端将是相同的)和上面定义的相同哈希函数来计算您应该监听的分区号。

谷梁鸣
2023-03-14

有时,如果可以通过将Kafka响应消息发送到Kafka连接器上,以便通过WebHooks、WebSocket、电子邮件或SMS文本消息直接向原始用户发送,则响应不必返回到原始请求应用程序...

如果您只是想做SOAP或REST风格的RPC,那么只需使用HTTP而不是Kafka,因为这是一种经过验证的模式。

翟俊哲
2023-03-14

请不要使用手动分配分区的消费者。它会变得非常混乱,很难扩展。

您可以使用每个前端使用者的主题,而不是分区。每个前端服务生成一条包含前端服务id的消息,发送到请求主题。然后,后端使用该消息,并基于该id生成对特定唯一前端服务响应主题的响应消息。如果您有固定数量的前端服务,它可能是一个很好的解决方案。可能的缺点是每次要添加新的前端服务时都会创建一个新主题。然而,它比手动分区分配更容易维护。

另一个可能的解决办法是使用不同的工具。如果Kafka不是强制性的,请重新考虑您的要求并进行研究。也许有一种工具比Kafka更适合你的需求。

 类似资料:
  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 我正在尝试使用NodeJS从远程机器连接到远程Apache Kafka服务器。我无法从nodejs代码中生成所需的kafka主题的消息。我也无法消费任何数据从主题以及。 我使用的是Apache-kafka版本2.122.2.1和Java8。我也在使用节点版本8.11.0。我还启动了zookeeper服务器和kafka服务器。我在ubuntu机器上本地创建了一个主题和一个生产者和消费者,以检查apa

  • 我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了但我不能使用它...我可以看到消息被使用但这不是希望的... 生产者代码 ProducerTX.Properties 消费者 感谢你的帮助..谢谢