当前位置: 首页 > 知识库问答 >
问题:

Spark结构化流,倍数查询不并发运行

公孙宇
2023-03-14
scala
case class MyWriter1() extends ForeachWriter[Row]{
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: Row): Unit = {
    println(s"custom1 - ${value.get(0)}")
  }

  override def close(errorOrNull: Throwable): Unit = true
}

case class MyWriter2() extends ForeachWriter[(String, Int)]{
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: (String, Int)): Unit = {
    println(s"custom2 - $value")
  }

  override def close(errorOrNull: Throwable): Unit = true
}


object Main extends Serializable{

  def main(args: Array[String]): Unit = {
    println("starting")

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val host = "localhost"
    val port = "9999"

    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName("app-test")
      .getOrCreate()

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()

    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))

    // Generate running word count
    val wordCounts = words.groupBy("value").count()

    // Start running the query that prints the running counts to the console
    val query1 = wordCounts.writeStream
      .outputMode("update")
      .foreach(MyWriter1())
      .start()

    val ds = wordCounts.map(x => (x.getAs[String]("value"), x.getAs[Int]("count")))

    val query2 = ds.writeStream
      .outputMode("update")
      .foreach(MyWriter2())
      .start()

    spark.streams.awaitAnyTermination()

  }
}

共有1个答案

穆宾白
2023-03-14

我也有同样的情况(但是在较新的结构化流api上),在我的例子中,在上一个StreamingQuery上调用awaitTermination()会有所帮助。

S.TH比如:

query1.start()
query2.start().awaitTermination()

更新:与上面相比,这个内置解决方案/方法更好:

sparkSession.streams.awaitAnyTermination()
 类似资料:
  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • 我正在使用Kafka和Spark 2.1结构化流。我有两个json格式的数据主题,例如: 我需要比较Spark中基于标记的两个流:name,当值相等时,执行一些额外的定义/函数。 如何使用Spark结构化流来做到这一点? 谢谢

  • 场景与经典的流连接略有不同 交易流: transTS, userid, productid,... streamB:创建的新产品流:productid、productname、createTS等) 我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。 结果为空。 我做错了什么?

  • 我有一个Kafka2.1消息代理,希望在Spark2.4中对消息的数据进行一些处理。我想使用齐柏林0.8.1笔记本快速原型。 我下载了结构化流所必需的spark-streaming-kafka-0-102.11.jar(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),并将其作为“dep

  • 我使用spark 2.2.1,kafka_2.12-1.0.0和scala从kafka获取一些json数据,但是,我只连接了kafka,没有数据输出。 这里是我的scala代码: 这是我的绒球.xml 我运行这段代码,控制台没有显示任何来自kafka的数据。 这里是控制台输出: 输出只是说我的消费者群体已经死亡。我的kafka运行良好,我可以使用控制台命令从“行为”主题中获取数据。总之,Kafka

  • 运行环境:本地PC-同样的问题。Dataproc集群-spark-submit--包 org.apache.spark:spark-sql-kafka-0-102.12:2.4.5--类org.differentPartitionSparkStreaming--主纱--部署模式集群--num-executors 2--驱动程序--内存4G--executor-cores 4--executor-m