with beam.Pipeline(options=pipeline_args) as p:
#read the dataset from bigquery
query_top_30_items = (
p
| 'GetTopItemNumbers' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, COUNT(item_number) AS freq_count FROM
[bigquery-public-data.iowa_liquor_sales.sales] GROUP BY item_number
ORDER BY freq_count DESC
LIMIT 30"""
)
| 'ReadItemNumbers' >> beam.Map(lambda elem: elem['item_number'])
| 'ItemNumberAsList' >> beam.combiners.ToList()
)
query_top_30_stores = (
p
|
'GetTopStores' >> beam.io.ReadFromBigQuery(
query = """SELECT store_number, COUNT(store_number) AS store_count
FROM [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY
store_number ORDER BY store_count DESC LIMIT 30"""
)
| 'ReadStoreValues' >> beam.Map(lambda elem:elem['store_number'])
| 'StoreValuesAsList' >> beam.combiners.ToList()
)
query_whole_table = (
(query_top_30_items, query_top_30_stores)
|'ReadTable' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, store_number, bottles_sold,
state_bottle_retail FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
| 'FilterByItems' >> beam.Filter(lambda row:row['item_number'] in query_top_30_items)
| 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores)
)
temp_location=pcoll.pipeline.options.view_as(Traceback(最近的调用最后):文件“run.py”,第113行,在run()文件“run.py”中,第100行,在run“filterbystore”>>beam.filter(lambda row:row['store_number'],在query_top_30_stores中)文件“lib/python3.7/site-packages/apache_beam/transforms/ptransform.py”,第573行,在ror result=p.applice(self,pvalueish,label)文件“/library/frameworks/python.framework/versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py”中,第646行,在apply return self.applice(transform,pvalueish,label)文件“ish,self._options)文件”/Library/frameworks/python.framework/versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py“,第188行,在apply return m(转换,输入,选项)文件”/library/frameworks/python.framework/versions/3.7/lib/python3.7/site-packon.framework/versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py“中,第218行,在=pcoll.pipeline.options.view_as(attributeError:”tuple“对象没有属性”pipeline“
因为我是Beam的新手,所以代码没有那么优化。请让我知道如果我可以进一步优化这个代码。
感谢您的时间和帮助!
对函数应用筛选条件在管道中不起作用。您有两个相同的选择:-
函数上的筛选条件对于函数返回给调用函数的内容将是不明确的。因此,修改您的代码,将筛选条件应用到上面突出显示的两个位置中的任何一个。
我计划每天向BigQuery表添加增量数据。每次向现有表添加增量数据时,我都希望从表中现有数据中消除重复记录(基于主键列)。一种方法是:- 从增量数据中收集密钥集(让我们称之为) 在 - 行上运行查询,并将结果存储在新表中。 将增量数据追加到新表中。 我对这种方法的担忧是,它会创建一个大桌子的副本,并添加到我的账单中。 有没有更好的方法可以在不创建重复表的情况下实现相同的目标?
问题内容: (PostgreSQL 8.4) 表“ trackingMessages”存储移动设备(tm_nl_mobileid)和固定设备(tm_nl_fixedId)之间的跟踪事件。 这里的问题是,同一台移动设备随后可能会连接到同一固定设备两次(或多次)。我不希望看到后续的固定装置,但是如果以后有连接到另一个固定装置的移动设备,可以稍后再将其连接到相同的固定装置。 我想我很亲密,但还不完全。我
我试图在bigquery中创建一个表,从google存储中传输csv。这个csv有3000万行,我得到了这些错误,例如: 读取数据时出错,错误消息:行中从位置2543333656开始的值太多。找到4列,而预期为3列 读取数据时出错,错误消息:行中从位置254312106开始的值太多。找到4列,而预期为3列 这很尴尬,我的csv中没有所有这些行。它在2.5亿行中发现了一个错误。怎么可能?? 非常感谢
我希望从ParDo函数中调用操作,为中的每个键生成单独的BigQuery表(我使用的是python SDK)。这里有两个类似的线程,不幸的是没有帮助: 1)https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey 当我执行以下代码时,第一个键的行被插入到BigQuery,然后管道失败,出现以下错误
问题内容: 我在ElasticSearch中具有以下结构的文档: 基本上,详细信息记录分为5个数组,同一记录的字段在5个数组中具有相同的索引位置。在示例数据中可以看到,有5个数组(价格,最大占用率,类型,可用性,大小),其中包含与同一元素相关的值。我要提取具有max_occupancy字段大于或等于2的元素(如果没有2的记录,则抢3;如果没有3的记录,则抢4,…),价格较低,在这种情况下为记录并将
我在Google BigQuery中有一个表,它由几个字段组成,然后是一个可能包含一个或多个对象的重复记录。我想在重复数据中创建一个新表,其中包含一个额外字段,并将原始数据复制到新表中,用GENERATE_UUID()的输出填充新字段,以便每个重复数据行都有一个唯一标识符。 我有一个类似的问题,当目标包含重复字段时,如何从一个BigQuery表复制到另一个?但我不知道如何调整它以适应我当前的用例。