当前位置: 首页 > 知识库问答 >
问题:

Spring-cloud kafka流模式注册表

公西博实
2023-03-14

我试图用函数编程(和spring cloud stream)转换来自输入主题的输入AVRO消息,并在输出主题上发布新消息。下面是我的转换函数:

@Bean
public Function<KStream<String, Data>, KStream<String, Double>> evenNumberSquareProcessor() {
    return kStream -> kStream.transform(() -> new CustomProcessor(STORE_NAME), STORE_NAME);
}
    spring:
  cloud:
    stream:
      function:
        definition: evenNumberSquareProcessor
      bindings:
        evenNumberSquareProcessor-in-0:
          destination: input
          content-type: application/*+avro
          group: group-1
        evenNumberSquareProcessor-out-0:
          destination: output
      kafka:
        binder:
          brokers: my-cluster-kafka-bootstrap.kafka:9092
          consumer-properties:
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema.registry.url: http://localhost:8081
spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: my-cluster-kafka-bootstrap.kafka:9092
            configuration:
              schema.registry.url: http://localhost:8081
              default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          bindings:
            evenNumberSquareProcessor-in-0:
              consumer:
                destination: input
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            evenNumberSquareProcessor-out-0:
                destination: output

我的spring boot应用程序是以这种方式声明的,并激活了模式注册表客户机:

    @EnableSchemaRegistryClient
@SpringBootApplication
public class TransformApplication {
    public static void main(String[] args) {
        SpringApplication.run(TransformApplication.class, args);
    }
}

谢谢你能给我带来的任何帮助。

视CG

共有1个答案

裴成文
2023-03-14

配置下配置架构注册表,然后所有绑定器都可以使用它。对了。avro序列化程序位于绑定和特定通道下。如果要使用默认属性default.value.serde:。你的Serde可能也错了。

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            configuration:
              schema.registry.url: http://localhost:8081
              default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          bindings:
            process-in-0:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

不要使用@enableschemareGistryClient。在Avro Serde上启用架构注册表。在本例中,我使用定义的beandata。试着在这里遵循这个例子。

@Service
public class CustomSerdes extends Serdes {

    private final static Map<String, String> serdeConfig = Stream.of(
            new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

    public static Serde<Data> DataAvro() {
        final Serde<Data> dataAvroSerde = new SpecificAvroSerde<>();
        dataAvroSerde.configure(serdeConfig, false);
        return dataAvroSerde;
    }
}
 类似资料:
  • 我正在了解Confluent的模式注册表,以满足所有模式管理需求。 我不太理解他们的版本控制方法...有一个的概念,我将其视为一个名称空间。据我所知,subject在模式注册表中必须是唯一。 然后是模式id,或者只是,它也是唯一的。 最后,还有一个。 以下是文档中的片段: :此主题的架构版本,每个主题从1开始 :全局唯一的架构版本id,在所有主题中的所有架构中都是唯一的 因此,一旦我想修改特定主题

  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 我使用的是Azure HDInsight的托管Apache Kafka解决方案,因为不幸的是Azure上没有托管汇流Kafka解决方案。是否可以运行汇合模式注册表并将其连接到HDInsight Apache Kafka集群的代理? 我希望只在单个VM上安装模式注册表,然后使用schema-registry.properties文件中的这一行,将其指向HDInsight集群的代理列表: kafkas

  • 我希望即使服务器重新启动,也能保持一个具有固定id的模式。 是否可以在模式注册表中保存模式,以便在服务器崩溃后使用相同的id? 否则,是否有可能在模式注册表服务器启动时用固定的id硬编码一个模式?

  • 我试图使用Confluent_Kafka的AvroProducer类生成Avro格式的消息。Kafka和Schema-Registry在同一个网络中作为3个节点的集群运行。 我得到的是 我没有使用Docker容器。集群由3个独立的VM组成,其中安装和运行Kafka和Registry Schema,所以它也不是独立的。Python代码从具有网络访问和防火墙异常的第四个VM执行。事实上,我可以在没有a

  • 我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化