当前位置: 首页 > 知识库问答 >
问题:

为什么使用广播会增加时间?

诸葛砚
2023-03-14
case class IpDB(startIp: Long, endIp: Long, company: String)
def ipMap(line: Array[String]):
    val Array(startIp, endIp, company) = line
    IpDB(startIp.toLong, endIp.toLong, company)
// The ip-db is just 300M in raw format.
// format of ip-data from s3:
// 100000 200000 GOOGLE
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value

val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)

// will do ip company lookup here
dataA.fullOuterJoin(dataB).map(doIpCompanyLookUp(ipData, _)).collect()

代码只是从输入源获取ip数据,然后在加入后找到它的公司。以下是我的问题:

这段代码在生产中会运行2-3分钟,但当删除广播数据(只需在两个数据上连接)时,只需花费不到1分钟。当我查看spark的ui时,我发现gc时间可能是问题所在。

下面是运行此作业的设置:

spark-submit --master yarn --deploy-mode client --driver-memory 4g --num-executors 10 --executor-memory 8800m --executor-cores 4 --class ... XX.jar 
spark version: 1.6.1
10 m3.xlarge.
    null
case class IpDB(startIp: Long, endIp: Long, company: String)
def ipMap(line: Array[String]):
    val Array(startIp, endIp, company) = line
    IpDB(startIp.toLong, endIp.toLong, company)
// The ip-db is just 300M in raw format.
// format of ip-data from s3:
// 100000 200000 GOOGLE
val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)

// will do ip company lookup here
val joinResult = dataA.fullOuterJoin(dataB)
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value
joinResult.map(doIpCompanyLookUp(ipData, _)).collect()

更新2。由于生产代码相当复杂,与上面的伪代码不同,在代码顺序上稍作改变后,程序运行得更快,但我不确定问题的关键是初始化广播数据的位置。

共有1个答案

陈胤
2023-03-14

没有太多考虑代码的情况下,行:

val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value

让我很担心。

您使用sc.textfile构建分布式RDD,只是为了通过collect()使其成为本地(驱动程序),然后通过sc.broadcast(!)使其再次分布式并可供执行程序使用正如您所看到的,有大量的数据来回发送。

sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).cache
 类似资料:
  • 我有一个正在运行的Spark Streaming应用程序,它使用mapWithState函数来跟踪RDD的状态。该应用程序可以正常运行几分钟,但随后会崩溃 我观察到,Spark应用程序的内存使用量随着时间的推移呈线性增加,尽管我已经为mapWithStateRDD设置了超时。请参阅下面的代码片段和内存使用情况- 如果每个RDD都有一个显式超时,为什么内存会随着时间线性增加? 我已经尝试增加内存,但

  • 问题内容: 我不明白为什么 ‘chown’ 命令应该增加我的docker映像的大小? 以下Dockerfile创建大小为5.3MB的映像: 但是,此示例创建的图像大小为8.7MB: 为什么? 注意: 我的实际dockerfile当然比该示例长得多,因此映像大小的增加也很大。这就是为什么我什至在乎。 问题答案: Dockerfile中的每个步骤都会生成一个新的中间映像或“层”,该文件由文件系统中与上

  • setAction()在intent(服务)中做什么 我并不真正理解setAction()的功能,我主要是在“服务到活动的数据传递”示例中找到它的。这根绳子可以自由设置吗?。它到底做什么? 创建广播意图时,除了可选数据和类别字符串之外,它还必须包括一个ACTION STRING。与标准意图一样,数据是使用键值对与意图对象的putExtra()方法一起添加到广播意图中的。可选的类别字符串可以通过调用

  • 问题内容: 我已经调整了控制器的构造函数和fxml,以便将控制器的fxml的所有设置都放在fxml中,除了FXML的构造和fxml的加载。这是我的控制器: 和我的fxml文件: 当调用fxmlLoader.load()并返回FXMLLoader时会发生stackoverflow fxmlLoader = new FXMLLoader(…),然后再次调用fxmlLoader.load()…为什么会发

  • 问题内容: 一位同事让我意识到了一个非常奇怪的MySQL行为。 假设您有一个表,其中包含一个auto_increment字段和另一个设置为唯一的字段(例如,用户名字段)。尝试插入表中已有用户名的行时,插入失败,正如预期的那样。然而,几次尝试失败后插入有效的新条目时,可以看到auto_increment值增加了。 例如,当我们的上一个条目如下所示时… …并且在下一次插入时尝试使用相同的用户名值尝试五