当前位置: 首页 > 知识库问答 >
问题:

在 Spark 流式处理中,如何检测空批次?

微生毅
2023-03-14

在 Spark 流式处理中,如何检测空批次?

让我们以有状态流式处理字数为例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java。是否可以仅在将新单词添加到流中时才打印字数RDD?

共有2个答案

岳奇逸
2023-03-14

这就是我如何避免空批次和覆盖在同一个目录。

import java.time.format.DateTimeFormatter
import java.time.LocalDateTime

   messageRecBased.foreachRDD{ rdd =>
        rdd.repartition(1)
        val eachRdd = rdd.map(record => record.value)
        if(!eachRdd.isEmpty)
          eachRdd.saveAsTextFile("hdfs/location/"+DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now)+"/")
      }
井学
2023-03-14

我是怎么做到的。创建一个空的RDD,它是您之前的窗口。然后在“一切”中,计算上一个窗口与当前窗口之间的差异。如果当前窗口包含不在上一个窗口中的记录,则批处理中存在新内容。最后,将上一个窗口设置为当前窗口中的内容。

  ...

  var previousWindowRdd = sc.emptyRDD[String]

  dStream.foreachRDD {
    windowRdd => {
      if (!windowRdd.isEmpty) processWindow(windowRdd.cache())
    }
  }

  ...

def processWindow(windowRdd: RDD[String]) = {
  val newInBatch = windowRdd.subtract(previousWindowRdd)

  if (!newInBatch.isEmpty())
    processNewBatch(windowRdd)

  previousWindowRdd = windowRdd
}
 类似资料:
  • 我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如 无法计算拆分,找不到块输入-0-1464774108087

  • 我有一个特定的要求,其中,我需要检查空的数据文件。如果为空,则填充默认值。这是我尝试过但没有得到我想要的东西。 这个想法是,如果df不是空的,就得到它。如果为空,则填写默认值为零。这似乎不起作用。以下是我得到的。 请帮忙。

  • 我正在尝试为我的Spark Batch工作检索Kafka补偿。在检索偏移量之后,我想关闭流上下文。 我尝试将streamlistener添加到流上下文,并在作业完成后实现onBatchCompleted方法关闭流,但收到异常“无法停止侦听器总线线程内的StreamingContext”。 有解决办法吗?我正在尝试检索偏移量以调用KafkaUtils。createRDD(sparkContext、k

  • 我在中看到了几个答案(例如这里),因此建议批次中的记录将成为单个RDD。我对此表示怀疑,因为假设batchInterval为1分钟,那么单个RDD将包含最后一分钟的所有数据? 注意:我不是直接将批次与RDD进行比较,而是将Spark内部处理的批次进行比较。

  • 问题内容: 我想派生一个go进程并获取新进程的ID,但是我在或库中看到的只是启动一个新进程。 问题答案: 您应该从包装中获取。 请注意,这是在根本不使用任何线程的情况下发明的,并且一个进程中始终只有一个执行线程,因此分叉是安全的。使用Go,情况完全不同,因为它大量使用OS级线程来为其goroutine调度提供动力。 现在,在Linux上未经修饰的子进程将在所有活动线程中只有一个线程(在父进程中调用

  • 我使用的是SPARK-SQL-2.4.1V和Java1.8。和Kafka版本SPARK-SQL-KAFKA-0-10_2.11_2.4.3。 这会产生以下错误: 类型Dataset中的方法join(Dataset,String)不适用于参数(Dataset,String,String)