我已经在本地安装了kafka(目前没有集群/模式注册),并尝试生成一个Avro主题,下面是与该主题相关的模式。
{
"type" : "record",
"name" : "Customer",
"namespace" : "com.example.Customer",
"doc" : "Class: Customer",
"fields" : [ {
"name" : "name",
"type" : "string",
"doc" : "Variable: Customer Name"
}, {
"name" : "salary",
"type" : "double",
"doc" : "Variable: Customer Salary"
} ]
}
我想创建一个简单的SparkProducerApi
来根据上述模式创建一些数据并将其发布到kafka。考虑创建转换为dataframe
的示例数据,然后将其更改为avro
然后发布。
val df = spark.createDataFrame(<<data>>)
然后,如下所示:
df.write
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic","customer_avro_topic")
.save()
}
现在可以< code >手动将模式附加到此avro主题。
这可以通过使用Apache Spark API
而不是使用Java/Kafka API
来实现吗?这是用于批处理,而不是流式处理
。
我认为这是不可能的,因为Spark中的Kafka生产者需要两列key和value,它们都必须是字节数组。
如果您从磁盘读取现有的Avro文件,那么您使用的Avro数据帧读取器可能会为名称和工资创建两列。因此,您需要一个操作来从包含整个Avro记录的其他列中构造一个<code>值
如果您想生成数据并且没有文件,那么您需要为Kafka消息键和作为字节数组的值构建一个Tuple2对象列表,然后您可以将它们并行化到RDD,然后将它们转换为Dataframe。但是在这一点上,只使用常规的Kafka Producer API要简单得多。
另外,如果您已经知道您的模式,请尝试在Kafka中生成测试数据的方法中提到的项目
我试图使用ConsumerSeeKaware,阅读kafka主题中可用的最后一条消息。消息类型是Avro对象列表。我能成功地做到这一点。但在反序列化过程中会失败。该消息使用spring-cloud-stream-kafka框架生成。消息具有contentType。 我知道avro消息可以像下面这样反序列化。 但不管用。可能是因为两件事。 > 消息是avro对象的列表。但我正在尝试使用Avro模式创
给定一个包含以下格式数据的大文件(V1,V2,…,VN) 我正在尝试使用Spark获得一个类似于下面的配对列表 我尝试了针对一个较旧的问题所提到的建议,但我遇到了一些问题。例如, 我得到了错误, 有人能告诉我哪些地方我可能做得不对,或者有什么更好的方法可以达到同样的效果?非常感谢。
从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。 如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。 我查看了示例,但没有找到符合我要求的任何内容。有没有一种
我正在使用confluent JDBC连接器连接到postgres数据库,以检索更改并将其放在Kafka主题中。现在,我想使用spring boot消费者来使用这些消息。这些消息采用AVRO格式。我从连接器中获得了模式,并使用avro-maven插件为其生成了一个POJO类。 但是当侦听器启动时,只有以下错误 当我不使用avro对数据进行反序列化时,我会收到数据但不可读。 在pom中。xml我有以
这里是源连接器状态的输出: 这里是接收器连接器配置的输出: 这里是接收器连接器状态的输出:
有没有办法从Apache spark生成无模式的avro?我可以看到一种使用apache avro库通过Java/Scala和融合avro生成它的方法。当我用下面的方式从Spark编写Avro时,它用模式创建了Avro。我想在没有模式的情况下创建,以减少最终数据集的大小。