我想记录从火花结构化流的传入流读取到数据库的记录数。我正在使用ForeachBatch来转换传入的流批处理和写入所需的位置。我想日志0记录读取,如果在一个特定的小时没有记录。但是当没有流时,Foreach批处理不会执行。有人能帮我吗?我的代码如下:
val incomingStream=spark。读流。格式(“事件中心”)。选项(customEventhubParameters.toMap)。load()
val query=incomingStream.writeStream.foreachBatch{
(batchDF: DataFrame, batchId: Long)=> writeStreamToDataLake(batchDF,batchId,partitionColumn,fileLocation,errorFilePath,eventHubName,configMeta)
}
.option("checkpointLocation",fileLocation+checkpointFolder+"/"+eventHubName)
.trigger(Trigger.ProcessingTime(triggerTime.toLong))
.start().awaitTermination()
这就是它的工作原理,甚至只有当有需要处理的内容,从而改变流的状态时,才会调用StreamingQueryListener的mods扩展。
可能还有另一种方法,但我会说"跳出框框思考"和pre-popualte与0每个时间帧这样的数据库,当查询AGGRegate,你会有正确的答案。
https://medium.com/@johankok/structured-streaming-in-a-flash-576cdb17bbee可以提供一些见解,以及火花:最终指南。
我有flink stream,我在某个时间窗口上计算一些事情,比如说30秒。 这里发生的事情是,它给我的结果,我以前的窗口聚合以及。 假设前30秒我得到结果10。 接下来的三十秒我想要新的结果,而不是我得到最后一个窗口的结果新的等等。 因此,我的问题是如何为每个窗口获得新的结果。
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之
批处理查询中似乎不支持“最新”。我想知道是否有可能用另一种方法做类似的事情(不直接处理偏移)
我正在尝试加入apache flink中的两个流以获得一些结果。 我的项目的当前状态是,我正在获取twitter数据并将其映射到一个2元组中,其中保存用户的语言和定义时间窗口中的推文总和。我这样做是为了每种语言的推文数量和每种语言的转发。推文/转发聚合在其他进程中运行良好。 我现在想得到一个时间窗口内转发次数占所有推文次数的百分比。 因此我使用以下代码: 当我打印或时,输出似乎很好。我的问题是我从
我正在使用Kafka和Spark 2.1结构化流。我有两个json格式的数据主题,例如: 我需要比较Spark中基于标记的两个流:name,当值相等时,执行一些额外的定义/函数。 如何使用Spark结构化流来做到这一点? 谢谢
我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\