当前位置: 首页 > 知识库问答 >
问题:

火花流式DStream元素与RDD

章鸿光
2023-03-14

本质上,我想对dStream中的每个元素应用一组函数。目前,我正在为pyspark.streaming.dstream使用“map”函数。根据文档,我的方法似乎是正确的。http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.dstream

map(f,preservesPartitioning=false)通过对DStream的每个元素应用一个函数来返回一个新的DStream。

我应该使用map,还是正确的方法是将函数/转换应用于RDDs(因为DStream使用RDD)??

更多文档:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html

共有1个答案

别旻
2023-03-14

DirectStream.map是正确的选择。以下映射:

stream.map(f)

相当于:

stream.transform(lambda rdd: rdd.map(f))

另一方面,DirectStream.foreachrdd是一个输出操作,并创建一个输出dstream。与foreachrdd一起使用的函数不应返回任何内容,与方法本身相同。看一下Scala签名就会很明显:

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
 类似资料:
  • 我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该

  • 我正在使用火花流,我从Kafka读取流。阅读此流后,我将其添加到hazelcast地图中。 问题是,我需要在读取Kafka的流之后立即从地图中过滤值。 我正在使用下面的代码来并行化地图值。 但在这个逻辑中,我在另一个逻辑中使用JavaRDD,即JavaInputDStream.foreachRDD,这会导致序列化问题。 第一个问题是,如何通过事件驱动来运行spark作业? 另一方面,我只是想得到一

  • 如果spark streaming在10秒的批处理间隔中获得50行消息,并且在40.5行消息之后,这10秒就结束了,剩下的时间落入另一个10秒的间隔中,前40.5行的文本是一个RDD被首先处理,在我的用例中,前40行是有意义的,但是下一个。5行没有意义,第二个RDD首先也是这样。5行,我的问题是否有效?。请提供建议如何处理这个问题?。 谢谢比尔。

  • 我在Scala/Spark中有一个批处理作业,它根据一些输入动态创建Drools规则,然后评估规则。我还有一个与要插入到规则引擎的事实相对应的输入。 到目前为止,我正在一个接一个地插入事实,然后触发关于这个事实的所有规则。我正在使用执行此操作。 seqOp运算符的定义如下: 以下是生成的规则的示例: 对于同一RDD,该批次花了20分钟来评估3K规则,但花了10小时来评估10K规则! 我想知道根据事

  • 我正在从一个消息应用程序收集数据,我目前正在使用Flume,它每天发送大约5000万条记录 我希望使用Kafka,使用Spark Streaming从Kafka消费并将其持久化到hadoop并使用impala进行查询 我尝试的每种方法都有问题。。 方法1-将RDD另存为parquet,将外部配置单元parquet表指向parquet目录 问题是finalParquet.saveAsParquetF

  • 我遵循火花流水槽集成的指导。但我最终无法获得任何事件。(https://spark.apache.org/docs/latest/streaming-flume-integration.html)谁能帮我分析一下?在烟雾中,我创建了“avro_flume.conf”的文件,如下所示: 在文件中,123.57.54.113是本地主机的ip。 最后,根本没有任何事件。 怎么了?谢谢!