当前位置: 首页 > 知识库问答 >
问题:

如何通过使用Apache Beam IO流式传输来避免BigQuery中的重复?

郭星文
2023-03-14

我们使用了一个非常简单的流程,从PubSub检索消息,将其JSON内容扁平化为两种类型(对于BigQuery和Postgres),然后插入到两个接收器中。但是,我们在两个接收器中都看到了重复的情况(Postgres有点固定了,有一个唯一的约束和一个“关于冲突……什么都不做”)。

起初,我们信任Apache Beam/BigQuery创建的所谓“插入式”UUId。然后,在将消息排入PubSub之前,使用JSON本身的数据为每条消息添加一个“unique\u label”属性,这赋予了它们唯一性(设备id和读取的时间戳)。并使用该属性和“withIdAttribute”方法订阅主题。最后,我们支付了GCP支持费用,但他们的“解决方案”不起作用。他们告诉我们甚至要使用改组转换(顺便提一下,这一点已被弃用)和一些窗口化(我们不会这样做,因为我们需要接近实时的数据)。

这是主流程,非常基本:[使用最新代码更新]管道

        val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(OptionArgs::class.java)
        val pipeline = Pipeline.create(options)
        var mappings = ""

        // Value only available at runtime
        if (options.schemaFile.isAccessible){
            mappings = readCloudFile(options.schemaFile.get())
        }

        val tableRowMapper = ReadingToTableRowMapper(mappings)
        val postgresMapper = ReadingToPostgresMapper(mappings)

        val pubsubMessages =
            pipeline
            .apply("ReadPubSubMessages",
                PubsubIO
                    .readMessagesWithAttributes()
                    .withIdAttribute("id_label")
                    .fromTopic(options.pubSubInput))

        pubsubMessages
            .apply("AckPubSubMessages", ParDo.of(object: DoFn<PubsubMessage, String>() {
                @ProcessElement
                fun processElement(context: ProcessContext) {
                    LOG.info("Processing readings: " + context.element().attributeMap["id_label"])
                    context.output("")
                }
            }))

        val disarmedMessages =
            pubsubMessages
                .apply("DisarmedPubSubMessages",
                    DisarmPubsubMessage(tableRowMapper, postgresMapper)
                )

        disarmedMessages
            .get(TupleTags.readingErrorTag)
            .apply("LogDisarmedErrors", ParDo.of(object: DoFn<String, String>() {
                @ProcessElement
                fun processElement(context: ProcessContext) {
                    LOG.info(context.element())
                    context.output("")
                }
            }))

        disarmedMessages
            .get(TupleTags.tableRowTag)
            .apply("WriteToBigQuery",
                BigQueryIO
                    .writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
                    .to(options.bigQueryOutput)
            )

        pipeline.run()

DissarmPubsubMessage是一种PTransforms,它使用FlatMapElements转换来获取TableRow和ReadingInputFlatten(Postgres自己的类)

我们期望零重复或“尽最大努力”(并且我们附加了一些清理cron的工作),我们为这些产品支付了运行统计和bigdata分析的费用。。。

[UPDATE 1]我甚至附加了一个新的简单转换,该转换通过ParDo记录我们的唯一属性,据说应该响应PubsubMessage,但事实并非如此:

带有AckPubSubMessages步骤的新流程

谢谢!!

共有2个答案

越朗
2023-03-14

支持的编程语言是Python和Java,您的代码似乎是Scala,据我所知不支持。我强烈建议使用Java,以避免您使用的编程语言有任何不受支持的功能。

此外,我建议使用以下方法来处理副本,选项2可以满足您的近实时需求:

  1. message\u id。您可能已经阅读了指向已弃用文档的常见问题解答-副本。但是,如果检查PubsubMessage对象,您会注意到messageId仍然可用,如果发布者未设置,则会填充它:

"此消息的ID,由服务器在消息发布时分配...发布者不能在topics.publish调用中填充它"

BigQuery流媒体。要在加载数据期间验证复制,请在插入BQ之前创建UUID。请参阅示例接收器一节:Google BigQuery。

尝试数据流模板PubSubToBigQuery并验证BQ中没有重复项。

濮金鑫
2023-03-14

看起来您正在使用全局窗口。一种技术是将此窗口设置为N分钟窗口。然后处理窗口中的键,并放置带有dup键的项目。

 类似资料:
  • 我不断遇到需要通过映射或集合保存状态的解决方案。e、 g.创建一个返回在输入中找到的重复项的方法 我的Java8流解决方案,不幸的是,我正在使用哈希集进行过滤。我理解这并不“恰当”,因为这取决于州。没有州是建议还是硬性规定?这只是运行并行流时的问题吗?有人能推荐一种不使用哈希集的方法吗?

  • 钱箱类: 商户类: 输入数据: 我的任务 计算每个商家的总金额并返回商家列表 我正在尝试使用Stream API解决这个任务。并编写了以下代码: 结果 但显然,流返回四个对象,而不是所需的两个对象。我意识到,地图(第二行)为每个cashBoxId创建了四个对象。而且我不知道如何通过进行过滤,也不知道如何获得没有重复的结果。

  • 问题内容: 我正在构建一个Web应用程序,该服务器应从服务器 http://lscube.org/projects/feng 播放RTSP / RTP流。 HTML5视频/音频标签是否支持rtsp或rtp?如果没有,最简单的解决方案是什么?也许是VLC插件之类的东西。 问题答案: 从技术上讲“是” (但不是真的) HTML 5的标签与协议无关-不在乎。您将协议作为URL的一部分放在属性中。例如:

  • Spring webflux Json流不适用于转换为Mono的Flux对象

  • 我想编写一个存储过程来将数据插入到表中,并检查是否已经存在相同的数据? 如果是,则异常抛出为已经存在。但我不知道应该在哪里添加异常。请帮忙。

  • 我使用for循环将学生详细信息添加到ArrayList。当我给第二个学生提供详细信息时,它会覆盖第一个数据。螺柱类 将数据添加到ArrayList的主类。 输出:第二个数据[2,2]后的实际输出辊数[1]。预期输出应为卷号: A1姓名: F1 L1性别:男性年龄: 11体育赛事:标枪 报名号:A2姓名:F2 L2性别:女年龄:14体育项目:100米跑 报名号:A3姓名:F3 L3性别:男性年龄:1