当前位置: 首页 > 知识库问答 >
问题:

创建多个标记输出时Google数据流超出堆

洪高扬
2023-03-14

我有许多未分区的大型BigQuery表和文件,我希望以各种方式对它们进行分区。因此,我决定尝试编写一个数据流作业来实现这一点。我认为这工作很简单。我尝试使用泛型编写,以便轻松地应用TextIO和BigQueryIO源代码。它在小型表上工作得很好,但在大型表上运行时,我总是得到Java.lang.outofMemoryError:Java堆空间

在我的主类中,我要么读取一个带有目标键的文件(由另一个DF作业生成),要么对一个BigQuery表运行一个查询,以获得要根据其进行分解的键列表。我的主类如下所示:

Pipeline sharder = Pipeline.create(opts);

// a functional interface that shows the tag map how to get a tuple tag
KeySelector<String, TableRow> bqSelector = (TableRow row) -> (String) row.get("COLUMN") != null ? (String) row.get("COLUMN") : "null";

// a utility class to store a tuple tag list and hash map of String TupleTag
TupleTagMap<String, TableRow> bqTags = new TupleTagMap<>(new ArrayList<>(inputKeys),bqSelector);

// custom transorm
ShardedTransform<String, TableRow> bqShard = new ShardedTransform<String, TableRow>(bqTags, TableRowJsonCoder.of());

String source = "PROJECTID:ADATASET.A_BIG_TABLE";
String destBase = "projectid:dataset.a_big_table_sharded_";

TableSchema schema = bq.tables().get("PROJECTID","ADATASET","A_BIG_TABLE").execute().getSchema();


PCollectionList<TableRow> shards = sharder.apply(BigQueryIO.Read.from(source)).apply(bqShard);
for (PCollection<TableRow> shard : shards.getAll()) {
    String shardName = StringUtils.isNotEmpty(shard.getName()) ? shard.getName() : "NULL";
    shard.apply(BigQueryIO.Write.to(destBase + shardName)
            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema));
    System.out.println(destBase+shardName);
} 
sharder.run();

我生成一组tupleTags用于自定义转换。我创建了一个实用工具类,它存储TupleTagListHashMap,以便通过键引用元组标记:

public class TupleTagMap<Key, Type> implements Serializable {

private static final long serialVersionUID = -8762959703864266959L;
final private TupleTagList tList;
final private Map<Key, TupleTag<Type>> map;
final private KeySelector<Key, Type> selector;

public TupleTagMap(List<Key> t, KeySelector<Key, Type> selector) {
    map = new HashMap<>();
    for (Key key : t)
        map.put(key, new TupleTag<Type>());
    this.tList = TupleTagList.of(new ArrayList<>(map.values()));
    this.selector = selector;

}

public Map<Key, TupleTag<Type>> getMap() {
    return map;
}

public TupleTagList getTagList() {
    return tList;
}

public TupleTag<Type> getTag(Type t){
    return map.get(selector.getKey(t));
}

然后我有一个自定义转换,它基本上有一个函数,使用元组映射输出pCollectionTuple,然后将其移动到pCollectionList以返回到主类:

public class ShardedTransform<Key, Type> extends
    PTransform<PCollection<Type>, PCollectionList<Type>> {


private static final long serialVersionUID = 3320626732803297323L;
private final TupleTagMap<Key, Type> tags;
private final Coder<Type> coder;


public ShardedTransform(TupleTagMap<Key, Type> tags, Coder<Type> coder) {
    this.tags = tags;
    this.coder = coder;
}

@Override
public PCollectionList<Type> apply(PCollection<Type> in) {

    PCollectionTuple shards = in.apply(ParDo.of(
            new ShardFn<Key, Type>(tags)).withOutputTags(
            new TupleTag<Type>(), tags.getTagList()));

    List<PCollection<Type>> shardList = new ArrayList<>(tags.getMap().size());

    for (Entry<Key, TupleTag<Type>> e : tags.getMap().entrySet()){
        PCollection<Type> shard = shards.get(e.getValue()).setName(e.getKey().toString()).setCoder(coder);
        shardList.add(shard);
    }
        return PCollectionList.of(shardList);
    } 
}

实际的DoFn非常简单,它只使用主类中提供的lambda,在hash映射中找到匹配的元组标记,用于侧输出:

public class ShardFn<Key, Type> extends DoFn<Type, Type> {

private static final long serialVersionUID = 961325260858465105L;

private final TupleTagMap<Key, Type> tags;

ShardFn(TupleTagMap<Key, Type> tags) {

    this.tags = tags;
}

@Override
public void processElement(DoFn<Type, Type>.ProcessContext c)
        throws Exception {
    Type element = c.element();
    TupleTag<Type> tag = tags.getTag(element);

    if (tag != null)
        c.sideOutput(tags.getTag(element), element);
    } 
}

共有1个答案

松洛华
2023-03-14

Beam模型目前对动态分区/大量分区没有很好的支持。您的方法在图构建时选择碎片的数量,然后产生的pardo可能会全部融合在一起,因此每个工作人员都试图同时写入80个不同的BQ表。每次写入都需要一些本地缓冲,所以可能太多了。

还有一种替代方法可以跨表(但不跨元素)进行并行化。如果您有大量相对较小的输出表,这将很好地工作。使用ParDo标记每个元素的表,然后执行GroupByKey。这为您提供了pcollection >> 。然后通过将元素写入表来处理每个kv >

不幸的是,现在您必须用BQ手写才能使用此选项。我们正在考虑用内置的支持来扩展接收器API。由于Dataflow SDK作为Apache Beam的一部分正在进一步开发,我们在这里跟踪该请求:https://issues.Apache.org/jira/browse/Beam-92

 类似资料:
  • 问题内容: Go语言中有没有办法记录到不同级别的多个输出? 我希望有一个程序可以同时在Info级别记录到stdout并在带有时间戳的调试级别记录一个文件。 就像我每次编写代码一样: 我可以看到控制台打印: 和一个文件: 我使用logrus和glog,但是找不到此功能。还有其他包装或我可以编码的东西吗? 问题答案: Go-logging支持不同的日志记录后端,例如文件,syslog等。可以设置多个后

  • 使用fromElements函数创建数据流时出错 下面是探险- 原因:java.io.IOException:无法从源反序列化元素。如果您使用的是用户定义的序列化(值和可写类型),请检查序列化函数。序列化程序是org.apache.flink.api.java.typeutils.runtime.kryo.kryoSerializer@599fcdda在org.apache.flink.strea

  • 我正在尝试创建一个新的JavaDB。我已经将Java DB驱动程序添加到库中,但在服务下创建新数据库时,它仍然会抛出一个错误。 我下载并定义了db-derby-10.15.2.0-bin 我在这里定义了drover文件 我在这里单击了创建数据库 填了这张表 点击om后显示如下 谁来帮帮我

  • 我目前正试图根据数据中包含的特定键,将运行在Google Dataflow上的Beam管道分叉到多个目的地。当使用TaggedOutput标记对“fork”的每个endpoint进行硬编码时,我能够实现这一点。但是,在将来,我并不总是知道底层数据中存在哪些键,因此我希望使用类似于以下的for循环动态创建流程中的后续步骤: 我的理解是,<代码>的结果。with\u outputs()应该是可编辑的,

  • 问题内容: 我知道Java中有一个函数可以使用method 将标准输出流设置为任何用户定义的值。 但是,是否有任何方法可以将标准输出重置为先前存储的标准输出还是标准输出? 问题答案: 您可以通过持有标准的文件描述符。要重置标准以打印到控制台,您可以 另一种方法是仅保留原始对象,如下所示:

  • 下面是我的流处理的伪代码。 上面的代码流程正在创建多个文件,我猜每个文件都有不同窗口的记录。例如,每个文件中的记录都有时间戳,范围在30-40秒之间,而窗口时间只有10秒。我预期的输出模式是将每个窗口数据写入单独的文件。对此的任何引用或输入都会有很大帮助。