当前位置: 首页 > 知识库问答 >
问题:

跟踪oracle数据库更改时无法解析模式注册表接收到的模式

罗均
2023-03-14

我正在使用confluent和kafka connect oracle(https://github.com/erdemcer/kafka-connect-oracle)为了跟踪Oracle database 11g XE中的更改,我可以使用模式注册表api(如“curl-X GET”)查看模式内容http://localhost:8081/schemas/ids/44" :

{"subject":"Test. KAFKAUSER. TEST-value","version": 1,"id": 44,"Schema":"{"type":"记录","name":"row","namesspace":"test.kafkauser.test","field":[{"name":"SCN","type":"long"},{"name":"SEG_OWNER","type":"string"},{"name":"TABLE_NAME","type":"string"},{"name":"TIMESTAMP","type":{"type":"long","connect.version": 1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}},{"name":"SQL_REDO","type":"string"},{"name":"OPERATION","type":"string"},{"name":"data","type":["null",{"type":"记录",":["null","Double"],"default": null},{"name":"NAME","type":["null","string"],"default": null}],"connect.name":"value"}],"default": null},{"name":"前","type":["null","value"],"default": null}],"connect.name":"test.kafkauser.test.row"}","删除": false}

但是,此模式无法被conFluent在python中的模式注册表解析

schemaRegistryClientURL="http://localhost:8081"
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client= CachedSchemaRegistryClient(url=schemaRegistryClientURL)
schema_registry_client.get_by_id(44)

我得到以下错误:

Traceback(最近调用的最后一次):File",第1行,在File"build/bdist.linux-x86_64/eges/conFluent/Schemaregistry/Client/CachedSchemaRealstryClient.py",第140行,get_by_idconfluent.schemaregistry.client.ClientError:从注册表接收到错误的模式。

kafka connect oracle是否将未验证的模式发送到模式注册表?如何将此模式转换为正确的格式?

谢谢

共有1个答案

马奇略
2023-03-14

看起来您的模式有问题。JSON格式化程序说它是无效格式。您可以在此处检查JSON格式是否正确:https://jsonformatter.curiousconcept.com/#

通过查看它,我看到这里有两个过度使用的引号:

第一个在第一行,在“schema”之后:

第二个在最后一行,在<代码>测试之间。行“},“deleted”:false}

删除这两个后,它现在是有效的形式。如果你问一个方法来自动这样做,我不知道怎么做。也许您可以搜索一些python代码来验证和修复JSON格式。

这是有效的格式:

{
   "subject":"TEST.KAFKAUSER.TEST-value",
   "version":1,
   "id":44,
   "schema":{
      "type":"record",
      "name":"row",
      "namespace":"test.kafkauser.test",
      "fields":[
         {
            "name":"SCN",
            "type":"long"
         },
         {
            "name":"SEG_OWNER",
            "type":"string"
         },
         {
            "name":"TABLE_NAME",
            "type":"string"
         },
         {
            "name":"TIMESTAMP",
            "type":{
               "type":"long",
               "connect.version":1,
               "connect.name":"org.apache.kafka.connect.data.Timestamp",
               "logicalType":"timestamp-millis"
            }
         },
         {
            "name":"SQL_REDO",
            "type":"string"
         },
         {
            "name":"OPERATION",
            "type":"string"
         },
         {
            "name":"data",
            "type":[
               "null",
               {
                  "type":"record",
                  "name":"value",
                  "namespace":"",
                  "fields":[
                     {
                        "name":"ID",
                        "type":[
                           "null",
                           "double"
                        ],
                        "default":null
                     },
                     {
                        "name":"NAME",
                        "type":[
                           "null",
                           "string"
                        ],
                        "default":null
                     }
                  ],
                  "connect.name":"value"
               }
            ],
            "default":null
         },
         {
            "name":"before",
            "type":[
               "null",
               "value"
            ],
            "default":null
         }
      ],
      "connect.name":"test.kafkauser.test.row"
   },
   "deleted":false
}

 类似资料:
  • 我试图运行Kafka服务与Zookeper,Kafdrop和模式注册表,我使它的工作安装Kafka,Zookeper和Kadrop在一个容器,然后安装合流模式注册表在另一个(与自己的Kafka,Zoowatch,Ksql服务器和Rest代理),然而尝试要使kafdrop读取模式注册表不工作,所以现在我想只安装一个容器与Kafka,Zoowatch,Kafdrop和模式注册表,即使一切都成功安装,模

  • 我有一个Kafka主题,我想用AVRO数据(目前是JSON)来填充它。我知道“正确”的方法是使用模式注册表,但出于测试目的,我想让它在没有它的情况下工作。 因此,我将AVRO数据作为数组[字节]发送,而不是常规的Json对象: 模式是在每个数据中启动的;我如何使它与kafka-connect一起工作?kafka-connect配置目前表现出以下属性(数据作为json.gz文件写入s3),我想编写P

  • 问题内容: 按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实,参考或专业知识的支持,但是这个问题可能会引起辩论,争论,民意调查或扩展讨论。如果您认为此问题可以解决并且可以重新提出,请访问帮助中心以获取指导。 8年前关闭。 跟踪和/或自动执行数据库架构更改的最佳方法是什么?我们的团队使用Subversion进行版本控制,我们已经能够以这种方式自动执行一些任务(将构建推送到暂存服务器

  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 问题内容: 我的同事在工作中提出了一个我无法回答的问题(由于缺乏经验),该问题与跟踪表中相关字段的变化有关。 想象一下,我们有3个表,每个表20个字段。在此示例中,我们考虑这些表中的每一个都有2个字段,一个名为LastUpdatedOn,另一个名为LastUpdatedBy。 如果我们只想跟踪这3个表中的更改,而只跟踪几个特定字段,而没有为每个表创建包含更新前最新版本的历史表,那么我们如何跟踪这些

  • 问题内容: 我似乎无法创建触发器。我已经尝试了以下两种方式进行更新。我通过插入语句不断收到语法错误。我搜索了过去4个小时的论坛和网络搜索,没有任何变化。对此还有更多代码,它基本上会重复执行。任何帮助,将不胜感激。谢谢你。我正在运行MySQL 5.0,并通过phpMyAdmin 2.8.2.4作为Administrator / Root访问。 TRY 1(在所有字段和名称上带有和不带有qoutes)