在我的Java应用程序中,我使用的是一个文本文件(大小为300MB),它保存在HDFS中。文件的每一行都包含一个字符串和一个用逗号分隔的整数ID。我正在逐行读取文件,并从中创建Hashmaps(String,ID)。
文件如下所示:
String1,Integer1
String2,Integer2
...
现在,我当前正在直接使用Apacha Hadoop配置和FileSystem对象从HDFS读取文件。
Configuration conf = new Configuration();
conf.addResource("core-site.xml"));
conf.addResource("hdfs-site.xml"));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
path= "<some location in HDFS>"
FileSystem fs = FileSystem.get(URI.create(path), conf);
in = fs.open(new Path(path));
输入流“in”被传递给另一个名为read(InputStream in)的函数,用于读取文件。
public void init(InputStream is) throws Exception {
ConcurrentMap<String, String> pageToId = new ConcurrentHashMap();
ConcurrentMap<String, String> idToPage = new ConcurrentHashMap();
logger.info("Free memory: " + Runtime.getRuntime().freeMemory());
InputStreamReader stream = new InputStreamReader(is, StandardCharsets.UTF_8);
BufferedReader reader = new BufferedReader(stream);
List<String> pageIdMappingColumns = ServerProperties.getInstance().getIdMappingColumns();
String line;
int line_no=0;
while (true) {
try {
line = reader.readLine();
if (line == null) {
break;
}
line_no++;
//System.out.println("Free memory: " + Runtime.getRuntime().freeMemory());
String[] values = line.split(COMMA);
//System.out.println("Free memory: " + Runtime.getRuntime().freeMemory());
if (values.length < pageIdMappingColumns.size()) {
throw new RuntimeException(PAGEMAPPER_INVALID_MAPPING_FILE_FORMAT);
}
String id = EMPTY_STR;
String page = EMPTY_STR;
for (int i = 0; i < values.length; i++) {
String s = values[i].trim();
if (PAGEID.equals(pageIdMappingColumns.get(i))) {
id = s;
continue;
}
if (PAGENAME.equals(pageIdMappingColumns.get(i))) {
page = s;
}
}
pageToId.put(page, id);
idToPage.put(id, page);
} catch (Exception e) {
logger.error(PAGEMAPPER_INIT + e.toString() + " on line " + line_no);
}
}
logger.info("Free memory: " + Runtime.getRuntime().freeMemory());
logger.info("Total number of lines: " + line_no);
reader.close();
ConcurrentMap<String, String> oldPageToId = pageToIdRef.get();
ConcurrentMap<String, String> oldIdToPage = idToPageRef.get();
idToPage.put(MINUS_1, START);
idToPage.put(MINUS_2, EXIT);
pageToId.put(START, MINUS_1);
pageToId.put(EXIT, MINUS_2);
/* Update the Atomic reference hashmaps in memory in two conditions
1. If there was no map in memory(first iteration)
2. If the number of page-names and page-id pairs in the mappings.txt file are more than the previous iteration
*/
if (oldPageToId == null || oldIdToPage != null && oldIdToPage.size() <= idToPage.size() && oldPageToId.size() <= pageToId.size()) {
idToPageRef.set(idToPage);
pageToIdRef.set(pageToId);
logger.info(PAGEMAPPER_INIT + " " + PAGEMAPPER_UPDATE_MAPPING);
} else {
logger.info(PAGEMAPPER_INIT + " " + PAGEMAPPER_LOG_MSZ);
}
}
当工作这样完成时,我正在关闭溪流:
IOUtils.closeQuietly(is);
我每隔1小时执行上面的代码,因为在这段时间内,文件在HDFS中被更改。所以现在,我得到了Java.lang.OutOfMemoryError:Java堆空间。
我的问题是:就内存需求而言,是否将文件复制到磁盘上然后使用它而不是直接从HDFS访问它更好?
注意:该文件有超过3200000行。
溪流永远是选择的方式。
您接收OutOfMemory是因为您从未关闭流,从而导致内存泄漏。
手动关闭流或使用try-with-resource
编辑
pageToId.put(page, id);
idToPage.put(id, page);
您在内存中至少存储了文件大小的2倍。大约600MB。
之后,将该值赋给某个ref
变量:
idToPageRef.set(idToPage);
pageToIdRef.set(pageToId);
我猜您仍然在某个地方引用旧的ref
数据,因此内部地图数据没有发布。
您还在
throw new RuntimeException(PAGEMAPPER_INVALID_MAPPING_FILE_FORMAT);
您应该使用try-with-resource或手动关闭finally
块中的流。
是否有一种已知的方法使用Hadoop api/spark scala在Hdfs上将文件从一个目录复制到另一个目录? 我尝试使用copyFromLocalFile,但没有帮助
如何将文件从HDFS复制到本地文件系统。文件下没有文件的物理位置,甚至没有目录。我如何将它们移到本地进行进一步的验证。我通过winscp进行了尝试。
我需要从本地文件系统复制一个文件夹到HDFS。我找不到任何例子移动文件夹(包括它的所有子文件夹)到HDFS
我想将文件salesjan2009.csv(存储在本地文件系统中,~/input/salesjan2009.csv)复制到HDFS(Hadoop分布式文件系统)主目录中 我编写了这段代码hduser@ubuntu:/usr/local/hadoop$hdfs dfs-copyfromlocal'/home/hduser/desktop/input/salesjan2009.csv'/hdfs-pa
我已经在Ubuntu 14.04上安装了hadoop。每当我将文件从本地文件系统复制到HDFS时,我都会出现以下错误。 我使用这个命令: 我遇到的错误是: 我是Linux环境的新手。我不明白哪个文件不存在。
我的s3存储桶中有很多文件,所以是否有任何aws cli命令可用于在s3中查找带有前缀名的最新文件?如何将该文件从s3复制到本地文件夹?我可以使用Boto3或python库来实现这一点吗?