很明显,对于分发小的查找数据,使用广播变量更好。
假设我们在yarn客户端模式下从Master节点运行pySpark代码(spark submit)。因此应用程序驱动程序将始终在主节点上创建。我们从主节点上的本地路径读取文件。
with open('/tmp/myfile.txt', 'r') as f:
lookup = {}
for line in f.readlines():
line = parse(line) # Method parse uses re and return dict
lookup[line['name']] = line['age']
然后创建broadcast var并使用它:
lookupBC = sc.broadcast(lookup)
output = sc.textFile('/path/to/hdfs/')\
.map(lambda e: (lookupBC.value.get(e, e), 1))\
.collect()
sc.addFile('/tmp/myfile.txt')
with open(SparkFiles.get('/tmp/myfile.txt')) as f:
lookup = {}
for line in f.readlines():
line = parse(line) # Method parse uses re and return dict
lookup[line['name']] = line['age']
output = sc.textFile('/path/to/hdfs/')\
.map(lambda e: (lookup.get(e, e), 1))\
.collect()
在executor日志中,我看到了有关bc VAR的信息,但我在代码中没有使用任何信息:
18/03/21 15:36:27 INFO util.Utils: Fetching spark://172.25.235.201:36478/files/myfile.txt to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/fetchFileTemp230224632617642846.tmp
18/03/21 15:36:27 INFO util.Utils: Copying /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/-17884647971521635771454_cache to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/container_1520754626920_6227_01_000002/./myfile.txt
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
18/03/21 15:36:28 INFO client.TransportClientFactory: Successfully created connection to strt01we.ebb.er.com/172.25.235.216:43791 after 4 ms (0 ms spent in bootstraps)
18/03/21 15:36:28 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.3 KB, free 366.3 MB)
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 551 ms
广播变量似乎被加载在内存中,直到它们被显式销毁。相反,sc.addfile
似乎是(为每个执行程序)创建一个磁盘副本。所以我猜测sparkfiles.get()
会在每次调用文件时将文件加载到内存中。
.map()
中调用sparkfiles.get()
,则它会尝试为RDD中的每个条目重新加载文件。最后,回答大家的问题,
视情况而定,调用.get
的位置,如上所述。
步骤是什么,如何在Executor上处理代码?
我不明白这部分。
嗨,一个星期以来,我一直在追踪我的办公代码中的一个错误。它与Spring、Hibernate和Transaction有关。 我所知道的: 1.延迟加载。 2. Spring如何使用代理和拦截器进行事务管理。 3. Spring中的事务传播,我们的代码使用默认的REQUIRED。 4.每个请求/会话的会话和Hibernate的分离实体基础来保持会话,以及我们的会话不是每个会话的事实。 我的场景:我的
我想了解更多关于std::thread的信息,特别是如果我有一个线程向量,并且其中一个线程完成了执行,将会发生什么。 想象一下这个例子: 创建一个线程向量,所有线程都执行以下函数: “char*flag”指向一个标志,表示函数停止执行。 例如,向量包含10个线程,它们都在执行。然后将线程号3的标志设置为零。(向量中的第4个线程,因为向量从零开始。) 好的做法是然后加入线程。 向量现在将包含多少st
编译一个简单的hello world应用程序,如下所示: 使用默认编译器选项()生成相对较大的822 KB可执行文件。 为什么会发生这种情况,减少可执行文件大小的最佳方法是什么?
观察:我在Polymer中运行一个SPA,当我在chrome中的新后台选项卡中打开内部链接时,页面直到我关注该选项卡才完成加载(ajax和所有页面)。 问题:chrome在等待用户焦点做什么?或者,Polymer在完成页面渲染和发送ajax请求之前在等待什么? 奖励:我如何打开一个开发工具面板,目标是我在后台打开的标签,以观察发生了什么/没有发生什么? 额外的回答:如果你在一个选项卡上打开开发工具
我已经从服务器的category类中提取了数据。getcategories方法返回包含旋转器项的字符串列表。当我点击旋转器项时。什么都没发生。我的代码有什么错误吗。请帮帮忙。 这是我的Java密码。 这是我的布局
本文向大家介绍请说明当PHP switch case执行case 0时会发生什么?,包括了请说明当PHP switch case执行case 0时会发生什么?的使用技巧和注意事项,需要的朋友参考一下 PHP是一种松散类型的语言。当与大小写0匹配时,字符串与最接近的整数匹配。 假设我们有以下开关表达式- 现在,我们将匹配情况0- 我们还将匹配非零情况- 示例 输出结果