广播与累积器(Broadcast & Accumulator)
优质
小牛编辑
127浏览
2023-12-01
对于并行处理,Apache Spark使用共享变量。 当驱动程序将任务发送到集群上的执行程序时,共享变量的副本将在集群的每个节点上运行,以便可以将其用于执行任务。
Apache Spark支持两种类型的共享变量 -
- Broadcast
- Accumulator
让我们详细了解它们。
广播(Broadcast)
广播变量用于跨所有节点保存数据副本。 此变量缓存在所有计算机上,而不是在具有任务的计算机上发送。 以下代码块包含PySpark的Broadcast类的详细信息。
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
以下示例显示如何使用Broadcast变量。 Broadcast变量有一个名为value的属性,它存储数据并用于返回广播值。
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command - 广播变量的命令如下 -
$SPARK_HOME/bin/spark-submit broadcast.py
Output - 以下命令的输出如下。
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
蓄能器(Accumulator)
累加器变量用于通过关联和交换操作聚合信息。 例如,您可以使用累加器进行求和操作或计数器(在MapReduce中)。 以下代码块包含PySpark的Accumulator类的详细信息。
class pyspark.Accumulator(aid, value, accum_param)
以下示例显示如何使用Accumulator变量。 Accumulator变量有一个名为value的属性,类似于广播变量。 它存储数据并用于返回累加器的值,但仅在驱动程序中可用。
在此示例中,累加器变量由多个工作程序使用并返回累计值。
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command - 累加器变量的命令如下 -
$SPARK_HOME/bin/spark-submit accumulator.py
Output - 上面命令的输出如下。
Accumulated value is -> 150