我正在以字典的形式将(无界)数据从谷歌云Pubsub流进一个PCollection。当流数据进来时,我想通过在一个静态(有界)查找表上按键连接它来丰富它。这个表足够小,可以保存在内存中。
我已经使用beam.io.ReadFromText
函数从csv中读取了有界查找表,并将值解析到字典中。然后,我创建了一个pardo
函数,该函数将我的无界pcollection
和查找字典作为侧输入。在pardo
中,它使用生成器“联接”查找表的正确行,并充实输入元素。
以下是主要部分…
# Get bounded lookup table
lookup_dict = (pcoll | 'Read PS Table' >> beam.io.ReadFromText(...)
| 'Split CSV to Dict' >> beam.ParDo(SplitCSVtoDict()))
# Use lookup table as side input in ParDo func to enrich unbounded pcoll
# I found that it only worked on my local machine when decorating it with AsList
enriched = pcoll | 'join pcoll on lkup' >> beam.ParDo(JoinLkupData(), data=beam.pvalue.AsList(lookup_dict)
class JoinLkupData(beam.DoFn):
def process(self, element, lookup_data):
# I used a generator here
lkup = next((row for row in lookup_data if row[<JOIN_FIELD>]) == element[<JOIN_FIELD>]), None)
if lkup:
# If there is a join, add new fields to the pcoll
element['field1'] = lkup['field1']
element['field2'] = lkup['field2']
yield element
在本地使用DirectRunner运行时,我能够获得正确的结果,但在DataFlow运行器上运行时,我收到以下错误:
apache_beam.runners.Dataflow.dataflow_runner.dataflowRuntimeException:数据流管道失败。状态:失败,错误:工作流失败。原因:预期自定义源的拆分数为非零。
这篇文章:“在Dataflow runner上拆分pcollections时出错”,让我想到这个错误的原因与多个工作人员在拆分工作时无法访问相同的查找表有关。
将来,如果可以的话,请分享Beam的版本和堆栈跟踪。
在这种情况下,错误消息不是很好是一个已知的问题。在撰写本文时,Python流数据流仅限于Pubsub用于读写,BigQuery用于写。在管道中使用文本源会导致此错误。
我有3个不同类型的键控数据流。 我不能使用联合(允许多个数据流),因为类型不同。我希望避免创建包装器,并将所有流转换为相同的类型。
我有一张地图清单
那么,问题是如何通过使用关键字列表在某些站点的联系人页面上动态地找到所有的文本字段和提交按钮?
问题内容: 我有两个域,是一对多关系中的一部分。我想知道如何查询孩子的父母FK吗?贝娄是父母/孩子的伪代码 上级: 儿童: 尽管我没有明确创建FK,但是grails会自行创建MySQL数据库。但是,当我想像这样通过FK查询孩子时: 我收到一个错误:找不到类[class mgr.AlumLanguage]的名称[alumProfileId]的属性 关于如何做到这一点的任何建议? 谢谢杰森 问题答
问题内容: 有一个像这样的json: 如何在不迭代所有json的情况下找到all 的值? PS:可以在json中的任何位置。 如果没有方法可以做到这一点,您能告诉我如何遍历json吗? 问题答案: 我对这个问题的处理方式会有所不同。 由于JSON不允许深度优先搜索,因此将json转换为Python对象,将其提供给XML解码器,然后提取要搜索的Node
问题内容: 我的目标是使用的Android 4.0中的REST Web服务。除非我尝试执行某些操作,否则此方法效果很好。这是相关的代码部分: 这将引发以下异常: 通过调试,我意识到通过的输出流是类型的,并且通过挖掘Android源代码,我发现如果需要重试请求(无论出于何种原因),它就会抛出上述异常,因为它指出了这一点。是 不是 用它想在那里。 现在的问题是:如何使HttpsURLConnectio