input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
| beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
trigger=AfterCount(30),
accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)
因此,我们正在寻找防止这种融合发生的方法,因此数据流将窗口与窗口的后处理分开。这样,我们期望Dataflow能够再次分配多个工作人员来进行激发窗口的后处理。
到目前为止我们所尝试的:
最后两个操作确实创建了第三个集群操作(1/processing2/windowing3/post-processing),但我们注意到,在开窗之后,仍然是同一个worker在执行所有操作。
是否有任何解决方案可以解决这个问题声明?
目前我们正在考虑的解决方案是建立另一个接收窗口的流管道,这样这些工作人员就可以并行处理窗口,但这很麻烦…
你做了正确的事情来打破你元素的融合。我怀疑可能有个问题给你带来了麻烦。
对于流,单个键总是在同一个工作器中处理。您的所有记录或大部分记录是否都被分配给一个键?如果是这样,您的处理将在一个工人中完成。
可以防止这种情况发生的方法是使窗口成为键的一部分,这样,即使多个窗口的元素具有相同的键,它们也可以在不同的worker中处理:
class KeyIntoKeyPlusWindow(core.DoFn):
def process(self, element, window=core.DoFn.WindowParam):
key, values = element
yield ((key, window), element)
group = windows | beam.ParDo(KeyIntoKeyPlusWindow() | beam.GroupByKey()
group | beam.Map(post_processing_fn)
问题内容: 目前,我正在使用内置于python的应用程序。当我在个人计算机上运行它时,它不会出现问题。 但是,当我将其移至生产服务器时。它不断向我显示以下错误: 我进行了一些研究,得出的原因是,当服务器仍在忙于发送数据时,最终用户浏览器会停止连接。 我想知道为什么会发生这种情况,以及根本原因是什么导致它无法在生产服务器上正常运行,而我的计算机却可以正常运行。任何建议表示赞赏 问题答案: 您的服务器
无状态管道是纯粹的功能,通过输入数据流动而不记住任何东西或引起可检测的副作用。 大多数管道是无状态的。 我们使用的CurrencyPipe和我们创建的长度管是无状态管的示例。 状态管道是能够管理它们转换的数据的状态的管道。 创建HTTP请求,存储响应并显示输出的管道是有状态的管道。 有状态管道应谨慎使用。 Angular 2提供 ,这是有状态的。 View Example 实现有状态管道 // n
传递参数 链接管道 我们可以将多个管道连接在一起,以便在一个表达式中使用多个管道。
使装饰器包含具有name属性的管道元数据。 此值将用于在模板表达式中调用此管道。 它必须是有效的JavaScript标识符。 实现PipeTransform接口的transform方法。 此方法接受管道的值和任何类型的可变数量的参数,并返回一个变换的(“管道”)值。 import { Component } from '@angular/core'; selector: 'app-root',
主要内容:示例程序,使用管道双向通信管道是两个或更多相关或相关进程之间的通信媒介。 它可以在一个进程内,也可以在子进程和父进程之间进行通信。 通信也可以是多层次的,如父进程,子进程和子进程之间的沟通等。通信是通过一个过程写入管道和从管道读取来实现的。 要实现管道系统调用,请创建两个文件,一个写入文件,另一个从文件读取。 管道机制可以用一个实时的场景来看,比如用管子把水灌进一个容器里。 填充过程可以理解为是写入管道,读取过程只不过是从