我以前也认为dataflow工作人员使用每个核心1个线程。但是,我最近发现这只适用于批处理模式。在流模式下,除非另有规定,否则它使用300个线程,如下所示。这与批处理工作者代码形成对比。要限制工作线程的数量,请使用--numberofworkerharnessthreads=n
。
我的故事是如何发现这一点的:我有一个ParDo的流式工作,它将读取XML文件并解析它们。工人们的内存耗尽了,因为他们试图一次啃太多的文件。我使用一个静态AtomicInteger来计算PARDO的并发执行次数。此外,我还记录了用于执行PARDO的threadIds。我可以在一个worker上看到多达300个并发执行,记录的不同threadid的数量也表明该worker使用了大量线程。
我通过限制--numberofworkerharnessthreads=10
使用的线程数来解决内存问题。我还尝试将这个数字设置为1个线程,但这似乎导致在任何给定时间只执行一个流水线步骤。这并不奇怪,但是我想要更高级别的并行性,所以10对于我的用例来说似乎是一个很好的数字。
我试图从一个数据流作业中运行两个分离的管道,类似于下面的问题: 一个数据流作业中的并行管道 如果我们使用单个p.run()使用单个数据流作业运行两个分离的管道,如下所示: 我认为它将在一个数据流作业中启动两个独立的管道,但它会创建两个包吗?它会在两个不同的工人上运行吗?
有人能帮我做这个吗?
Posthoc将FFMPEG连接到opencv-python二进制文件,用于Google云数据流作业 根据这个问题,可能会拉出一个自定义docker图像,但我找不到任何关于如何使用DataFlow进行处理的文档。 https://issues.apache.org/jira/browse/beam-6706?focusedcommentid=16773376&page=com.atlassian.
我正在开发一个物联网应用程序,需要从PubSub主题读取流数据。我想使用Google云数据流SDK读取这些数据。我正在使用Java 1.8 我正在使用谷歌云平台的试用版。当我使用PubSubIO时。Read方法读取流数据时,我在日志文件中发现错误,我的项目没有足够的CPU配额来运行应用程序。 所以我想使用谷歌云数据流SDK读取流数据。 请有人告诉我在哪里可以找到使用Google Cloud Dat
我正在尝试在Spring云数据流中配置DLQ。下面是流定义以及我如何部署它 在自定义转换处理器代码中,我已经提到过 这意味着若消息包含错误,那个么RunTimeException和我想在DLQ中捕获这些消息。但当我运行代码时,似乎没有得到任何名为test tran的Kafka DL队列。 我是否需要设置更多属性来启用DLQ,还是需要更改代码中的某些内容以正确使用DLQ。 自定义转换代码 Trans
在我的新公司,我是一名数据工程师,负责构建google cloud platform(GCP)批处理ETL管道。我的团队的数据科学家最近给了我一个数据模型(用Python3.6编写的.py文件)。 数据模型有一个主函数,我可以调用它并获得一个dataframe作为输出,我打算将这个dataframe附加到一个bigquery表中。我是否可以只导入这个主函数,并使用apache beam(Dataf