当前位置: 首页 > 知识库问答 >
问题:

SlidingWindows Python Apache Beam复制数据

于恺
2023-03-14
 | 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))    
 | 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
 | 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

输出

如果我只从pub/sub发送一条消息,并尝试在滑动窗口完成后用代码打印我所拥有的内容:

class print_row2(beam.DoFn):
    def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
        print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))

结果

('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000
  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=X===========|       :       :
  w2:               |==============|       :
  ...

对于FixedWindows,这是我的porpouse的正确代码:

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

共有1个答案

吴建中
2023-03-14

发出属于窗口的所有元素。如果一个元素属于多个窗口,它将在每个窗口中发出。

累积模式仅在计划处理延迟数据/多次触发器触发时才重要。在这种情况下,当触发器再次激发时,丢弃模式只在窗口中提供新的元素,即只发出自上次触发器激发以来到达同一窗口的元素,已经发出的元素不会再次发出,而是被丢弃。在累加模式下,每次触发都会发出整个窗口,它将包括上次已经发出的旧元素和此后到达的新元素。

如果我理解你的例子,你有滑动窗口,它们有30秒的长度,它们每15秒启动一次。所以它们重叠15秒:

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=============|       :       :
  w2:               |==============|       :
  w3:                      |===============|
  ...
  time: ----t+00---t+15---t+30----t+45----t+60------>
             :             :               :
  w1:        |=============|               :
  w2:                      |===============|
  w3:                                      |====...
  ...
 类似资料:
  • 问题内容: 我想创建一个1D NumPy数组,该数组由另一个1D数组的1000个背对背重复组成,而无需将数据复制1000次。 可能吗? 如果有帮助,我打算将两个数组都视为不可变的。 问题答案: 你做不到 NumPy数组在每个维度上必须具有一致的步幅,而您的步幅在大多数情况下需要采用一种方式,但有时会向后跳。 您可以获得的最接近的结果是一个1000行的2D数组,其中的每一行都是您第一个数组的视图,或

  • 问题内容: 我有一个a不断更新的数组。比方说。我需要制作一个完全相同的副本a并称之为b。如果a要改成,b应该还是。做这个的最好方式是什么?我尝试了像这样的循环: 但这似乎无法正常工作。请不要使用深层复制等高级术语,因为我不知道这意味着什么。 问题答案: 你可以尝试使用System.arraycopy()

  • 主要内容:使用 copyOf() 方法和 copyOfRange() 方法,使用 arraycopy() 方法,使用 clone() 方法所谓复制数组,是指将一个数组中的元素在另一个数组中进行复制。本文主要介绍关于 Java 里面的数组复制(拷贝)的几种方式和用法。 在 Java 中实现数组复制分别有以下 4 种方法: Arrays 类的 copyOf() 方法 Arrays 类的 copyOfRange() 方法 System 类的 arraycopy() 方法 Object 类的 clone

  • 问题内容: 这个问题已经在这里有了答案 : 11年前关闭。 精确重复 表命名难题:单数与复数名称 使用单数或复数数据库表名称更好吗?有公认的标准吗? 我听到有人支持和反对它,你们怎么看? 问题答案: 恕我直言,表名称应像客户一样是复数形式。 如果类名映射到“客户”表中的一行,则其名称应与“客户”一样为单数形式。

  • 如果我们使用不可修改的对象,就像我们之前讲过的,假如我们需要修改这个对象状态,必须要创建一个新的一个或者多个属性被修改的实例。这个任务是非常重复且不简洁的。 举个例子,如果我们需要修改Forecast中的temperature(温度),我们可以这么做: val f1 = Forecast(Date(), 27.5f, "Shiny day") val f2 = f1.copy(temperatur

  • 问题内容: 我注意到程序中存在一个错误,发生该错误的原因是因为熊猫似乎是通过引用熊猫数据框而不是通过值进行复制。我知道不可变对象将始终通过引用传递,但pandas数据帧不是不可变的,因此我不明白为什么它通过引用传递。谁能提供一些信息? 谢谢!安德鲁 问题答案: Python中的所有函数都是“按引用传递”,没有“按值传递”。如果要显式复制pandas对象,请尝试。

  • 问题内容: 如何将MySQL表的数据,结构和索引复制或克隆或复制到新表? 这是我到目前为止发现的。 这将复制数据和结构,但不复制索引: 这将复制结构和索引,但不复制数据: 问题答案: 要使用索引和触发器进行复制,请执行以下两个查询: 要仅复制结构和数据,请使用此代码: 我之前曾问过这个问题: 复制包含索引的MySQL表