我们正在努力将Apache Storm与Kafka的Confluent框架集成在一起。我们正在使用名为“Pyleus”的storm python包装器
我们设置了一个监控数据库表的ConFluent-Kafka JDBC连接器,每当DB发生变化时,新记录将作为Avro格式的Kafka消息发送。
在Pyleus bolt中,我们能够获取Kafka消息,但是,我们无法将其反序列化为JSON。
我们正在使用两个名为“Avro\u json\u serializer”和“Avro”的python Avro模块。当我试图反序列化我放在一起的简单Avro文件时,它们就起作用了。
Kafka消息中Avro数据的Avro模式是使用HTTP GET从Confluent的模式注册表中获取的。我将Kafka消息中的模式和Avro数据放入两个文件中,下面是我的测试程序:
import avro
import avro_json_serializer as ajs
import json
# Avro schema from Confluent's schema registry using HTTP GET
schema_string = open("realAvroSchemaFromKK.avsc").read()
schema_dict = json.loads(schema_string)
avro_schema = avro.schema.make_avsc_object(schema_dict, avro.schema.Names())
serializer = ajs.AvroJsonSerializer(avro_schema)
# Avro data with in Kafka message - I wrote it into this file
avrofile = open("realAvroFromKK.avro", "r")
avro = avrofile.read()
jsonData = serializer.to_json(avro) # where the code error out #
print jsonData
我解释错误消息的方式是我的avro模式不适合我的avro数据:
avro.io.AvroTypeException: The datum �bankbankHoward �����THoward �����T� is not an example of the schema {
"namespace": "example.avro",
"type": "record",
"connect.name": "TABLE_NAME",
"fields": [
{
"type": "int",
"name": "Column_1"
},
... (omitting the rest of the schema)
我从这里读到ConFluent框架中Avro格式的Kafka消息在消息开头有4个额外的字节,指示模式ID。我试图去掉Avro数据的前4个字节,然后将其发送到“serializer.to_json()”,但仍然没有运气。
救命!
我在通过Storm Kafka spout读取kafka汇合数据时遇到了完全类似的问题。这是为我工作的等效Java代码。
ByteBuffer input = ByteBuffer.wrap(data);
int id = input.getInt();
int start = input.position() + 1;
MyAvroObject obj = null;
try {
obj = datum_reader.read(null, DecoderFactory.get().binaryDecoder(input.array(), start, input.limit(), null));
} catch (IOException e) {
e.printStackTrace();
}
return obj;
ByteBuffer上的getInt()和position方法将指针移动到架构Id之后。希望这有帮助。
正在尝试读取avro文件。 无法将运行到Avro架构的数据转换为Spark SQL StructType:[“null”,“string”] 尝试手动创建架构,但现在遇到以下情况: 通用域名格式。databricks。火花阿夫罗。SchemaConverters$CompatibleSchemaException:无法将Avro架构转换为catalyst类型,因为路径处的架构不兼容(avroTyp
在Spring-Cloud-Stream中是否有支持或计划支持avro和/或来自汇流平台的模式注册表?我发现spring-integration-kafka 1.3.0版中存在对avro的依赖,而spring-cloud-stream-binder-kafka的主分支(2.0)和spring-kafka没有任何avro依赖。
我有一个avro格式的数据流(json编码),需要存储为镶木地板文件。我只能这样做, 把df写成拼花地板。 这里的模式是从json中推断出来的。但是我已经有了avsc文件,我不希望spark从json中推断出模式。 以上述方式,parquet文件将模式信息存储为StructType,而不是avro.record.type。是否也有存储avro模式信息的方法。 火花 - 1.4.1
使用此Kafka Connect连接器: https://www.confluent.io/hub/confluentinc/kafka-connect-s3 我手动将其安装到我的kafka Connect Docker映像的插件中。我的目的是使用Kafka Connect将来自Kafka主题的Avro记录写入S3。 在运行时,使用Kafka Connect,我会得到以下错误: 在ConFluen
现在Spark 2.4已经内置了对Avro格式的支持,我正在考虑将数据湖中某些数据集的格式从Parquet更改为Avro,这些数据集通常是针对整行而不是特定列聚合进行查询/联接的。 然而,数据之上的大部分工作都是通过Spark完成的,据我所知,Spark的内存缓存和计算是在列格式的数据上完成的。在这方面,Parquet是否提供了性能提升,而Avro是否会招致某种数据“转换”损失?在这方面,我还需要
在将Java对象转换为JSON字符串时,我遇到了JsonMappingException。下面是完整的异常消息。 下面是我用来将Java对象转换为JSON字符串的java代码。我尝试启用ACCEPT_SINGLE_VALUE_AS_ARRAY,ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT但仍然面临问题。不确定是什么导致了JsonMappingException:不是数组问题。