当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams是否可以使用一种格式的消息并产生另一种格式,例如AVRO消息

申高峰
2023-03-14

我使用kafka流来消耗来自一个主题的JSON字符串,处理并生成存储在另一个主题中的响应。然而,需要对响应主题产生的消息需要采用avro格式。

我已经尝试使用键作为字符串序列和值作为规范AvroSerde

以下是我创建拓扑的代码:

StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> consumerStream =builder.stream(kafkaConfiguration.getConsumerTopic());
consumerStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerStream.to(kafkaConfiguration.getProducerTopic());

以下是我的配置

    if (schemaRegistry != null && schemaRegistry.length > 0) {
        streamsConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, String.join(",", schemaRegistry));          
    }
    streamsConfig.put(this.keySerializerKeyName, StringSerde.class);
    streamsConfig.put(this.valueSerialzerKeyName, SpecificAvroSerde.class);
    streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize);
    streamsConfig.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
    streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
    streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.parseInt(commitIntervalMs));
    streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfThreads);
    streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
    streamsConfig.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
    streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,StreamsConfig.OPTIMIZE);
    streamsConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionMode);
    streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);

当我尝试使用该示例时,我看到了以下错误:

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

共有1个答案

满博
2023-03-14

问题在于关键值Serdes。您应该在使用流时使用正确的SERDE,在发布流时使用相同的SERDE。

如果您的输入是JSON,并且希望以Avro的形式发布,您可以按照以下方式执行:

Properties streamsConfig= new Properties();
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> consumerStream =builder.stream(kafkaConfiguration.getConsumerTopic(),Consumed.with(Serdes.String(), Serdes.String()));

// Replace AvroObjectClass with your avro object type
KStream<String,AvroObjectClass> consumerAvroStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerAvroStream.to(kafkaConfiguration.getProducerTopic());
 类似资料:
  • 我想使用spring-Kafka库使用spring boot配置的消费者来使用来自Kafka代理的消息,源是一个JDBC连接器,它负责从MySQL数据库提取消息,这些消息需要被使用 下面是我的application.yml文件

  • 问题内容: 我正在寻找使用SQL将一种日期格式转换为另一种格式。我正在使用用于SQLite的数据库浏览器,日期存储在“ TEXT”类型的列中。 这是当前格式的2个示例: 2017/4/17上午9:09:09 2017/10/4下午10:21:13 请注意,在当前的日期,月份和小时格式中,如果只有一位数字,则当前不会以前导“ 0”填充。他们也把月份放在第一位,然后是日期,然后是年份。这两个示例应转换

  • 消息通常按照批量的方式写入.record batch 是批量消息的技术术语,它包含一条或多条 records.不良情况下, record batch 只包含一条 record.Record batches 和 records 都有他们自己的 headers.在 kafka 0.11.0及后续版本中(消息格式的版本为 v2 或者 magic=2)解释了每种消息格式.点击查看消息格式详情. 5.3.1

  • 问题内容: 有没有一种简单的方法可以将一种日期格式转换为PHP中的另一种日期格式? 我有这个: 但是我当然希望它返回一个当前日期,而不是返回“黎明”。我究竟做错了什么? 问题答案: 第二个参数必须是正确的时间戳(自1970年1月1日以来的秒数)。您正在传递一个字符串,date()无法识别。 您可以使用 strtotime() 将日期字符串转换为时间戳。但是,即使strtotime()也无法识别格式

  • 我刚刚开始使用使用Apache Camel 2.15.3的应用程序。我是Camel的新手,我正在尝试了解消息是如何发送的,以及路由中组件之间的外观。 应用程序中的路由是使用Spring扩展xml设置的。以下是其中一条路由的示例。 首先是一个简短的术语问题:本示例中的中间bean叫什么?endpoint?组件?还是别的什么?现在我把它们叫做组件。 我现在的主要困惑是理解什么是输入,什么是从一个组件传

  • 1.1.1. 目录 1.1.2. 若琪智能家居协议 1.1.3. 示例 1.1.1. 目录 若琪智能家居协议 请求消息类型 回复消息类型 消息体 示例 一个控制请求 一个控制成功的返回 当发生了错误时的一个返回 1.1.2. 若琪智能家居协议 请求消息类型 命令 Directives 由若琪主动向 Skill 发起的请求,可以是 Skill 开放的 HTTP 服务,或者是 JSON RPC ove