我们使用了一个非常简单的流程,从PubSub检索消息,将其JSON内容扁平化为两种类型(对于BigQuery和Postgres),然后插入到两个接收器中。但是,我们在两个接收器中都看到了重复的情况(Postgres有点固定了,有一个唯一的约束和一个“关于冲突……什么都不做”)。
起初,我们信任Apache Beam/BigQuery创建的所谓“插入式”UUId。然后,在将消息排入PubSub之前,使用JSON本身的数据为每条消息添加一个“unique\u label”属性,这赋予了它们唯一性(设备id和读取的时间戳)。并使用该属性和“withIdAttribute”方法订阅主题。最后,我们支付了GCP支持费用,但他们的“解决方案”不起作用。他们告诉我们甚至要使用改组转换(顺便提一下,这一点已被弃用)和一些窗口化(我们不会这样做,因为我们需要接近实时的数据)。
这是主流程,非常基本:[使用最新代码更新]管道
val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(OptionArgs::class.java)
val pipeline = Pipeline.create(options)
var mappings = ""
// Value only available at runtime
if (options.schemaFile.isAccessible){
mappings = readCloudFile(options.schemaFile.get())
}
val tableRowMapper = ReadingToTableRowMapper(mappings)
val postgresMapper = ReadingToPostgresMapper(mappings)
val pubsubMessages =
pipeline
.apply("ReadPubSubMessages",
PubsubIO
.readMessagesWithAttributes()
.withIdAttribute("id_label")
.fromTopic(options.pubSubInput))
pubsubMessages
.apply("AckPubSubMessages", ParDo.of(object: DoFn<PubsubMessage, String>() {
@ProcessElement
fun processElement(context: ProcessContext) {
LOG.info("Processing readings: " + context.element().attributeMap["id_label"])
context.output("")
}
}))
val disarmedMessages =
pubsubMessages
.apply("DisarmedPubSubMessages",
DisarmPubsubMessage(tableRowMapper, postgresMapper)
)
disarmedMessages
.get(TupleTags.readingErrorTag)
.apply("LogDisarmedErrors", ParDo.of(object: DoFn<String, String>() {
@ProcessElement
fun processElement(context: ProcessContext) {
LOG.info(context.element())
context.output("")
}
}))
disarmedMessages
.get(TupleTags.tableRowTag)
.apply("WriteToBigQuery",
BigQueryIO
.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
.to(options.bigQueryOutput)
)
pipeline.run()
DissarmPubsubMessage是一种PTransforms,它使用FlatMapElements转换来获取TableRow和ReadingInputFlatten(Postgres自己的类)
我们期望零重复或“尽最大努力”(并且我们附加了一些清理cron的工作),我们为这些产品支付了运行统计和bigdata分析的费用。。。
[UPDATE 1]我甚至附加了一个新的简单转换,该转换通过ParDo记录我们的唯一属性,据说应该响应PubsubMessage,但事实并非如此:
带有AckPubSubMessages步骤的新流程
谢谢!!
支持的编程语言是Python和Java,您的代码似乎是Scala,据我所知不支持。我强烈建议使用Java,以避免您使用的编程语言有任何不受支持的功能。
此外,我建议使用以下方法来处理副本,选项2可以满足您的近实时需求:
"此消息的ID,由服务器在消息发布时分配...发布者不能在topics.publish调用中填充它"
BigQuery流媒体。要在加载数据期间验证复制,请在插入BQ之前创建UUID。请参阅示例接收器一节:Google BigQuery。
尝试数据流模板PubSubToBigQuery并验证BQ中没有重复项。
看起来您正在使用全局窗口。一种技术是将此窗口设置为N分钟窗口。然后处理窗口中的键,并放置带有dup键的项目。
我不断遇到需要通过映射或集合保存状态的解决方案。e、 g.创建一个返回在输入中找到的重复项的方法 我的Java8流解决方案,不幸的是,我正在使用哈希集进行过滤。我理解这并不“恰当”,因为这取决于州。没有州是建议还是硬性规定?这只是运行并行流时的问题吗?有人能推荐一种不使用哈希集的方法吗?
钱箱类: 商户类: 输入数据: 我的任务 计算每个商家的总金额并返回商家列表 我正在尝试使用Stream API解决这个任务。并编写了以下代码: 结果 但显然,流返回四个对象,而不是所需的两个对象。我意识到,地图(第二行)为每个cashBoxId创建了四个对象。而且我不知道如何通过进行过滤,也不知道如何获得没有重复的结果。
问题内容: 我正在构建一个Web应用程序,该服务器应从服务器 http://lscube.org/projects/feng 播放RTSP / RTP流。 HTML5视频/音频标签是否支持rtsp或rtp?如果没有,最简单的解决方案是什么?也许是VLC插件之类的东西。 问题答案: 从技术上讲“是” (但不是真的) HTML 5的标签与协议无关-不在乎。您将协议作为URL的一部分放在属性中。例如:
Spring webflux Json流不适用于转换为Mono的Flux对象
我想编写一个存储过程来将数据插入到表中,并检查是否已经存在相同的数据? 如果是,则异常抛出为已经存在。但我不知道应该在哪里添加异常。请帮忙。
我使用for循环将学生详细信息添加到ArrayList。当我给第二个学生提供详细信息时,它会覆盖第一个数据。螺柱类 将数据添加到ArrayList的主类。 输出:第二个数据[2,2]后的实际输出辊数[1]。预期输出应为卷号: A1姓名: F1 L1性别:男性年龄: 11体育赛事:标枪 报名号:A2姓名:F2 L2性别:女年龄:14体育项目:100米跑 报名号:A3姓名:F3 L3性别:男性年龄:1