当前位置: 首页 > 知识库问答 >
问题:

Apache beam-fixed窗口,具有产生早期结果的默认触发器

缪坚诚
2023-03-14
    null
Counts, Unix_Timestamp
1, 1553578200
2, 1553578201
3, 1553578202
4, 1553578203
...
def encode_byte_string(element):
  #element = ', '.join(element)
  #count = str(count)
  element = str(element)
  print element
  return element.encode('utf-8')

def custom_timestamp(message):
  data, time_stamp = message.split(',')
  # assuming that message is already parsed JSON (dict)
  return beam.window.TimestampedValue(data, int(time_stamp))

class BuildRecordFn(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
    window_start = window.start
    window_end = window.end
    return [element + (window_start,) + (window_end,)]


pubsub_data = (
                p 
                | 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription= input_subscription)
                | 'Remove extra chars' >> beam.Map(lambda data: (data.rstrip().lstrip()))
                | 'CustomTimestamp' >> beam.Map(custom_timestamp)
                | 'Window' >> beam.WindowInto(window.FixedWindows(100))
                | 'Form Key Value pair' >> beam.Map(lambda x: (1, int(x)))
                | 'Sum values' >> beam.GroupByKey()
                | 'AddWindowEndTimestamp' >> beam.ParDo(BuildRecordFn())
                | 'Encode to byte string' >> beam.Map(encode_byte_string)
                | 'Write to pub sub' >> beam.io.WriteToPubSub(output_topic)
              )


result = p.run()
result.wait_until_finish()
(1, [1,2,3,4, ..., 99], Timestamp(1553578200), Timestamp(1553578300))
(1, [99,100,101,102, ..., 199], Timestamp(1553578300), Timestamp(1553578400))

然而,当我运行这个管道时,我甚至在通过窗口结束之前就得到了早期结果

(1, [11], Timestamp(1553578200), Timestamp(1553578300))
(1, [12, 16, 15], Timestamp(1553578200), Timestamp(1553578300))
(1, [19, 18, 8, 10, 23, 21, 1, 7, 9, 13], Timestamp(1553578200), Timestamp(1553578300))
(1, [5, 6, 14, 17, 20, 22], Timestamp(1553578200), Timestamp(1553578300))
(1, [33], Timestamp(1553578200), Timestamp(1553578300))
(1, [3], Timestamp(1553578200), Timestamp(1553578300))
(1, [24, 28, 29, 37, 39, 43], Timestamp(1553578200), Timestamp(1553578300))
(1, [2, 4], Timestamp(1553578200), Timestamp(1553578300))
(1, [48], Timestamp(1553578200), Timestamp(1553578300))
(1, [25, 31, 34, 36, 38, 40, 46, 49, 51], Timestamp(1553578200), Timestamp(1553578300))
(1, [26, 27, 30, 32, 41, 42, 45, 47], Timestamp(1553578200), Timestamp(1553578300))
(1, [44, 52], Timestamp(1553578200), Timestamp(1553578300))
(1, [35, 50], Timestamp(1553578200), Timestamp(1553578300))

这可能是什么原因?

共有1个答案

方季同
2023-03-14

似乎是这里描述的晚期数据的情况。https://beam.apache.org/documentation/programming-guide/#水印和后期数据

缓解此类问题的一种方法是使用WithAllowedlateness添加某种“延迟”。但是,这个函数现在似乎只在Java SDK中可用。

另一种方法可能是使用afterwatermark触发器。但是,我还没试过。所以,带着点盐去吧。

 类似资料:
  • 我正在尝试测量具有窗口操作的 Flink 应用程序的延迟,如下所示: 我正在考虑事件时间并提取时间戳,我使用这个水印策略: 聚合函数将特定对象保存为累加器,其中还包含提取的时间戳;这些时间戳写在 kafka 主题中。问题是返回的时间戳如下: 返回的时间戳并不像我预期的那样间隔相等,第四个和第五个是相等的,但它们的返回间隔为15秒,这是不可能的,因为应用程序记录的输入每秒(每秒10个)连续生成。在其

  • 片段着色器 结果是一个绿色屏幕,里面有一个黑色矩形。但我希望长方形改为蓝色。我的代码有问题吗?

  • 我有一个由两个字段“键控”的记录流,然后分配一个间隔为30秒的会话窗口。我使用附加在记录上的“时间戳”作为事件时间。我正在使用“Assign AscendingTimeStamps”水印。 以下面的记录为例。该流由(用户,place)键控。 Record1:user1,place1,timestamp t1 Record2:user2,place1,timestamp在t1之后30秒 桶1 Rec

  • 我正在使用OpenNLP处理诸如“在洛杉矶工作的医生”和“住在好莱坞并在圣莫尼卡工作的女性”之类的查询。对于理解人类的英语来说,这些句子很明显,主题是“医生”和“女性”。然而,当我使用opennlp时,它将句子标记为 [女性生活][好莱坞] 这是另一个句子“住在圣莫尼卡、在马里布工作和踢足球的人”被处理为 为什么OpenNLP的POS标记器错误地标记了它们?这些句子有最简单的语法结构。如果最先进的

  • 我有flink stream,我在某个时间窗口上计算一些事情,比如说30秒。 这里发生的事情是,它给我的结果,我以前的窗口聚合以及。 假设前30秒我得到结果10。 接下来的三十秒我想要新的结果,而不是我得到最后一个窗口的结果新的等等。 因此,我的问题是如何为每个窗口获得新的结果。

  • 问题内容: 运行以下代码: 结果是: 上面的代码中没有定义任何窗口框架,它看起来默认的窗口框架是 不确定我对默认窗口框架的理解是否正确 问题答案: 从Spark Gotchas 默认帧规格取决于给定窗口定义的其他方面: 如果指定了ORDER BY子句,并且该函数接受了帧规范,则该帧规范是由RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW定义的, 否