我试图用函数编程(和spring cloud stream)转换来自输入主题的输入AVRO消息,并在输出主题上发布新消息。下面是我的转换函数:
@Bean
public Function<KStream<String, Data>, KStream<String, Double>> evenNumberSquareProcessor() {
return kStream -> kStream.transform(() -> new CustomProcessor(STORE_NAME), STORE_NAME);
}
spring:
cloud:
stream:
function:
definition: evenNumberSquareProcessor
bindings:
evenNumberSquareProcessor-in-0:
destination: input
content-type: application/*+avro
group: group-1
evenNumberSquareProcessor-out-0:
destination: output
kafka:
binder:
brokers: my-cluster-kafka-bootstrap.kafka:9092
consumer-properties:
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: my-cluster-kafka-bootstrap.kafka:9092
configuration:
schema.registry.url: http://localhost:8081
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
evenNumberSquareProcessor-in-0:
consumer:
destination: input
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
evenNumberSquareProcessor-out-0:
destination: output
我的spring boot应用程序是以这种方式声明的,并激活了模式注册表客户机:
@EnableSchemaRegistryClient
@SpringBootApplication
public class TransformApplication {
public static void main(String[] args) {
SpringApplication.run(TransformApplication.class, args);
}
}
谢谢你能给我带来的任何帮助。
视CG
在配置
下配置架构注册表,然后所有绑定器都可以使用它。对了。avro序列化程序位于绑定
和特定通道下。如果要使用默认属性default.value.serde:
。你的Serde可能也错了。
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:9092
configuration:
schema.registry.url: http://localhost:8081
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
process-in-0:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
不要使用@enableschemareGistryClient
。在Avro Serde上启用架构注册表。在本例中,我使用定义的beandata
。试着在这里遵循这个例子。
@Service
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public static Serde<Data> DataAvro() {
final Serde<Data> dataAvroSerde = new SpecificAvroSerde<>();
dataAvroSerde.configure(serdeConfig, false);
return dataAvroSerde;
}
}
我正在了解Confluent的模式注册表,以满足所有模式管理需求。 我不太理解他们的版本控制方法...有一个的概念,我将其视为一个名称空间。据我所知,subject在模式注册表中必须是唯一。 然后是模式id,或者只是,它也是唯一的。 最后,还有一个。 以下是文档中的片段: :此主题的架构版本,每个主题从1开始 :全局唯一的架构版本id,在所有主题中的所有架构中都是唯一的 因此,一旦我想修改特定主题
我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表
我使用的是Azure HDInsight的托管Apache Kafka解决方案,因为不幸的是Azure上没有托管汇流Kafka解决方案。是否可以运行汇合模式注册表并将其连接到HDInsight Apache Kafka集群的代理? 我希望只在单个VM上安装模式注册表,然后使用schema-registry.properties文件中的这一行,将其指向HDInsight集群的代理列表: kafkas
我希望即使服务器重新启动,也能保持一个具有固定id的模式。 是否可以在模式注册表中保存模式,以便在服务器崩溃后使用相同的id? 否则,是否有可能在模式注册表服务器启动时用固定的id硬编码一个模式?
我试图使用Confluent_Kafka的AvroProducer类生成Avro格式的消息。Kafka和Schema-Registry在同一个网络中作为3个节点的集群运行。 我得到的是 我没有使用Docker容器。集群由3个独立的VM组成,其中安装和运行Kafka和Registry Schema,所以它也不是独立的。Python代码从具有网络访问和防火墙异常的第四个VM执行。事实上,我可以在没有a
我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化