我们需要从Kafka主题导出生产数据以用于测试目的:数据用Avro编写,模式放在模式注册表中。
我们尝试了以下策略:
kafka-console-consumer
和stringdeserializer
或binarydeserializer
。我们无法获得可以用Java解析的文件:解析时总是出现异常,这表明文件格式错误。kafka-avro-console-consumer
:它生成一个还包括一些字节的json,例如在反序列化BigDecimal时。我们甚至不知道要选择哪个解析选项(不是avro,也不是json)难道没有一种简单、容易的方法可以将包含avro数据的Kafka主题的值(而不是模式)的内容转储到文件中,以便对其进行解析吗?我希望使用kafka-console-consumer和正确的选项,再加上使用Avro的正确Java Api,这是可以实现的。
例如,使用kafka-console-consumer...我们无法获得一个可以用Java解析的文件:在解析它时,我们总是会遇到异常,这表明该文件的格式是错误的。
您不会使用普通的控制台消费者。您可以使用kafka-avro-console-consumer
,它将二进制avro数据反序列化为json,以便在控制台上读取。您可以将>topic.txt
重定向到控制台以读取它。
如果您确实使用了控制台使用者,则不能立即解析Avro,因为您仍然需要从数据中提取模式ID(在第一个“神奇字节”之后的4个字节),然后使用模式注册中心客户端检索模式,只有这样您才能反序列化消息。当控制台使用者写入文件时,用于读取该文件的任何Avro库都希望在文件头放置一个完整的模式,而不仅仅是在每行都指向注册表中任何内容的ID。(基本的Avro库对注册表也一无所知)
为什么不直接用Java编写一个Kafka consumer呢?参见架构注册表文档
打包并将代码放在某个生产服务器中
不完全确定为什么这是一个问题。如果您可以在生产网络中使用SSH代理或VPN,那么您不需要在那里部署任何东西。
如何导出此数据
由于您使用的是模式注册表,我建议您使用其中一个Kafka Connect库
包括Hadoop、S3、Elasticsearch和JDBC。我想还有一个FileSink连接器
嗨,我正在阅读kafka主题,我想处理从kafka接收到的数据,例如tockenize,过滤掉不必要的数据,删除停用词,最后我想写回另一个kafka主题 然后我得到以下错误 线程"main"中的异常org.apache.spark.sql.Analysis Exception:具有流源的查询必须使用WriteStream.start()执行; 然后,我对代码进行了如下编辑,以从Kafka中读取并写
下面的json数据示例 下面的错误消息 线程“main”org.apache.spark.sql.analysisException中出现异常:未能找到数据源:Kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdatasource(datasourc
我想读取事务的元数据(在Kafka0.11.0.1中支持),这样我就可以确定特定事务ID的事务是否已经提交。目前,我正在从_transactional_state主题获取键和值,但它是某种编码格式。以下是我在轮询__transaction_state主题时收到的一些相同的键/值:键=10000000MMM,值=+')
Debezium连接器的Kafka connect事件是Avro编码的。 在传递给Kafka connect standalone服务的connect-standalone.properties中提到了以下内容。 使用这些属性配置Kafka使用者代码: 在消费者实现中,下面是读取键和值组件的代码。我使用REST从模式注册表中获取键和值的模式。 解析密钥工作正常。在解析消息的值部分时,我得到了Arr
我想使用apache nifi将一些通用数据生成到kafka主题中,并且我希望这些数据是avro格式的。我为它所做的: 在架构注册表中创建新架构: {“type”:“record”,“name”:“my_schema”,“namespace”:“my_namespace”,“doc”:“”,“fields”:[{“name”:“key”,“type”:“int”},{“name”:“value”,
我正在使用Apache Beam的kafkaIO阅读一个主题,该主题在Confluent schema Registry中有一个avro模式。我可以反序列化消息并写入文件。但最终我想写给BigQuery。我的管道无法推断架构。我如何提取/推断模式并将其附加到管道中的数据,以便我的下游进程(写入BigQuery)能够推断模式? 下面是我使用模式注册表url设置反序列化器的代码,以及我从Kafka读到