当前位置: 首页 > 知识库问答 >
问题:

使用Apache Spark为Kafka生成Avro主题

孔彭祖
2023-03-14

我已经在本地安装了kafka(目前没有集群/模式注册),并尝试生成一个Avro主题,下面是与该主题相关的模式。

{
  "type" : "record",
  "name" : "Customer",
  "namespace" : "com.example.Customer",
  "doc" : "Class: Customer",
  "fields" : [ {
    "name" : "name",
    "type" : "string",
    "doc" : "Variable: Customer Name"
  }, {
    "name" : "salary",
    "type" : "double",
    "doc" : "Variable: Customer Salary"
  } ]
}

我想创建一个简单的SparkProducerApi来根据上述模式创建一些数据并将其发布到kafka。考虑创建转换为dataframe的示例数据,然后将其更改为avro然后发布。

val df = spark.createDataFrame(<<data>>)

然后,如下所示:

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("topic","customer_avro_topic")
  .save()
}

现在可以< code >手动将模式附加到此avro主题。

这可以通过使用Apache Spark API而不是使用Java/Kafka API来实现吗?这是用于批处理,而不是流式处理

共有1个答案

钱展
2023-03-14

我认为这是不可能的,因为Spark中的Kafka生产者需要两列key和value,它们都必须是字节数组。

如果您从磁盘读取现有的Avro文件,那么您使用的Avro数据帧读取器可能会为名称和工资创建两列。因此,您需要一个操作来从包含整个Avro记录的其他列中构造一个<code>值

如果您想生成数据并且没有文件,那么您需要为Kafka消息键和作为字节数组的值构建一个Tuple2对象列表,然后您可以将它们并行化到RDD,然后将它们转换为Dataframe。但是在这一点上,只使用常规的Kafka Producer API要简单得多。

另外,如果您已经知道您的模式,请尝试在Kafka中生成测试数据的方法中提到的项目

 类似资料:
  • 我试图使用ConsumerSeeKaware,阅读kafka主题中可用的最后一条消息。消息类型是Avro对象列表。我能成功地做到这一点。但在反序列化过程中会失败。该消息使用spring-cloud-stream-kafka框架生成。消息具有contentType。 我知道avro消息可以像下面这样反序列化。 但不管用。可能是因为两件事。 > 消息是avro对象的列表。但我正在尝试使用Avro模式创

  • 给定一个包含以下格式数据的大文件(V1,V2,…,VN) 我正在尝试使用Spark获得一个类似于下面的配对列表 我尝试了针对一个较旧的问题所提到的建议,但我遇到了一些问题。例如, 我得到了错误, 有人能告诉我哪些地方我可能做得不对,或者有什么更好的方法可以达到同样的效果?非常感谢。

  • 从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。 如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。 我查看了示例,但没有找到符合我要求的任何内容。有没有一种

  • 我正在使用confluent JDBC连接器连接到postgres数据库,以检索更改并将其放在Kafka主题中。现在,我想使用spring boot消费者来使用这些消息。这些消息采用AVRO格式。我从连接器中获得了模式,并使用avro-maven插件为其生成了一个POJO类。 但是当侦听器启动时,只有以下错误 当我不使用avro对数据进行反序列化时,我会收到数据但不可读。 在pom中。xml我有以

  • 这里是源连接器状态的输出: 这里是接收器连接器配置的输出: 这里是接收器连接器状态的输出:

  • 有没有办法从Apache spark生成无模式的avro?我可以看到一种使用apache avro库通过Java/Scala和融合avro生成它的方法。当我用下面的方式从Spark编写Avro时,它用模式创建了Avro。我想在没有模式的情况下创建,以减少最终数据集的大小。