当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect,JdbcSinkConnector - Getting "检索id为1的Avro架构时出错,找不到主题。;错误代码:40401 "

汪修诚
2023-03-14

我创建了一个NiFi流,该流最终将json记录发布为具有Avro编码值和字符串键的记录,使用了值模式的融合注册表中的模式。这是NiFi中AvroRecordSetWriter的配置。

我现在正在尝试使用Kafka Connect(Connect-独立)使用JdbcSinkConnector将消息移动到PostgreSQL数据库,但收到以下错误:检索id 1的Avro架构时出错

我已经确认我的Confluent注册表中有一个架构,ID为1。以下是我对Connect任务的配置

工人配置:

bootstrap.servers=localhost:29092
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/tmp/connect.offsets
rest.host.name=localhost
rest.port=8083
plugin.path=share/java

连接器配置:

name=pg-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds
connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=xxxxxxxx
insert.mode=upsert
table.name.format=test_data
auto.create=true

我在NiFi中创建了一个正确使用消息的流,并且通过指定<code>--property schema.registry.url,我还成功地使用kafka avro console consumer使用了消息(在输出中格式化为JSON)=http://schema-registry:8081。请注意,我在Docker容器中运行消费者,这就是为什么url不是localhost。

我不确定我错过了什么。我唯一的想法是我对键转换器使用了错误的类,但是这对于给定的错误没有意义。有人能看出我做错了什么吗?

共有1个答案

戚均
2023-03-14

我对Nifi了解不多,但我看到模式的名称是“rds”,并且在错误日志中说它没有在模式注册表中找到主题。

Kafka使用KafkaAvroSerializer序列化avro记录,同时在模式注册表中注册相关的avro模式。它使用KafkaAvroDeserializer反序列化avro记录并从模式注册表中检索相关的模式。

Schema registry将模式存储到称为“主题”的类别中,为记录命名主题的默认行为是:对于值记录为< code>topic_name-value,对于键为< code>topic_name-key。

在你的情况下,你没有向Kafka注册架构,而是向Nifi注册了架构,所以我的猜测是名称“rds”出现在架构注册表中,或者是架构注册表上的使用者名称。

您如何验证您的架构是否被严格存储?

通常情况下,正确的主题是<code>rds-value</code>,因为您只对值记录使用模式注册表。

 类似资料:
  • 汇合版本4.1.0 我使用KTable从几个主题(topic_1,topic _2)中获取数据,连接数据,然后使用KStream将数据推送到另一个主题(totic_out)。(Ktable.toStream()) 数据采用avro格式 当我使用 我发现 但是没有主题与topic_out键。为什么不创建它? topic_out的输出: 我可以看到密钥正在生成,但没有密钥的主题。 为什么需要带密钥的主

  • 我有Kafka的以下配置 我试图通过版本得到主题,我得到如下 我使用插件<;代码>;avro-maven插件>;生成>;CreateBankAccount>;代码< 然后,我用一个字符串键和一个avro序列化的有效负载向主题推送一条消息,但是我有一个错误 轨道: ProducerConfig值: KafkaAvroSerializerConfig值:

  • 首先感谢@OneCricketeer到目前为止的支持,我到现在已经尝试了这么多配置,不知道还有什么可以尝试的。 使用 confluent 访问外部流。 连接正在工作,我可以看到加载了偏移量: INFO[my_mysql_sink|task-0][消费者clientId=连接器-消费者-my_mysql_sink-0, groupId=连接器-my_mysql_sink]设置分区的偏移量gamerb

  • 我遵循了本教程: 1.节点安装-node-v8.9.0-x86 2. ran命令 谢谢你,

  • 大家好,我有一个模拟ATM机的程序。它使用我创建的account类,在用户输入0到999999之间的id后,为用户生成一个帐户。然后,他们可以执行各种任务,如查看余额、取款、存款等。不过,我在检查程序时遇到了一个问题。它编译时没有错误,并且第一次通过循环时,它工作得非常完美。但是,如果他们点击退出并输入另一个无效id,它会显示两次无效输入消息。我复制了下面发生的事情的控制台。有人能给我解释一下为什

  • 问题内容: 我有一个由Eclipse生成的.jar,我无法在其他计算机(与Windows XP)上运行。出现“找不到主类。程序将退出”消息。那台计算机可以在Netbeans生成的另一个.jar上正常运行,所以我想JRE并不是问题。我更新了JRE,但没有改变。问题是什么? 更新:我忘了提,我做了一个可运行的jar文件。在另外两台计算机上,它可以正常工作(win 7和XP),但在特定计算机上则不能。