我有一个拼花地板数据表,结构如下:
var data = sparkSession.read.parquet("s3://data-location")
var window = Window.rowsBetween(-250, Window.currentRow - 1).partitionBy("ID").orderBy("DATE")
data.withColumn("Feature_1", col("Feature_1").divide(avg("Feature_1").over(window))).write.parquet("s3://data-out")
我已经调整了以下设置,希望降低总时间:
第二种方法帮助防止了日志中出现的一些溢出,但对实际性能没有任何帮助。
有什么方法可以改善这一点,或者窗口函数通常运行缓慢与我的使用?这只是20M行,远不及spark用其他类型的工作负载所能处理的..
您的数据集大小约为70 GB。如果我对每个id都理解正确的话,它就是对所有记录按日期排序,然后取前面的250条记录做平均值。由于您需要在400多个专栏上应用这一点,我建议在创建拼花地板时尝试bucketing以避免洗牌。编写bucketted parquet文件需要相当多的时间,但对于所有480列的派生可能不需要8分钟*480执行时间。
请尝试bucketing或重新分区和sortwithin创建拼花地板文件,让我知道如果它工作。
我是Spark和Cassandra的新学员。我正面临着一个主要的性能问题,我在Spark中每5秒将来自Kafka的数据流化,然后使用JRI在R语言中对数据执行分析,最后将数据保存到Cassandra各自的列族中。将数据保存到Cassandra的持续时间(以毫秒为单位)随着输入请求的数量迅速增加[每个请求为200KB]。 火花代码:
我正在尝试flink的一些网络监控工作。我的目标是计算每个的不同。 我下面的代码工作,但性能真的很糟糕。似乎每个滑动窗口都重新计算所有事件,但这不应该是必要的。 例如,我们有时间秒1-600的事件。Flink可以得到每秒的累加器,所以我们每秒有600个累加器。当第一个滑动窗口过期时,flink只合并1-300的累加器,并销毁第二个1的累加器。此窗口还可以在最后一秒前预合并1-299。当第二个滑动窗
我试图在火花数据帧中使用rowNumber。我的查询在Spark shell中按预期工作。但是当我在eclipse中写出它们并编译一个jar时,我面临着一个错误 我的问题 在Spark shell中运行查询时,我没有使用HiveContext。不确定为什么它返回一个错误,当我运行相同的jar文件。如果有帮助的话,我也在Spark 1.6.0上运行脚本。有人面临类似的问题吗?
我正在开发一条每天都会运行的管道。它包括连接两个表,比如x 以下是关于环境的事实, 对于表x: 数据大小:18 MB 分区中的文件数:~191 文件类型:拼花地板 对于表y: < li >数据大小:1.5 GB < li >一个分区中的文件数:~3200 < li >文件类型:拼花地板 现在的问题是: 我尝试了不同的资源组合的火花工作。 例如。: 执行者:50内存:20GB内核:5 执行者:70内
作业并行性(4,8,16):[自动生成源]-->[Map1]-->[滚动窗口(10s)]-->[Map2]-->[接收器] Flink窗口性能eps 4p、8p、16p 作业以上的性能最高达到了每秒50k+-左右,不管我如何将集群缩放成4-16的并行度。 闪烁性能无窗口4p、8p 我已经删除了窗口的逻辑,以消除瓶颈性能的应用程序逻辑,但似乎窗口仍然导致我的整个流性能下降,即使该窗口只是一个通过函数
问题内容: 我们几乎将elasticsearch用作缓存,存储在时间窗口中找到的文档。我们不断插入许多不同大小的文档,然后使用结合日期过滤器的文本查询在ES中进行搜索,因此当前线程不会获取已经看到的文档。像这样: “(((word1 AND word 2)OR(word3 AND word4))AND insertDate> 1389000” 我们使用TTL功能在elasticsearch中将数据