我使用的是Apache Beam2.14和Java。
给定如下所示的数据集:
| countryID | sessionID | pageID | count |
| --------- | --------- | --------- | -------- |
| a | a | a | 1 |
| a | b | c | 2 |
| b | c | a | 4 |
| c | d | a | 6 |
我想返回一个数据集,它只包含计数之和位于前N个countryDs的行,对于每个countryID是前N个sessionID,对于每个sessionID是前N个pageID。
数据集的大小以数十亿行为单位--它将无法容纳在内存中。顺便说一句--数据集驻留在BigQuery中,并尝试在BigQuery中使用DENSE_RANK()或ROW_NUMBER()函数错误直接执行此操作,但由于该大小导致“内存限制超出”错误,因此尝试使用Dataflow。
我目前的策略是:
top.of
获取顶级countryidstop.perkey
获取每个国家的顶级会话。棘手的部分是,需要在每个“group by”级别保留行,以便在最后发出它们。我尝试创建一个树形结构,其中每个节点保存“group by”步骤的结果--这样我就可以只计算一次它的子节点的总和,以便在后续步骤中进行比较。也就是说,在每个“group by”步骤中,结果是一个kV
,节点具有如下字段:
@DefaultCoder(SerializableCoder.class)
public static class TreeNode implements Node, Serializable {
private Long total = 0L;
private KV<String, Iterable<LeafNode>> kv;
...
虽然这几乎适用于direct runner和一个小样本数据集,但在dataflow上运行时,我会遇到与节点
类相关的序列化错误,因为Iterable
是输入pCollection的窗口:
由:java.io.notSerializableException:org.apache.beam.runners.dataflow.worker.util.BatchGroupSobyWindowviaiteratorsFn$Windowreterable
(参见https://beam.apache.org/releases/javadoc/2.15.0/index.html?org/apache/beam/sdk/transforms/groupbykey.html)
考虑到我需要处理的数据集大小,将数据复制到内存中的不同集合以实现可序列化不是一个可行的选择。
以下是到目前为止管道的一个示例--仅使用2个分组级别作为示例:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
.apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA)
.apply("Set first level key", WithKeys.of(new GroupKey(key1)))
.apply("Group by", GroupByKey.create())
.apply("to leaf nodes", ParDo.of(new ToLeafNode()))
.apply("Set 2nd level key", WithKeys.of(new GroupKey2()))
.apply("Group by 2nd level", GroupByKey.create())
.apply("To tree nodes", ParDo.of(new ToTreeNode()))
.apply("Top N", Top.of(10, new CompareTreeNode<TreeNode>()))
.apply("Flatten", FlatMapElements.via(new FlattenNodes<TreeNode>()))
.apply("Expand", ParDo.of(new ExpandTreeNode()))
.apply("Top N of first key", Top.perKey(10, new CompareTreeNode<LeafNode>()))
.apply("Values", Values.create())
.apply("Flatten", FlatMapElements.via(new FlattenNodes<LeafNode>()))
.apply("Expand", ParDo.of(new ExpandLeafNode()))
.apply("Values", Values.create())
.apply("Write to bq",
BigQueryIO.<Row>write().to(outputTable).withSchema(BigQueryUtils.toTableSchema(SCHEMA))
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withFormatFunction(BigQueryUtils.toTableRow()));
pipeline.run();
似乎这应该是一个共同的目标,所以我想知道是否有更简单的方法,或者任何使用Beam在Java中实现同样事情的例子。
您可以尝试使用setcoder
设置代码,如下所示。
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
.apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA)
.apply("Set first level key", WithKeys.of(new GroupKey(key1)))
.apply("Group by", GroupByKey.create())
.apply("to leaf nodes", ParDo.of(new ToLeafNode()))
.apply("Set 2nd level key", WithKeys.of(new GroupKey2()))
.apply("Group by 2nd level", GroupByKey.create())
.apply("To tree nodes", ParDo.of(new ToTreeNode())).setCoder(SerializableCoder.of(TreeNode.class))
.apply("Top N", Top.of(10, new CompareTreeNode<TreeNode>()))
.apply("Flatten", FlatMapElements.via(new FlattenNodes<TreeNode>()))
.apply("Expand", ParDo.of(new ExpandTreeNode()))
.apply("Top N of first key", Top.perKey(10, new CompareTreeNode<LeafNode>()))
.apply("Values", Values.create())
.apply("Flatten", FlatMapElements.via(new FlattenNodes<LeafNode>()))
.apply("Expand", ParDo.of(new ExpandLeafNode()))
.apply("Values", Values.create())
.apply("Write to bq",
BigQueryIO.<Row>write().to(outputTable).withSchema(BigQueryUtils.toTableSchema(SCHEMA))
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withFormatFunction(BigQueryUtils.toTableRow()));
pipeline.run();
但是,对于需要确定前N个国家、前N个会话和前N个页面的情况,我建议将管道简化为只对右侧字段单独分组,然后应用sum
和Top
,如下所示。
Pipeline pipeline = Pipeline.create(options);
rows = pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
.apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA);
sumByCountry =rows.apply("Set Country key", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via((Row row) -> KV.of(row.getCountry(), row.getCount()))))
.apply("Country Scores", Sum.<String>integersPerKey());
.apply("Top Countries", Top.of(N, new CompareValues()))
// Do the same for Session and page
sumBySession = rows....
sumByPage = rows....
我不确定您是否想要获得前N个国家的所有行,但如果您想要获得行,那么您可以在行PCollection上使用前N个国家的侧输入并过滤出结果。您可以对会话和页面执行相同的操作。
Dataflow应该根据这个用例的需要进行伸缩,所以您不需要为这个用例手动执行中间groupby。
我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。
如何聚合一个值在嵌套在Elasticsearch嵌套位置?我对一个嵌套对象没有问题,但在嵌套对象内的嵌套我感到困惑... 样本数据: 欲望结果: 在索引映射中,我将cat_a和条目字段的类型设置为嵌套,当我从工具字段查询聚合时,在cat_a的根(级别1)中没有问题,并且可以工作,但是在聚合中在rx_a(这是在第2级)我不能检索结果,它或空或显示错误,因为我的错误查询。 查询级别1 agg: 如何处
在Elasticsearch中,是否可以在嵌套筛选器中引用顶级(非嵌套)属性? 我有一种情况,我需要一个条件在全局级别或在任何数量的关联嵌套对象中的一个中为真。在嵌套筛选器内部,我有一个或筛选器来检查其中一个或另一个,但似乎忽略了外部属性。这里有一个例子。 我有一种感觉,我需要的东西不受支持,嵌套筛选器内部的所有内容都必须在指定的路径上或以下应用(从文档开始,“对嵌套对象/文档执行查询,就像它们被
我有一门java课 在上面的场景中,示例具有子示例,这又是示例列表。此嵌套可以是 n 级。我想实现的是有一个示例列表,即扁平化上面的对象并将所有示例收集到最终列表中(收集所有n级示例)。一个明显的方法是递归。在Java中有什么方法可以更有效地实现它。我尝试了一些java 8概念,但它们不符合要求。
我试图从API调用中检索数据并将其传递给另一个服务。我收到的数据是在一个特定的JSON结构,我想把它映射到一个结构,但没有多层次的数据。我尝试了点符号来访问更深的值,但它不起作用。 基本上,我试图得到一个包含一系列“问题”(key、self、description)的结构,但没有“fields.description”结构。 JSON: 结构: 预期/期望的结构: 有可能吗?如果是,怎么做?使用嵌
问题内容: 我在文件中有此代码 当我运行时,我得到了回溯: 我真的没有在这里看到问题。理解中没有定义吗? 奇怪的是,当直接粘贴到python解释器中时,这似乎如何执行而没有错误… 编辑 :如果我使用列表理解而不是集合理解,则此方法有效 问题答案: 我在这里提出了一个错误。这是 还是坏了 通过python 2.7.5设计。 从错误报告: 在Python 2中,列表推导式没有自己的作用域,因此在您的第