我正在学习kafka connect的教程,我想知道是否有可能为数据来自MySql表的主题定义一个自定义的模式注册表。
我在我json/connect配置中找不到定义它的地方,而且我不想在创建模式后创建一个新版本。
我的MySql表称为站有这个模式
Field | Type
---------------+-------------
code | varchar(4)
date_measuring | timestamp
attributes | varchar(256)
其中,属性包含 Json 数据而不是字符串(我必须使用该类型,因为属性的 Json 字段是可变的。
我的连接器是
{
"value.converter.schema.registry.url": "http://localhost:8081",
"_comment": "The Kafka topic will be made up of this prefix, plus the table name ",
"key.converter.schema.registry.url": "http://localhost:8081",
"name": "jdbc_source_mysql_stations",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": [
"ValueToKey"
],
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": [
"code",
"date_measuring"
],
"connection.url": "jdbc:mysql://localhost:3306/db_name?useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC",
"connection.user": "confluent",
"connection.password": "**************",
"table.whitelist": [
"stations"
],
"mode": "timestamp",
"timestamp.column.name": [
"date_measuring"
],
"validate.non.null": "false",
"topic.prefix": "mysql-"
}
并创建该模式
{
"subject": "mysql-stations-value",
"version": 1,
"id": 23,
"schema": "{\"type\":\"record\",\"name\":\"stations\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"date_measuring\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"attributes\",\"type\":\"string\"}],\"connect.name\":\"stations\"}"
}
其中“属性”字段当然是一个字符串。与我将应用它的其他模式不同。
{
"fields": [
{
"name": "code",
"type": "string"
},
{
"name": "date_measuring",
"type": {
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"connect.version": 1,
"logicalType": "timestamp-millis",
"type": "long"
}
},
{
"name": "attributes",
"type": {
"type": "record",
"name": "AttributesRecord",
"fields": [
{
"name": "H1",
"type": "long",
"default": 0
},
{
"name": "H2",
"type": "long",
"default": 0
},
{
"name": "H3",
"type": "long",
"default": 0
},
{
"name": "H",
"type": "long",
"default": 0
},
{
"name": "Q",
"type": "long",
"default": 0
},
{
"name": "P1",
"type": "long",
"default": 0
},
{
"name": "P2",
"type": "long",
"default": 0
},
{
"name": "P3",
"type": "long",
"default": 0
},
{
"name": "P",
"type": "long",
"default": 0
},
{
"name": "T",
"type": "long",
"default": 0
},
{
"name": "Hr",
"type": "long",
"default": 0
},
{
"name": "pH",
"type": "long",
"default": 0
},
{
"name": "RX",
"type": "long",
"default": 0
},
{
"name": "Ta",
"type": "long",
"default": 0
},
{
"name": "C",
"type": "long",
"default": 0
},
{
"name": "OD",
"type": "long",
"default": 0
},
{
"name": "TU",
"type": "long",
"default": 0
},
{
"name": "MO",
"type": "long",
"default": 0
},
{
"name": "AM",
"type": "long",
"default": 0
},
{
"name": "N03",
"type": "long",
"default": 0
},
{
"name": "P04",
"type": "long",
"default": 0
},
{
"name": "SS",
"type": "long",
"default": 0
},
{
"name": "PT",
"type": "long",
"default": 0
}
]
}
}
],
"name": "stations",
"namespace": "com.mycorp.mynamespace",
"type": "record"
}
有什么建议吗?如果不可能,我想我必须创建一个KafkaStream来创建另一个主题,即使我会避免它。
提前感谢!
我不认为您问的是关于使用“自定义”注册中心的任何事情(您会用表示您正在使用哪个注册中心的两行代码来问),而是在从数据库中提取记录后,您如何解析数据/应用模式
您可以编写自己的Transform,也可以使用Kstream,这实际上是这里的主要选项。有一个SetSchemaMetadata转换,但我不确定它是否能满足您的要求(将字符串解析为Avro记录)
或者,如果您必须将JSON数据推入单个数据库属性,也许您不应该使用Mysql,而应该使用具有更灵活html" target="_blank">数据约束的文档数据库。
否则,您可以使用 BLOB 而不是 varchar 并将二进制 Avro 数据放入该列中,但这样您仍然需要一个自定义的反序列化程序来读取数据
问题内容: 我想在定义类时注册一个类的实例。理想情况下,以下代码可以解决问题。 不幸的是,此代码生成错误。 发生的事情是在我尝试实例化a的行上,但装饰器尚未返回,因此它不存在。 是否使用元类解决了这个问题? 问题答案: 是的,元类可以做到这一点。元类的方法返回该类,因此只需在返回之前注册该类。 前面的示例在Python 2.x中有效。在Python 3.x中,的定义略有不同(虽然未更改,但未显示-
我是新来的Laravel。我有两种不同的用户类型:老师和学生。教师在网站注册表格中注册账户,但学生注册由windows应用程序提供。所以我必须为这个应用程序提供一个API。默认Laravel认证是足够的教师注册,但我可以使用相同的AuthController API注册?我不需要视图或重定向到网站。
我已经为log4j2编写了一个自定义触发策略,该策略将被回滚。在每小时/天/你的工作时间间隔结束时,按照本SO帖子的建议记录文件。 虽然我遵循基于时间的触发策略约定(命名等),但我无法看到我的策略被实例化和使用。 解决方案由3个java文件和一个maven文件组成,可在github上获得 在这里,您可以从政策本身找到主要内容: log4j2。xml文件: 编辑: 在调试过程中,我了解到在log4j
有什么建议吗?
我创建一个自定义注册表单在wordpress买我有以下错误,我似乎找不到什么导致他们。 注意:未定义的变量:new_user在第219行的 /Applications/MAMP/htdocs/****/wp-内容/插件/护理匹配/carematch.php 注意:未定义的变量:错误 /Applications/MAMP/htdocs/****/wp-内容/插件/护理匹配/carematch.php
我想注册一个Avro模式,它在模式注册表上引用另一个Avro模式。 首先,我注册了以下基本Avro模式: 如果我尝试注册以下Avro架构,该架构引用“客户端”属性中的基本架构,则操作失败并出现错误422 这个问题似乎与指定自定义类型字段有关。 任何想法如何添加自定义类型,同时注册相关的模式在模式注册表?