查找缓存的引用表位于BigQuery中,我们可以读取它并将其作为ParDo操作的侧输入传入,但无论我们如何设置触发器/窗口,它都不会刷新。
class FilterAlertDoFn(beam.DoFn):
def process(self, element, alertlist):
print len(alertlist)
print alertlist
… # function logic
alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))
| ‘alert_side_input’ >> beam.WindowInto(
beam.window.GlobalWindows(),
trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(
late=trigger.AfterCount(1)
)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| beam.Map(lambda elem: elem[‘SOMEKEY’])
)
...
main_input | ‘alerts’ >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))
根据这里的I/O页面(https://beam.apache.org/documentation/io/build-in/),它说Python SDK只支持BigQuery接收器的流,这是否意味着BQ读取是一个有界源,因此不能在此方法中刷新?
试图在源上设置非全局窗口会导致侧输入中出现空的PCollection。
更新:当试图实现Pablo的答案建议的策略时,使用side输入的ParDo操作不会运行。
有一个单一的输入源到两个输出,其中一个使用侧输入。非SideInput仍然会到达它的目的地,SideInput管道不会进入FilterAlertDoFn()。
通过将side输入替换为虚拟值,管道将进入函数。它可能在等待一个不存在的合适窗口吗?
使用与上面相同的FilterAlertDoFn(),我的side_input和调用现在如下所示:
def refresh_side_input(_):
query = 'select col from table'
client = bigquery.Client(project='gcp-project')
query_job = client.query(query)
return query_job.result()
trigger_input = ( p | 'alert_ref_trigger' >> beam.io.ReadFromPubSub(
subscription=known_args.trigger_subscription))
bigquery_side_input = beam.pvalue.AsSingleton((trigger_input
| beam.WindowInto(beam.window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(refresh_side_input)
))
...
# Passing this as side input doesn't work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), bigquery_side_input)
# Passing dummy variable as side input does work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), [1])
我尝试了refresh_side_input()的几个不同版本,它们在检查函数内部的返回时报告预期结果。
更新2:
我对Pablo的代码做了一些小的修改,我得到了相同的行为--DoFn从不执行。
在下面的示例中,每当我发帖到某个Other_Topic时,我都会看到'in_load_conversion_data',但当发帖到某个Other_Topic时,我永远不会看到'in_dofn'
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.transforms import trigger
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def load_my_conversion_data():
return {'EURUSD': 1.1, 'USDMXN': 4.4}
def load_conversion_data(_):
# I will suppose that these are currency conversions. E.g.
# {'EURUSD': 1.1, 'USDMXN' 20,}
print 'in_load_conversion_data'
return load_my_conversion_data()
class ConvertTo(beam.DoFn):
def __init__(self, target_currency):
self.target_currency = target_currency
def process(self, elm, rates):
print 'in_DoFn'
elm = elm.attributes
if elm['currency'] == self.target_currency:
yield elm
elif ' % s % s' % (elm['currency'], self.target_currency) in rates:
rate = rates[' % s % s' % (elm['currency'], self.target_currency)]
result = {}.update(elm).update({'currency': self.target_currency,
'value': elm['value']*rate})
yield result
else:
return # We drop that value
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
some_topic = 'projects/some_project/topics/some_topic'
some_other_topic = 'projects/some_project/topics/some_other_topic'
with beam.Pipeline(options=pipeline_options) as p:
table_pcv = beam.pvalue.AsSingleton((
p
| 'some_other_topic' >> beam.io.ReadFromPubSub(topic=some_other_topic, with_attributes=True)
| 'some_other_window' >> beam.WindowInto(window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(load_conversion_data)))
_ = (p | 'some_topic' >> beam.io.ReadFromPubSub(topic=some_topic)
| 'some_window' >> beam.WindowInto(window.FixedWindows(1))
| beam.ParDo(ConvertTo('USD'), rates=table_pcv))
正如您所指出的,Java SDK允许您使用更多的流功能,如定时器和状态。这些实用程序有助于实现类似这样的管道。
Python SDK缺少其中一些实用程序,特别是计时器。出于这个原因,我们需要使用一个hack,通过在PubSub中的some_other_topic
中插入消息,可以触发侧输入的重新加载。
这也意味着您必须手动执行对BigQuery的查找。您可能可以使用apache_beam.io.gcp.bigquery_tools.bigqueryWrapper
类直接执行对BigQuery的查找。
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
def load_conversion_data(_):
# I will suppose that these are currency conversions. E.g.
# {‘EURUSD’: 1.1, ‘USDMXN’ 20, …}
return external_service.load_my_conversion_data()
table_pcv = beam.pvalue.AsSingleton((
p
| beam.io.gcp.ReadFromPubSub(topic=some_other_topic)
| WindowInto(window.GlobalWindow(),
trigger=trigger.Repeatedly(trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(load_conversion_data)))
class ConvertTo(beam.DoFn):
def __init__(self, target_currency):
self.target_currenct = target_currency
def process(self, elm, rates):
if elm[‘currency’] == self.target_currency:
yield elm
elif ‘%s%s’ % (elm[‘currency’], self.target_currency) in rates:
rate = rates[‘%s%s’ % (elm[‘currency’], self.target_currency)]
result = {}.update(elm).update({‘currency’: self.target_currency,
‘value’: elm[‘value’]*rate})
yield result
else:
return # We drop that value
_ = (p
| beam.io.gcp.ReadFromPubSub(topic=some_topic)
| beam.WindowInto(window.FixedWindows(1))
| beam.ParDo(ConvertTo(‘USD’), rates=table_pcv))
从select语句更新表时遇到问题...下面是命令: 我要传递给此查询的参数将是sub sub查询中的report_id...select单独执行需要0.113秒,而update查询总共需要4.868秒。是因为update查询将对表的每一行执行select语句吗?我怎么能让这更快? 谢谢你
问题内容: 我有一个Java程序,可以对sql服务器数据库运行一堆查询。其中第一个查询视图返回大约750k条记录。我可以通过sql server management studio运行查询,大约30秒后即可得到结果。但是,我从昨晚开始运行该程序。当我今天早上检查它时,大约15小时后,该查询仍未将结果返回给Java程序。 我可以访问数据库以执行几乎任何我想做的事情,但是我真的不确定如何开始调试它。人
问题内容: 我们从JBoss 4(和JDK 5)升级到JBoss 5(和JDK 6)。问题是开始时间已经从1.5分钟(在JBoss 4上)增加到4分钟以上。 似乎花费JBoss最长时间来初始化的组件是JMX 从调试服务器日志中,我在有问题的时间得到以下行: 项目中没有EJB。 内存设置为: 您是否知道如何改善JBoss的启动时间? 更新: 到目前为止没有运气,我尝试了shreeni的建议(更改了扫
在我的VPS服务器上的PHP文件中考虑这个代码: 还这个 每天多次测试后,“连接时间”始终保持在50秒标记。我相信如果不是在1秒及以下,速度应该会快10倍。 我不知道服务器配置,但我被告知,我的服务器的CPU或RAM可能在故障。我使用顶部命令行显示以下内容,这对我来说似乎很好: 任务:共80项,1项运行,79项睡眠,0项停止,0项僵尸 Cpu:0.0%us、0.0%sy、0.0%ni、100.0%
问题内容: 我正在构建一个测试爬网程序,并且想知道Go(golang)是否缓存DNS查询。我没有看到有关dnsclient中缓存的任何信息。添加到任何搜寻器以防止大量额外的DNS查询似乎很重要。 Go(1.4+)是否缓存DNS查找? 如果不是,那么debian / ubuntu / linux,windows或darwin / OSX是否可以在网络级别进行任何缓存? 问题答案: 您的问题的答案是否
问题内容: 我有以下只需1秒即可执行的sql查询: 但是我需要一个结果集来获取比率大于0的结果。因此,当我将查询更改为此时,需要7分钟的时间来执行: 为什么这会使查询时间从1秒增加到7分钟?由于b表很大,因此我什至尝试使用CTE,但这也没有提高性能。我认为使用CTE可以从中筛选出较小的一组值,因此应该更快一些,但这无济于事: 我不能包括执行计划,因为除了查询之外,我没有对数据库的权限。 问题答案: