当前位置: 首页 > 知识库问答 >
问题:

写入hdfs时如何避免小文件问题

张高澹
2023-03-14

我在我的项目中使用spack-sql-2.3.1v、kafka和java8。与

--driver-memory 4g \
--driver-cores 2 \
--num-executors 120 \
--executor-cores 1 \
--executor-memory 768m \

在消费者方面,我尝试使用下面的代码在hdfs me中编写文件

                          dataSet.writeStream()
                                        .format("parquet")
                                        .option("path", parqetFileName)
                                        .option("mergeSchema", true)
                                        .outputMode("Append")
                                        .partitionBy("company_id","date")
                                        .option("checkpointLocation", checkPtLocation)
                                        .trigger(Trigger.ProcessingTime("25 seconds"))
                                        .start();

当我存储到hdfs文件夹中时,它看起来像下面的东西,即每个文件都在1.5k即几个KB。

$ hdfs dfs -du -h /transactions/company_id=24779/date=2014-06-24/
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00026-1027fff9-5745-4250-961a-fd56508b7ea3.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00057-6604f6cc-5b8d-41f4-8fc0-14f6e13b4a37.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00098-754e6929-9c75-430f-b6bb-3457a216aae3.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00099-1d62cbd5-7409-4259-b4f3-d0f0e5a93da3.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00109-1965b052-c7a6-47a8-ae15-dea301010cf5.c000.snappy.parquet

由于这个小文件,它需要大量的处理时间,而我从hdfs中读取更大的数据集

2020-02-12 07:07:57,475 [Driver] ERROR org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception: java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3664)
        at java.lang.String.<init>(String.java:207)
        at java.lang.String.substring(String.java:1969)
        at java.net.URI$Parser.substring(URI.java:2869)
        at java.net.URI$Parser.parse(URI.java:3049)
        at java.net.URI.<init>(URI.java:588)
        at org.apache.spark.sql.execution.streaming.SinkFileStatus.toFileStatus(FileStreamSinkLog.scala:52)
        at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex$$anonfun$2.apply(MetadataLogFileIndex.scala:46)
        at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex$$anonfun$2.apply(MetadataLogFileIndex.scala:46)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.<init>(MetadataLogFileIndex.scala:46)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:336)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
        at com.spgmi.ca.prescore.utils.DbUtils.loadFromHdfs(DbUtils.java:129)
        at com.spgmi.ca.prescore.spark.CountRecords.main(CountRecords.java:84)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
2020-02-12 07:07:57,533 [Reporter] WARN  org.apache.spark.deploy.yarn.ApplicationMaster - Reporter thread fails 1 time(s) in a row.
java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: dev1-dev.com":8030;
        at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:805)
        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1497)
        at org.apache.hadoop.ipc.Client.call(Client.java:1439)
        at org.apache.hadoop.ipc.Client.call(Client.java:1349)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
        at com.sun.proxy.$Proxy22.allocate(Unknown Source)
        at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
        at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy23.allocate(Unknown Source)
        at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:296)
        at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:249)
        at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$allocationThreadImpl(ApplicationMaster.scala:540)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:606)
Caused by: java.nio.channels.ClosedByInterruptException
        at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:753)
        at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
        at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:687)
        at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:790)
        at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:411)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1554)

问题:

>

如果我想计算给定hdfs文件夹中的记录总数,如何计算?

新更改后

--driver-memory 16g \
--driver-cores 1 \
--num-executors 120 \
--executor-cores 1 \
--executor-memory 768m \

运行成功结果包括:

2020-02-12 20:28:56,188 [Driver] WARN  com.spark.mypackage.CountRecords -  NUMBER OF PARTITIONS AFTER HDFS READ : 77926
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|               24354|   94|
|               26425|   96|
|               32414|   64|
|               76143|   32|
|               16861|   32|
|               30903|   64|
|               40335|   32|
|               64121|   64|
|               69042|   32|
|               32539|   64|
|               34759|   32|
|               41575|   32|
|                1591|   64|
|                3050|   98|
|               51772|   32|
+--------------------+-----+

2020-02-12 20:50:32,301 [Driver] WARN  com.spark.mypackage.CountRecords -  RECORD COUNT: 3999708

共有1个答案

容飞掣
2023-03-14

>

  • 是的。小文件不仅仅是一个火花问题。它会在NameNode上造成不必要的负载。在处理小文件时,您应该花更多的时间压缩和上载较大的文件,而不是担心OOM。您的文件小于64MB/128MB,这表明您对Hadoop的使用很差。

    有点像火花。读取(“hdfs://path“).count()将读取路径中的所有文件,然后对数据框中的行进行计数

    没有硬设置的数字。您需要在作业上启用JMX监控,并查看堆的大小。否则,任意加倍当前内存,直到它开始没有内存。如果开始接近8GB以上,则需要考虑通过添加更多并行化来减少每个作业中读取的数据。

    FWIW、Kafka Connect也可用于输出分区的HDFS/S3路径。

  •  类似资料:
    • 问题内容: 我可以在不生成编译的.pyc文件的情况下运行python解释器吗? 问题答案: 来自“ Python 2.6的新增功能- 解释器更改” : 现在,可以通过向Python解释器提供-B开关,或者通过在运行解释器之前设置 PYTHONDONTWRITEBYTECODE环境变量来阻止Python编写.pyc或.pyo文件。此设置可作为Python程序的 变量使用,并且Python代码可以更改

    • 本文向大家介绍MySQL问答系列之如何避免ibdata1文件大小暴涨,包括了MySQL问答系列之如何避免ibdata1文件大小暴涨的使用技巧和注意事项,需要的朋友参考一下 0、导读 ibdata1文件是什么? ibdata1是一个用来构建innodb系统表空间的文件,这个文件包含了innodb表的元数据、撤销记录、修改buffer和双写buffer。如果file-per-table选项打开的话,该

    • 如果我在文件已经存在的同一个容器中上传azure blob上的文件,它正在覆盖文件,如何避免覆盖相同的文件?下面我提到的场景... 步骤1-将文件“abc.jpg”上传到azure上名为“filecontainer”的容器中 第二步-一旦它被上传,尝试上传一些不同的同名文件到同一个容器 输出 - 它将用最新上传的文件覆盖现有文件 我的要求-我想避免这种覆盖,因为不同的人可能会将具有相同名称的文件上

    • 问题内容: 我想在HDFS中创建文件并在其中写入数据。我使用以下代码: 它创建文件,但不写入任何内容。我搜索了很多,但没有找到任何东西。我怎么了 我是否需要任何权限才能在HDFS中写入? 问题答案: 的替代方法,你可以在获取文件系统时传递URI

    • 我正在使用Flume从我的本地文件系统写一些CSV文件到HDFS。 我想知道HDFS水槽的最佳配置是什么,这样本地系统上的每个文件都会在HDFS以CSV格式准确复制。我希望Flume处理的每个CSV文件都是单个事件,作为单个文件刷新和写入。尽可能多地,我希望文件是完全一样的,没有标题的东西等。 我需要在这些值上加什么来模拟我想要的行为? 如果还有其他Flume代理配置变量需要更改,请提供。 如果这

    • 问题内容: 我正在使用以下代码将Spark DataFrame保存到JSON文件 输出结果是: 如何生成单个JSON文件而不是每行一个文件? 如何避免* crc文件? 如何避免SUCCESS文件? 问题答案: 如果要单个文件,则需要在调用write之前对单个分区执行a操作,因此: 就个人而言,我觉得很烦人,输出文件的数量取决于你打电话之前有分区的数量-特别是如果你做了-但据我所知,目前没有其他办法