当前位置: 首页 > 知识库问答 >
问题:

将Pubsub的每X条消息写入云存储

毛正浩
2023-03-14

我对云数据流/Apache Beam是个新手,所以概念/编程对我来说还是模糊的。

我想做的是让Dataflow监听Pubsub并以JSON的形式获取这种格式的消息:

{
  "productId": "...",
  "productName": "..."
}

并将其转换为:

{
  "productId": "...",
  "productName": "...",
  "sku": "...",
  "inventory": {
    "revenue": <some Double>,
    "stocks":  <some Integer>
  }
}

因此所需的步骤是:

>

  • (IngestFromPubsub)通过收听主题从Pubsub获取记录(1条Pubsub消息=1条记录)

    (EnrichDataFromAPI)

    a.将有效负载的JSON字符串反序列化为Java对象

    b.通过调用外部API,使用SKU,我可以通过添加inventory属性来丰富每个记录的数据。

    c.再次序列化记录。

    (WriteToGCS)那么每个x号(可以参数化)记录,我需要在云存储中写入这些。还请考虑x=1这样一个简单的情况。(x=1,好主意吗?恐怕云存储写得太多了)

    尽管我是一个Python的人,但我已经很难用Python实现这一点了,所以我需要用Java写。读Beam在Java的例子让我头疼,它太冗长了,很难遵循。我所理解的是,每个步骤都是一个。将应用到pCollection。

    到目前为止,这是我微不足道的努力的结果:

    public static void main(String[] args) {
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        options.setStreaming(true);
    
        Pipeline pipeline = Pipeline.create(options);
        pipeline
            .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
            // I don't really understand the next part, I just copied from official documentation and filled in some values
            .apply(Window.<String>into(FixedWindows.of(Duration.millis(5000)))
                .withAllowedLateness(Duration.millis(5000))
                .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
                .discardingFiredPanes()
            )
            .apply("EnrichDataFromAPI", ParDo.of(
                new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        c.element();
                        // help on this part, I heard I need to use Jackson but I don't know, for API HttpClient is sufficient
                        // ... deserialize, call API, serialize again ...
                        c.output(enrichedJSONString);
                    }
                }
            ))
            .apply("WriteToGCS", 
                TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
        ;
    
    
        PipelineResult result = pipeline.run();
    }
    

    请填上缺少的部分,并给我一个关于窗口的提示(例如,什么是适当的配置等),以及我应该在哪些步骤中插入/应用它。

  • 共有1个答案

    养焱
    2023-03-14

    >

  • 我认为您不需要在IngestFromPubSubEnrich DataFromapi中进行任何窗口化。窗口化的目的是将在时间上附近的记录分组到窗口中,这样您就可以对它们进行聚合计算。但由于您不是在进行任何聚合计算,而是对独立处理每个记录感兴趣,因此不需要Windows。

    由于您总是将一个输入记录转换为一个输出记录,因此您的EnrichDataFromapi应该是MapElements。这应该会使代码更容易。

    在Apache Bean Java中有处理JSON的资源:Apache Beam stream processing of JSON data

    您不一定需要使用jackson将JSON映射到Java对象。您可能可以直接操作JSON。您可以使用Java的原生JSON API来解析/操纵/序列化。

  •  类似资料:
    • 保存/记录在AWS SNS主题上发布的每条消息的最简单方法是什么?我想可能有一个神奇的设置可以自动将它们推送到S3或数据库,或者可能是一个自动支持HTTP目标的数据库服务,但似乎并非如此。也许需要通过Lambda函数来完成? 目的只是为了在设置一些SNS发布时进行基本的诊断和调试。我并不真正关心大规模或快速查询,只想一次记录和执行几分钟对所有活动的基本查询。

    • 我正在使用一些Google云平台服务(数据流、云存储、PubSub),并有以下场景: 许多应用程序将GCS上CSV文件的路径发布到PubSub主题 在用Python编写的流式Beam管道中,我们使用Beam。io。从PubSub读取并在DoFn中将每个输入文件作为一个整体进行处理(每个调用都会获得一条PubSub消息) 这在很大程度上可以正常工作,但随着文件的增长,我们希望通过使用Datafram

    • 问题内容: 我的总体问题是: 使用Redis for PubSub,当发布者将消息推送到频道中的速度比订阅者能够阅读它们的速度快时,消息会如何处理? 例如,假设我有: 一个简单的发布者以2 msg / sec的速度发布消息。 一个简单的订户以1 msg / sec的速率读取消息。 我天真的假设是订户只会看到发布到Redis上的消息的50%。为了验证这一理论,我编写了两个脚本: pub.py 子py

    • 如何删除文本频道中的每条消息?这起作用了,但速度很慢。(删除所有内容可能需要15秒,具体取决于数量)我无法使用该选项,因为速度在我的实现中很重要。 我还尝试删除整个通道并创建一个具有相同属性的新通道,这也可以工作,但当机器人此后试图向通道发送消息时出现了一些问题。(未知通道错误) 进一步说明:这必须能够删除多达数千条消息。 如果您对我的问题有任何疑问,请询问,我会尝试相应地编辑它。

    • 我用的是Kafka0.8.2。正如文件所说: batch.num.messages指定: 使用异步模式时要在一批中发送的消息数。生产者将等待该数量的消息准备好发送或排队。缓冲器已达到最大毫秒。 和请求。必修的。acks控制代理对请求的确认。 我想知道Kafka经纪人如何发送这个确认,它是否发送批次确认字符,还是每个单独的消息?

    • FCM服务未向我的iOS应用程序发送消息。 > App CAN成功接收APNs令牌和实例ID令牌 App CAN使用推送通知实用程序利用. p8令牌在后台成功接收来自APN的推送 #2中使用的相同APNs密钥上传到Firebase控制台 应用程序无法接收Firebase控制台中Notification Composer发送的消息,也无法使用CURL请求接收消息。 应用程序在通过FCM发送时不显示任