当前位置: 首页 > 知识库问答 >
问题:

多线程如何在apache beam管道中使用有界源代码?

杨乐意
2023-03-14

我是大数据处理的新手。我正在使用apache束JavaSDK来使用它。试图了解多线程/并行数据处理在apache束管道中是如何工作的。关于多线程,数据是如何从一个PTransform处理到另一个PTransform的?

共有1个答案

侯英达
2023-03-14

如果您谈论的是并行数据处理,那么一般来说,Beam依赖于底层的杠杆式数据处理引擎(例如Spark、Flink、Dataflow等)。通常,您无法直接控制诸如“将使用多少工人”、“您的输入集如何分块和并行化”等内容,这将是旧引擎的责任。

但是,假设输入数据将被拆分为捆绑包,管道中的每个DoFn实例都将在一个工作者上处理一个或多个捆绑包,但同时由一个或多个工作者处理。通过这种方式,可以并行处理数据—每个worker上的每个捆绑包。而且捆绑包之间没有任何协调或同步机制(就像我们在多线程中所做的那样),我们必须假设它们是以任意顺序独立处理的。

所以,这是非常“鸟瞰”的观点。如果你有任何具体问题,请毫不犹豫地提出。

 类似资料:
  • BilledLines=Integer.ParseInt(args[7]); Array=TaxCalc.Calculation(输入,y,BilledLines,); 返回数组; } taxouput函数是OSB中的多线程函数。现在,我想调用setup()和 cleanup(),以便setup()只对第一个线程调用,而 cleanup()只对最后一个线程调用。

  • 我正在做一个WSDL的申请。但实际上我不明白是什么使“呢?wsdl”参数以及我将如何处理返回的XML。例如: https://adwords.google.com/api/adwords/cm/v201309/CampaignService?wsdl 这个URL返回和XML字符串,但我该怎么办? 我可以使用jaxb(xjc)将模式文件转换为java类,但我不明白如何正确使用此WSDL? 感谢您的回

  • 问题内容: 我最近将bash执行命令重写为Jenkins管道。旧代码就像 现在,我使用管道脚本来包装命令,像这样 但是,我遇到了一个错误。当我尝试时,它会正确显示。所以我怀疑内部有问题。 在使用管道之前,命令在外壳执行中工作正常。因此,源代码安装在Jenkins服务器上,似乎管道脚本不知道源命令是什么。 如何在sh wrapd块中运行source命令? 问题答案: 替换为 请注意,第一个点后有一个

  • 问题内容: 在Linux终端中,当一个命令的输出太长而无法在一页中读取时,我可以这样做: 这样我就可以读取cat文件的输出并上下滚动。 如何在IPython中执行此操作? 例如,我尝试了一下,但没有成功: 我最初的问题是通过Shift + Page Up不能看到来自的输出,并且我不想更改滚动缓冲区。 问题答案: 在IPython中,您可以使用标准的寻呼机(通常是)来显示对象。另外,您可以增加终端的

  • 我需要创建一个并行执行多个操作的应用程序。我曾考虑过使用线程或线程池,但我以前从未使用过,所以我发现这相当困难。Thread应按以下方式工作: 所有系统应同时运行。你认为我应该如何实现这一点?

  • 问题内容: 我正在开发一个项目,在该项目中,我需要对正在运行的服务器进行HTTP URL调用,该服务器将响应作为JSON字符串返回。 下面是我的主要代码,它使用和- 下面是我的类,它实现接口并使用… 现在我有下面的代码在另一大类它调用的方法类顺序- 所以我的问题是在这里应该是静态的,就像我正确看到的一样,我正在为每个请求重新创建整个连接池,而我猜这不是正确的方法。 注意: 如果我将RestTemp