当前位置: 首页 > 知识库问答 >
问题:

在Spring Cloud Stream中获得底层Kafka消费者和生产者

莫河
2023-03-14

我有一个用例,希望在Spring云流应用程序中获得底层的Kafka生产者(KafkaTemplate)。在浏览代码时,我偶然发现了KafkaProducerMessageHandler,它有一个getKafkaTemplate方法。然而,它无法自动接线。

此外,如果我直接自动连接KafkaTemplate,模板将使用默认属性初始化,它将忽略SCSt配置的绑定器中的代理

如何在Spring Cloud Stream应用程序中访问底层的KafkaTemplate或生产者/消费者?

编辑:实际上,我的SCSt应用程序有多个Kafka活页夹,我想获得每个活页夹对应的KafkaTemplate或Kafka制作人。这有可能吗?


共有1个答案

段干麒
2023-03-14

不完全清楚您为什么需要这样做,但是您可以通过向应用程序上下文添加producerMessageHandlerCustomzer@Bean来捕获KafkaTem板

 类似资料:
  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?

  • Quarkus的Kafka-Streams扩展提供了一种便捷的管道启动方式。必须在包含项目的< code > application . properties 文件中插入流应用程序的必要配置选项,例如< code > quar kus . Kafka-streams . bootstrap-servers = localhost:9092 。 Quarkus还为更精细的配置提供了通过选项。文件指出

  • 我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要