场景与经典的流连接略有不同
交易流: transTS, userid, productid,...
streamB:创建的新产品流:productid、productname、createTS等)
我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。
streamA_wm = streamA.withWatermark("transTS", "3 minutes")
streamB_wm = streamB.withWatermark("createTS", "1 day")
streamA_wm
.join(streamB_wm, "productId AND transTS >= createTS", "leftOuter")
结果为空。
我做错了什么?
我想你在这方面的做法可能是错误的。虽然产品在创建和更新时是事务性的,但它们是相对于其他事务流的元数据。
我谨提出以下建议:
但可能有一个原因,为产品流,但。。。如果不再对产品进行更新,并且您通过交易流再次获取该产品的数据,会发生什么情况?
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之
我试图从[Database ricks][1]中复制示例并将其应用于Kafka的新连接器并引发结构化流,但是我无法使用Spark中的开箱即用方法正确解析JSON… 注:题目以JSON格式写入Kafka。 下面的代码不行,我相信那是因为列json是字符串,和方法from_json签名不匹配... 有什么建议吗? [更新]示例工作:https://github.com/katsou55/kafka-s
我正在使用Kafka和Spark 2.1结构化流。我有两个json格式的数据主题,例如: 我需要比较Spark中基于标记的两个流:name,当值相等时,执行一些额外的定义/函数。 如何使用Spark结构化流来做到这一点? 谢谢
我正在使用带更新模式的结构化流媒体读取Kafka主题中的数据流。,然后做一些改变。 然后我创建了一个jdbc接收器,用追加模式在mysql接收器中推送数据。问题是我如何告诉我的接收器让它知道这是我的主键,并基于它进行更新,这样我的表就不会有任何重复的行。
我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。
背景:我写了一个简单的spark结构化蒸app,把数据从Kafka搬到S3。我发现,为了支持一次准确的保证,spark创建了_spark_metadata文件夹,但该文件夹最终变得太大,当流应用程序运行很长时间时,元数据文件夹变得太大,以至于我们开始出现OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,自己管理偏移量。 我们如何管理Spark Streaming中的偏移量:我使用了va