我正在学习kafka connect的教程,我想知道是否有可能接收一些类的类型的消息。
教程:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1//
{
"namespace": "avro",
"type": "record",
"name": "Audit",
"fields": [
{"name": "c1", "type": "int"},
{"name": "c2", "type": "string"},
{"name": "create_ts", "type": "long"},
{"name": "update_ts", "type": "long"}
]
}
基于avro格式,我用Maven生成了一个类。
然后我用我的类型定义了消费者工厂:
public ConsumerFactory<String, Audit> auditConsumerFactory() { ... )
和KafkaListener:
@KafkaListener(topics = "${kafka.mysql.topic}", containerFactory = "mysqlKafkaListenerContainerFactory")
public void receive(Audit audit) {
System.out.println(audit);
this.latch.countDown();
}
2019-12-16 21:56:50.139 ERROR 31862 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mysql-audit-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class audit specified in writer's schema whilst finding reader's schema for a SpecificRecord.
public ConsumerFactory<String, Audit> auditConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getKafkaBootstrapAddress());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return new DefaultKafkaConsumerFactory(props);
}
{
"type": "record",
"name": "avro.Audit",
"fields": [
{
"name": "c1",
"type": "int"
},
{
"name": "c2",
"type": "string"
},
{
"name": "create_ts",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
},
{
"name": "update_ts",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
}
],
"connect.name": "avro.Audit"
}
我在Github上找到了我问题的答案
我不知道这个问题是否还有其他线索,但最终合流解决了这个问题。将这三行添加到JDBC连接器
“Transforms”:“AddNamespace”,“Transforms.AddNamespace.Type”:“org.apache.kafka.connect.Transforms.SetSchemaMadata$value”,“Transforms.AddNamespace.Schema.Name”:“My.Namespace.NameOftheSchema”,
Kafka-7883
我正在学习kafka connect的教程,我想知道是否有可能为数据来自MySql表的主题定义一个自定义的模式注册表。 我在我json/connect配置中找不到定义它的地方,而且我不想在创建模式后创建一个新版本。 我的MySql表称为站有这个模式 其中,属性包含 Json 数据而不是字符串(我必须使用该类型,因为属性的 Json 字段是可变的。 我的连接器是 并创建该模式 其中“属性”字段当然是
我是Avro架构的新手。我尝试使用kafka发布/消费我的java对象。我有java bean类,它包含LocalDateTime和byte[]的字段。如何在avro架构基元类型中定义两者?我可以用于LocalDateTime的最佳基元类型是什么? 我定义了这样的东西;但是 类强制转换异常[1] [1] Caused by: java.lang.ClassCastException: [B can
如果您是一位经验丰富的ML开发人员,而且ML Kit的预训练的模型不能满足您的需求,您可以通过ML Kit使用定 的TensorFlow Lite模型。 使用Firebase托管您的TensorFlow Lite模型或将其与您的应用程序打包在一起。然后,使用ML Kit SDK来使用您的自定义模型的最佳版本构建应用。如果您使用Firebase托管您的模型,ML Kit会自动更新您的用户的所用版本。
需要 10.2.0+ 您可以在*.vue文件中定义自定义语言块。 自定义块的内容将由在vue-loader'选项的loaders对象中指定的加载器处理,然后由组件模块require。 配置类似于[先进的Loader配置](../ configurations / advanced.md)中描述的配置,除了匹配使用标记名称而不是lang`属性。 如果找到一个自定义块的匹配加载器,它将被处理; 否则将
我试图构建一个系统,从Kafka读取json数据(无模式),将其转换为avro并将其推送到s3。 我已经能够使用kstream和KSQL实现json到avro的转换。我想知道使用Kafka Connect的自定义转换是否可以实现同样的效果。 这是我迄今为止所尝试的: 其中avro_schema是avsc文件中指定的架构名称。 我不确定这是否是正确的方法,但我面临的问题是,当调用newRecord(
主要内容:自定义模块编写说明文档到目前为止,读者已经掌握了导入 Python 标准库并使用其成员(主要是函数)的方法,接下来要解决的问题是,怎样自定义一个模块呢? 前面章节中讲过,Python 模块就是 Python 程序,换句话说,只要是 Python 程序,都可以作为模块导入。例如,下面定义了一个简单的模块(编写在 demo.py 文件中): 可以看到,我们在 demo.py 文件中放置了变量(name 和 add)、函数(