我正在尝试使用HDFS kafka连接器将protobuf消息从kafka发送到HDFS。我的连接器配置如下所示
{
"name": "hdfs3-connector-test",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"tasks.max": "1",
"topics": "test-topic",
"hdfs.url": "hdfs://10.8.0.1:9000",
"flush.size": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://10.8.0.1:8081",
"confluent.topic.bootstrap.servers": "10.8.0.1:9092",
"confluent.topic.replication.factor": "1"
}
}
为了测试这一点,我尝试在一个小节点应用程序中发送protobuf序列化消息。这是我的文件:
// data.proto
syntax = "proto3";
package awesomepackage;
message SearchRequest {
string query = 1;
int32 page = 2;
}
和我的节点应用程序
const { Kafka } = require('kafkajs')
const protobuf = require('protobufjs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['10.8.0.1:9092']
})
const producer = kafka.producer()
const run = async () => {
await producer.connect()
protobuf.load('data.proto', async (err, root) => {
console.log("TESTING")
console.log(err)
let SearchRequest = root.lookupType('awesomepackage.SearchRequest')
let payload = {query: "test", page: 2}
var errMsg = SearchRequest.verify(payload);
console.log(errMsg)
let msg = SearchRequest.create(payload)
var buffer = SearchRequest.encode(msg).finish();
console.log(buffer)
await producer.send({
topic: 'test-topic',
messages: [
{key: 'key1', value: buffer}
]
})
})
}
run()
但是,当我运行此程序时,我会出现以下错误:
Failed to deserialize data for topic test-topic to Protobuf
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我该如何解决这个问题?我的猜测是,我的protobuf模式没有在Kafka模式注册表中注册,但我不确定。如果是这种情况,是否有方法将模式从节点发送到注册表?
io.confluent.connect.protobuf。ProtobufConverter需要模式注册表,而不是纯序列化Protobuf。换句话说,节点代码中缺少模式注册表部分(或手动创建“包装”的原型消息的字节)
参考线材格式-合流
如果你不想使用Schema注册表,你可以使用BlueApron原型转换器,但看起来你正在使用一个,所以最好使用合流转换器
我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送
我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)
我正在使用spring-cloud-stream:1.3.0.发行版、spring-cloud-stream-binder-kafka:1.3.0.发行版开发spring引导应用程序。我使用spring integration dsl拆分文件中的行,使用beanio将行转换为json,要求将成功的json消息写入一个kafka主题,并将错误消息写入不同的kafka主题。下面是application
我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。
我正在使用spring cloud stream阅读来自Kafka主题的消息。正在从队列中读取并处理消息,如果消息在处理时失败,则该消息应进入配置的错误队列,但会出现以下错误。 从消息中提取标题时出现异常,解决此问题的最佳方法是什么? kafka版本为1.0,kafka客户端为2.11-1.0
我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如