当前位置: 首页 > 知识库问答 >
问题:

Apache Beam-Reading JSON和Stream

冉俊德
2023-03-14
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

PCollection<String> lines = p.apply("ReadMyFile", TextIO.read().from("/Users/xyz/eclipse-workspace/beam-prototype/test.json"));
System.out.println("lines: " + lines);
p.apply(FileIO.match().filepattern("/Users/xyz/eclipse-workspace/beam-prototype/test.json"))

我只需要阅读下面的json文件。从该文件中读取完整的TestData,然后将其流式传输。

{
“testdata":{
“siteOwner”:”xxx”,
“siteInfo”:{
“siteID”:”id_member",
"siteplatform”:”web”,
"siteType”:”soap”,
"siteURL”:”www”,
}
}
}

上面的代码不是在读取json文件,而是在打印

lines: ReadMyFile/Read.out [PCollection]

你能给我提供样品参考吗?

共有1个答案

南宫炜
2023-03-14

这是读取JSON的示例代码。这是正确的做法吗?

来快速回答你的问题,是的。您的示例代码是读取包含JSON的文件的正确方法,其中文件的每一行都包含一个JSON元素。textio输入转换逐行读取文件,因此如果单个JSON元素跨越多行,那么它将不可解析

第二个代码示例具有相同的效果。

 类似资料:
  • 我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

  • 我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实

  • 我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/

  • Apache NiFi比StreamSets慢吗? 我在Apache NiFi和StreamSets中创建了一个管道,它从一个Kafka主题接收数据,并将数据转储到另一个Kafka主题中,但StreamSets的速度要比NiFi快得多。

  • 我正在学习java Stream,并且很难解决下面的问题。我卡住的原因是因为我对处理没有任何想法。 通过执行,我偶然发现了一个“count”的解决方案,但除此之外,我无法继续下去。请帮助我如何处理这些问题,并告诉我在这种情况下工作的原因。到目前为止,我已经尝试了我所学到的一切。

  • lambdas和streams的概念有点弱,所以可能有些东西真的没有任何意义,但我会尝试表达我想要发生的事情。 我有一个类发票,其中有一个项目名称,价格,和数量。我必须映射项目名称和总成本(价格*数量)。 这将在多个发票对象上完成。 此外,我需要排序他们的总数,也过滤那些超过一定的数量,例如。100.把它们放进地图后我该怎么做?

  • 我已经开始使用Akka Streams和Op Rabbit,我有点困惑。 我需要根据谓词拆分流,然后将它们组合起来,就像我在创建图形和使用分区和合并时所做的那样。 我已经能够使用GraphDSL. Builder做这样的事情,但似乎无法让它与AckedSource/Flow/Sink一起工作 图表如下所示: 我不确定是否拆分什么时候是我应该使用的,因为我总是需要正好2个流。 这是一个不进行分区且不

  • 我有课 给出一个Person类列表,我根据该类的不同属性进行聚合。对于(如)- 现在我需要得到一个结果,这样我就应该根据国家和城市的组合得到总的“totalcountrytoCityCount”,并且根据国家、城市和宠物的组合得到总的“petCount”。我可以使用groupingBy和summingint分别获得它们 它给出了结果 但我想要的实际结果是:- 令人惊讶地删除了计数