当前位置: 首页 > 知识库问答 >
问题:

如何根据已处理元素的数量动态触发窗口?

濮阳霄
2023-03-14

我有一个Apache Beam管道,它运行在Google Cloud Dataflow上。这是一个流式管道,接收来自Google Cloud PubSub的输入消息,这些消息基本上是要处理的JSON元素数组。

大致来说,管道有这几个步骤:

  1. 将消息反序列化为pCollecttion >
  2. 将数组拆分(或分解)为pCollection
  3. 少量处理步骤:某些元素将在其他元素之前完成,而某些元素将被缓存,因此它们只需跳到最后,而不需要进行任何处理。
  4. 平整所有输出并应用GroupByKey(这是问题所在的步骤):它将pCollection转换回pCollection > 但不等待所有元素。
  5. 序列化以发布PubSub消息。

我无法获取最后的groupbykey来将接收到的所有元素分组在一起。发布的消息不包含必须处理的元素,并且所花费的时间比跳到最后的元素要长。

我认为如果我可以编写一个html" target="_blank">自定义的数据驱动触发器,这将是直接解决的。或者即使我可以从自定义的windowfn动态设置触发器afterpane.ElementCountatLest()

我似乎无法制作自定义触发器。但是有没有可能以某种方式动态设置每个窗口的触发器呢?

--

这里是我正在研究的管道的简化版本。

我已经将对象数组t的输入简化为一个简单的整数数组。我已经模拟了这些整数的键(或ID)。通常它们是物体的一部分。

我还将缓慢的处理步骤(实际上是几个步骤)简化为一个带有人为延迟的单一步骤。

(完整示例gist https://gist.github.com/naringas/bfc25bcf8e7aca69f74de719d75525f2)

PCollection<String> queue = pipeline
    .apply("ReadQueue", PubsubIO.readStrings().fromTopic(topic))
    .apply(Window
        .<String>into(FixedWindows.of(Duration.standardSeconds(1)))
        .withAllowedLateness(Duration.standardSeconds(3))
        .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
        .discardingFiredPanes());

TupleTag<List<KV<Integer, Integer>>> tagDeserialized = new TupleTag<List<KV<Integer, Integer>>>() {};
TupleTag<Integer> tagDeserializeError = new TupleTag<Integer>() {};
PCollectionTuple imagesInputTuple = queue
    .apply("DeserializeJSON", ParDo.of(new DeserializingFn()).withOutputTags(tagDeserialized, TupleTagList.of(tagDeserializeError)));

/*  
This is where I think that I must adjust the custom window strategy, set the customized dynamic-trigger
*/
PCollection<KV<Integer, Integer>> images = imagesInputTuple.get(tagDeserialized)
    /* I have tried many things
    .apply(Window.<List<KV<Integer, Integer>>>into(new GlobalWindows()))
    */
    .apply("Flatten into timestamp", ParDo.of(new DoFn<List<KV<Integer, Integer>>, KV<Integer, Integer>>() {
        // Flatten and output into same ts
        // like Flatten.Iterables() but I set the output window
        @ProcessElement
        public void processElement(@Element List<KV<Integer, Integer>> input, OutputReceiver<KV<Integer, Integer>> out, @Timestamp Instant ts, BoundedWindow w, PaneInfo p) {
            Instant timestamp = w.maxTimestamp();
            for (KV<Integer, Integer> el : input) {
                out.outputWithTimestamp(el, timestamp);
            }
        }
    }))
    .apply(Window.<KV<Integer, Integer>>into(new GlobalWindows()));

TupleTag<KV<Integer, Integer>> tagProcess = new TupleTag<KV<Integer, Integer>>() {};
TupleTag<KV<Integer, Integer>> tagSkip = new TupleTag<KV<Integer, Integer>>() {};
PCollectionTuple preproc = images
    .apply("PreProcessingStep", ParDo.of(new SkipOrNotDoFn()).withOutputTags(tagProcess, TupleTagList.of(tagSkip)));

TupleTag<KV<Integer, Integer>> tagProcessed = new TupleTag<KV<Integer, Integer>>() {};
TupleTag<KV<Integer, Integer>> tagError = new TupleTag<KV<Integer, Integer>>() {};
PCollectionTuple processed = preproc.get(tagProcess)
    .apply("ProcessingStep", ParDo.of(new DummyDelasyDoFn).withOutputTags(tagProcessed, TupleTagList.of(tagError)));

/* Here, at the "end"
the elements get grouped back
first: join into a PcollectionList and flatten it
second: GroupByKey which should but doesn't way for all elements
lastly: serilize and publish (in this case just print out)
*/
PCollection end = PCollectionList.of(preproc.get(tagSkip)).and(processed.get(tagProcessed))
    .apply("FlattenUpsert", Flatten.pCollections())
    //
    .apply("GroupByParentId", GroupByKey.create())
    .apply("GroupedValues", Values.create())
    .apply("PublishSerialize", ParDo.of(
        new DoFn<Object, String>() {
            @ProcessElement
            public void processElement(ProcessContext pc) {
                String output = GSON.toJson(pc.element());
                LOG.info("DONE: {}", output);
                pc.output(output);
            }
        }));
// "send the string to pubsub" goes here

共有1个答案

笪烨
2023-03-14

我玩了一点有状态的管道。由于您希望使用数据驱动触发器或afterpane.ElementCountatLest(),我假设您知道符合消息的元素的数量(或者,至少它不因键而改变),因此我在本例中定义了num_elements=10

我的方法的主要思想是跟踪到目前为止我看到的特定键的元素的数量。请注意,我必须将preprocessingstepprocessingstep合并为一个单独的代码,以获得准确的计数。我知道这只是一个简化的例子,所以我不知道这将如何转化为真实的场景。

在有状态的ParDo中,我定义了两个状态变量,一个bagstate,其中包含所有的整数,另一个valueState用于计算错误数:

// A state bag holding all elements seen for that key
@StateId("elements_seen")
private final StateSpec<BagState<Integer>> elementSpec =
      StateSpecs.bag();

// A state cell holding error count
@StateId("errors")
private final StateSpec<ValueState<Integer>> errorSpec =
      StateSpecs.value(VarIntCoder.of());

然后,我们像往常一样处理每个元素,但我们不会输出任何东西,除非它是一个错误。在这种情况下,我们在向tagerror端输出发出元素之前更新错误计数器:

errors.write(firstNonNull(errors.read(), 0) + 1);
is_error = true;
output.get(tagError).output(input);

我们更新计数,对于成功处理或跳过的元素(即!is_error),将新的观察到的元素写入bagstate:

int count = firstNonNull(Iterables.size(state.read()), 0) + firstNonNull(errors.read(), 0);

if (!is_error) {
   state.add(input.getValue());
   count += 1;
}

然后,如果成功处理的元素和错误之和等于num_elements(我们在这里模拟的是数据驱动触发器),则从bagstate中刷新所有项:

if (count >= NUM_ELEMENTS) {
   Iterable<Integer> all_elements = state.read();
   Integer key = input.getKey();

   for (Integer value : all_elements) {
      output.get(tagProcessed).output(KV.of(key, value));
   }
}

注意,这里我们已经可以对值进行分组,并且只发出一个kv > 。我只是做了一个for循环,以避免更改下游的其他步骤。

有了这个,我发布了这样一条消息:

gcloud pubsub topics publish streamdemo --message "[1,2,3,4,5,6,7,8,9,10]"

在我得到之前:

INFO: DONE: [4,8]

现在我得到:

INFO: DONE: [1,2,3,4,5,6,8,9,10]

元素7不像模拟错误的元素那样存在。

使用directrunner和2.16.0 SDK进行测试。这里有完整的代码。

如果这适用于您的用例,请告诉我,请记住我只做了一些次要的测试

 类似资料:
  • 我不能做的是找到一种方法来改变Vbox的尺寸(绿色段),然后根据窗口的大小改变按钮(橙色段)。(当用户使用窗口大小时) 我更喜欢找到一种方法将参数设置到我的css文件中,或者作为最后的手段在我的FXML中。 .css文件:

  • 我有两个(最终更多)字符向量,它们由几个(有序的)名称组成。这类向量的两个例子是: 和 (可通过以下方式获得): 和 现在我想根据元素的位置计算这两个字符向量之间的相关性。例如,在第一个向量中,元素“原始”的位置为1,但在第二个向量中,它的位置为14。 我该怎么办? 提前谢谢!

  • 全部的 我是骆驼的开胃菜。我有一些问题。这里有一个场景。 文件正文中有2个字段。(seq, date, OR no.)所以,我为这个数据格式生成了pojo类。 我申请了骆驼队。几天后,关于添加名称字段,dataformat发生了更改。 所以,我们需要修改pojo类并再次重建应用程序。但我们不想再次构建应用程序并为dataformat生成pojo类。 如果我们可以在运行时使用xsd动态生成Pojo类

  • 本文向大家介绍iframe里面的元素触发父窗口元素事件的jquery代码,包括了iframe里面的元素触发父窗口元素事件的jquery代码的使用技巧和注意事项,需要的朋友参考一下 例如父窗口定义了一个事件。 top: $(dom1).bind('topEvent', function(){}); 那么iframe里面的元素怎样触发父窗口dom1的事件呢?这样吗? $(dom1, parent.do

  • 问题内容: 总览 我具有以下HTML结构,并且将和事件附加到了元素上。 问题 当我将文件拖到时,事件将按预期触发。但是,当我将鼠标移到子元素(例如)上时,会为该元素触发该事件,然后为该元素触发该事件。 如果我再次将鼠标悬停在该元素上,则再次触发该事件,这很酷,但是随后为刚刚剩下的子元素触发了该事件,因此执行了该指令,这并不酷。 此行为有问题的原因有两个: 我只附加&,所以我不明白为什么子元素也要附

  • 问题内容: 我有一个用jQuery 方法动态生成的with : 包含一些输入元素,它们被加载到modal中。 使用jQuery的方法,我可以在事件触发后捕获输入值,但是当将元素动态添加到模式div时,当用户输入文本时事件不会触发。 哪种jQuery方法支持处理由动态创建的元素触发的事件? 用于创建新输入元素的代码为: 捕获用户值的代码是: 第二个代码块似乎适用于原始元素,但不会由新的动态生成的元素