SO和Web上的大多数问题/答案都讨论了如何使用Hive将一堆小的ORC文件组合成一个更大的文件,但是,我的ORC文件是日志文件,每天都分开,因此我需要将它们分开。我只想每天“汇总”
ORC文件(它们是HDFS中的目录)。
我最有可能需要用Java编写解决方案,并且遇到过OrcFileMergeOperator,这可能是我需要使用的内容,但还为时过早。
解决此问题的最佳方法是什么?
这里有很好的答案,但是没有一个答案允许我进行日常工作,以便我可以进行日常汇总。我们记录了每天写入HDFS的日志文件,我不想每天进来时在Hive中运行查询。
我最终所做的事情对我来说似乎更加直接。我编写了一个Java程序,该程序使用ORC库扫描目录中的所有文件并创建这些文件的列表。然后打开一个新的Writer,它是“组合”文件(以“。”开头,因此从Hive中隐藏了,否则Hive将失败)。然后,程序将打开列表中的每个文件,并读取内容并将其写出到组合文件中。读取所有文件后,它将删除文件。我还添加了在需要时一次运行一个目录的功能。
注意:您将需要一个架构文件。日志日志可以在json“ journalctl -o json”中输出,然后可以使用Apache
ORC工具生成模式文件,也可以手动执行一个。ORC的自动生成很好,但手动总是更好。
注意:要按原样使用此代码,您将需要一个有效的keytab并在类路径中添加-Dkeytab =。
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import com.cloudera.org.joda.time.LocalDate;
public class OrcFileRollUp {
private final static String SCHEMA = "journald.schema";
private final static String UTF_8 = "UTF-8";
private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
private static final String keytabLocation = System.getProperty("keytab");
private static final String kerberosUser = "<userName>";
private static Writer writer;
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
InetAddress myHost = InetAddress.getLocalHost();
String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);
int currentDay = LocalDate.now().getDayOfMonth();
int currentMonth = LocalDate.now().getMonthOfYear();
int currentYear = LocalDate.now().getYear();
Path path = new Path(HDFS_BASE_LOGS_DIR);
FileSystem fileSystem = path.getFileSystem(conf);
System.out.println("The URI is: " + fileSystem.getUri());
//Get Hosts:
List<String> allHostsPath = getHosts(path, fileSystem);
TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
.replaceAll("\n", ""));
//Open each file for reading and write contents
for(int i = 0; i < allHostsPath.size(); i++) {
String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working"; //filename: .2018_04_24.orc.working
//Create list of files from directory and today's date OR pass a directory in via the command line in format
//hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
String directory = "";
Path outFilePath;
Path argsPath;
List<String> orcFiles;
if(args.length == 0) {
directory = currentYear + "/" + currentMonth + "/" + currentDay;
outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
try {
orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
} catch (Exception e) {
continue;
}
} else {
outFilePath = new Path(args[0] + "/" + outFile);
argsPath = new Path(args[0]);
try {
orcFiles = getAllFilePath(argsPath, fileSystem);
} catch (Exception e) {
continue;
}
}
//Create List of files in the directory
FileSystem fs = outFilePath.getFileSystem(conf);
//Writer MUST be below ^^ or the combination file will be deleted as well.
if(fs.exists(outFilePath)) {
System.out.println(outFilePath + " exists, delete before continuing.");
} else {
writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
.setSchema(schema));
}
for(int j = 0; j < orcFiles.size(); j++ ) {
Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));
VectorizedRowBatch batch = reader.getSchema().createRowBatch();
RecordReader rows = reader.rows();
while (rows.nextBatch(batch)) {
if (batch != null) {
writer.addRowBatch(batch);
}
}
rows.close();
fs.delete(new Path(orcFiles.get(j)), false);
}
//Close File
writer.close();
//Remove leading "." from ORC file to make visible to Hive
outFile = fileSystem.getFileStatus(outFilePath)
.getPath()
.getName();
if (outFile.startsWith(".")) {
outFile = outFile.substring(1);
int lastIndexOf = outFile.lastIndexOf(".working");
outFile = outFile.substring(0, lastIndexOf);
}
Path parent = outFilePath.getParent();
fileSystem.rename(outFilePath, new Path(parent, outFile));
if(args.length != 0)
break;
}
}
private static String getSchema(String resource) throws IOException {
try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {
return IOUtils.toString(input, UTF_8);
}
}
public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
List<String> hostsList = new ArrayList<String>();
FileStatus[] fileStatus = fs.listStatus(filePath);
for (FileStatus fileStat : fileStatus) {
hostsList.add(fileStat.getPath().toString());
}
return hostsList;
}
private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
List<String> fileList = new ArrayList<String>();
FileStatus[] fileStatus = fs.listStatus(filePath);
for (FileStatus fileStat : fileStatus) {
if (fileStat.isDirectory()) {
fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
} else {
fileList.add(fileStat.getPath()
.toString());
}
}
for(int i = 0; i< fileList.size(); i++) {
if(!fileList.get(i).endsWith(".orc"))
fileList.remove(i);
}
return fileList;
}
}
解决这个问题的最佳方法是什么?
我正在从s3读取csv文件,并以ORC的身份写入配置单元表。在写的同时,它也在写大量的小文件。我需要合并所有这些文件。我设置了以下属性: 除了这些配置之外,我尝试了repartition(1)和coalesce(1),这将合并到单个文件中,但它会删除配置单元表并重新创建它。 如果我使用追加模式而不是覆盖模式,它会在每个分区下创建重复文件。 在这两种情况下,spark作业运行两次,在第二次执行时失败
我在Hive有一个分区的兽人表。加载所有可能的分区后,我在HDFS上得到多个ORC文件,即HDFS上的每个分区目录中都有一个ORC文件。我需要将每个分区下的所有这些ORC文件组合成一个大的ORC文件,用于某些用例。 有人能给我建议一个方法,把这些多个ORC文件(属于每个分区)组合成一个单一的大ORC文件。 我试着从分区表创建一个新的非分区ORC表。它确实减少了文件的数量,但不会减少到单个文件。 p
问题内容: 如何使用Java合并两个WAV文件? 我试过了,但是没有正常工作,他们还有其他方法吗? 问题答案: 如果直接使用wav文件的字节,则可以在任何编程语言中使用相同的策略。对于此示例,我将假设两个源文件具有相同的比特率/数字通道,并且具有相同的长度/大小。(否则,您可能可以在开始合并之前对其进行编辑)。 首先看一下WAV规范,我在斯坦福课程网站上找到了一个很好的人: 常见的标头长度为44或
问题内容: 对于某些要求,我想将 文本文件(定界) 转换为 ORC(优化行列) 格式。由于必须定期运行它,因此我想编写一个 Java程序 来执行此操作。我不想使用Hive临时表解决方法。有人可以帮我吗?以下是我尝试过的 运行此命令将显示以下错误,并在本地生成一个名为 part-00000 的文件 问题答案: 您可以使用Spark数据帧非常轻松地将定界文件转换为orc格式。您还可以指定/施加模式并过
我有一个很大的netCDF(.nc)文件文件夹,每个文件都有相似的名称。数据文件包含时间、经度、纬度和月降水量变量。目标是获得每个月X年的平均月降水量。因此,最终我会得到12个值,代表每个纬度和长纬度的X年平均月降水量。多年来,每个文件都位于同一位置。每个文件以相同的名称开头,以“date.nc”结尾,例如: 结尾是年月。目前为止我所知道的是: 我得到一个KeyError: ('time ','经