当前位置: 首页 > 知识库问答 >
问题:

发布/订阅数据流BigQuery管道中存在重复项

黄昊英
2023-03-14

考虑以下设置:

  • 发布/订阅
  • 数据流:用于验证发布/订阅、解包和写入BigQuery的事件的流作业
  • BigQuery

我们在通过Datafow管道的有效事件上有计数器,并观察到计数器高于发布/订阅中可用的事件量。

注意:我们似乎在BigQuery中也看到了重复项,但我们仍在调查中。

在数据流日志中可以观察到以下错误:

Pipeline stage consuming pubsub took 1h35m7.83313078s and default ack deadline is 5m. 
Consider increasing ack deadline for subscription projects/<redacted>/subscriptions/<redacted >

请注意,数据流作业是在发布/订阅中已有数百万条消息等待时启动的。

问题:

  • 这是否会导致管道拾取重复事件
  • 我们能做些什么来缓解这个问题吗

共有1个答案

司徒炎彬
2023-03-14

我的建议是通过以批处理模式启动数据流作业来清除PubSub订阅消息队列。然后以流模式运行它,以进行常规操作。这样,您就可以从一个干净的基础上开始您的流式处理作业,而不会有一长串排队的消息。

此外,数据流(和beam)的强大功能使其能够在流式处理和批处理相同的管道中运行。

 类似资料:
  • 我们有一个托管在Google Kubernetes引擎上的NodeJS API,我们想开始将事件记录到BigQuery中。 我可以看到三种不同的方法: 使用API中的节点BigQuery SDK将每个事件直接插入BigQuery(如此处“流式插入示例”下所述):https://cloud.google.com/bigquery/streaming-data-into-bigquery或此处:htt

  • 问题内容: 跟随Redis Pub / Sub 这工作正常,我可以使用以下任何语言发布消息 使用,我可以验证此请求是否已正确发布 当我将订阅者 块 添加到 其他类(侦听器类)中的 该频道时,问题就开始了,如下所示 中的,还表明侦听器已正确订阅 问题是,当我将订户侦听器类添加到相同的Rails应用程序时…它停止工作,导致侦听Redis服务器并停止执行任何其他代码…它只是坐在那里侦听。 因此,有一种方

  • 简介 Redis 的列表类型键可以用来实现队列,并且支持阻塞式读取,所以 Redis 能够非常容易的实现一个高性能的优先队列。同时在更高层面上,Redis 还支持“发布/订阅”的消息模式,可以基于此构建一个聊天系统。 发布示例 发布(Publish)即将消息发布到频道中。示例代码: // 发送消息 Redis::publish('chan-1', 'Hello, World!'); // 发送消息

  • 我的管道是IoTCore->pub/sub->Dataflow->bigQuery。最初,我得到的数据是Json格式的,管道工作正常。现在我需要转向csv,问题是我使用的Google定义的数据流模板使用Json输入而不是csv。是否有一种简单的方法通过数据流将csv数据从pub/sub转移到bigquery。模板可能会改变,但它是用Java实现的,我从来没有用过,所以需要很长时间来实现。我还考虑过

  • 我已经使用Google云数据流SDK编写了一个流式管道,但我想在本地测试我的管道。我的管道从Google Pub/Sub获取输入数据。 是否可以使用DirectPipelineRunner(本地执行,而不是在Google云中)运行访问发布/订阅(pubsubIO)的作业? 我在以普通用户帐户登录时遇到权限问题。我是项目的所有者,我正在尝试访问发布/子主题。

  • 想象一个简单的Google数据流管道。在这个管道中,您使用apache beam函数从BQ读取数据,并根据返回的pcollection更新这些行 该管道的问题是,当您读取表(beam.map)时,将对返回的pcollection中的每个项执行UpdateBQ 可能的解决办法