当前位置: 首页 > 知识库问答 >
问题:

Spark Streaming提前写入日志重启后不重放数据

年凯康
2023-03-14

为了有一种简单的方法来测试Spark Streaming预写日志,我创建了一个非常简单的自定义输入接收器,它将生成字符串并存储这些字符串:

class InMemoryStringReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {

  val batchID = System.currentTimeMillis()

  def onStart() {
    new Thread("InMemoryStringReceiver") {
      override def run(): Unit = {
        var i = 0
        while(true) {
          //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
          //To implement a reliable receiver, you have to use store(multiple-records) to store data.
          store(ArrayBuffer(s"$batchID-$i"))
          println(s"Stored => [$batchID-$i)]")
          Thread.sleep(1000L)
          i = i + 1
        }
      }
    }.start()
  }

  def onStop() {}
}

然后我创建了一个简单的应用程序,它将使用自定义接收器来流式传输数据并对其进行处理:

object DStreamResilienceTest extends App {

  val conf = new SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", "true")
  val ssc = new StreamingContext(conf, Seconds(1))
  ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
  val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new InMemoryStringReceiver())
  customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
    println(s"processed => [${rdd.collect().toList}]")
    Thread.sleep(2000L)
  }
  ssc.start()
  ssc.awaitTermination()

}

正如您所看到的,在每秒存储字符串的同时,对每个接收到的RDD的处理都有2秒的睡眠时间。这会创建积压工作,新字符串会堆积起来,应该存储在WAL中。实际上,我可以看到检查点目录中的文件正在更新。运行应用程序时,我会得到如下输出:

[info] Stored => [1453374654941-0)]
[info] processed => [List(1453374654941-0)]
[info] Stored => [1453374654941-1)]
[info] Stored => [1453374654941-2)]
[info] processed => [List(1453374654941-1)]
[info] Stored => [1453374654941-3)]
[info] Stored => [1453374654941-4)]
[info] processed => [List(1453374654941-2)]
[info] Stored => [1453374654941-5)]
[info] Stored => [1453374654941-6)]
[info] processed => [List(1453374654941-3)]
[info] Stored => [1453374654941-7)]
[info] Stored => [1453374654941-8)]
[info] processed => [List(1453374654941-4)]
[info] Stored => [1453374654941-9)]
[info] Stored => [1453374654941-10)]

正如您所料,存储速度超出了处理速度。因此,我杀死应用程序并重新启动它。这次我注释掉了foreachRDD中的sleep,以便处理可以清除任何积压工作:

[info] Stored => [1453374753946-0)]
[info] processed => [List(1453374753946-0)]
[info] Stored => [1453374753946-1)]
[info] processed => [List(1453374753946-1)]
[info] Stored => [1453374753946-2)]
[info] processed => [List(1453374753946-2)]
[info] Stored => [1453374753946-3)]
[info] processed => [List(1453374753946-3)]
[info] Stored => [1453374753946-4)]
[info] processed => [List(1453374753946-4)]

正如您所看到的,新事件已被处理,但上一批中没有。旧的WAL日志被清除,我看到这样的日志消息,但旧数据没有得到处理。

INFO WriteAheadLogManager : Recovered 1 write ahead log files from hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0

我做错了什么?我使用的是Spark 1.5.2。

共有1个答案

笪俊迈
2023-03-14

Spark用户邮件列表上的Shixiong(Ryan)Zhu回答了这一问题。

使用StreamingContext。按照他建议的方式获取或创建。

 类似资料:
  • 我使用Spring Boot并希望它将日志输出写入一个文件。 根据文档,这只是通过设置 虽然控制台输出运行良好,但未创建。此外,如果我手动创建文件,则不会对其写入任何内容。我错过了什么?

  • 有没有办法覆盖日志回溯配置?我知道我们在名为 logback 的文件中定义了(通常存储在路径 中),并且我知道通过使用

  • 我的grails配置log4j部分中有以下内容: 文件“onetract3.log”已成功创建,但未向该文件写入任何内容。 我可以在控制台中看到信息处理正确。 Grails版本是2.3.5 知道为什么这没有写入日志文件吗? 编辑:10.03.1014,设置相加为false。

  • 我正在尝试为mydb表恢复空的DB,但是 的语法似乎不是直接的。我之前在执行mysqldump时创建了文件emptyDbs。

  • 我有一个设置,其中docker容器使用日志驱动程序来写入它们的日志。目前日志中的日志行被转发到主机上运行的rsyslog,但syslog行上的应用程序名称显示为。 作为一种解决方法,我想将日志元数据中的字段写入syslog中出现的行中,这样我就可以确定在主机的syslog被发送到syslog聚合服务器之后,哪个容器写入了哪一行。 有什么建议吗?

  • 在中有一个,可以将日志写入到。 我需要在中使用相同的功能,但我还没有找到这样做的选项。有人知道如何使用实现同样的效果吗?