我正在尝试使用Google Dataflow和Apache Beam SDK 2.6.0的PubSub实现一次性交付。
用例非常简单:
'Generator'数据流作业将1M消息发送到PubSub主题。
GenerateSequence
.from(0)
.to(1000000)
.withRate(100000, Duration.standardSeconds(1L));
“存档”数据流作业从PubSub订阅中读取消息,并保存到Google云存储中。
pipeline
.apply("Read events",
PubsubIO.readMessagesWithAttributes()
// this is to achieve exactly-once delivery
.withIdAttribute(ATTRIBUTE_ID)
.fromSubscription('subscription')
.withTimestampAttribute(TIMESTAMP_ATTRIBUTE))
.apply("Window events",
Window.<Dto>into(FixedWindows.of(Duration.millis(options.getWindowDuration())))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.standardMinutes(15))
.discardingFiredPanes())
.apply("Events count metric", ParDo.of(new CountMessagesMetric()))
.apply("Write files to archive",
FileIO.<String, Dto>writeDynamic()
.by(Dto::getDataSource).withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.of((msg, ctx) -> msg.getData(), Requirements.empty()), TextIO.sink())
.to(archiveDir)
.withTempDirectory(archiveDir)
.withNumShards(options.getNumShards())
.withNaming(dataSource ->
new SyslogWindowedDataSourceFilenaming(dataSource, archiveDir, filenamePrefix, filenameSuffix)
));
我在 Pubsub.IO.Write(“生成器”作业)和 PubsubIO.Read(“存档”作业)中都添加了“带 Id 属性”,并期望它能保证精确一次的语义。
我想测试“负面”情景:
事实上,我得到的是——所有消息都被传递了(至少实现了一次),但除此之外,还有很多重复的消息——大约每1M3050K。
是否有任何解决方案可以实现一次交付?
同时,Pub/Sub支持“恰好一次交付”。
它目前处于正式发布前的状态,因此遗憾的是尚未准备好投入生产使用。
所以,我自己从来没有做过,但是关于你的问题的推理是我处理它的方式…
我的解决方案有点复杂,但我无法在不涉及其他外部服务的情况下确定其他方法来实现这一目标。所以,这里什么都没有。
您可以让您的管道同时从pubsub和GCS读取数据,然后将它们组合起来对数据进行重复删除。这里棘手的部分是一个是有界pCollection(GCS),另一个是无界pCollection(pubsub)。您可以将时间戳添加到有界集合中,然后对数据进行窗口处理。在此阶段,您可能会删除大于约15分钟(您的先例实现中窗口的持续时间)的GCS数据。这两个步骤(即正确添加时间戳和删除可能旧到无法创建重复的数据)是迄今为止最棘手的部分。
解决此问题后,请附加两个 p 集合,然后在两组数据通用的 Id 上使用 GroupByKey。这将产生一个P收集
最后,只有在重新启动管道时,第一个pubsub窗口才需要这项额外的工作。之后,您应该将GCS pCollection重新分配给一个空pCollection,这样按键分组就不会做太多额外的工作。
让我知道你的想法,如果这可行。此外,如果你决定采用这个策略,请公布你的里程数:)。
数据流不能让你在运行中保持状态。如果您使用Java,您可以以不导致其丢失现有状态的方式更新正在运行的管道,从而允许您跨管道版本进行重复数据删除。
如果这对您不起作用,您可能希望以ATTRIBUTE_ID键控的方式存档邮件,例如。使用扳手或GCS作为文件名。
提供的检索/更新API是否有任何限制? 我找到了这些文件: https://cloud.google.com/pubsub/quotas:包含有关吞吐量的信息以及主题、订阅和消息创建数量的限制 检索/更新API的限制呢? 例如:检索具有名称的主题,更新订阅。
我已经使用Google云数据流SDK编写了一个流式管道,但我想在本地测试我的管道。我的管道从Google Pub/Sub获取输入数据。 是否可以使用DirectPipelineRunner(本地执行,而不是在Google云中)运行访问发布/订阅(pubsubIO)的作业? 我在以普通用户帐户登录时遇到权限问题。我是项目的所有者,我正在尝试访问发布/子主题。
本文向大家介绍Redis 订阅发布_Jedis实现方法,包括了Redis 订阅发布_Jedis实现方法的使用技巧和注意事项,需要的朋友参考一下 我想到使用Redis的订阅发布模式是用来解决推送问题的~。 对于概念性的叙述,多多少少还是要提一下的: 什么是Redis发布订阅?Redis发布订阅是一种消息通信模式,发送者通过通道A发送消息message,订阅过通道A的客户端就可以接收到消息messag
发布和订阅 Meteor 服务端可以通过Meteor.publish发布文档集,同时客户端可以通过Meteor.subscribe订阅这些发布。 任何客户端订阅的文档都可以通过find方法进行查询使用。 默认情况下,每个新创建的 Meteor 应用包含有 autopublish 包,它会自动为每个客户端发布所有可用的文档。 为了可以更细化的控制不同客户端所接收的数据文档,首先应该在终端移除 aut
我是GCP的新手,目前有一个使用GKE和gRPC的微服务架构。微服务正在向Google Cloud Pub/Sub发布事件。我的Web-UI正在使用Google Cloud Endpoint向微服务发送请求。我想在网站上有很多实时/推送更新(例如实时更新用户统计数据等),现在想知道如何最好地做到这一点。让Web-UI订阅Google Cloud Pub/Sub中的主题是不是一种不好的做法?GCP中
问题内容: 我一直在研究不同的nodeJS发布/订阅实现,并想知道哪种方法最适合特定的应用程序。该应用程序的要求涉及多通道,多用户3D环境中对象的实时同步。 我从使用socket.io开始,创建了一个基本的通道数组,当用户发送消息时,它遍历该通道中的用户并将消息发送到用户的客户端。这很好用,我对此没有任何问题。 为了保持对象的持久性,我使用node_redis添加了Redis支持。然后,我将通道数