我想发布消息到一个发布/子主题与一些属性感谢数据流作业在批处理模式。
它与@ankur解决方案一起工作:https://stackoverflow.com/a/55824287/9455637
但我认为使用共享的pub/sub客户端会更有效:https://stackoverflow.com/a/55833997/9455637
但是发生了一个错误:
import apache_beam as beam
from apache_beam.io.gcp import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud.pubsub_v1 import PublisherClient
import json
import argparse
import re
import logging
class PubsubClient(PublisherClient):
def __reduce__(self):
return self.__class__, (self.batch_settings,)
# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
def __init__(self):
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = PubsubClient(batch_settings)
super().__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(
topic=element["topic"],
data=json.dumps(element["data"]).encode("utf-8"),
**element["attributes"],
)
return future.result()
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
"--source_table_id",
dest="source_table_id",
default="",
help="BigQuery source table <project>.<dataset>.<table> with columns (topic, attributes, data)",
)
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
# pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
bq_source_table = known_args.source_table_id
bq_table_regex = r"^(?P<PROJECT_ID>[a-zA-Z0-9_-]*)[\.|\:](?P<DATASET_ID>[a-zA-Z0-9_]*)\.(?P<TABLE_ID>[a-zA-Z0-9_-]*)$"
regex_match = re.search(bq_table_regex, bq_source_table)
if not regex_match:
raise ValueError(
f"Bad BigQuery table id : `{bq_source_table}` please match {bq_table_regex}"
)
table_ref = bigquery.TableReference(
projectId=regex_match.group("PROJECT_ID"),
datasetId=regex_match.group("DATASET_ID"),
tableId=regex_match.group("TABLE_ID"),
)
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "ReadFromBqTable" #
>> bigquery.ReadFromBigQuery(table=table_ref, use_json_exports=True) # Each row contains : topic / attributes / data
| "PublishRowsToPubSub" >> beam.ParDo(PublishFn())
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
问题内容: 我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)内累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。 我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。 这是到目前为止我所
我的管道是IoTCore->pub/sub->Dataflow->bigQuery。最初,我得到的数据是Json格式的,管道工作正常。现在我需要转向csv,问题是我使用的Google定义的数据流模板使用Json输入而不是csv。是否有一种简单的方法通过数据流将csv数据从pub/sub转移到bigquery。模板可能会改变,但它是用Java实现的,我从来没有用过,所以需要很长时间来实现。我还考虑过
为了良好地沟通,你必须认识到它的困难。它本身就是一种技能。与你交流的人本身是有瑕疵的,这一事实使得沟通变得更加困难。他们不会努力去理解你。他们不善言辞。他们经常过度工作或者无聊,至少,有时候只关注他们自己的工作而非你要发表的长篇大论。上课,练习写作,公共演讲,聆听,这些东西的一个好处是,如果你擅长它们,你可以更容易看到问题所在以及解决方法。 程序员是一种社会动物,他们的生存依赖于与团队的交流。高级
我正在构建一个事件驱动的微服务架构,它应该是云不可知的(尽可能多)<由于这最初是在GCP中进行的,我不想在配置和所有这些方面花费太长时间,我打算直接将GCP的发布/订阅用于事件队列,并在稍后处理其他云实现,但后来我遇到了Spring云数据流,这看起来很好,因为这些是Spring Boot微服务,我需要一种方法来协调它们 Spring Cloud数据流是否支持Pub Sub作为事件队列? 在配置和设
我有一个在< code>postgresql数据库上使用< code>typeorm的更新查询,如下所示,该查询频繁地在20个项目的列表上执行(每30秒一次)。大约需要。更新12秒,对我的极限来说已经很多了。 是否有可能在单个查询中执行这样的批量更新,而不是迭代其他项?如果是的话-怎么做? 和对于每个项目都是唯一的。
问题内容: 我已经开始尝试使用Django REST框架。到目前为止,我已经成功地为我的对象创建了一个序列化程序,通过Javascript的$ .post()创建了发布视图,发布对象和返回对象。因此,现在我可以在JSON和Django模型对象之间进行适当的转换。 问题是我有一个对象数组[A1,A2,…,An]。现在,当我需要发布这样的数组时,我逐个对象地进行处理。是否有可能一次发布整个数组,并在D