当前位置: 首页 > 知识库问答 >
问题:

我们如何为Spring Cloud Stream中的模式配置value.subject.name.Strategy Kafka生产者、消费者和KStreams?

西门正平
2023-03-14

我想定制Spring云流生产者、消费者和KStreams中Avro模式主题的命名策略。

这将在Kafka中使用key.subject.name.strategyvalue.subject.name.strategy->https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy

在一个土生土长的Kafka制作人中,这是这样的:


private val producer: KafkaProducer<Int, Customer>

    init {
        val props = Properties()
        ...
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }

    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }
spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
                value:
                  subject:
                    name:
                      strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

显然,Spring Cloud使用自己的主题命名策略,接口org.springframework.Cloud.stream.schema.avro.SubjectNamingStrategy,只有一个子类:DefaultSubjectNamingStrategy

是否有声明性的方法来配置value.subject.name.strategy,或者我们需要提供自己的org.springframework.cloud.stream.schema.avro.subject-naming-strategy实现和属性spring.cloud.stream.schema.avro.subject-naming-strategy

共有1个答案

吕鸿文
2023-03-14

正如另一个答案中指出的,有一个专用属性spring.cloud.stream.schema.avro.SubjectNamingStrategy,它允许为Kafka制作者设置不同的命名策略。

我提供了org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy,它提供了开箱即用的功能。

对于Kafka流和本机序列化/反序列化(Spring Cloud Streams 3.0.0+的默认行为),必须使用Confluent的实现(io.confluent.Kafka.serializers.subject.recordnameStrategy)和本机属性:

spring:
  application:
    name: shipping-service
  cloud:
    stream:
      ...
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              ...
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy 
 类似资料:
  • 一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 用例如下。我在Java代码中的许多对象实例上传递生产者或消费者引用。在其中一些地方,我想对Kafka的配置进行一些检查。这意味着我想回去,Kafka生产者/消费者(包括默认值)中存储了什么样的有效配置。我在java文档中没有看到显式的anthing: Kafka制作人 那么,如何找回Kafka制作人和消费者的配置呢?

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?

  • 我正在努力为Kafka-Streams正确配置Spring Cloud Stream,以便使用带有信任存储和密钥存储的SSL。 在我的应用程序中,我有多个流正在运行,所有流的SSL配置应该是相同的。 stream2:Topic2>Topic4 Topic3 stream3:Topic4>Topic5 我使用最新的Spring-Cloud Stream框架和Kafka-Streams以及Avro模型