当前位置: 首页 > 知识库问答 >
问题:

DirectRunner不像我在Beam Java SDK中用FixedWindows指定的那样从pub/sub读取

丁文轩
2023-03-14

我目前正在使用Apache Beam Java SDK 2.8.0开发一个数据流管道,该管道从pub/sub读取流数据。管道只是来自Google的PubSubtotext.java模板。

https://github.com/googlecloudplatform/dataflowtemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/pubsubtotext.java

虽然使用DataflowRunner部署到云中的工作符合预期,但使用DirectRunner并不能正确运行,也就是说,当我在本地环境中工作时,这使得开发管道变得非常困难。

例如,当我将FixedWindows速率设置为30s时,云上的Dataflow Runner每30秒生成一次文件,这是预期的。

然而,当我在本地环境中为DirectRunner设置相同的速率时,它不会每30秒发出一次文件。相反,它以不稳定的方式生成文件。

例如,它在4分钟后发出第一个数据,并创建8个文件,实际上是一次生成的,然后在5分钟后生成,然后在3分钟后生成,等等,这使得本地开发过程极其耗时和令人沮丧。

我为什么要观察这个?

以下是再现问题的全部内容:

  1. Git克隆https://github.com/googleCloudplatform/dataflowtemplates
  2. 从pom.xml中删除beam-runners-direct-java的 test 行,使其在运行时支持DirectRunner。
  3. 按照https://github.com/googlecloudplatform/dataflowtemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/pubsubtotext.java上的建议编译和运行程序,但将runner更改为DirectRunner,并添加--outputshardtemplate=w-p-ss-of-nn,这是省略的选项,在本地运行时是必需的。
  4. 同时删除--project--staginglocationtemplocation行,因为它不会部署到云中。
  5. 发出文件需要非常长的时间,尽管我设置了Windowduration=30s

我怀疑这是一个与pub/sub相关的问题,但当我运行tcpdump时,它开始连接到pub/sub并立即提取数据。这可能是一个DirectRunner特定的问题。

共有1个答案

梁昊天
2023-03-14

虽然我不知道为什么会这样,但我找到了解决这个问题的方法。虽然dataflowrunner不需要设置触发器以使其正常工作,但必须为directrunner指定显式触发器。

.trrigering追加到window.into中,问题就消失了。

 类似资料:
  • 发布订阅模式 核心点 Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作主题(topic)。 主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscriber) 从主题订阅消息。 主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。 pub/sub解决了什么样的问题? 耗时的问题,比如上传,格式转换、计算等其他耗

  • 我正在运行一个内存密集型应用程序。一些关于环境的信息: 64位debian 13 GB RAM 64位JVM(我的程序运行时输出System.getProperty("sun.arch.data.model"),它说"64") 下面是我发出的确切命令: Java-xmx 9000m-jar " ale . jar " test config 我已经用同样精确的数据、配置等运行了程序。在其他几个系统

  • 如果我想发送消息到谷歌PubSub并使用它的消息。您建议使用Spring cloud GCP库还是只使用Google cloud Java API。 有人能区分这两者吗?或者与谷歌云pubsub库相比,Spring Cloud gcp提供了哪些功能。

  • 我想设置订阅请求的读取超时。现在唯一的选择是设置,或者只是等到pubsub返回,如果没有发布消息,这似乎是90秒。 我正在使用gcloud-node模块调用PubSub。它使用底层的请求模块进行gcloud api调用。我已经更新了gcloud-node/lib/pubsub/subscription.js的本地副本,将请求超时设置为30秒 更新:我可能需要澄清一下我的例子。我有两个客户端连接在同

  • 问题内容: 我有以下状态: 然后我更新状态: 由于setState是假设要合并的,所以我希望它是: 但是它吃掉了id,状态为: 这是预期的行为吗?仅更新嵌套状态对象的一个​​属性的解决方案是什么? 问题答案: 我认为不做递归合并。 您可以使用当前状态的值构造一个新状态,然后调用该状态: 我在这里使用过函数function(来自underscore.js库),通过创建状态的浅表副本来防止对该状态的现

  • 我想使用查找从一个集合中获取一些数据并将其放入另一个集合中。 在localfield或foreignfield中写什么都不重要,因为它从player_game_stats中获取所有数据并将其插入player集合中的每个文档中。我想检查localfield和foreignField是否相等,但lookup不检查这一点。我对mongodb使用NoSqlBooster