我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。
我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。
到目前为止,我有两种解决方案,但都不是特别令人满意的。如有任何想法或想法,将不胜感激:
这似乎应该是一个常见的模式,所以我想知道其他人是如何解决这个问题的。
一种优雅的方法是在后端生产者中使用分区函数,并使用手动分区分配分配
为前端消费者只监听感兴趣的分区。
更详细的信息:
在前端生产者中,在将“请求”消息生成为“请求”主题之前,请将消息键设置为前端客户端id(它需要是唯一的)。
在后端使用者中,不需要手动分配分区,只需使用subscribe
订阅request
主题即可。但值得注意的是,当您收到“请求”消息并对其进行处理时,请不要丢失消息密钥,请保留它。因为它确定了请求的来源。
在后端producer中,当您完成请求过程后,您将生成一条要回复的响应消息,并将响应消息密钥设置为上面保留的前端客户端id。您还需要定义分区函数(散列函数,将客户机id映射到分区号)。使用分区函数执行send()
。
在前端消费者中,您需要使用assgin()
方法来监听特定的分区。但是如何知道应该听哪个分区呢?只需使用它的client-id(在同一个客户端将是相同的)和上面定义的相同哈希函数来计算您应该监听的分区号。
有时,如果可以通过将Kafka响应消息发送到Kafka连接器上,以便通过WebHooks、WebSocket、电子邮件或SMS文本消息直接向原始用户发送,则响应不必返回到原始请求应用程序...
如果您只是想做SOAP或REST风格的RPC,那么只需使用HTTP而不是Kafka,因为这是一种经过验证的模式。
请不要使用手动分配分区的消费者。它会变得非常混乱,很难扩展。
您可以使用每个前端使用者的主题,而不是分区。每个前端服务生成一条包含前端服务id的消息,发送到请求
主题。然后,后端使用该消息,并基于该id生成对特定唯一前端服务响应
主题的响应消息。如果您有固定数量的前端服务,它可能是一个很好的解决方案。可能的缺点是每次要添加新的前端服务时都会创建一个新主题。然而,它比手动分区分配更容易维护。
另一个可能的解决办法是使用不同的工具。如果Kafka不是强制性的,请重新考虑您的要求并进行研究。也许有一种工具比Kafka更适合你的需求。
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前
我正在尝试使用NodeJS从远程机器连接到远程Apache Kafka服务器。我无法从nodejs代码中生成所需的kafka主题的消息。我也无法消费任何数据从主题以及。 我使用的是Apache-kafka版本2.122.2.1和Java8。我也在使用节点版本8.11.0。我还启动了zookeeper服务器和kafka服务器。我在ubuntu机器上本地创建了一个主题和一个生产者和消费者,以检查apa
我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了但我不能使用它...我可以看到消息被使用但这不是希望的... 生产者代码 ProducerTX.Properties 消费者 感谢你的帮助..谢谢