我的流处理器中有一个关于多个分区的主题,我只是想从一个分区进行流处理,但是不知道如何配置
spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=s-processor
spring.cloud.stream.bindings.input.destination=uinput
spring.cloud.stream.bindings.input.group=r-processor
spring.cloud.stream.bindings.input.contentType=application/java-serialized-object
spring.cloud.stream.bindings.input.consumer.header-mode=raw
spring.cloud.stream.bindings.input.consumer.use-native-decoding=true
spring.cloud.stream.bindings.input.consumer.partitioned=true
@StreamListener(target = "input")
// @SendTo(value = { "uoutput" })
public void process(KStream<UUID, AModel> ustream) {
我只希望此处理器处理一个分区数据,其他分区将有其他处理器
到目前为止,我的发现与https://Kafka . Apache . org/20/javadoc/org/Apache/Kafka/streams/streams CONFIG . html # PARTITION _ GROUPER _ CLASS _ CONFIG有关,但无法找到如何在spring application.properties中设置该属性
Kafka Streams不允许读取单个分区。如果您订阅了一个主题,所有分区都会被使用并分布在可用的实例上。因此,您无法提前知道哪个分区被分配给了哪个实例,并且所有实例都执行相同的代码。
但链接到处理器的每个分区都有不同类型的数据,因此需要不同的处理器应用程序
对于这种情况,处理器(或转换器)必须能够处理所有分区的数据。Kafka Streams通过< code>ProcessorContext对象公开分区号,该对象通过< code>init()方法传递给处理器:https://Kafka . Apache . org/20/javadoc/org/Apache/Kafka/Streams/kstream/transformer . html # init-org . Apache . Kafka . Streams . processor context-
因此,您需要在变压器内“分支”以根据分区应用不同的处理逻辑:
ustream.transform(() -> new MyTransformer());
class MyTransformer implement Transformer {
// other methods omitted
R transform(K key, V value) {
switch(context.partition()) { // get context from `init()`
case 0:
// your processing logic
break;
case 1:
// your processing logic
break;
// ...
}
}
我认为分区分组器是将分区与单个处理器内的任务分组。如果要确保处理器仅处理单个分区,则需要提供至少与主题分区相同数量的处理器实例。例如,如果你的主题有 4 个分区,那么你需要有 4 个流应用程序的实例,以确保每个实例只处理一个分区。
我有一个流监听器,作为 其中@input(“requesti”)配置如下所示; 发送记录ProducerRecord(TOPIC=Request,Partition=null,headers=recordheaders(headers=[recordheader(key=Key_TypeId,值=[106,97,118,97,46,117,116,105,108,85,85,85,73,68]),
稍微介绍一下背景 24届的 强目标院校 软件工程专业 在学校比较摆烂 考研没考上 无实习 春招没面 7月初毕业 在boss上投了odC加加岗 刷了半个月左右力扣 8月2号星期五 做的笔试 满分过了 笔试题都比较简单记不清题目了 都没怎么看给的牛客题资料 输入处理去特意看了 考的题一些简单的数据结构就解决了 没考dp和树啥的 8月5号星期一 交了点双证啥的材料 做了下性格测试 性格测试按照boss上
我试图使用来自kafka主题的合流avro消息作为spring Boot2.0的Kstream。 我能够以的形式使用消息,但不能以的形式使用消息。 例外情况: 线程“pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3”org.apache.kafka.streams.errors.StreamsException:流线程[pcs-7bb7
我有多个微服务和前面的API,喜欢使用相同的主题为事件每个域事件在单独的分区,我能够配置spring kafka绑定器发送到不同的分区使用 实现 我的问题是,我能否将Kstream绑定器配置为只能为@input和@output使用分区。
更新: 根据答案,下面的配置在绑定器级别工作。
我正在使用Spring Cloud Stream(Edgware.SR5)和Spring Boot(1.5.10.RELEASE)。我的@StreamListener正在处理收到的每条消息两次。 该示例的思想是在队列中发布消息并对其进行处理。 服务: 绑定: application.properties: 配置(用于在测试中注入代理服务): 测试: 我得到了以下输出: 我不知道我的配置有什么问题,