我有一个Pub/Sub主题,它会定期(通常每隔几天或几周一次,但有时更频繁)接收批量消息。我想启动一个批处理数据流作业来读取这些消息,执行一些转换,将结果写入Datastore,然后停止运行。当新一批消息发出时,我想启动一项新工作。我已经阅读了Apache Beam PythonSDK文档和许多SO问题,但仍不确定一些事情。
Pub/Sub IO可以作为非流作业的一部分读取吗?然后同一作业可以使用Datastore IO(目前不支持流)进行写入吗?我是否可以假设默认的全局窗口和触发器会正确地告诉作业何时停止从Pub/Sub读取(当不再写入一批消息时)?或者我是否需要添加某种触发/窗口方案,例如最大时间或最大元素数?触发时,触发器会告诉全局窗口关闭并因此结束作业吗?
编辑:假设这是针对具有数据流的Java Beam,回答不正确。
抱歉,我没想到这是给Python的。
根据此拉取请求中添加的留档,Python中的流媒体模式显式不支持Datastore。留档中存在不一致之处,它声称支持Pub/Sub的Python批处理模式,而链接代码说它仅在流媒体管道中受支持。我已经提交了一个Jira错误来尝试解决这个问题。
对于python流模式下的数据流,这似乎不是当前支持的用例。我建议您考虑改用Apache Beam的Java版本,它支持向数据存储中进行流式写入。
考虑以下设置: 发布/订阅 数据流:用于验证发布/订阅、解包和写入BigQuery的事件的流作业 BigQuery 我们在通过Datafow管道的有效事件上有计数器,并观察到计数器高于发布/订阅中可用的事件量。 注意:我们似乎在BigQuery中也看到了重复项,但我们仍在调查中。 在数据流日志中可以观察到以下错误: 请注意,数据流作业是在发布/订阅中已有数百万条消息等待时启动的。 问题: 这是否会
问题内容: 我目前正在从数据存储区中请求20个条目,使用游标将其返回给用户,以防用户要求更多条目,请将游标用作新起点,并询问下一个20个条目。 该代码看起来像 万一重要的是这里的完整代码:https : //github.com/koffeinsource/kaffeeshare/blob/master/data/appengine.go#L23 使用带有的循环看起来像是反模式,但是使用/ 时我看
我正在尝试找出是否有任何GCP数据流模板可用于使用“Pub/Sub to Cloud Spanner”进行数据摄取。我发现已经有一个默认的GCP数据流模板可用于示例-“Cloud Pub/Sub to BigQuery”。所以,我有兴趣看看我是否可以在流或批处理模式下对扳手进行数据摄取,以及行为会如何
我尝试使用简单的select查询读取数据,并使用resultset数据创建csv文件。 到目前为止,在application.properties文件中已经有了select查询,并且能够生成csv文件。 现在,我希望将查询移动到一个静态表中,并在批处理作业开始之前(类似于before作业)将其作为初始化步骤获取。 你能告诉我做这件事最好的策略是什么吗。在读取数据和创建CSV文件的实际批处理作业开始
H全部, 如果有人有任何经验的kafka-spark流对处理各种数据,请给我一个简短的细节,如果这是一个可行的解决方案,并比有两个不同的管道更好。 提前道谢!
我试图弄清楚GCP上是否有一项服务,允许使用发布/订阅的流,并将累积的数据转储/批处理到云存储中的文件中(例如,每X分钟一次)。我知道这可以通过Dataflow实现,但如果有现成的解决方案,我会寻找更多的解决方案。 例如,这是可以使用AWS Kinesis Firehose进行的操作—纯粹在配置级别—可以告诉AWS定期或在累积数据达到一定大小时将流中累积的任何内容转储到S3上的文件。 这样做的原因