Quarkus的Kafka-Streams扩展提供了一种便捷的管道启动方式。必须在包含项目的< code > application . properties 文件中插入流应用程序的必要配置选项,例如< code > quar kus . Kafka-streams . bootstrap-servers = localhost:9092 。
Quarkus还为更精细的配置提供了通过选项。文件指出:
kafka-streams名称空间中的所有属性都按原样传递给Kafka Streams引擎。更改它们的值需要重新构建应用程序。
这样,我们就可以传递一个定制的时间戳提取器(或者任何其他与流配置相关的配置属性)
...
kafka-streams.default.timestamp.extractor=my.custom.extractor.class
...
在启动时,Quarkus 扩展会打印出此配置属性未知,但是我们可以看到它已正确传递,因为 my.custom.extractor.class
显示在流应用程序的自动打印配置中。
我们如何将选项传递给底层Kafka生产者和消费者,并验证其使用?例如,我想更改底层生产者的<code>max.request。size和消费者的max.partition.fetch。字节
属性。
本机 Kafka 流和旧的Quarkus方法允许我直接在代码中更改传递的配置对象,例如,
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), 6000000);
IMO这在Quarkusland是脏的,我想通过< code > application . properties 文件传递所有选项。
您可以使用消费者和生产者的标准配置传递它,前缀为kafka-stream:
kafka-streams.consumer.$property
kafka-streams.producer.$property
检查-
用例如下。我在Java代码中的许多对象实例上传递生产者或消费者引用。在其中一些地方,我想对Kafka的配置进行一些检查。这意味着我想回去,Kafka生产者/消费者(包括默认值)中存储了什么样的有效配置。我在java文档中没有看到显式的anthing: Kafka制作人 那么,如何找回Kafka制作人和消费者的配置呢?
我有一个用例,希望在Spring云流应用程序中获得底层的Kafka生产者(KafkaTemplate)。在浏览代码时,我偶然发现了,它有一个方法。然而,它无法自动接线。 此外,如果我直接自动连接,模板将使用默认属性初始化,它将忽略SCSt配置的
我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
我有几个连接到Kafka集群的消费者,但我无法控制。同时,我想了解这些消费者是如何配置的。 有没有一个API可以列出所有的消费者(如果有发布者的话,这是一个额外的好处),然后读取他们所有的配置?我说的是这些消费者设置: https://docs . confluent . io/current/installation/configuration/consumer-configs . html #
我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断