我有Flume Avro水槽和SparkStreams程序来读取水槽。CDH 5.1、Flume 1.5.0、Spark 1.0,使用Scala作为Spark上的程序lang
我能够制作Spark示例并计算Flume Avro事件。
但是我无法将 Flume Avro 事件反序列化为字符串\文本,然后解析结构行。
有人能举例说明如何使用Scala做到这一点吗?
请尝试以下代码:
stream.map(e => "Event:header:" + e.event.get(0).toString
+ "body: " + new String(e.event.getBody.array)).print
您可以实现自定义解码器以进行反序列化。随之提供预期的类型信息。
您可以使用以下代码反序列化水槽事件:
val eventBody = stream.map(e => new String(e.event.getBody.array))
下面是一个spark流应用程序的示例,该应用程序使用flume twitter源和avro接收器来分析twitter中的流行标签,以将事件推送到spark:
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
object PopularHashTags {
val conf = new SparkConf().setMaster("local[4]").setAppName("PopularHashTags").set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)
def main(args: Array[String]) {
sc.setLogLevel("WARN")
System.setProperty("twitter4j.oauth.consumerKey", <consumerKey>)
System.setProperty("twitter4j.oauth.consumerSecret", <consumerSecret>)
System.setProperty("twitter4j.oauth.accessToken", <accessToken>)
System.setProperty("twitter4j.oauth.accessTokenSecret", <accessTokenSecret>)
val ssc = new StreamingContext(sc, Seconds(5))
val filter = args.takeRight(args.length)
val stream = FlumeUtils.createStream(ssc, <hostname>, <port>)
val tweets = stream.map(e => new String(e.event.getBody.array))
val hashTags = tweets.flatMap(status => status.split(" ").filter(_.startsWith("#")))
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
// Print popular hashtags
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
stream.count().map(cnt => "Received " + cnt + " flume events.").print()
ssc.start()
ssc.awaitTermination()
}
}
我是Scala和Apache Flink的初学者,但到目前为止,一切都很顺利。我正在尝试使用Flink应用程序中序列化到AVRO的Kafka事件。我阅读了文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-反序列化模式)和google搜索了很多小时,但我仍然在同一页上。我有一
我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式
我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在
目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。
我很难将JSON反序列化到下面示例中实现基本接口的一些类ChildA、ChildB等。 异常是:com.fasterxml.jackson.databind.JsonMappingExc0019:意外令牌(END_OBJECT),预期FIELD_NAME:缺少包含类型id的属性type(对于类Basic) 预期的JSON如下所示: 没有在所有类型对象中显示的属性,因此它们完全不同。但正如您在rea