当前位置: 首页 > 知识库问答 >
问题:

pySpark addfile选项,执行器中的worker会发生什么

左丘源
2023-03-14

很明显,对于分发小的查找数据,使用广播变量更好。

假设我们在yarn客户端模式下从Master节点运行pySpark代码(spark submit)。因此应用程序驱动程序将始终在主节点上创建。我们从主节点上的本地路径读取文件。

with open('/tmp/myfile.txt', 'r') as f:
    lookup = {}
    for line in f.readlines():
        line = parse(line) # Method parse uses re and return dict
        lookup[line['name']] = line['age']

然后创建broadcast var并使用它:

lookupBC = sc.broadcast(lookup)

output = sc.textFile('/path/to/hdfs/')\
    .map(lambda e: (lookupBC.value.get(e, e), 1))\
    .collect()
sc.addFile('/tmp/myfile.txt')

with open(SparkFiles.get('/tmp/myfile.txt')) as f:
    lookup = {}
    for line in f.readlines():
        line = parse(line) # Method parse uses re and return dict
        lookup[line['name']] = line['age']

output = sc.textFile('/path/to/hdfs/')\
    .map(lambda e: (lookup.get(e, e), 1))\
    .collect()
  1. 文件将被读取多少次?每个执行器在特定节点上执行一次?还是每个任务一次?
  2. 步骤是什么,如何在Executor上处理代码?
  3. 使用更好的addFile或bc var?
  4. spark是否会基于pyspark代码进行任何优化并创建隐式bc变量?

在executor日志中,我看到了有关bc VAR的信息,但我在代码中没有使用任何信息:

18/03/21 15:36:27 INFO util.Utils: Fetching spark://172.25.235.201:36478/files/myfile.txt to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/fetchFileTemp230224632617642846.tmp
18/03/21 15:36:27 INFO util.Utils: Copying /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/-17884647971521635771454_cache to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/container_1520754626920_6227_01_000002/./myfile.txt
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
18/03/21 15:36:28 INFO client.TransportClientFactory: Successfully created connection to strt01we.ebb.er.com/172.25.235.216:43791 after 4 ms (0 ms spent in bootstraps)
18/03/21 15:36:28 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.3 KB, free 366.3 MB)
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 551 ms

共有1个答案

郏经纬
2023-03-14

广播变量似乎被加载在内存中,直到它们被显式销毁。相反,sc.addfile似乎是(为每个执行程序)创建一个磁盘副本。所以我猜测sparkfiles.get()会在每次调用文件时将文件加载到内存中。

  • 因此,在上面的示例中,它将加载一次。
  • 但如果您在.map()中调用sparkfiles.get(),则它会尝试为RDD中的每个条目重新加载文件。

最后,回答大家的问题,

视情况而定,调用.get的位置,如上所述。

步骤是什么,如何在Executor上处理代码?

我不明白这部分。

 类似资料:
  • 嗨,一个星期以来,我一直在追踪我的办公代码中的一个错误。它与Spring、Hibernate和Transaction有关。 我所知道的: 1.延迟加载。 2. Spring如何使用代理和拦截器进行事务管理。 3. Spring中的事务传播,我们的代码使用默认的REQUIRED。 4.每个请求/会话的会话和Hibernate的分离实体基础来保持会话,以及我们的会话不是每个会话的事实。 我的场景:我的

  • 我想了解更多关于std::thread的信息,特别是如果我有一个线程向量,并且其中一个线程完成了执行,将会发生什么。 想象一下这个例子: 创建一个线程向量,所有线程都执行以下函数: “char*flag”指向一个标志,表示函数停止执行。 例如,向量包含10个线程,它们都在执行。然后将线程号3的标志设置为零。(向量中的第4个线程,因为向量从零开始。) 好的做法是然后加入线程。 向量现在将包含多少st

  • 编译一个简单的hello world应用程序,如下所示: 使用默认编译器选项()生成相对较大的822 KB可执行文件。 为什么会发生这种情况,减少可执行文件大小的最佳方法是什么?

  • 观察:我在Polymer中运行一个SPA,当我在chrome中的新后台选项卡中打开内部链接时,页面直到我关注该选项卡才完成加载(ajax和所有页面)。 问题:chrome在等待用户焦点做什么?或者,Polymer在完成页面渲染和发送ajax请求之前在等待什么? 奖励:我如何打开一个开发工具面板,目标是我在后台打开的标签,以观察发生了什么/没有发生什么? 额外的回答:如果你在一个选项卡上打开开发工具

  • 我已经从服务器的category类中提取了数据。getcategories方法返回包含旋转器项的字符串列表。当我点击旋转器项时。什么都没发生。我的代码有什么错误吗。请帮帮忙。 这是我的Java密码。 这是我的布局

  • 本文向大家介绍请说明当PHP switch case执行case 0时会发生什么?,包括了请说明当PHP switch case执行case 0时会发生什么?的使用技巧和注意事项,需要的朋友参考一下 PHP是一种松散类型的语言。当与大小写0匹配时,字符串与最接近的整数匹配。 假设我们有以下开关表达式- 现在,我们将匹配情况0- 我们还将匹配非零情况- 示例 输出结果