当前位置: 首页 > 知识库问答 >
问题:

在scala spark Streaming中使用foreach时不希望string作为类型?

周超英
2023-03-14

代码段:

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)
write2hdfs.foreachRDD(rdd => {

rdd.foreach(avroRecord => {
println(avroRecord)
//val rawByte = avroRecord.getBytes("UTF-8")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//val ssc = new JavaStreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val kafkaParams = Map[String, String]("zookeeper.connect" -> 
zkQuorum,"group.id" -> group,"zookeeper.connection.timeout.ms" -> "10000")                    

//val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val lines = 
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topics,StorageLevel.NONE)

已完成的进口:

import org.apache.spark.streaming._
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord, 
GenericDatumWriter, GenericData}
import org.apache.avro.io.{DecoderFactory, DatumReader, DatumWriter, 
BinaryDecoder}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import java.io.{File, IOException}
//import java.io.*
import org.apache.commons.io.IOUtils;
import _root_.kafka.serializer.{StringDecoder, DefaultDecoder}
import _root_.kafka.message.Message
import scala.reflect._

编译错误:

将1个Scala源代码编译为/home/spark_scala/spark_stream_project/target/scala-2.10/classes...[error]/home/spark_scala/spark_stream_project/src/main/scala/sparkstreaming.scala:34:重载方法值createStream,其备选方案Elevel:org.apache.spark.storage.storageLevel)org.apache.spark.streaming.api.java.javapairReceiverInputdStream[String,kafka.message.message][error](Ssc:org.apache.spark.streaming.streaming.streamingContext,kafkaparams:scala.collection.immutable.map[String,String],topics:scala.collection.immutable.map[String,String],stopics:scala.collection.immutable.map[String,String],stopics:scala.collection,age.message],隐式证据$3:scala.reflect.classtag[kafka.serializer.StringDecoder],隐式证据$4:scala.reflect.classtag[kafka.serializer.DefaultDecoder])org.apache.spark.streaming.dreceiverInputdStream[(String,kafka.Message.Message)][(String,kafka.Message.Message)]不能应用于(org.apache.spark.streamingcontext,scala.collection.immutable.map[String,String,org.apache.spark.streamingcontext,String,org.apache.spark.storagelevel)[error]

这里出了什么问题。另外,我没有看到kafkaUtils API文档中所建议的正确构造函数。API文档引用:https://spark.apache.org/docs/1.3.0/API/java/index.html?org/apache/spark/streaming/kafkautils.html

期待支持。

谢了。

val lines = 
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)
java.lang.ClassCastException: [B cannot be cast to kafka.message.Message

On line :
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)

理想情况下,过滤这个Dstream(字符串、消息)应该也可以工作,对吗?在使用map之前,我需要从消息中提取有效负载吗?

需要输入请。谢谢

共有1个答案

程谭三
2023-03-14

您可以这样做:

import kafka.serializer.{StringDecoder, DefaultDecoder}
import kafka.message.Message

val kafkaParams = Map[String, String](
    "zookeeper.connect" -> zkQuorum, "group.id" -> group,
    "zookeeper.connection.timeout.ms" -> "10000")
val lines = KafkaUtils.createStream[String, Message, StringDecoder, DefaultDecoder](
      ssc, kafkaParams, topics, storageLevel)

这将得到DStream[(String,kafka.message.message)],您应该能够检索原始字节并从那里转换为Avro。

 类似资料:
  • 我正在使用Ansile 2.0.1.0和Docker。 我想连接到容器以在Ansbile中创建一个Docker容器。 也是变量中要管理的容器的名称。 但是当执行时,我得到错误消息。 我猜是因为它使用了var来宿主。 我如何修复错误?

  • 我使用Web View来显示PDF,它工作得很好,但我想去一个特定的页面,但它不像在WebView中那样工作,我们必须使用URL,所以我在桌面浏览器上工作的URL中传递我的页码。也就是说,如果你知道任何其他方式来显示PDF在应用程序中,将是最受欢迎的,但如果你谈论barteck PDFviewer库,将不会为我工作,因为巨大的大小。我想要一个简单的轻量级PDF查看器,重定向我到一个特定的页面。欢迎

  • 问题内容: 我项目的一位主要开发人员已将项目的toString()实现称为“纯粹的残障”,并希望将其从代码库中删除。 我已经说过,这样做意味着任何希望显示对象的客户端都必须编写自己的代码以将对象转换为字符串,但这得到了“是的答案”。 现在具体来说,该系统中的对象是矩形,圆形等图形元素,当前表示形式是显示x,y,比例,边界等。 那么,人群在哪里呢? 您应该何时以及何时不应该实现toString? 问

  • 我看到过许多关于堆栈溢出问题的答案,这些问题涉及使用Pandas方法。我也看到用户在他们下面评论说“速度慢,应该避免”。 如果是如此糟糕,那么为什么它会出现在API中? 如何和何时使我的代码免费? 是否有任何情况下是好的(优于其他可能的解决方案)?

  • 问题内容: 我正在尝试将AutoCompleteTextField与自定义类一起用作通用类型,并添加AjaxFormComponentUpdatingBehavior。我的意思是我想要一个 然后添加一个AjaxFormComponentUpdatingBehavior: 问题是由于某种原因,添加该行为会使表单尝试使用String设置模型对象(即使AutoCompleteTextField具有Som

  • 问题内容: 我想为Eclipse创建一个Google Closure Compiler插件。我已经有一个弹出菜单项,用于将JavaScript文件编译为压缩版本。但是,如果每次保存时都会自动生成一个缩小的版本,那么它将大有帮助。我已阅读/听说过有关自然和建筑,扩展点和的内容。但是我没有弄清楚我应该使用什么,尤其是如何使它工作。 是否有一个实用的插件实例可以执行“相同的事情”,以便我可以从中进行工作