scala
case class MyWriter1() extends ForeachWriter[Row]{
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Row): Unit = {
println(s"custom1 - ${value.get(0)}")
}
override def close(errorOrNull: Throwable): Unit = true
}
case class MyWriter2() extends ForeachWriter[(String, Int)]{
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: (String, Int)): Unit = {
println(s"custom2 - $value")
}
override def close(errorOrNull: Throwable): Unit = true
}
object Main extends Serializable{
def main(args: Array[String]): Unit = {
println("starting")
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val host = "localhost"
val port = "9999"
val spark = SparkSession
.builder
.master("local[*]")
.appName("app-test")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console
val query1 = wordCounts.writeStream
.outputMode("update")
.foreach(MyWriter1())
.start()
val ds = wordCounts.map(x => (x.getAs[String]("value"), x.getAs[Int]("count")))
val query2 = ds.writeStream
.outputMode("update")
.foreach(MyWriter2())
.start()
spark.streams.awaitAnyTermination()
}
}
我也有同样的情况(但是在较新的结构化流api上),在我的例子中,在上一个StreamingQuery上调用awaitTermination()会有所帮助。
S.TH比如:
query1.start()
query2.start().awaitTermination()
更新:与上面相比,这个内置解决方案/方法更好:
sparkSession.streams.awaitAnyTermination()
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之
我正在使用Kafka和Spark 2.1结构化流。我有两个json格式的数据主题,例如: 我需要比较Spark中基于标记的两个流:name,当值相等时,执行一些额外的定义/函数。 如何使用Spark结构化流来做到这一点? 谢谢
场景与经典的流连接略有不同 交易流: transTS, userid, productid,... streamB:创建的新产品流:productid、productname、createTS等) 我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。 结果为空。 我做错了什么?
我有一个Kafka2.1消息代理,希望在Spark2.4中对消息的数据进行一些处理。我想使用齐柏林0.8.1笔记本快速原型。 我下载了结构化流所必需的spark-streaming-kafka-0-102.11.jar(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),并将其作为“dep
我使用spark 2.2.1,kafka_2.12-1.0.0和scala从kafka获取一些json数据,但是,我只连接了kafka,没有数据输出。 这里是我的scala代码: 这是我的绒球.xml 我运行这段代码,控制台没有显示任何来自kafka的数据。 这里是控制台输出: 输出只是说我的消费者群体已经死亡。我的kafka运行良好,我可以使用控制台命令从“行为”主题中获取数据。总之,Kafka
运行环境:本地PC-同样的问题。Dataproc集群-spark-submit--包 org.apache.spark:spark-sql-kafka-0-102.12:2.4.5--类org.differentPartitionSparkStreaming--主纱--部署模式集群--num-executors 2--驱动程序--内存4G--executor-cores 4--executor-m