我使用kafka流来消耗来自一个主题的JSON字符串,处理并生成存储在另一个主题中的响应。然而,需要对响应主题产生的消息需要采用avro格式。
我已经尝试使用键作为字符串序列和值作为规范AvroSerde
以下是我创建拓扑的代码:
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> consumerStream =builder.stream(kafkaConfiguration.getConsumerTopic());
consumerStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerStream.to(kafkaConfiguration.getProducerTopic());
以下是我的配置
if (schemaRegistry != null && schemaRegistry.length > 0) {
streamsConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, String.join(",", schemaRegistry));
}
streamsConfig.put(this.keySerializerKeyName, StringSerde.class);
streamsConfig.put(this.valueSerialzerKeyName, SpecificAvroSerde.class);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize);
streamsConfig.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.parseInt(commitIntervalMs));
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfThreads);
streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
streamsConfig.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,StreamsConfig.OPTIMIZE);
streamsConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionMode);
streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
当我尝试使用该示例时,我看到了以下错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
问题在于关键值Serdes。您应该在使用流时使用正确的SERDE,在发布流时使用相同的SERDE。
如果您的输入是JSON,并且希望以Avro的形式发布,您可以按照以下方式执行:
Properties streamsConfig= new Properties();
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> consumerStream =builder.stream(kafkaConfiguration.getConsumerTopic(),Consumed.with(Serdes.String(), Serdes.String()));
// Replace AvroObjectClass with your avro object type
KStream<String,AvroObjectClass> consumerAvroStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerAvroStream.to(kafkaConfiguration.getProducerTopic());
我想使用spring-Kafka库使用spring boot配置的消费者来使用来自Kafka代理的消息,源是一个JDBC连接器,它负责从MySQL数据库提取消息,这些消息需要被使用 下面是我的application.yml文件
问题内容: 我正在寻找使用SQL将一种日期格式转换为另一种格式。我正在使用用于SQLite的数据库浏览器,日期存储在“ TEXT”类型的列中。 这是当前格式的2个示例: 2017/4/17上午9:09:09 2017/10/4下午10:21:13 请注意,在当前的日期,月份和小时格式中,如果只有一位数字,则当前不会以前导“ 0”填充。他们也把月份放在第一位,然后是日期,然后是年份。这两个示例应转换
消息通常按照批量的方式写入.record batch 是批量消息的技术术语,它包含一条或多条 records.不良情况下, record batch 只包含一条 record.Record batches 和 records 都有他们自己的 headers.在 kafka 0.11.0及后续版本中(消息格式的版本为 v2 或者 magic=2)解释了每种消息格式.点击查看消息格式详情. 5.3.1
问题内容: 有没有一种简单的方法可以将一种日期格式转换为PHP中的另一种日期格式? 我有这个: 但是我当然希望它返回一个当前日期,而不是返回“黎明”。我究竟做错了什么? 问题答案: 第二个参数必须是正确的时间戳(自1970年1月1日以来的秒数)。您正在传递一个字符串,date()无法识别。 您可以使用 strtotime() 将日期字符串转换为时间戳。但是,即使strtotime()也无法识别格式
我刚刚开始使用使用Apache Camel 2.15.3的应用程序。我是Camel的新手,我正在尝试了解消息是如何发送的,以及路由中组件之间的外观。 应用程序中的路由是使用Spring扩展xml设置的。以下是其中一条路由的示例。 首先是一个简短的术语问题:本示例中的中间bean叫什么?endpoint?组件?还是别的什么?现在我把它们叫做组件。 我现在的主要困惑是理解什么是输入,什么是从一个组件传
1.1.1. 目录 1.1.2. 若琪智能家居协议 1.1.3. 示例 1.1.1. 目录 若琪智能家居协议 请求消息类型 回复消息类型 消息体 示例 一个控制请求 一个控制成功的返回 当发生了错误时的一个返回 1.1.2. 若琪智能家居协议 请求消息类型 命令 Directives 由若琪主动向 Skill 发起的请求,可以是 Skill 开放的 HTTP 服务,或者是 JSON RPC ove