当前位置: 首页 > 知识库问答 >
问题:

Spring@StreamListener过程(KStream流)分区

鲁鸿
2023-03-14

我的流处理器中有一个关于多个分区的主题,我只是想从一个分区进行流处理,但是不知道如何配置

spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=s-processor
spring.cloud.stream.bindings.input.destination=uinput
spring.cloud.stream.bindings.input.group=r-processor
spring.cloud.stream.bindings.input.contentType=application/java-serialized-object
spring.cloud.stream.bindings.input.consumer.header-mode=raw
spring.cloud.stream.bindings.input.consumer.use-native-decoding=true

spring.cloud.stream.bindings.input.consumer.partitioned=true

@StreamListener(target = "input")
// @SendTo(value = { "uoutput" })
public void process(KStream<UUID, AModel> ustream) {

我只希望此处理器处理一个分区数据,其他分区将有其他处理器

到目前为止,我的发现与https://Kafka . Apache . org/20/javadoc/org/Apache/Kafka/streams/streams CONFIG . html # PARTITION _ GROUPER _ CLASS _ CONFIG有关,但无法找到如何在spring application.properties中设置该属性

共有2个答案

阙辰龙
2023-03-14

Kafka Streams不允许读取单个分区。如果您订阅了一个主题,所有分区都会被使用并分布在可用的实例上。因此,您无法提前知道哪个分区被分配给了哪个实例,并且所有实例都执行相同的代码。

但链接到处理器的每个分区都有不同类型的数据,因此需要不同的处理器应用程序

对于这种情况,处理器(或转换器)必须能够处理所有分区的数据。Kafka Streams通过< code>ProcessorContext对象公开分区号,该对象通过< code>init()方法传递给处理器:https://Kafka . Apache . org/20/javadoc/org/Apache/Kafka/Streams/kstream/transformer . html # init-org . Apache . Kafka . Streams . processor context-

因此,您需要在变压器内“分支”以根据分区应用不同的处理逻辑:

ustream.transform(() -> new MyTransformer());


class MyTransformer implement Transformer {
  // other methods omitted

  R transform(K key, V value) {
    switch(context.partition()) { // get context from `init()`
      case 0:
        // your processing logic
        break;
      case 1:
        // your processing logic
        break;

      // ...
  }
}
赖绪
2023-03-14

我认为分区分组器是将分区与单个处理器内的任务分组。如果要确保处理器仅处理单个分区,则需要提供至少与主题分区相同数量的处理器实例。例如,如果你的主题有 4 个分区,那么你需要有 4 个流应用程序的实例,以确保每个实例只处理一个分区。

 类似资料:
  • 我有一个流监听器,作为 其中@input(“requesti”)配置如下所示; 发送记录ProducerRecord(TOPIC=Request,Partition=null,headers=recordheaders(headers=[recordheader(key=Key_TypeId,值=[106,97,118,97,46,117,116,105,108,85,85,85,73,68]),

  • 我试图使用来自kafka主题的合流avro消息作为spring Boot2.0的Kstream。 我能够以的形式使用消息,但不能以的形式使用消息。 例外情况: 线程“pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3”org.apache.kafka.streams.errors.StreamsException:流线程[pcs-7bb7

  • 我有多个微服务和前面的API,喜欢使用相同的主题为事件每个域事件在单独的分区,我能够配置spring kafka绑定器发送到不同的分区使用 实现 我的问题是,我能否将Kstream绑定器配置为只能为@input和@output使用分区。

  • 更新: 根据答案,下面的配置在绑定器级别工作。

  • 我正在使用Spring Cloud Stream(Edgware.SR5)和Spring Boot(1.5.10.RELEASE)。我的@StreamListener正在处理收到的每条消息两次。 该示例的思想是在队列中发布消息并对其进行处理。 服务: 绑定: application.properties: 配置(用于在测试中注入代理服务): 测试: 我得到了以下输出: 我不知道我的配置有什么问题,

  • spring cloud stream如何将多个Kafka分区分配给属于同一消费者组的反应流? 我注意到,如果我使用普通的非反应流侦听器,每个线程将被分配到一个分区,这取决于使用者并发配置。然而,在流(流量输入)的情况下,我没有注意到任何这样的并行行为。似乎只定义了一个流来处理来自所有分区的消息。 我的期望是每个Kafka主题分区都有独立的流,即使在由不同线程备份的同一节点上也是如此。