使用Spark streaming从Kafka主题读取Json数据
我使用DataFrame处理数据,稍后我希望将输出保存到HDFS文件。问题在于使用:
df.write.save("append").format("text")
产生许多文件,有些很大,有些甚至是0字节。
有没有办法控制输出文件的数量?此外,为了避免“相反”的问题,是否有办法也限制每个文件的大小,以便在当前达到一定大小/行数时写入新文件?
您可以使用尺寸估计器:
import org.apache.spark.util.SizeEstimator
val size = SizeEstimator.estimate(df)
接下来,您可以根据数据帧的大小通过重复或合并来调整文件的数量
输出文件的数量等于数据集的分区数量,这意味着您可以根据上下文以多种方式对其进行控制:
数据集
,您可以使用阅读器特定参数控制输入spark.sql.shuffle.partitions
参数控制分区数量。合并
或重新分区
。是否有办法也限制每个文件的大小,以便在当前达到一定大小/行数时写入新文件?
不。对于内置编剧来说,这是严格的1:1关系。
问题内容: 我已尝试使用此代码建议(http://www.daniweb.com/forums/thread23883.html#)将控制台输出写入txt文件,但未成功。怎么了? 问题答案: 你需要执行以下操作: 第二句话是关键。它将假定的“最终” 属性的值更改为提供的值。 可以使用类似的方法(和)来更改标准输入和错误流。有关详细信息,请参考。 上面的一个更通用的版本是这样的: 如果为is ,则流
有没有办法使用hadoop流作业将这20,000个文件合并到10,000个文件?或者,换句话说,有没有办法控制hadoop流式输出文件的数量? 提前感谢!
问题内容: 我是python的新手,正在编写一些脚本来自动从FTP服务器等下载文件。我想显示下载进度,但是我希望它保持不变,例如: 输出: 正在下载文件FooFile.txt [47%] 我正在尝试避免这样的事情: 我应该怎么做呢? 复制 : 如何在命令行应用程序的当前行上打印? 问题答案: 您还可以使用回车符:
所以我有一个Laravel控制器: 目前,我正在使用artisan(在引擎盖下运行PHP的内置开发Web服务器)运行应用程序: 我想将控制台消息记录到artisan进程的管道中。
使用火花流读取和处理来自Kafka的消息并写入HDFS-Hive。由于我希望避免创建许多垃圾文件系统的小文件,我想知道是否有办法确保最小的文件大小,和/或强制在文件中输出行数最少的能力,超时除外。谢谢。
问题内容: 我正在尝试在python中找到一种将脚本执行日志重定向到文件以及以pythonic方式的方法。有没有简单的方法可以做到这一点? 问题答案: 我想出了这个[unested] 在python中将期望有一个函数。您可以使用具有此功能的自定义对象。否则,您也可以让sys.stdout引用您的对象,在这种情况下,即使没有,它也会被准备。