因此,我认为我自己陷入了困惑,因为我知道SpringCloudStreams有两种不同的Kafka活页夹:
我正在寻找正确的YAML设置,以便在spring cloud streams的常规kafka活页夹中定义序列化器和反序列化器:
我可以使用以下逻辑调整默认值:
spring:
main:
web-application-type: NONE
application:
name: tbfm-translator
kafka:
consumer:
group-id: ${consumer_id}
bootstrap-servers: ${kafka_servers}
cloud:
schemaRegistryClient:
endpoint: ${schema_registry}
stream:
# default:
# producer.useNativeEncoding: true
# consumer.useNativeEncoding: true
defaultBinder: kafka
kafka:
binder:
auto-add-partitions: true # I wonder if its cause this is set
auto-create-topics: true # Disabling this seem to override the server setings and will auto create
producer-properties:
# For additional properties you can check here:
# https://docs.confluent.io/current/installation/configuration/producer-configs.html
schema.registry.url: ${schema_registry}
# Disable for auto schema registration
auto.register.schemas: false
# Use only the latest schema version
use.latest.version: true
# This will use reflection to generate schemas from classes - used to validate current data set
# against the scheam registry for valid production
schema.reflection: true
# To use an avro key enable the following line
#key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
#This will use a string based key - aka not in the registry - dont need a name strategy with string serializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
# This will control the Serializer Setup
value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
即:
spring.cloud.stream.kafka.binder.producer-properties.value.serializer
spring.cloud.stream.kafka.binder.producer-properties.key.serializer
我想我应该能够在每个主题的基础上做到这一点:
spring:
cloud:
stream:
bindings:
my-topic:
destination: a-topic
xxxxxxxx??
我遇到了设置:
producer:
use-native-encoding: false
keySerde: <CLASS>
但这似乎不起作用。我可以设置一个简单的属性来在每个主题的基础上执行此操作吗?我认为keySerde
用于Kafka-stream实现,而不是普通的kafka绑定器。
stream:
bindings: # Define output topics here and then again in the kafka.bindings section
test:
destination: multi-output
producer:
useNativeDecoding: true
kafka:
bindings:
test:
destination: multi-output
producer:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
这似乎工作-但很烦人我必须在两个地方复制绑定定义
让我们想回避YAML风格的定义
使用本机编码必须为true才能使用您自己的序列化程序。
spring.cloud.stream.kafka.bindings.my-topic.producer.configuration.value.serializer:...
有关Kafka特定的制作人属性,请参阅文档。
配置
使用包含通用Kafka生产者属性的键/值对进行映射。
默认值:空地图。
问题内容: 每当我尝试序列化文件时,都会收到错误消息:FileNotFound。不知道为什么。这是我的FileHelper代码: 问题答案:
我试图弄清楚如何在Spring
我正在使用Spring云流和Kafka绑定器使用SASL连接到Kafka集群。SASL配置如下所示: 我想以编程方式/在运行时更新用户名和密码,如何在Spring Cloud Stream中使用Spring Kafka binders做到这一点? 旁注:使用BinderFactory,我可以在其hashmap中看到这些配置,但我想知道如何在运行时更新配置,以便这些更改也反映在连接中?
我使用Kafka Consumer API来构建Consumer。为了构建反序列化器,我实现了Deserializer类并提供了必要的实现。我收到此错误“Exception Raisedorg.apache.kafka.Common.Errors.SerializationException:错误反序列化分区staging.DataFeeds.PartnerHotel-0的键/值,偏移量为1920
目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。
我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff