当前位置: 首页 > 知识库问答 >
问题:

聚集步骤在本地发出结果,但在GCP数据流中部署时不发出结果

呼延辰龙
2023-03-14
(
    p
    | 'read_telemetry_from_pubsub' >> ReadFromPubSub(topic=PUBSUB_TOPIC)
    | 'window_telemetry' >> beam.WindowInto(beam.window.FixedWindows(WINDOW_SIZE))
    | 'format_telemetry_for_influx' >> beam.ParDo(FormatInfluxDB())
    | "add_dummy_key" >> beam.Map(lambda elem: (None, elem))
    | "groupby_dummy_key" >> beam.GroupByKey()
    | "delete_dummy_key" >> beam.MapTuple(lambda _, val: val)
    | "aggregate" >> beam.ParDo(Aggregator())
    | "write_processed_messages_to_influx" >> beam.ParDo(WriteToInfluxDB())
)

这是一个在15分钟的窗口中聚合包含所有消息的列表的类:

class Aggregator(beam.DoFn):
    def process(self, elements):
        # parsing the message list into a pandas DataFrame
        # some preprocessing and agregation steps
        # returns a list with json messages
        return [aggregated_values]

我们使用GCP pub/sub模拟器在本地测试这段代码,它工作得很好。然而,当我们部署到GCP Dataflow时,它不会发出任何结果,也不会在日志中发现错误。此外,我们看到数据新鲜度无限增长。

我们认为我们缺少一些触发函数,但我们不确定这是否是进行此类聚合的正确方法,因为它在本地发出结果,但在部署时不发出结果。当我们使用与默认触发器不同的触发器时,本地我们没有发射。

共有1个答案

邬友樵
2023-03-14

对于所需的包,使用setup.py文件而不是requirements.txt文件解决了这个问题。此外,我们将python Apache Beam SDK版本从2.29.0更新到2.32.0,现在可以正确地聚合和发出结果。

我们在日志GCP模块中发现了这个错误:

同步pod...(“pipeline_name”)时出错,跳过:[用crashloopbackoff为“sdk0”“startcontainer”失败:“back-off 5m0s restarting failed container=sdk0 pod=pipeline_name)”

 类似资料:
  • 比对数据后,窗口显示将在目标中插入、更新或删除的记录数。取消勾选“显示相同的表和其他”或“显示相同的集合和其他”选项来隐藏具有相同数据的表或集合和具有不同结构的表或集合。在默认情况下,所有具有不同数据的表或集合和所有动作均已勾选。你可以取消勾选你不想应用的复选框。 当你在列表中选择一个表或集合时,底部窗格将显示源和目标中的数据。源和目标之间不同的值会高亮显示。若要查看多行的数据,请右击网格并选择“

  • 比对数据后,窗口显示将在目标中插入、更新或删除的记录数。取消勾选“显示相同的表和其他”或“显示相同的集合和其他”选项来隐藏具有相同数据的表或集合和具有不同结构的表或集合。在默认情况下,所有具有不同数据的表或集合和所有动作均已勾选。你可以取消勾选你不想应用的复选框。 当你在列表中选择一个表或集合时,底部窗格将显示源和目标中的数据。源和目标之间不同的值会突出显示。若要查看多行的数据,请按住 Contr

  • 比对数据后,窗口显示将在目标中插入、更新或删除的记录数。取消勾选“显示相同的表和其他”或“显示相同的集合和其他”选项来隐藏具有相同数据的表或集合和具有不同结构的表或集合。在默认情况下,所有具有不同数据的表或集合和所有动作均已勾选。你可以取消勾选你不想应用的复选框。 当你在列表中选择一个表或集合时,底部窗格将显示源和目标中的数据。源和目标之间不同的值会高亮显示。若要查看多行的数据,请右击网格并选择“

  • 我每天都在python dataflow工作中得到这个错误。 我使用的是Apache Beam2.15(与2.17一样)Python 3.7。 2020-01-28 17:08:53.801来自工作人员的GM恐怖消息:处理在步骤s03中停留了至少10m00s,没有在sun.misc.unsafe.park(本机方法)在java.util.concurrent.locks.locksupport.p

  • 我正在尝试部署一个现有的。Net核心应用程序,通过创建构建和发布管道来使用Azure Devops。构建管道工作正常,但我在运行发布管道(在部署Azure App Service下)时收到以下错误。 错误:未找到具有指定模式的包:D:\a\r1\a***. zip 检查任务中提到的包是作为构建中的工件发布还是作为前一阶段发布并在当前作业中下载。 应该做些什么来解决这个问题?

  • 问题内容: 我想将结果集转换为JSP页面中的列表。并希望显示所有值。这是我的查询: 我已经执行,使用 PreparedStatement的 ,并得到了 结果集 。但是如何将其转换为 列表 并希望显示如下结果: 问题答案: 您需要在循环中逐行遍历 ResultSet 对象,以提取每个列值: