当前位置: 首页 > 知识库问答 >
问题:

Spark结构化流连接问题

上官凯歌
2023-03-14

场景与经典的流连接略有不同

交易流: transTS, userid, productid,...

streamB:创建的新产品流:productid、productname、createTS等)

我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。

streamA_wm = streamA.withWatermark("transTS", "3 minutes")
streamB_wm = streamB.withWatermark("createTS", "1 day")

streamA_wm
   .join(streamB_wm, "productId AND transTS >= createTS", "leftOuter")

结果为空。

我做错了什么?

共有1个答案

柳培
2023-03-14

我想你在这方面的做法可能是错误的。虽然产品在创建和更新时是事务性的,但它们是相对于其他事务流的元数据。

我谨提出以下建议:

  1. 将事务流加入到参考数据产品中——它不受流处理。
  2. 不要缓存产品,这可以确保您转到源。
  3. 使用镶木地板,KUDU的产品。

但可能有一个原因,为产品流,但。。。如果不再对产品进行更新,并且您通过交易流再次获取该产品的数据,会发生什么情况?

 类似资料:
  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • 我试图从[Database ricks][1]中复制示例并将其应用于Kafka的新连接器并引发结构化流,但是我无法使用Spark中的开箱即用方法正确解析JSON… 注:题目以JSON格式写入Kafka。 下面的代码不行,我相信那是因为列json是字符串,和方法from_json签名不匹配... 有什么建议吗? [更新]示例工作:https://github.com/katsou55/kafka-s

  • 我正在使用Kafka和Spark 2.1结构化流。我有两个json格式的数据主题,例如: 我需要比较Spark中基于标记的两个流:name,当值相等时,执行一些额外的定义/函数。 如何使用Spark结构化流来做到这一点? 谢谢

  • 我正在使用带更新模式的结构化流媒体读取Kafka主题中的数据流。,然后做一些改变。 然后我创建了一个jdbc接收器,用追加模式在mysql接收器中推送数据。问题是我如何告诉我的接收器让它知道这是我的主键,并基于它进行更新,这样我的表就不会有任何重复的行。

  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 背景:我写了一个简单的spark结构化蒸app,把数据从Kafka搬到S3。我发现,为了支持一次准确的保证,spark创建了_spark_metadata文件夹,但该文件夹最终变得太大,当流应用程序运行很长时间时,元数据文件夹变得太大,以至于我们开始出现OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,自己管理偏移量。 我们如何管理Spark Streaming中的偏移量:我使用了va