在 Spark 流式处理中,如何检测空批次?
让我们以有状态流式处理字数为例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java。是否可以仅在将新单词添加到流中时才打印字数RDD?
这就是我如何避免空批次和覆盖在同一个目录。
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
messageRecBased.foreachRDD{ rdd =>
rdd.repartition(1)
val eachRdd = rdd.map(record => record.value)
if(!eachRdd.isEmpty)
eachRdd.saveAsTextFile("hdfs/location/"+DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now)+"/")
}
我是怎么做到的。创建一个空的RDD,它是您之前的窗口。然后在“一切”中,计算上一个窗口与当前窗口之间的差异。如果当前窗口包含不在上一个窗口中的记录,则批处理中存在新内容。最后,将上一个窗口设置为当前窗口中的内容。
...
var previousWindowRdd = sc.emptyRDD[String]
dStream.foreachRDD {
windowRdd => {
if (!windowRdd.isEmpty) processWindow(windowRdd.cache())
}
}
...
def processWindow(windowRdd: RDD[String]) = {
val newInBatch = windowRdd.subtract(previousWindowRdd)
if (!newInBatch.isEmpty())
processNewBatch(windowRdd)
previousWindowRdd = windowRdd
}
我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如 无法计算拆分,找不到块输入-0-1464774108087
我有一个特定的要求,其中,我需要检查空的数据文件。如果为空,则填充默认值。这是我尝试过但没有得到我想要的东西。 这个想法是,如果df不是空的,就得到它。如果为空,则填写默认值为零。这似乎不起作用。以下是我得到的。 请帮忙。
我正在尝试为我的Spark Batch工作检索Kafka补偿。在检索偏移量之后,我想关闭流上下文。 我尝试将streamlistener添加到流上下文,并在作业完成后实现onBatchCompleted方法关闭流,但收到异常“无法停止侦听器总线线程内的StreamingContext”。 有解决办法吗?我正在尝试检索偏移量以调用KafkaUtils。createRDD(sparkContext、k
我在中看到了几个答案(例如这里),因此建议批次中的记录将成为单个RDD。我对此表示怀疑,因为假设batchInterval为1分钟,那么单个RDD将包含最后一分钟的所有数据? 注意:我不是直接将批次与RDD进行比较,而是将Spark内部处理的批次进行比较。
问题内容: 我想派生一个go进程并获取新进程的ID,但是我在或库中看到的只是启动一个新进程。 问题答案: 您应该从包装中获取。 请注意,这是在根本不使用任何线程的情况下发明的,并且一个进程中始终只有一个执行线程,因此分叉是安全的。使用Go,情况完全不同,因为它大量使用OS级线程来为其goroutine调度提供动力。 现在,在Linux上未经修饰的子进程将在所有活动线程中只有一个线程(在父进程中调用
我使用的是SPARK-SQL-2.4.1V和Java1.8。和Kafka版本SPARK-SQL-KAFKA-0-10_2.11_2.4.3。 这会产生以下错误: 类型Dataset中的方法join(Dataset,String)不适用于参数(Dataset,String,String)