当前位置: 首页 > 知识库问答 >
问题:

Kafka连接|未能反序列化主题数据|检索id的Avro键/值模式版本时出错|未找到主题错误代码:40401

闻华容
2023-03-14

首先感谢@OneCricketeer到目前为止的支持,我到现在已经尝试了这么多配置,不知道还有什么可以尝试的。

使用 confluent connect-independent worker.properties sink.properties 访问外部流。

连接正在工作,我可以看到加载了偏移量:

INFO[my_mysql_sink|task-0][消费者clientId=连接器-消费者-my_mysql_sink-0, groupId=连接器-my_mysql_sink]设置分区的偏移量gamerboot.gamer.master.workouts.clubs.spieleranalyse-1到提交的偏移量FetchPotion{偏移量=2225, offsetEpoch=Optional.empty,当前领导者=领导者和纪元{领导者=可选的[kafka8.pro.someurl.net:9093(id: 8机架:空)],纪元=0}}(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844)

但之后,当新消息进来时,我收到一个错误:

错误[my_mysql_sink|task-0]WorkerSinkTask{id=my_mysql_sink-0}在主题'gamerboot.gamer.master.workouts.clubs.spieleranalyse'分区1的偏移量2225和时间戳1641459346507处转换消息键时出错:无法将主题gamerboot.gamer.master.workouts.clubs.spieleranalyse的数据反序列化为Avro:
原因:org.apache.kafka.common.errors.SerializationException:检索id 422的Avro键架构版本时出错

由以下原因引起:io.confluent.kafka.schemaregistry.client.rest.exceptions。RestClientException:未找到主题。;错误代码:40401

我不明白。

worker.properties:

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

sink .属性

#key.converter.enhanced.avro.schema.support=true
#key.converter=org.apache.kafka.connect.storage.StringConverter

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=https://schema-reg.pro.someurl.net

#value.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=https://schema-reg.pro.someurl.net

#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

#pk.mode=record_key
#pk.fields=

由于mysql中没有pk集,所以我想记录流中的所有内容。

正如它所说的“检索id 422的Avro密钥架构版本时出错”,我可以看到以下内容:

屏幕截图_subject_id

不要怀疑它说的是json,这只是我的ChromePlugin把它解释为JSON。价值也是如此。我还尝试了sink.properties中的每一种组合,这些组合都有注释。我还能够卷曲键和值的最新模式(比如):

curl-s https://schema-reg . pro . some URL . net/subjects/gamer boot . gamer . master . club-com . ad . gamer boot . Kafka . models . workoutkey/versions/latest | jq

{
  "type": "record",
  "name": "ClubWorkoutKey",
  "namespace": "com.ad.gamerboot.kafka.models.workouts",
  "fields": [
    {
      "name": "playerId",
      "type": "string"
    },
    {
      "name": "tagId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

当我为键输入字符串转换器时,它更进一步了。转换器和值。水槽中的转换器。但在我看来,这肯定是错的,因为Avro是在这里传递的。有了String,还有其他问题,我必须设置一个pk并打开delete等。

谢谢支持。

*编辑:

所以,给我的是:

topic = gamerboot.gamer.master.workouts.clubs.spieleranalyse

schema.url = https://schema-reg.pro.someurl.net

以及:架构id url:

 https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.workouts-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/schema

并且:

https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest

对我来说,这就像一个谜题,我20天前就从Kafka开始了。从那里,我尝试了周围的URL,找到了我发布的主题:

对于Key:< code > https://schema-reg . pro . some URL . net/subjects/gameboot . gamer . master . club-com . ad . gameboot . Kafka . models . workoutkey/versions/latest/

Schema: {“subject”:“gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey”,“version”:1,“id”:422,“schema”:“{\”type\“:\”record\“,\”name\“:\”ClubWorkoutKey\“,\”namespace\“:\”com.ad.gamerboot.kafka.models.workouts\“,\”fields\“:[{\”name\“:\”playerId\“,\”type\“:\”string\“},{\”name\“:\”tagId\“,\”type\“:[\”null\“,\”string\“],\”default\“:null}]}”}

对于值:https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/

https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue/versions/latest/

架构:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue","version": 1,"id": 423,"Schema":"{\"type\":\"记录\",\"name\":\"Club bWorkoutKickValue\",\"namesspace\":\"com.ad.gamerboot.kafka.models.workouts\",\"字段\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"time戳\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\": null},{\"name\":\"balSpeed\",\"type\":[\"null\",\"int\"],\"default\": null},{\"name\":\"BallSpeedFloat\null\",\"浮点\"],\"默认\": null},{\"name\":\"ball Speed Zone\",\"type\":{\"type\":\"e num\",\"name\":\"Ball Speed Zone\",\"符号\":[\"COLD\",\"MEDIUM\",\"HOT\",\"FIRE\",\"INVALID\"]}},{\"name\":\"信心\",\"type\":[\"null\",\"in t\"],\"default\": null},{\"name\":\"摄取时间\",\"type\":[\"null\",{\"type\":\"long\",\"逻辑类型\":\"时间戳-磨机\"}],\"默认\": null}]}"}

和:<代码>{“主题”:“gamerboot.gamer.master.club.com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue”,“版本”:1,“id”:424,“模式”:“{\f5类型\”:“记录\”、{f4名称\”:\“ClubWorkOutplayerMotion值\”、\“名称空间\“:“com.d.gamerboot.kafka.models.workOutlines \”、“字段\”:[{"“名称\“、\”playerId\“、\”字符串\”。},{\'name\':\'timestamp\,\'type\':{\'type\,'long\,'logicalType\:\'时间戳为毫秒\“}、{\“名称\”:“绝对距离\”、“类型\”:“浮点\”}、{\”名称\“:\“平均速度\”、“类型\”:\“浮点\“}、{名称\\”:“峰值速度\“、{类型\“:“浮点\”、{名称\:“标记ID\”、}“类型\”:[“空\\”、{字符串\]、{默认\“::空、{安装名称\”。ATIONID\、\“类型\”:[\“null\”、\“string\”]、\“default\”:null}、{\“name\”:\“averageSpeedZone\,\“类型\”:[\“null\,{\“类型\”:“enum\”、\“name\:\”AverageSpeedZone\“,\“symbols\”:[\“SPRINT\”,\“HIGH_SPEED_RUN\”,“RUN\”和“JOG\”、“WALK\”、\“STAND\”、“\“INVALID\”]}]、\“default\”:null、\“别名\”:[\“SpeedZones\”]、{\“name\”:“peakSpeedZone\”、“、\“type\”、[“null\”、{“type\:“enum\”、\“name\”Peakspeed\”、:“PeadZone\”。“,”符号\“:[\”短跑\“,”高速跑步\“,\”跑步\“、”慢跑\“、\”行走\“、、\”站立\“、\”无效\“]}]、\”默认\“:”空值\“,”{\”名称\“:\”摄取时间\“,\”类型\”:[\'null\'、{\'type\':\'long\'、\'logicalType\':\'timestamp millis\'、\'default\':null}]}

MySQL表:

+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| Field            | Type                                                                 | Null | Key | Default | Extra |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| playerid         | varchar(100)                                                         | YES  |     | NULL    |       |
| timestamp        | mediumtext                                                           | YES  |     | NULL    |       |
| absoluteDistance | float                                                                | YES  |     | NULL    |       |
| avarageSpeed     | float                                                                | YES  |     | NULL    |       |
| peakSpeed        | float                                                                | YES  |     | NULL    |       |
| tagId            | varchar(50)                                                          | YES  |     | NULL    |       |
| installationId   | varchar(100)                                                         | YES  |     | NULL    |       |
| averageSpeedZone | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES  |     | NULL    |       |
| peakSpeedZone    | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES  |     | NULL    |       |
| ballSpeed        | int(11)                                                              | YES  |     | NULL    |       |
| ballSpeedFloat   | float                                                                | YES  |     | NULL    |       |
| ballSpeedZone    | enum('COLD','MEDIUM','HOT','FIRE','INVALID')                         | YES  |     | NULL    |       |
| confidence       | int(11)                                                              | YES  |     | NULL    |       |
| ingestionTime    | mediumtext                                                           | YES  |     | NULL    |       |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+

MySQL中预期的数据:

+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| playerid                             | timestamp     | absoluteDistance | avarageSpeed | peakSpeed | tagId          | installationId | averageSpeedZone | peakSpeedZone | ballSpeed | ballSpeedFloat | ballSpeedZone | confidence | ingestionTime |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641495873505 |          5.76953 |       1.1543 |   1.22363 | 0104FLHBN009XD | null           | WALK             | WALK          |      NULL |           NULL | NULL          |       NULL | 1641496586458 |
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641484677624 |             NULL |         NULL |      NULL | 0104FLHBN009XD | NULL           | NULL             | NULL          |        37 |        37.0897 | COLD          |         77 | 1641484896747 |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+

来自avro控制台的数据看起来像数据库条目:

{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641484677624,"tagId":{"string":"0104FLHBN009XD"},"ballSpeed":{"int":37},"ballSpeedFloat":{"float":37.08966},"ballSpeedZone":"COLD","confidence":{"int":77},"ingestionTime":{"long":1641484896747}}

{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641495873505,"absoluteDistance":5.7695312,"averageSpeed":1.1542969,"peakSpeed":1.2236328,"tagId":{"string":"0104FLHBN009XD"},"installationId":null,"averageSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.AverageSpeedZone":"WALK"},"peakSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.PeakSpeedZone":"WALK"},"ingestionTime":{"long":1641496586458}}

这是一个全新的实际融合安装。我几个小时前刚刚更新了Avro到:kafka-Connect-avro-转换器:7.0.1

共有1个答案

卫胜
2023-03-14

有关RecordNameStrategy的架构已被公司更改。现在一切正常。

谢谢

 类似资料:
  • 汇合版本4.1.0 我使用KTable从几个主题(topic_1,topic _2)中获取数据,连接数据,然后使用KStream将数据推送到另一个主题(totic_out)。(Ktable.toStream()) 数据采用avro格式 当我使用 我发现 但是没有主题与topic_out键。为什么不创建它? topic_out的输出: 我可以看到密钥正在生成,但没有密钥的主题。 为什么需要带密钥的主

  • 我创建了一个NiFi流,该流最终将json记录发布为具有Avro编码值和字符串键的记录,使用了值模式的融合注册表中的模式。这是NiFi中AvroRecordSetWriter的配置。 我现在正在尝试使用Kafka Connect()使用JdbcSinkConnector将消息移动到PostgreSQL数据库,但收到以下错误:检索id 1的Avro架构时出错 我已经确认我的Confluent注册表中

  • 我对Kafka很陌生。我正在尝试发送一个消息到Kafka主题,其中包含头和有效载荷。 以下是错误: @PostMapping(value=“/publish”)public void sendMessageToKafkaTopic(@RequestBody CabLocationPayload CabLocationPayload){ Header和Payload具有JSON的映射字段。 在Pro

  • 我有一台装有Java 1.6的服务器。在这里,我需要使用Confluent的< code > KafkaAvroDeserializer 来反序列化avro消息。 问题是: 如果我使用Confluent-1.0(它与Java兼容 如果我使用Confluent-2.0或更高版本,它拥有一切,但它只与java兼容 在这种情况下我该怎么办? 为了比较: http://docs.confluent.io/

  • 我最近在一个streams应用程序中遇到了一个以前没有遇到过的问题,它很难跟踪与键控/连接相关的问题(以及更新后的分区问题)。 我有两个主题(raw_events和processed_users),这两个主题的密钥相同,但是当我试图对这两个主题执行连接时,尽管密钥相同,但只有一些连接是成功的。 为简洁起见,应用程序的基本工作流程如下: null 问题本身是在步骤5中产生的。由于主题和主题之间的连接

  • 我有Kafka的以下配置 我试图通过版本得到主题,我得到如下 我使用插件<;代码>;avro-maven插件>;生成>;CreateBankAccount>;代码< 然后,我用一个字符串键和一个avro序列化的有效负载向主题推送一条消息,但是我有一个错误 轨道: ProducerConfig值: KafkaAvroSerializerConfig值: