我正在将数据转换为数据帧,将其写入HDFS:
密码
object KafkaSparkHdfs {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
sparkConf.set("spark.driver.allowMultipleContexts", "true");
val sc = new SparkContext(sparkConf)
def main(args: Array[String]): Unit = {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sparkConf, Seconds(20))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "stream3",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("fridaydata")
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
)
val lines = stream.map(consumerRecord => consumerRecord.value)
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(word => (word, 1))
val wordCount = wordMap.reduceByKey(_ + _)
wordCount.foreachRDD(rdd => {
val dataframe = rdd.toDF();
dataframe.write
.mode(SaveMode.Append)
.save("hdfs://localhost:9000/newfile24")
})
ssc.start()
ssc.awaitTermination()
}
}
已创建文件夹,但未写入文件。
程序因以下错误而终止:
18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.lang.Thread.run(Thread.java:748)
18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
在我的pom中,我使用了各自的依赖项:
这里有一个明显的问题-合并(1)
。
dataframe.coalesce(1)
虽然在许多情况下减少文件数量可能很诱人,但当且仅当数据量足够低以供节点处理时才应该这样做(显然它不在这里)。
此外,让我引述留档:
但是,如果要进行剧烈合并,例如,将numPartitions=1,这可能会导致计算在比您喜欢的节点更少的节点上进行(例如,在numPartitions=1的情况下为一个节点)。为了避免这种情况,可以调用重新分区。这将添加一个洗牌步骤,但意味着当前上游分区将并行执行(无论当前分区是什么)。
结论是,您应该根据预期的数据量和所需的并行性相应地调整参数<这样的代码>合并(1)在实践中很少有用,尤其是在流式处理这样的上下文中,数据属性可能随时间而不同。
错误是由于试图同时运行多个spark上下文。将allowMultipleContexts设置为true主要用于测试目的,不鼓励使用。因此,问题的解决方案是确保在任何地方都使用相同的SparkContext。从代码中我们可以看到,SparkContext(sc)用于创建一个SQLContext,这很好。但是,在创建StreamingContext时,不使用它,而是使用SparkConf。
通过查看留档,我们可以看到:
通过提供新SparkContext所需的配置来创建StreamingContext
换句话说,通过使用
SparkConf
作为参数,将创建一个新的SparkContext
。现在有两个单独的上下文。
这里最简单的解决方案是继续使用与之前相同的上下文。将创建StreamingContext的行更改为:
val ssc = new StreamingContext(sc, Seconds(20))
注意:在较新版本的Spark(2.0)中,请改用SparkSession。然后可以使用StreamingContext(spark.sparkContext,…)创建新的流上下文
。它可以如下所示:
val spark = SparkSession().builder
.setMaster("local[*]")
.setAppName("SparkKafka")
.getOrCreate()
import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
但是Flink医生说: 在启用Flink检查点的情况下,Flink Kafka使用者将使用来自主题的记录,并以一致的方式定期检查其所有的Kafka偏移量以及其他操作的状态。在作业失败的情况下,Flink会将流程序恢复到最新检查点的状态,并从检查点中存储的偏移量开始重新使用来自Kafka的记录。 阅读其他来源,我猜Flink检查点将保存程序的状态以及消耗的偏移量,但Spark检查点只是保存消耗的偏移
我刚开始使用Spark streaming并尝试运行本教程中的一个示例,我正在跟踪制作并运行我们自己的NetworkWordCount。我已经完成了第8步,并从SBT制作了一个罐子。 现在我正在尝试使用第9步中的命令运行deploy my jar,如下所示: 我创建的jar包含“NetworkWordCount”类,该类具有来自spark示例的以下代码 我无法确定我做错了什么。
应用程序通常会通过抛出另一个异常来响应异常。 实际上,第一个异常引起第二个异常。 它可以是非常有助于用户知道什么时候一个异常导致另一个异常。 “异常链(Chained Exceptions)”帮助程序员做到这一点。 以下是Throwable中支持异常链的方法和构造函数。 Throwable getCause() Throwable initCause(Throwable) Throwable(St
你可以使用raise语句 引发 异常。你还得指明错误/异常的名称和伴随异常 触发的 异常对象。你可以引发的错误或异常应该分别是一个Error或Exception类的直接或间接导出类。 如何引发异常 例13.2 如何引发异常 #!/usr/bin/python # Filename: raising.py classShortInputException(Exception): '''A u
问题内容: 异常存储在哪里?堆,堆。如何为异常分配和释放内存?现在,如果您有多个需要处理的异常,是否创建了所有这些异常的对象? 问题答案: 我假设为异常分配的内存分配方式与所有其他对象(在堆上)分配方式相同。 这曾经是个问题,因为您不能为OutOfMemoryError分配内存,这就是直到Java 1.6之前 才没有堆栈跟踪的原因。现在,它们也为stacktrace预分配了空间。 如果您想知道在抛
因为Java编程语言不需要捕获方法或声明未检查异常(包括 RuntimeException、Error及其子类),程序员可能会试图编写只抛出未检查异常的代码,或使所有异常子类继承自RuntimeException。这两个快捷方式都允许程序员编写代码,而不必担心编译器错误,也不用担心声明或捕获任何异常。虽然这对于程序员似乎很方便,但它避开了捕获或者声明异常的需求,并且可能会导致其他人在使用您的类而产
当面对选择抛出异常的类型时,您可以使用由别人编写的异常 - Java平台提供了许多可以使用的异常类 - 或者您可以编写自己的异常类。 如果您对任何以下问题回答“是”,您应该编写自己的异常类;否则,你可以使用别人的。 你需要一个Java平台中没有表示的异常类型吗? 如果用户能够区分你的异常与由其他供应商编写的类抛出的异常吗? 你的代码是否抛出不止一个相关的异常? 如果您使用他人的例外,用户是否可以访
:) 我已经在一个(奇怪的)情况中结束了自己,简单地说,我不想使用来自Kafka的任何新记录,因此暂停主题中所有分区的sparkStreaming消费(InputStream[ConsumerRecord]),执行一些操作,最后,恢复消费记录。 首先这可能吗? 我一直在尝试这样的事情: 但是我得到了这个: 任何帮助我理解我遗漏了什么,以及为什么当消费者明确分配了分区时我会得到空结果的帮助都将受到欢