当前位置: 首页 > 知识库问答 >
问题:

节点:向Kafka错误发送原型消息

束新
2023-03-14

我正在尝试使用HDFS kafka连接器将protobuf消息从kafka发送到HDFS。我的连接器配置如下所示

{
    "name": "hdfs3-connector-test",
    "config": {
        "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
        "tasks.max": "1",
        "topics": "test-topic",
        "hdfs.url": "hdfs://10.8.0.1:9000",
        "flush.size": "3",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://10.8.0.1:8081",
        "confluent.topic.bootstrap.servers": "10.8.0.1:9092",
        "confluent.topic.replication.factor": "1"
    }
}

为了测试这一点,我尝试在一个小节点应用程序中发送protobuf序列化消息。这是我的文件:

// data.proto

syntax = "proto3";
package awesomepackage;

message SearchRequest {
  string query = 1;
  int32 page = 2;
}

和我的节点应用程序

const { Kafka } = require('kafkajs')
const protobuf = require('protobufjs')

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['10.8.0.1:9092']
})

const producer = kafka.producer()


const run = async () => {
    await producer.connect()

    protobuf.load('data.proto', async (err, root) => {
        console.log("TESTING")
        console.log(err)
    
        let SearchRequest = root.lookupType('awesomepackage.SearchRequest')
        let payload = {query: "test", page: 2} 
    
        var errMsg = SearchRequest.verify(payload);
        console.log(errMsg)
    
        let msg = SearchRequest.create(payload)
        var buffer = SearchRequest.encode(msg).finish();
        console.log(buffer)
        await producer.send({
            topic: 'test-topic',
            messages: [
                {key: 'key1', value: buffer}
            ]
        })
    })
    
}

run()

但是,当我运行此程序时,我会出现以下错误:

Failed to deserialize data for topic test-topic to Protobuf

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我该如何解决这个问题?我的猜测是,我的protobuf模式没有在Kafka模式注册表中注册,但我不确定。如果是这种情况,是否有方法将模式从节点发送到注册表?

共有1个答案

洪国兴
2023-03-14

io.confluent.connect.protobuf。ProtobufConverter需要模式注册表,而不是纯序列化Protobuf。换句话说,节点代码中缺少模式注册表部分(或手动创建“包装”的原型消息的字节)

参考线材格式-合流

如果你不想使用Schema注册表,你可以使用BlueApron原型转换器,但看起来你正在使用一个,所以最好使用合流转换器

 类似资料:
  • 我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送

  • 我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)

  • 我正在使用spring-cloud-stream:1.3.0.发行版、spring-cloud-stream-binder-kafka:1.3.0.发行版开发spring引导应用程序。我使用spring integration dsl拆分文件中的行,使用beanio将行转换为json,要求将成功的json消息写入一个kafka主题,并将错误消息写入不同的kafka主题。下面是application

  • 我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。

  • 我正在使用spring cloud stream阅读来自Kafka主题的消息。正在从队列中读取并处理消息,如果消息在处理时失败,则该消息应进入配置的错误队列,但会出现以下错误。 从消息中提取标题时出现异常,解决此问题的最佳方法是什么? kafka版本为1.0,kafka客户端为2.11-1.0

  • 我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如