所以我要做的是根据文件路径向每个元素添加时间戳。作为测试,我使用了以下示例。
首先,正如本答案中所解释的,您可以使用fileio
连续匹配文件模式。这将有所帮助,因为根据您的用例,一旦您完成了回填,您就希望在同一作业中继续读取新到达的文件。在本例中,我提供gs://bucket_name/data/**
,因为我的文件将类似于gs://bucket_name/data/year/month/day/hour/filename.extension
:
p
.apply(FileIO.match()
.filepattern(inputPath)
.continuously(
// Check for new files every minute
Duration.standardMinutes(1),
// Never stop checking for new files
Watch.Growth.<String>never()))
.apply(FileIO.readMatches())
观看频率和超时可以随意调整。
.apply("Add Timestamps", ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@Override
public Duration getAllowedTimestampSkew() {
return new Duration(Long.MAX_VALUE);
}
@ProcessElement
public void processElement(ProcessContext c) {
ReadableFile file = c.element();
String fileName = file.getMetadata().resourceId().toString();
String lines[];
String[] dateFields = fileName.split("/");
Integer numElements = dateFields.length;
String hour = dateFields[numElements - 2];
String day = dateFields[numElements - 3];
String month = dateFields[numElements - 4];
String year = dateFields[numElements - 5];
String ts = String.format("%s-%s-%s %s:00:00", year, month, day, hour);
Log.info(ts);
try{
lines = file.readFullyAsUTF8String().split("\n");
for (String line : lines) {
c.outputWithTimestamp(KV.of(fileName, line), new Instant(dateTimeFormat.parseMillis(ts)));
}
}
catch(IOException e){
Log.info("failed");
}
}}))
最后,我窗口进入1小时fixedwindows
并记录结果:
.apply(Window
.<KV<String,String>>into(FixedWindows.of(Duration.standardHours(1)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply("Log results", ParDo.of(new DoFn<KV<String, String>, Void>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
String file = c.element().getKey();
String value = c.element().getValue();
String eventTime = c.timestamp().toString();
String logString = String.format("File=%s, Line=%s, Event Time=%s, Window=%s", file, value, eventTime, window.toString());
Log.info(logString);
}
}));
对我来说,它与.WithAllowedlateness(duration.zero)
一起使用,但这取决于您可能需要设置它的顺序。请记住,过高的值将导致窗口打开的时间更长,并使用更持久的存储。
我设置了$bucket
和$project
变量,只上传了两个文件:
gsutil cp file1 gs://$BUCKET/data/2019/03/17/00/
gsutil cp file2 gs://$BUCKET/data/2019/03/18/22/
并使用以下命令运行作业:
mvn -Pdataflow-runner compile -e exec:java \
-Dexec.mainClass=com.dataflow.samples.ChronologicalOrder \
-Dexec.args="--project=$PROJECT \
--path=gs://$BUCKET/data/** \
--stagingLocation=gs://$BUCKET/staging/ \
--runner=DataflowRunner"
结果:
完整代码
问题内容: 我有一个顺序文件,它是hadoop map- reduce作业的输出。在此文件中,数据以键值对的形式写入,而值本身是一个映射。我想将值读取为MAP对象,以便我可以进一步处理它。 程序输出:关键是:[this is key]值是:{abc = 839177,xyz = 548498,lmn = 2,pqr = 1} 在这里,我以字符串的形式获取值,但我希望将其作为map的对象。 问题答案
我在标准Spring Boot应用程序的resources文件夹下有以下文件。Spring忙碌的profile设置为dev,属性文件的读取顺序是什么。?
问题内容: 如何使用python以相反的顺序读取文件?我想从最后一行读取文件。 问题答案: 在Python 3中:
第一次使用python。我正在尝试浏览包含段落和表格的word文档。我已经弄清楚了如何使用以下代码浏览文档中的所有段落和文档中的所有表格: 但我正试图找到一种方法,像任何阅读它的人一样,有序地浏览这份文件。所以如果我们有一份文件包含: 它会按照这个顺序读。我想这样做的原因是,根据表格后面的段落,我想对它执行不同的操作。
本文向大家介绍Python按行读取文件的实现方法【小文件和大文件读取】,包括了Python按行读取文件的实现方法【小文件和大文件读取】的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python按行读取文件的实现方法。分享给大家供大家参考,具体如下: 小文件: 大文件: 更多关于Python相关内容感兴趣的读者可查看本站专题:《Python文件与目录操作技巧汇总》、《Python文本文件操
问题内容: 我实现了一个小的IO类,它可以从不同磁盘(例如,两个包含相同文件的硬盘)上的多个相同文件中读取数据。在顺序情况下,两个磁盘在文件上的平均读取速度均为60MB / s,但是当我进行交错操作(例如4k磁盘1、4k磁盘2然后合并)时,有效读取速度会降低到40MB / s而不是增加吗? 上下文:Win 7 + JDK 7b70、2GB RAM,2.2GB测试文件。基本上,我尝试以穷人的方式模仿