我想定制Spring云流生产者、消费者和KStreams中Avro模式主题的命名策略。
这将在Kafka中使用key.subject.name.strategy
和value.subject.name.strategy
->https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy
在一个土生土长的Kafka制作人中,这是这样的:
private val producer: KafkaProducer<Int, Customer>
init {
val props = Properties()
...
props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
producer = KafkaProducer(props)
}
fun sendCustomerEvent(customer: Customer) {
val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
producer.send(record)
}
spring:
application:
name: spring-boot-customer-service
cloud:
stream:
kafka:
bindings:
output:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
显然,Spring Cloud使用自己的主题命名策略,接口org.springframework.Cloud.stream.schema.avro.SubjectNamingStrategy
,只有一个子类:DefaultSubjectNamingStrategy
。
是否有声明性的方法来配置value.subject.name.strategy
,或者我们需要提供自己的org.springframework.cloud.stream.schema.avro.subject-naming-strategy
实现和属性spring.cloud.stream.schema.avro.subject-naming-strategy
?
正如另一个答案中指出的,有一个专用属性spring.cloud.stream.schema.avro.SubjectNamingStrategy
,它允许为Kafka制作者设置不同的命名策略。
我提供了org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
,它提供了开箱即用的功能。
对于Kafka流和本机序列化/反序列化(Spring Cloud Streams 3.0.0+的默认行为),必须使用Confluent的实现(io.confluent.Kafka.serializers.subject.recordnameStrategy
)和本机属性:
spring:
application:
name: shipping-service
cloud:
stream:
...
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
...
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,
生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f
用例如下。我在Java代码中的许多对象实例上传递生产者或消费者引用。在其中一些地方,我想对Kafka的配置进行一些检查。这意味着我想回去,Kafka生产者/消费者(包括默认值)中存储了什么样的有效配置。我在java文档中没有看到显式的anthing: Kafka制作人 那么,如何找回Kafka制作人和消费者的配置呢?
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?
我正在努力为Kafka-Streams正确配置Spring Cloud Stream,以便使用带有信任存储和密钥存储的SSL。 在我的应用程序中,我有多个流正在运行,所有流的SSL配置应该是相同的。 stream2:Topic2>Topic4 Topic3 stream3:Topic4>Topic5 我使用最新的Spring-Cloud Stream框架和Kafka-Streams以及Avro模型