当前位置: 首页 > 知识库问答 >
问题:

如何将Flume的Avro事件反序列化到Spark?

姜鸿
2023-03-14

我有Flume Avro水槽和SparkStreams程序来读取水槽。CDH 5.1、Flume 1.5.0、Spark 1.0,使用Scala作为Spark上的程序lang

我能够制作Spark示例并计算Flume Avro事件。

但是我无法将 Flume Avro 事件反序列化为字符串\文本,然后解析结构行。

有人能举例说明如何使用Scala做到这一点吗?

共有3个答案

盖辉
2023-03-14

请尝试以下代码:

stream.map(e => "Event:header:" + e.event.get(0).toString
                + "body: " + new String(e.event.getBody.array)).print
顾恺
2023-03-14

您可以实现自定义解码器以进行反序列化。随之提供预期的类型信息。

呼延弘方
2023-03-14

您可以使用以下代码反序列化水槽事件:

val eventBody = stream.map(e => new String(e.event.getBody.array))

下面是一个spark流应用程序的示例,该应用程序使用flume twitter源和avro接收器来分析twitter中的流行标签,以将事件推送到spark:

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._

object PopularHashTags {

val conf = new SparkConf().setMaster("local[4]").setAppName("PopularHashTags").set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)

def main(args: Array[String]) {

sc.setLogLevel("WARN")

System.setProperty("twitter4j.oauth.consumerKey", <consumerKey>)
System.setProperty("twitter4j.oauth.consumerSecret", <consumerSecret>)
System.setProperty("twitter4j.oauth.accessToken", <accessToken>)
System.setProperty("twitter4j.oauth.accessTokenSecret", <accessTokenSecret>)

val ssc = new StreamingContext(sc, Seconds(5))
val filter = args.takeRight(args.length)
val stream = FlumeUtils.createStream(ssc, <hostname>, <port>)

val tweets = stream.map(e => new String(e.event.getBody.array))

val hashTags = tweets.flatMap(status => status.split(" ").filter(_.startsWith("#")))

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
  .map { case (topic, count) => (count, topic) }
  .transform(_.sortByKey(false))

// Print popular hashtags
topCounts60.foreachRDD(rdd => {
  val topList = rdd.take(10)
  println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
  topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})

stream.count().map(cnt => "Received " + cnt + " flume events.").print()

ssc.start()
ssc.awaitTermination()
    }

}
 类似资料:
  • 我是Scala和Apache Flink的初学者,但到目前为止,一切都很顺利。我正在尝试使用Flink应用程序中序列化到AVRO的Kafka事件。我阅读了文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-反序列化模式)和google搜索了很多小时,但我仍然在同一页上。我有一

  • 我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式

  • 我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在

  • 目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。

  • 我很难将JSON反序列化到下面示例中实现基本接口的一些类ChildA、ChildB等。 异常是:com.fasterxml.jackson.databind.JsonMappingExc0019:意外令牌(END_OBJECT),预期FIELD_NAME:缺少包含类型id的属性type(对于类Basic) 预期的JSON如下所示: 没有在所有类型对象中显示的属性,因此它们完全不同。但正如您在rea