当前位置: 首页 > 知识库问答 >
问题:

Apache Beam GroupByKey()在Python中的Google DataFlow上运行时失败

景哲
2023-03-14
p_collection
            | "PairWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
            | "GroupByKey" >> beam.GroupByKey()
            | "AggregateGroups" >> beam.Map(lambda (pair, ones): (pair, sum(ones)))
            | "Format" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})

但是当我将它部署到Google Cloud DataFlow时,我得到了以下错误:

查看出现此错误的源代码,我认为这可能是由于某些名称包含一些奇怪的编码字符,所以我不顾一切地尝试使用代码上看到的.encode(“ascii”,errors=“ignore”).decode(),但没有成功。

关于为什么这个管道在本地成功执行,但在数据流运行器上失败,有什么想法吗?

共有1个答案

欧阳玺
2023-03-14

这与其说是修复了我的问题,不如说是从一开始就避免了问题,但它确实让我的代码运行了,这要归功于注释中user1093967的建议。

我只是将groupbykeyaggregategroups替换为combinePerkey(sum)步骤,这样问题就不再发生了。

p_collection
        | "PairWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
        | "GroupAndSum" >> beam.CombinePerKey(sum)
        | "Format" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})

不过,我很高兴知道它为什么有效。

 类似资料:
  • 下面是我在虚拟机中使用的缩短代码。 客户端的连接部分如下: openshift配置如下: openshift中的应用程序日志显示如下: 如有任何协助,我们将不胜感激!

  • 下面是我的discord music player bot,我用Maven和Java8编译并安装了它,使用了以下步骤: 下面是我的pom.xml 发送播放音乐的命令后出错... 我曾尝试将此方法改为Cp1252编码,但仍然失败。

  • Update:原来您不需要任何技巧来实现这一点,只需相应地使用或即可。或者,您可以在中配置自己的别名,如下所述。

  • 我有大约30个迁移文件。 当它到达包含重命名列的迁移时: 它只会在迁移时出错(比如说,第15个)。它可以正常运行所有其他迁移。如果我对此进行评论,那么它将按预期完成。 我运行或 我得到以下错误: PHP-v给了我这个: MySQL-v给了我这个: 我使用的是“条令/dbal”:“~2.3” 此错误仅在本地计算机上运行迁移时发生。在我的docker-compose堆栈和Vagant框中按预期完成。所

  • 所有测试返回“NosuchBeanDefinitionException:没有'com.example.networkService'类型的合格bean可用:预期至少有1个bean符合autowire候选。依赖项注释:{@org.springframework.beans.factory.annotation.autowired(required=true)}” 这是配置文件application

  • 我正在尝试在我的虚拟机上安装kibana。当我启动kibana时,错误如下: vagrant@vagrant-ubuntu-trusty-64:~/elasticsearch-2.1.1/plugins/kibana-4.1.1-linux-x64$./bin/kibana{"name":"Kibana","host name":"流浪者-ubuntu-Trust y-64","pid": 136