我跟着这个帖子并行运行KMeans。我在EMR上使用了Python2.7和Spark2.0.2。
from pyspark.ml.clustering import KMeans
from sklearn.datasets.samples_generator import make_blobs
from pyspark.ml.linalg import Vectors
import random
random.seed(1)
group_size = 30
n_groups = 20
n_samples= n_groups * group_size
n_features=2
n_centers=4
xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)
x_groups = []
for i in range(n_groups):
x_groups.append(xs[i*group_size: (i+1)*group_size])
def do_kmean(xs):
data = []
for x in xs:
data.append((Vectors.dense(x.tolist()),) )
df = spark.createDataFrame(data, ["features"])
num_clusters = random.randint(5,10)
kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction")
model = kmeans.fit(df)
return [num_clusters, kmeans.getK()]
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=8)
result = tpool.map(do_kmean, x_groups)
[[5, 9],
[8, 9],
[6, 8],
[10, 9],
[7, 9],
[9, 9],
[7, 9],
[9, 9],
[5, 5],
[5, 9],
[9, 7],
[9, 9],
[5, 7],
[10, 5],
[7, 7],
[7, 7],
[6, 6],
[10, 10],
[10, 10],
[5, 5]]
这确实是Spark 2.0.2和2.1.0的bug。我能够在我的本地机器上复制上述两个版本的bug。Spark 2.1.1的bug已经修复。
https://issues.apache.org/jira/browse/spark-19348
但是当我运行test时,两个浏览器实例都打开了(Chrome首先打开并开始执行,延迟后Firefox打开)。在这种情况下,驱动程序对象被Firefox驱动程序覆盖,chrome停止执行。测试继续在Firefox上执行并成功完成。 项目的结构是这样的: 创建了一个DriverBase.class来加载与浏览器对应的驱动程序,该浏览器具有my@beforeSuite. crteated页面的单个类。(
问题内容: 我对react.js还是很陌生,并且正在通过构建砌体样式布局进行试验。 我将每个元素呈现给DOM,然后需要遍历每个项目并根据前面的元素应用x和y位置。 初始模型如下所示: (我只显示了一个项目以使内容简短)。 完成循环并获取x和y数据后,我想将其应用于podStyle对象。我用以下数据调用setState: 这似乎从模型中删除了所有当前数据,而只剩下了podStyle数据。我是否误解了
问题内容: 我想覆盖我无法控制的工厂提供给我的对象中的方法。 我的具体问题是,我想覆盖 的getInputStream 和 getOutputStream方法 一的 Socket对象 进行 线记录 。 通用问题如下: 我想实例化并用我自己的实例替换的位置,如下所示: 对于套接字,我将返回一个 InputStream 和 OutputStream ,它们通过记录来包装以拦截数据。 问题答案: 由于J
我有个身材。我的android应用程序中的gradle文件具有以下设置: My AndroidManifest.xml不包含版本代码,也不包含 versionName。 现在我想在Jenkins上构建这个应用程序,并将build_NUMBER作为应用程序的版本代码传递,以便每个构建都有更高的版本。 所以在工作中,我接到了一个电话: 当我使用“versionCode”重命名“app release.
在这个特定的示例中,我扩展了
我想循环两个列表,将组合传递给函数,并获得以下输出: 由于这是Pyspark,我想将其并行化,因为函数的每个迭代都可以独立运行。 注:我的实际函数是pyspark中的一个长而复杂的算法。只是想贴一个简单的例子来概括。 最好的方法是什么?