我使用Spark 2.1.1。
我使用结构化流从2个Kafka分区读取消息。我正在向Spark Standalone集群提交我的应用程序,其中有一个工人和两个执行者(每个2个核心)。
./bin/spark-submit \
--class MyClass \
--master spark://HOST:IP \
--deploy-mode cluster \
/home/ApplicationSpark.jar
我想要这样的功能,来自每个Kafka分区的消息应该由每个单独的执行器独立处理。但现在正在发生的是,执行器分别读取和映射分区数据,但在映射之后,形成的无边界表被普遍使用,并且具有来自两个分区的数据。
当我对表运行结构化查询时,查询必须处理来自两个分区的数据(更多的数据量)。
select product_id, max(smr.order_time), max(product_price) , min(product_price)
from OrderRecords
group by WINDOW(order_time, "120 seconds"), product_id
Kafka分区在Product_id
有没有办法从执行程序映射到的Kafka分区并行但分别对数据运行相同的结构化查询?
但现在发生的事情是,执行器分别读取和.map分区数据,但是在映射通常使用的无界表并且具有来自两个分区的数据之后。因此,当我对表运行结构化查询时,查询必须处理来自两个分区的数据(更多的数据量)。
这是了解在不导致随机播放和跨分区(甚至可能通过网络)发送数据的情况下可以执行的内容和方式的关键。
最终的答案取决于你的查询是什么。如果它们在记录组上工作,其中组分布在多个主题分区中,因此在两个不同的Spark执行器上,你必须格外小心你的算法/转换,以便在单独的分区上进行处理(仅使用分区中可用的内容)并仅聚合结果。
我有一个需要执行一些业务逻辑的处理程序,我希望它在单独的线程池中执行,以不阻塞io事件循环。我已将DefaultEventExecutorGroup添加到管道中,如http://netty.io/4.0/api/io/netty/channel/ChannelPipeline.htmljavadoc和http://netty.io/wiki/new-and-noteworthy-in-4.0.ht
我有一个大约 100GB 的数据源,我正在尝试使用日期列对其进行分区。 为了避免分区内出现小块,我添加了一个重新分区(5 ),使每个分区内最多有5个文件: 我的问题是,在我分配的30个执行器中,只有5个在实际运行。最后我得到了我想要的东西(每个分区内有5个文件),但由于只有5个执行器在运行,所以执行时间非常长。 你有什么建议可以让我做得更快吗?
我正在使用在每一行上执行一个函数,这需要很长时间,为了加快速度,有没有一种方法可以使用并行处理,使多个核心在不同的行上并发工作? 例如,我将PRISM天气数据(https://prism.oregonstate.edu/)聚合到州一级,同时按人口加权。这是基于https://www.patrickbaylis.com/blog/2021-08-15-pop-weighted-weather/. 请
假设数组如下所示: 数组中最多可以有100.000个值。 另一方面,如果我这样做: 我得到serialization异常,因为spark正在尝试序列化spark上下文,而spark上下文是不可序列化的。 如何使这个工作,但仍然利用并行性。 这是我得到的咒语:
我对spark streaming有两个问题: < li >我有一个spark流应用程序正在运行,并以< code>20秒的批处理间隔收集数据,在< code>4000个批处理中,有< code>18个批处理因异常而失败: 无法计算拆分,块输入-0-1464774108087 未找到 我假设此时数据大小大于spark可用内存,并且应用程序< code>StorageLevel为< code>MEM
我需要一些帮助来了解spark如何决定分区的数量,以及它们在executors中是如何处理的,我很抱歉这个问题,因为我知道这是一个重复的问题,但即使在阅读了许多文章后,我仍然不能理解我正在放上一个我目前正在工作的真实生活用例,以及我的Spark提交配置和集群配置。 我的硬件配置: < code>3节点计算机,总Vcores=30,总内存=320 GB。 我正在使用spark dataframe J