当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect-JDBC自定义Avro模式

沃侯林
2023-03-14

我正在学习kafka connect的教程,我想知道是否有可能接收一些类的类型的消息。

教程:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1//

{
   "namespace": "avro",
   "type": "record",
   "name": "Audit",
   "fields": [
      {"name": "c1", "type": "int"},
      {"name": "c2", "type": "string"},
      {"name": "create_ts", "type": "long"},
      {"name": "update_ts", "type": "long"}
   ]
}

基于avro格式,我用Maven生成了一个类。

然后我用我的类型定义了消费者工厂:

public ConsumerFactory<String, Audit> auditConsumerFactory() { ... )

和KafkaListener:

@KafkaListener(topics = "${kafka.mysql.topic}", containerFactory =   "mysqlKafkaListenerContainerFactory")
public void receive(Audit audit) {
     System.out.println(audit);
     this.latch.countDown();
}
2019-12-16 21:56:50.139 ERROR 31862 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mysql-audit-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class audit specified in writer's schema whilst finding reader's schema for a SpecificRecord.
    public ConsumerFactory<String, Audit> auditConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getKafkaBootstrapAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        return new DefaultKafkaConsumerFactory(props);
    }
{
  "type": "record",
  "name": "avro.Audit",
  "fields": [
    {
      "name": "c1",
      "type": "int"
    },
    {
      "name": "c2",
      "type": "string"
    },
    {
      "name": "create_ts",
      "type": {
        "type": "long",
        "connect.version": 1,
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "update_ts",
      "type": {
        "type": "long",
        "connect.version": 1,
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "logicalType": "timestamp-millis"
      }
    }
  ],
  "connect.name": "avro.Audit"
}

我在Github上找到了我问题的答案

共有1个答案

太叔俊侠
2023-03-14

我不知道这个问题是否还有其他线索,但最终合流解决了这个问题。将这三行添加到JDBC连接器

“Transforms”:“AddNamespace”,“Transforms.AddNamespace.Type”:“org.apache.kafka.connect.Transforms.SetSchemaMadata$value”,“Transforms.AddNamespace.Schema.Name”:“My.Namespace.NameOftheSchema”,

Kafka-7883

 类似资料:
  • 我正在学习kafka connect的教程,我想知道是否有可能为数据来自MySql表的主题定义一个自定义的模式注册表。 我在我json/connect配置中找不到定义它的地方,而且我不想在创建模式后创建一个新版本。 我的MySql表称为站有这个模式 其中,属性包含 Json 数据而不是字符串(我必须使用该类型,因为属性的 Json 字段是可变的。 我的连接器是 并创建该模式 其中“属性”字段当然是

  • 我是Avro架构的新手。我尝试使用kafka发布/消费我的java对象。我有java bean类,它包含LocalDateTime和byte[]的字段。如何在avro架构基元类型中定义两者?我可以用于LocalDateTime的最佳基元类型是什么? 我定义了这样的东西;但是 类强制转换异常[1] [1] Caused by: java.lang.ClassCastException: [B can

  • 如果您是一位经验丰富的ML开发人员,而且ML Kit的预训练的模型不能满足您的需求,您可以通过ML Kit使用定 的TensorFlow Lite模型。 使用Firebase托管您的TensorFlow Lite模型或将其与您的应用程序打包在一起。然后,使用ML Kit SDK来使用您的自定义模型的最佳版本构建应用。如果您使用Firebase托管您的模型,ML Kit会自动更新您的用户的所用版本。

  • 需要 10.2.0+ 您可以在*.vue文件中定义自定义语言块。 自定义块的内容将由在vue-loader'选项的loaders对象中指定的加载器处理,然后由组件模块require。 配置类似于[先进的Loader配置](../ configurations / advanced.md)中描述的配置,除了匹配使用标记名称而不是lang`属性。 如果找到一个自定义块的匹配加载器,它将被处理; 否则将

  • 我试图构建一个系统,从Kafka读取json数据(无模式),将其转换为avro并将其推送到s3。 我已经能够使用kstream和KSQL实现json到avro的转换。我想知道使用Kafka Connect的自定义转换是否可以实现同样的效果。 这是我迄今为止所尝试的: 其中avro_schema是avsc文件中指定的架构名称。 我不确定这是否是正确的方法,但我面临的问题是,当调用newRecord(

  • 主要内容:自定义模块编写说明文档到目前为止,读者已经掌握了导入 Python 标准库并使用其成员(主要是函数)的方法,接下来要解决的问题是,怎样自定义一个模块呢? 前面章节中讲过,Python 模块就是 Python 程序,换句话说,只要是 Python 程序,都可以作为模块导入。例如,下面定义了一个简单的模块(编写在 demo.py 文件中): 可以看到,我们在 demo.py 文件中放置了变量(name 和 add)、函数(