我正在使用confluent和kafka connect oracle(https://github.com/erdemcer/kafka-connect-oracle)为了跟踪Oracle database 11g XE中的更改,我可以使用模式注册表api(如“curl-X GET”)查看模式内容http://localhost:8081/schemas/ids/44" :
{"subject":"Test. KAFKAUSER. TEST-value","version": 1,"id": 44,"Schema":"{"type":"记录","name":"row","namesspace":"test.kafkauser.test","field":[{"name":"SCN","type":"long"},{"name":"SEG_OWNER","type":"string"},{"name":"TABLE_NAME","type":"string"},{"name":"TIMESTAMP","type":{"type":"long","connect.version": 1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}},{"name":"SQL_REDO","type":"string"},{"name":"OPERATION","type":"string"},{"name":"data","type":["null",{"type":"记录",":["null","Double"],"default": null},{"name":"NAME","type":["null","string"],"default": null}],"connect.name":"value"}],"default": null},{"name":"前","type":["null","value"],"default": null}],"connect.name":"test.kafkauser.test.row"}","删除": false}
但是,此模式无法被conFluent在python中的模式注册表解析:
schemaRegistryClientURL="http://localhost:8081"
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client= CachedSchemaRegistryClient(url=schemaRegistryClientURL)
schema_registry_client.get_by_id(44)
我得到以下错误:
Traceback(最近调用的最后一次):File",第1行,在File"build/bdist.linux-x86_64/eges/conFluent/Schemaregistry/Client/CachedSchemaRealstryClient.py",第140行,get_by_idconfluent.schemaregistry.client.ClientError:从注册表接收到错误的模式。
kafka connect oracle是否将未验证的模式发送到模式注册表?如何将此模式转换为正确的格式?
谢谢
看起来您的模式有问题。JSON格式化程序说它是无效格式。您可以在此处检查JSON格式是否正确:https://jsonformatter.curiousconcept.com/#
通过查看它,我看到这里有两个过度使用的引号:
第一个在第一行,在“schema”之后:
第二个在最后一行,在<代码>测试之间。行“}和
,“deleted”:false}
删除这两个后,它现在是有效的形式。如果你问一个方法来自动这样做,我不知道怎么做。也许您可以搜索一些python代码来验证和修复JSON格式。
这是有效的格式:
{
"subject":"TEST.KAFKAUSER.TEST-value",
"version":1,
"id":44,
"schema":{
"type":"record",
"name":"row",
"namespace":"test.kafkauser.test",
"fields":[
{
"name":"SCN",
"type":"long"
},
{
"name":"SEG_OWNER",
"type":"string"
},
{
"name":"TABLE_NAME",
"type":"string"
},
{
"name":"TIMESTAMP",
"type":{
"type":"long",
"connect.version":1,
"connect.name":"org.apache.kafka.connect.data.Timestamp",
"logicalType":"timestamp-millis"
}
},
{
"name":"SQL_REDO",
"type":"string"
},
{
"name":"OPERATION",
"type":"string"
},
{
"name":"data",
"type":[
"null",
{
"type":"record",
"name":"value",
"namespace":"",
"fields":[
{
"name":"ID",
"type":[
"null",
"double"
],
"default":null
},
{
"name":"NAME",
"type":[
"null",
"string"
],
"default":null
}
],
"connect.name":"value"
}
],
"default":null
},
{
"name":"before",
"type":[
"null",
"value"
],
"default":null
}
],
"connect.name":"test.kafkauser.test.row"
},
"deleted":false
}
我试图运行Kafka服务与Zookeper,Kafdrop和模式注册表,我使它的工作安装Kafka,Zookeper和Kadrop在一个容器,然后安装合流模式注册表在另一个(与自己的Kafka,Zoowatch,Ksql服务器和Rest代理),然而尝试要使kafdrop读取模式注册表不工作,所以现在我想只安装一个容器与Kafka,Zoowatch,Kafdrop和模式注册表,即使一切都成功安装,模
我有一个Kafka主题,我想用AVRO数据(目前是JSON)来填充它。我知道“正确”的方法是使用模式注册表,但出于测试目的,我想让它在没有它的情况下工作。 因此,我将AVRO数据作为数组[字节]发送,而不是常规的Json对象: 模式是在每个数据中启动的;我如何使它与kafka-connect一起工作?kafka-connect配置目前表现出以下属性(数据作为json.gz文件写入s3),我想编写P
问题内容: 按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实,参考或专业知识的支持,但是这个问题可能会引起辩论,争论,民意调查或扩展讨论。如果您认为此问题可以解决并且可以重新提出,请访问帮助中心以获取指导。 8年前关闭。 跟踪和/或自动执行数据库架构更改的最佳方法是什么?我们的团队使用Subversion进行版本控制,我们已经能够以这种方式自动执行一些任务(将构建推送到暂存服务器
我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表
问题内容: 我的同事在工作中提出了一个我无法回答的问题(由于缺乏经验),该问题与跟踪表中相关字段的变化有关。 想象一下,我们有3个表,每个表20个字段。在此示例中,我们考虑这些表中的每一个都有2个字段,一个名为LastUpdatedOn,另一个名为LastUpdatedBy。 如果我们只想跟踪这3个表中的更改,而只跟踪几个特定字段,而没有为每个表创建包含更新前最新版本的历史表,那么我们如何跟踪这些
问题内容: 我似乎无法创建触发器。我已经尝试了以下两种方式进行更新。我通过插入语句不断收到语法错误。我搜索了过去4个小时的论坛和网络搜索,没有任何变化。对此还有更多代码,它基本上会重复执行。任何帮助,将不胜感激。谢谢你。我正在运行MySQL 5.0,并通过phpMyAdmin 2.8.2.4作为Administrator / Root访问。 TRY 1(在所有字段和名称上带有和不带有qoutes)