那里!我是Cloud-DataFlow的新手。
我使用DataflowPipelineRunner读取csv文件并将结果输出到BigQuery。当csv文件的大小很小(只有20条记录,小于1MB)时,它工作得很好,但当文件的大小变大(超过1000万条记录,约616.42 MB)时,它出现了OOM错误。
以下是错误消息:
oder.decodeOutOfMemoryError:Java堆空间oder.javacom.google.cloud.dataflow.sdk.io.(Aeader.decode)com.google.cloud.dataflow.sdk.io.ByteArrayOutputSeader.read(ByteArrayOutputScom.google.cloud.dataflow.sdk.io.)eader.advanceByteArrayOutputSource.java:536容量(ByteArrayOutputScom.google.cloud.dataflow.sdk.io.)eader.advanceByteArrayOutputSource.java:287(ByteArrayOutputScom.google.cloud.dataflow.sdk.runners.worker.)com.google.cloud.dataflow.sdk.util.StreamUtils.getBytes(StreamUtils.java:63)在co.coder.MyCoder.decode(MyCoder.java:54)在co.coder.MyCjava.lang.(MyCjava.util.: 1)在rrays.copyTextIO$TextSource$TextBasedRrrays.java:3236货币元素(TextIO. java: 1065)在java.io.TextIO$TextSource$TextBasedRtream.grow下记录(TextIO. java: 1052)在tream.java:118FileBasedSource$FileBasedRjava.io.Impl(FileBasedStream.ensure)在tream.java:93OffsetBasedSource$OffsetBasedRjava.io.(OffsetBasedStream.write)在tream.java:153WorkerCustomS源$BoundedReaderIterator. AdvDataflowWorkerHarness$WorkerThread. call(DataflowWorkerHarness. java: 172)在com. google. cloud. dataflow. sdk. runners. worker。DataflowWorkerHarness$WorkerThread. call(DataflowWorkerHarness. java: 159)在java. util. conflow。未来任务. run(FutureTask. java: 266)在java. util. converter。java. util的ThreadPoolExecutor. runWorker(ThreadPoolExecutor. java: 1142)在java. util. concam。ThreadPoolExecitor$Worker. run(ThreadPoolExecutor. java: 617)在java. lang。线程.运行(Thread. java: 745)
从错误消息中可以看出,[MyCoder.java:54]中发生了错误。MyCoder是我实现的CustomCoder的一个子类,它将csv文件从Shift JIS编码为UTF-8:
53:@Override
54:public String decode(InputStream inStream, Context context) throws CoderException, IOException {
55: if (context.isWholeStream) {
56: byte[] bytes = StreamUtils.getBytes(inStream);
57: return new String(bytes, Charset.forName("Shift_JIS"));
58: } else {
59: try {
60: return readString(new DataInputStream(inStream));
61: } catch (EOFException | UTFDataFormatException exn) {
62: // These exceptions correspond to decoding problems, so change
63: // what kind of exception they're branded as.
64: throw new CoderException(exn);
65: }
66: }
67:}
另外,这是我运行DataflowPipelineRunner的方式:
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectId);
options.setStagingLocation(stagingFolderPathInGCS);
options.setWorkerMachineType("n1-highmem-4");
options.setMaxNumWorkers(5);
Pipeline p = Pipeline.create(options);
// read csv from gcs
PCollection<String> lines = p.apply(TextIO.Read.named("csv input")
.from("gs://" + bucketName + "/original/" + fileName).withCoder(MyCoder.of()));
lines.apply(TextIO.Write.named("csv output").to("gs://" + bucketName + "/encoded/" + fileName)
.withCoder(StringUtf8Coder.of()).withoutSharding().withHeader("test Header"));
p.run();
由于Dataflow是一个可扩展的大数据云服务,所以我对这个OOM错误感到有点困惑,有人能向我解释为什么会发生[OutOfMemoryError]以及如何解决它吗?
非常感谢!
我没有安静的理解,但解决问题如下:
但当文件变得巨大时(超过1000万条记录,约616.42 MB),出现OOM错误。
这是因为我只是通过处理较小的文件(只有20条记录,小于1MB)来生成测试数据,换句话说,1000万数据只有20个键。所以我改变了另一个测试数据,它有很多键(没有太多的转储数据)。
此外,我遵循Kenn Knowles的建议,通过删除以下代码,让数据流自动管理其作业和实例:
withoutSharding()
options.setWorkerMachineType("n1-highmem-4");
最终数据流工作运行良好(MachineType自动使用n1-Standard-1)!
有关dataflow[动态工作再平衡]的更多信息,请参见以下内容:https://cloud.google.com/dataflow/service/dataflow-service-desc#Autotuning
问题内容: 我制作了一个250MB的json文件,看起来应该像这样: 其中“ B”值可以为len> =1。这表示我具有有效的JSON。 我打电话 这是文档。当读入熊猫数据框时,我得到以下回溯: 想不到出了什么问题。引发错误的python文件并没有帮助。 问题答案: 我有同样的错误消息,我使用绝对路径解决了。 那对我有用!
问题内容: 我在Android Appiclation中创建CustomListAdapter。但是我在执行它时得到这个错误。 这是我的CustomListAdapter.java代码: 这是我的活动代码: 在我的活动中,已经在setAdapter()方法之前添加了CustomListAdapter实例化。但是我仍然得到那个错误。 这是我的错误日志 这是我的AppController.java 这
问题内容: 当我尝试初始化eventBus时,出现NullPointerException异常: de.mrsfinster.web.livefeed.bean.FeedBean.init(FeedBean.java:179)处的java.lang.NullPointerException sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)处sun
问题内容: 我正在尝试为在线Java课程创建程序。该程序包括Employee类和Name类。我必须创建多个Employee对象,并提示用户输入员工的姓名。我将所有Employee对象存储在一个employee数组中。 这是代码: 问题是编译器在运行程序时说最后一行是NullPointerException。我不确定自己在做什么错。有什么建议? 谢谢!-西恩 问题答案: 您创建了一个大小为的新数组,
问题内容: 错误: 这是什么问题 问题答案: 某处正在修改。我怀疑它可能在此调用内发生: 编辑 基本上,发生的是通过 另一个 迭代器进行的递归修改。这使得外部迭代器的故障快速行为得以解决。 __
我正在使用Context.SendorderedBroadcast方法发送广播,并在到达ResultReceiver之前检查是否有人收到了广播。 下面是一段代码: 到目前为止,我只在搭载Android 2.1版本的摩托罗拉Defy手机上遇到过这种情况,但它也可能发生在其他手机上。 我在网上找不到任何关于它的参考资料,有人知道为什么会这样吗?谢谢!