当前位置: 首页 > 知识库问答 >
问题:

Apache Beam Go sdk中的合并函数存在问题

寇夜洛
2023-03-14

在谷歌云数据流上运行管道时,我们遇到了ApacheBeamGoSDK(v2.28.0)的合并操作问题。我知道Go SDK是实验性的,但如果有人能帮助我们了解我们的代码是否有任何错误,或者Go SDK或数据流中是否存在错误,那就太好了。只有在使用Google Dataflow和一些大型数据集运行管道时,才会出现此问题。我们正在尝试合并PCollection

type pairedVec struct {
    Vec1 [1048576]uint64
    Vec2 [1048576]uint64
}

PCollection中有10000000个项目。

主要功能:

func main() {
    flag.Parse()

    beam.Init()
    ctx := context.Background()
    pipeline := beam.NewPipeline()
    scope := pipeline.Root()

    records := textio.ReadSdf(scope, *inputFile)
    rRecords := beam.Reshuffle(scope, records)

    vecs := beam.ParDo(scope, &genVecFn{LogN: *logN}, rRecords)
    histogram := beam.Combine(scope, &combineVecFn{LogN: *logN}, vecs)

    lines := beam.ParDo(scope, &flattenVecFn{}, histogram)
    textio.Write(scope, *outputFile, lines)

    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %s", err)
    }
}

读取输入文件后,Dataflow计划1000个工作人员生成PCollection,并开始进行组合。然后工人人数减少到几乎1人,持续了很长时间。最终作业失败,错误日志如下:

2021-03-02T06:13:40.438112597ZWorkflow failed. Causes: S09:CombinePerKey/CoGBK'1/Read+CombinePerKey/main.combineVecFn+CombinePerKey/main.combineVecFn/Extract+beam.dropKeyFn+main.flattenVecFn+textio.Write/beam.addFixedKeyFn+textio.Write/CoGBK/Write failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: go-job-1-1614659244459204-03012027-u5s6-harness-q8tx Root cause: The worker lost contact with the service., go-job-1-1614659244459204-03012027-u5s6-harness-44hk Root cause: The worker lost contact with the service., go-job-1-1614659244459204-03012027-u5s6-harness-05nm Root cause: The worker lost contact with the service., go-job-1-1614659244459204-03012027-u5s6-harness-l22w Root cause: The worker lost contact with the service.

工人人数的变化

编辑

在将所有记录组合在一起之前,尝试添加一个步骤将记录“预组合”到100000个密钥(CombinedDomain=100000):

主要功能:

func main() {
    flag.Parse()

    beam.Init()
    ctx := context.Background()
    pipeline := beam.NewPipeline()
    scope := pipeline.Root()

    records := textio.ReadSdf(scope, *inputFile)
    rRecords := beam.Reshuffle(scope, records)

    vecs := beam.ParDo(scope, &genVecFn{LogN: *logN}, rRecords)

    keyVecs := beam.ParDo(scope, &addRandomKeyFn{Domain: *combineDomain}, vecs)
    combinedKeyVecs := beam.CombinePerKey(scope, &combineVecFn{LogN: *logN}, keyVecs)
    combinedVecs := beam.DropKey(scope, combinedKeyVecs)

    histogram := beam.Combine(scope, &combineVecFn{LogN: *logN}, combinedVecs)

    lines := beam.ParDo(scope, &flattenVecFn{}, histogram)
    textio.Write(scope, *outputFile, lines)

    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %s", err)
    }
}

但作业仅为其安排了一个工作人员,并且在很长时间后失败:

Workflow failed. Causes: S06:Reshuffle/e6_gbk/Read+Reshuffle/e6_gbk/GroupByWindow+Reshuffle/e6_unreify+main.genVecFn+main.addRandomKeyFn+CombinePerKey/CoGBK'2+CombinePerKey/main.combineVecFn/Partial+CombinePerKey/CoGBK'2/Write failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: 
  go-job-1-1615178257414007-03072037-mrlo-harness-ppjj
      Root cause: The worker lost contact with the service.,
  go-job-1-1615178257414007-03072037-mrlo-harness-czng
      Root cause: The worker lost contact with the service.,
  go-job-1-1615178257414007-03072037-mrlo-harness-79n8
      Root cause: The worker lost contact with the service.,
  go-job-1-1615178257414007-03072037-mrlo-harness-mj6c
      Root cause: The worker lost contact with the service. 

在CombinePerKey()之前添加了另一次改组后,管道安排了1000名工人进行处理。但是这个作业非常慢,并且使用了大量的随机数据。1小时后,genVecFn完成的数据不到10%,数据容量为8.08TB。这与我们的生产代码基本一致,最终失败是因为它耗尽了40TB的随机数据配额。

我们尝试了另一种方法来减少单个工作人员的工作负载:将向量[1048576]uint64分割成[32768]uint64的32个部分,并组合每个部分。类似的东西:

    totalLength := uint64(1 << *logN)
    segLength := uint64(1 << *segmentBits)
    for i := uint64(0); i < totalLength/segLength; i++ {
        fileName := strings.ReplaceAll(*outputFile, path.Ext(*outputFile), fmt.Sprintf("-%d-%d%s", i+1, totalLength/segLength, path.Ext(*outputFile)))
        pHistogram := beam.Combine(scope, &combineVecRangeFn{StartIndex: i * segLength, Length: segLength}, vecs)
        flattened := beam.ParDo(scope, &flattenVecRangeFn{StartIndex: i * segLength}, pHistogram)
        textio.Write(scope, fileName, flattened)
    }

这项工作最终成功了。


共有1个答案

齐运诚
2023-03-14

鉴于您的管道代码,将作业缩减为1个辅助角色的行为符合Go SDK的预期,因为它缺乏Java和Python SDK的一些优化。发生这种情况的原因是因为您使用了梁。组合,这是一个全局组合,意味着PCollection中的每个元素都被组合成一个值。在Go SDK上,这意味着所有元素都需要本地化到一个单独的辅助角色才能组合,这对于1000万项来说,每个项大约是16兆字节,花费的时间太长,作业很可能会超时(您可能可以通过查看来确认这一点对于数据流日志中的超时消息)。

其他SDK也进行了优化,在整合到单个工作者之前,在工作者之间拆分输入元素以进行组合。例如,在Java SDK中:“组合可以并行进行,将输入PCollection的不同子集单独组合,并以任意树缩减模式进一步组合它们的中间结果,直到生成单个结果值。”

幸运的是,这个解决方案很容易为Go SDK手动实现。只需将元素放入N个桶中(其中N大于理想情况下想要的辅助角色数量),方法是在[0, N)范围内分配随机键。然后执行一个CompinePerKey,并且只需要在一个辅助角色上本地化具有匹配键的元素,从而允许在多个辅助角色中拆分这个联合角色。然后用DropKey和全局组合来跟进,您应该会得到预期的结果。

 类似资料:
  • a=[78,187,30] b=[78,186,185,25,30] c=[78,187,186,185,25,30] //想获得的结果 a=[1,2,3,4,5] b=[1,6,7,8,3,9,5] c=[1,2,6,7,8,3,4,9,5] //想获得的结果 a、b数组里面的值都是唯一的,怎么用js获得想要的值呢? 问了ChatGPT都没解决,它给的方法在控制台输出结果不一致,因为chatGP

  • 问题内容: 对于我正在编写的某些代码,我需要从1到20跳过6进行迭代。 有没有办法更有效地做到这一点? 问题答案: 在python 2中,您没有合并“范围函数”;这些只是列表。您的示例效果很好。但是range总是在内存中创建一个完整列表,因此,如果仅在for循环中需要,一种更好的方法是使用生成器表达式和xrange: 在生成器表达式中,if部分可以包含要跳过数字的复杂逻辑。 组合可迭代对象的另一种

  • 我有一个关于函数的问题。我有一些浮点数,例如160.325和5.325。函数的返回值应分别为160.33和5.33,但160.325返回160.32,5.325返回5.33。 我尝试过不同的方式, 我预计产出为160.33和5.33。

  • 问题内容: 我需要用同一行的一组列(从左到右)中的第一个非空条目填充一个单元格-类似于SQL中的coalesce()。 在以下示例表中 我想在A行的每个单元格中放置一个单元格函数,这样我将得到: 我知道我可以使用一系列IF函数来做到这一点,但是在我的实际工作表中,我有30列可供选择,因此,如果有一种更简单的方法,我将很高兴。 问题答案: 这是一个数组公式。输入公式后,按+ +使Excel将其评估为

  • 本文向大家介绍在Perl中合并数组,包括了在Perl中合并数组的使用技巧和注意事项,需要的朋友参考一下 由于Perl中的数组只是逗号分隔的值序列,因此可以将它们组合在一起,如下所示- 示例 输出结果 这将产生以下结果- 嵌入式数组仅成为主数组的一部分,如下所示- 示例 输出结果 这将产生以下结果-

  • 对于vue-axios auth by api_token,我使用助手文件api.js。