当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Streams kafka活页夹-主题序列化配置

胥智
2023-03-14

因此,我认为我自己陷入了困惑,因为我知道SpringCloudStreams有两种不同的Kafka活页夹:

  • 《春云流》Kafka活页夹

我正在寻找正确的YAML设置,以便在spring cloud streams的常规kafka活页夹中定义序列化器和反序列化器:

我可以使用以下逻辑调整默认值:

spring:
  main:
    web-application-type: NONE
  application:
    name: tbfm-translator
  kafka:
    consumer:
      group-id: ${consumer_id}
    bootstrap-servers: ${kafka_servers}
  cloud:
    schemaRegistryClient:
      endpoint: ${schema_registry}
    stream:
#      default:
#        producer.useNativeEncoding: true
#        consumer.useNativeEncoding: true
      defaultBinder: kafka
      kafka:
        binder:
          auto-add-partitions: true # I wonder if its cause this is set
          auto-create-topics: true # Disabling this seem to override the server setings and will auto create

          producer-properties:
            # For additional properties you can check here:
            # https://docs.confluent.io/current/installation/configuration/producer-configs.html

            schema.registry.url: ${schema_registry}

            # Disable for auto schema registration
            auto.register.schemas: false

            # Use only the latest schema version
            use.latest.version: true

            # This will use reflection to generate schemas from classes - used to validate current data set
            # against the scheam registry for valid production
            schema.reflection: true

            # To use an avro key enable the following line
            #key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

            #This will use a string based key - aka not in the registry - dont need a name strategy with string serializer
            key.serializer: org.apache.kafka.common.serialization.StringSerializer

            # This will control the Serializer Setup
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

即:

spring.cloud.stream.kafka.binder.producer-properties.value.serializer
spring.cloud.stream.kafka.binder.producer-properties.key.serializer

我想我应该能够在每个主题的基础上做到这一点:


spring:
  cloud:
    stream:
      bindings:
        my-topic:
          destination: a-topic
          xxxxxxxx??

我遇到了设置:

          producer:
            use-native-encoding: false
            keySerde: <CLASS>

但这似乎不起作用。我可以设置一个简单的属性来在每个主题的基础上执行此操作吗?我认为keySerde用于Kafka-stream实现,而不是普通的kafka绑定器。

共有2个答案

贺福
2023-03-14


    stream:
      bindings: # Define output topics here and then again in the kafka.bindings section
        test:
          destination: multi-output
          producer:
            useNativeDecoding: true

      kafka:
        bindings:
          test:
            destination: multi-output
            producer:
              configuration:
                value.serializer: org.apache.kafka.common.serialization.StringSerializer

这似乎工作-但很烦人我必须在两个地方复制绑定定义

让我们想回避YAML风格的定义

锺离浩慨
2023-03-14

使用本机编码必须为true才能使用您自己的序列化程序。

spring.cloud.stream.kafka.bindings.my-topic.producer.configuration.value.serializer:...

有关Kafka特定的制作人属性,请参阅文档。

配置

使用包含通用Kafka生产者属性的键/值对进行映射。

默认值:空地图。

 类似资料:
  • 问题内容: 每当我尝试序列化文件时,都会收到错误消息:FileNotFound。不知道为什么。这是我的FileHelper代码: 问题答案:

  • 我试图弄清楚如何在Spring

  • 我正在使用Spring云流和Kafka绑定器使用SASL连接到Kafka集群。SASL配置如下所示: 我想以编程方式/在运行时更新用户名和密码,如何在Spring Cloud Stream中使用Spring Kafka binders做到这一点? 旁注:使用BinderFactory,我可以在其hashmap中看到这些配置,但我想知道如何在运行时更新配置,以便这些更改也反映在连接中?

  • 我使用Kafka Consumer API来构建Consumer。为了构建反序列化器,我实现了Deserializer类并提供了必要的实现。我收到此错误“Exception Raisedorg.apache.kafka.Common.Errors.SerializationException:错误反序列化分区staging.DataFeeds.PartnerHotel-0的键/值,偏移量为1920

  • 目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。

  • 我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff