我正在使用Kafka和Spark 2.1结构化流。我有两个json格式的数据主题,例如:
topic 1:
{"id":"1","name":"tom"}
{"id":"2","name":"mark"}
topic 2:
{"name":"tom","age":"25"}
{"name":"mark","age:"35"}
我需要比较Spark中基于标记的两个流:name,当值相等时,执行一些额外的定义/函数。
如何使用Spark结构化流来做到这一点?
谢谢
不久前,我面临一个类似的要求:我有两个流,必须根据一些标准“连接”在一起。我使用的是一个名为mapGroupsWithState的函数。
这个函数的作用(用几句话来说,下面参考的更多细节)是以(K,V)的形式获取流,并根据每对的键将其元素累积在一个共同的状态上。然后,您可以有办法在状态完成时告诉 Spark(根据您的应用程序),甚至可以对不完整的状态进行超时。
基于您的问题的示例:
>
将Kafka主题读入Spark Stream:
val rawDataStream: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "topic1,topic2") // Both topics on same stream!
.option("startingOffsets", "latest")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(value AS STRING) as jsonData") // Kafka sends bytes
对数据执行一些操作(我更喜欢SQL,但您可以使用DataFrame API)将每个元素转换为键值对:
spark.sqlContext.udf.register("getKey", getKey) // You define this function; I'm assuming you will be using the name as key in your example.
val keyPairsStream = rawDataStream
.sql("getKey(jsonData) as ID, jsonData from rawData")
.groupBy($"ID")
使用mapGroupsWithState函数(我给你演示一下基本思路;您必须根据自己的需要定义myGrpFunct):
keyPairsStream
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(myGrpFunct)
就是这样!如果您正确地实现了myGrpFunct,您将得到一个合并的数据流,您可以进一步转换它,如下所示:
[“汤姆”,“身份证”:“1”,“姓名”:“汤姆”},“姓名”:“tom”,“年龄”:“25”}]
["mark",{"id":"2","name":"mark"},{"name":"mark","age:"35"}]
希望这有帮助!
一个很好的解释与一些代码片段:http://asyncified.io/2017/07/30/exploring-stateful-streaming-with-spark-structured-streaming/
我希望你有你的解决方案。如果不是,则可以尝试从两个主题创建两个kstream,然后连接这些kstream并将连接的数据放回一个主题。现在,您可以使用Spark结构化流将连接的数据读取为一个数据帧。现在,您可以对连接的数据应用所需的任何转换。由于结构化流不支持两个流数据帧的连接,因此可以采用这种方法完成任务。
遵循当前文档(Spark 2.1.1)
还不支持两个流数据集之间的任何类型的连接。
裁判:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-操作
此时此刻,我认为你需要依赖于@igofred的回答所提出的Spark Streaming。
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之
问题内容: 提前致谢。感谢您的帮助。 我想比较两个相同类型和结构的任意JToken(NewtonSoft的Json.Net)。 主要目标 是能够使用此方法对两个Json字符串进行排序,以便即使开始时它们具有相同的数据,但顺序不同,最后它们是两个完全相同的字符串。因此排序标准并不重要,重要的是该标准始终相同。并且应该考虑每个小数据元素。 JToken可以是以下几种类型之一:。我没有考虑比较。 获得一
场景与经典的流连接略有不同 交易流: transTS, userid, productid,... streamB:创建的新产品流:productid、productname、createTS等) 我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。 结果为空。 我做错了什么?
这是因为检查点只存储了其中一个数据流的偏移量吗?浏览Spark结构流文档,似乎可以在Spark 2.2或>中进行流源的联接/联合
问题内容: 比较运算符的 “ Go编程语言规范”部分使我相信,仅包含可比较字段的结构应具有可比性: 如果结构的所有字段都是可比较的,则它们的值是可比较的。如果两个结构值对应的非空白字段相等,则它们相等。 这样,由于“ Student”结构中的所有字段都是可比较的,因此我希望编译以下代码: 但是,它无法使用以下消息进行编译: 无效的操作:alice> = carol(运算符> =未在结构上定义) 我
问题内容: 比较Java 8中的两个实例并找出它们是否具有相同元素(特别是出于单元测试目的)的一种好方法是什么? 我现在得到的是: 或者: 但这意味着我要构建两个集合并丢弃它们。考虑到测试流的大小,这不是性能问题,但是我想知道是否存在一种规范的方式来比较两个流。 问题答案: