当前位置: 首页 > 知识库问答 >
问题:

Spring-Cloud-Stream汇流KStream Avro消耗量

汲丰茂
2023-03-14

我试图使用来自kafka主题的合流avro消息作为spring Boot2.0的Kstream。

我能够以messageChannel的形式使用消息,但不能以kstream的形式使用消息。

@Input(ORGANIZATION)
KStream<String, Organization> organizationMessageChannel();


@StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
        log.info("Organization Received:" + organization);
 }

例外情况:

线程“pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3”org.apache.kafka.streams.errors.StreamsException:流线程[pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3]重新平衡失败。在org.apache.kafka.streams.processor.internals.streamthread.pollrequests(streamthread.java:860)在org.apache.kafka.streams.processor.streamthread.runonce(streamthread.java:808)在org.apache.kafka.streams.process.streamthread.runloop(streamthread.java:774)在org.apache.kafka.preams.preamthread.run(streamthread.在org.apache.kafka.streams.streamsconfig.defaultValueserde(streamsconfig.java:859)在org.apache.kafka.streams.processor.internals.abstractProcessorContext.(abstractProcessorContext.java:59)在404)在org.apache.kafka.streams.processor.internals.streamthread$taskcreator.createTask(streamthread.java:365)在org.apache.kafka.streams.processor.streamtread$abstracttaskcreator.creamtread(streamthread.java:350)在org.apache.kafka.streams.processor.internalts.addstreamtasks(streamthread.java:350)在balanceListener.onPartitionsAssigned(streamthread.java:259)在org.apache.kafka.clients.consumer.cordinator.onjoIntegrate(consumercordinator.java:264)在org.apache.kafka.clients.consumer.internals.consumercordinator.java:264)在org.apache.kafka.clients.consumactcordinator.joingRoupifNeed(abstractcordinator.java:367)在pollonce(kafkaconsumer.java:1146)在org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer.java:1111)在org.apache.kafka.streams.processor.internals.streamthread.pollrequests(streamthread.java:851).还有3个原因是:io.confluent.common.config.ConfigException:缺少必需的配置“schema.registry.url”,该配置没有默认值。在io.confluent.common.config.config.configdef.parse(configdef.java:243),在io.confluent.common.config.config.abstractconfig.(abstractconfig.java:78),在io.confluent.kafka.serializers.abstractKafkaavroserdeconfig.(abstractKafkaavroserdeconfig.(abstractKafkaavroserdeconfig.java:61),在.streams.serdes.avro.specificaVroserializer.configure(specificaVroserializer.java:58)在io.confluent.kafka.streams.serdes.avro.specificaVroserde.configure(specificaVroserede.java:107)在org.apache.kafka.streams.streams.streamsconfig.defaultValueserde(streamsconfig.java:855)...19多个

基于这个错误,我认为我没有为Confluent配置schema.registry.url。我快速浏览了一下这里的示例,在如何使用StreamListenerspring cloud stream执行相同的操作方面略有丢失

这需要单独配置吗?或者,是否有一种方法可以配置confluent正在寻找的application.yml中的schema.registry.url

以下是代码repo https://github.com/naveenpop/springboot-kstream-confluent

{
   "namespace":"com.test.demo.avro",
   "type":"record",
   "name":"Organization",
   "fields":[
      {
         "name":"orgId",
         "type":"string",
         "default":"null"
      },
      {
         "name":"orgName",
         "type":"string",
         "default":"null"
      },
      {
         "name":"orgType",
         "type":"string",
         "default":"null"
      },
      {
         "name":"parentOrgId",
         "type":"string",
         "default":"null"
      }
   ]
}

demokstreamapplication.java

@SpringBootApplication
@EnableSchemaRegistryClient
@Slf4j
public class DemokstreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemokstreamApplication.class, args);
    }

    @Component
    public  static class organizationProducer implements ApplicationRunner {

        @Autowired
        private KafkaProducer kafkaProducer;

        @Override
        public void run(ApplicationArguments args) throws Exception {
            log.info("Starting: Run method");
            List<String> names = Arrays.asList("blue", "red", "green", "black", "white");
            List<String> pages = Arrays.asList("whiskey", "wine", "rum", "jin", "beer");
            Runnable runnable = () -> {
                String rPage = pages.get(new Random().nextInt(pages.size()));
                String rName = names.get(new Random().nextInt(names.size()));

                try {
                    this.kafkaProducer.produceOrganization(rPage, rName, "PARENT", "111");
                } catch (Exception e) {
                    log.info("Exception :" +e);
                }
            };
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable ,1 ,1, TimeUnit.SECONDS);
        }
    }
}

java

@Configuration
public class KafkaConfig {

    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
    private String endpoint;

    @Bean
    public SchemaRegistryClient confluentSchemaRegistryClient() {
        ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endpoint);
        return client;
    }

}

java

@Slf4j
@EnableBinding(KstreamBinding.class)
public class KafkaConsumer {

   @StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION_INPUT) KStream<String, Organization> organization) {
        organization.foreach((s, organization1) -> log.info("KStream Organization Received:" + organization1));
    }
}
@EnableBinding(KstreamBinding.class)
public class KafkaProducer {

    @Autowired
    private KstreamBinding kstreamBinding;

    public void produceOrganization(String orgId, String orgName, String orgType, String parentOrgId) {

        try {
            Organization organization = Organization.newBuilder()
                    .setOrgId(orgId)
                    .setOrgName(orgName)
                    .setOrgType(orgType)
                    .setParentOrgId(parentOrgId)
                    .build();

            kstreamBinding.organizationOutputMessageChannel()
                            .send(MessageBuilder.withPayload(organization)
                            .setHeader(KafkaHeaders.MESSAGE_KEY, orgName)
                            .build());

        } catch (Exception e){
            log.error("Failed to produce Organization Message:" +e);
        }
    }
}
public interface KstreamBinding {

    String ORGANIZATION_INPUT= "organizationInput";
    String ORGANIZATION_OUTPUT= "organizationOutput";

    @Input(ORGANIZATION_INPUT)
    KStream<String, Organization> organizationInputMessageChannel();

    @Output(ORGANIZATION_OUTPUT)
    MessageChannel organizationOutputMessageChannel();
}

我已经向GitHub示例提交了从spring boot本身生成消息的请求,并且仍然以空值的形式获取消息。

感谢您花时间讨论这个问题。

共有1个答案

墨承泽
2023-03-14

以下实现将不会执行您打算执行的操作:

@StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
        log.info("Organization Received:" + organization);
 }

该log语句只在引导阶段调用一次。为了使其发挥作用,您需要在接收的kstream上调用一些操作,然后在那里提供逻辑。例如,在下面的工作中,我在foreach方法调用上提供了一个lambda表达式。

 @StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION) KStream<String, Organization> organization) {

        organization.foreach((s, organization1) -> log.info("Organization Received:" + organization1));
    }

配置中还有一个问题,即错误地为键分配了avroSerde,而键实际上是字符串。像这样更改它:

default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring:
  application:
    name: kstream
  cloud:
    stream:
      schemaRegistryClient:
        endpoint: http://localhost:8081
      schema:
        avro:
          schema-locations: classpath:avro/Organization.avsc
      bindings:
        organizationInput:
          destination: organization-updates
          group: demokstream.org
          consumer:
            useNativeDecoding: true
        organizationOutput:
          destination: organization-updates
          producer:
            useNativeEncoding: true
      kafka:
        bindings:
          organizationOutput:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
        streams:
          binder:
            brokers: localhost
            configuration:
              schema.registry.url: http://localhost:8081
              commit:
                interval:
                  ms: 1000
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

您还可以从主应用程序类中删除Kafkaconfig类以及EnableSchemareGistryClient注释。

 类似资料:
  • 我有以下场景: 生产者通过Confluent的REST代理(在Confluent的模式注册表上注册模式)向Kafka主题发送Avro编码的消息,如http://docs.confluent.io/3.0.0/kafka-rest/docs/intro.html#produce-and-consument-avro-messages所述 Spring Cloud Stream enabled mes

  • 本节将详细介绍如何使用Spring Cloud Stream。它涵盖了创建和运行流应用程序等主题。

  • Spring cloud stream starter kafka在连接消费者时没有加载配置。以下是我在调试模式下运行控制台时在控制台中看到的配置: 我有以下引导yml文件的配置部分

  • 当从shell/data flow UI部署流时,有没有什么解决方法不使用git build pack 以下是https://github.com/cloudfoundry/java-buildpack.git的错误消息:克隆git repository失败

  • 使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp

  • 我开发了spring批处理应用程序,该应用程序生成由json对象列表组成的amqp(rabbitmq)消息。消息具有包含一些元数据的标头。Spring cloud stream应用程序正在消费消息,我使用了功能性方法。如何访问标题<将消息头用于除路由之外的任何内容,这是一种糟糕的方法吗?