Step1: 首先下载项目:
//下载项目
git clone https://github.com/allwefantasy/spark-deep-learning.git .
//切换到release 分支
git checkout release
Step2: 构建pyspark环境:
确保安装了python 2.7 ,强烈建议你使用Virtualenv方便python环境的管理。之后通过pip 安装pyspark
pip install pyspark
文件比较大,大约180多M,有点耐心。你也可以使用阿里源:
pip install pyspark -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
下载 spark 2.2.0,然后解压到特定目录,设置SPARK_HOME即可。
其实如果通过spark-submit 提交程序,并不会需要额外安装pyspark, 这里通过pip安装的主要目的是为了让你的IDE能有代码提示。
接着安装项目需要的依赖:
pip install -r requirements.txt
最后进行项目build:
build/sbt assembly
这个时候你就得到了一个jar包:
target/scala-2.11/spark-deep-learning-assembly-0.1.0-spark2.1.jar
另外,另外你还需要一个Kafka。 似乎感觉有点麻烦,然而只要配置一次。
为了方便在IDE得到代码提示,我们还需要把python相关的代码打包。
在主目录运行:
cd ./python && python setup.py bdist_wheel && cd dist && pip uninstall sparkdl && pip install ./sparkdl-0.2.2-py2-none-any.whl && cd ..
我这里打包和安装放一块了。
现在,在IDE里,你可以得到代码提示补全了。
首先我们假设我们有这样的数据:
# -*- coding: UTF-8 -*-
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from spark_sklearn import GridSearchCV
from sklearn import svm
from sklearn.model_selection import GridSearchCV
from sparkdl.estimators.text_estimator import TextEstimator, KafkaMockServer
from sparkdl.transformers.tf_text import TFTextTransformer
session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()
documentDF = session.createDataFrame([
("Hi I heard about Spark", "spark"),
("I wish Java could use case classes", "java"),
("I wish Java could use case classes", "java"),
("I wish Java could use case classes", "java"),
("Logistic regression models are neat", "mlib"),
("Logistic regression models are neat", "spark"),
("Logistic regression models are neat", "mlib"),
("Logistic regression models are neat", "java"),
("Logistic regression models are neat", "spark"),
("Logistic regression models are neat", "java"),
("Logistic regression models are neat", "mlib")
], ["text", "preds"])
接着我们希望把preds转化为数字(分类),text转化为向量,这样才能喂给算法。我们可以这么做:
features = TFTextTransformer(
inputCol="text", outputCol="features", shape=(-1,), embeddingSize=100, sequenceLength=64)
indexer = StringIndexer(inputCol="preds", outputCol="labels")
pipline = Pipeline(stages=[features, indexer])
ds = pipline.fit(documentDF).transform(documentDF)
TFTextTransformer 默认提供的是一个二维数组,shape为(64,100),这种shape其实是为了给深度学习使用的,这里我指定shape为(-1,) 则会将二维数组转化为一个64*100的向量
现在我们写一个函数,里面实现具体的sk-learn逻辑:
def sk_map_fun(args={}, ctx=None, _read_data=None):
params = args['params']['fitParam']
data = [item for item in _read_data()]
parameters = {'kernel': ('linear', 'rbf')}
svr = svm.SVC()
clf = GridSearchCV(svr, parameters)
X = [x["features"] for x in data[0]]
y = [int(x["labels"]) for x in data[0]]
model = clf.fit(X, y)
print(model.best_estimator_)
return ""
前面必须是def sk_map_fun(args={}, ctx=None, _read_data=None):
这样,函数名字可以随意定。 _read_data 是你获取数据的一个对象,典型用法如下:
for data in _read_data(max_records=params["batch_size"]):
batch_data = feed_dict(data)
sess.run(train_step, feed_dict={input_x: batch_data})
因为SVM是需要全量数据的,所以我简单的一次性拉取所有数据,因为条数小于默认的64条,所以我没有指定max_records.
data = [item for item in _read_data()]
X = [x["features"] for x in data[0]]
y = [int(x["labels"]) for x in data[0]]
现在我们要把sk_map_fun 集成到Estimator里:
estimator = TextEstimator(inputCol="features", outputCol="features", labelCol="labels",
kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
"group_id": "sdl_1", "test_mode": False},
runningMode="Normal",
fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
mapFnParam=sk_map_fun)
estimator.fit(ds).collect()
这里,通过mapFnParam
参数,我们将sklearn函数传递给了TextEstimator,并且我们配置了Kakfa相关参数。这里唯一需要注意的是fitParam, 这里的fitParam 长度为2,意味着会启动两个进程运行sk_map_fun,并且一次传递对应的参数给sk_map_fun,sk_map_fun的第一段代码:
params = args['params']['fitParam']
这个时候params是{"epochs": 5, "batch_size": 64} 或者 {"epochs": 5, "batch_size": 1}。
这样你可以通过params拿到epoche,batch_size等,然后传给对应的Sk-Learn模型。
如果你只是运行Local模式,那么可以修改下kafkaParam参数:
import tempfile
mock_kafka_file = tempfile.mkdtemp()
kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
"mock_kafka_file": mock_kafka_file,
"group_id": "sdl_1", "test_mode": True},
指定一个临时目录mock_kafka_file,并且设置为test_mode为True,这样就可以不依赖于Kafka.
现在我么给出完整程序:
# -*- coding: UTF-8 -*-
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from spark_sklearn import GridSearchCV
from sklearn import svm
from sklearn.model_selection import GridSearchCV
from sparkdl.estimators.text_estimator import TextEstimator, KafkaMockServer
from sparkdl.transformers.tf_text import TFTextTransformer
session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()
documentDF = session.createDataFrame([
("Hi I heard about Spark", "spark"),
("I wish Java could use case classes", "java"),
("I wish Java could use case classes", "java"),
("I wish Java could use case classes", "java"),
("Logistic regression models are neat", "mlib"),
("Logistic regression models are neat", "spark"),
("Logistic regression models are neat", "mlib"),
("Logistic regression models are neat", "java"),
("Logistic regression models are neat", "spark"),
("Logistic regression models are neat", "java"),
("Logistic regression models are neat", "mlib")
], ["text", "preds"])
# transform text column to sentence_matrix column which contains 2-D array.
features = TFTextTransformer(
inputCol="text", outputCol="features", shape=(-1,), embeddingSize=100, sequenceLength=64)
indexer = StringIndexer(inputCol="preds", outputCol="labels")
pipline = Pipeline(stages=[features, indexer])
ds = pipline.fit(documentDF).transform(documentDF)
def sk_map_fun(args={}, ctx=None, _read_data=None):
data = [item for item in _read_data()]
parameters = {'kernel': ('linear', 'rbf')}
svr = svm.SVC()
clf = GridSearchCV(svr, parameters)
X = [x["features"] for x in data[0]]
y = [int(x["labels"]) for x in data[0]]
model = clf.fit(X, y)
print(model.best_estimator_)
return ""
# create a estimator to training where map_fun contains tensorflow's code
estimator = TextEstimator(inputCol="features", outputCol="features", labelCol="labels",
kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
"group_id": "sdl_1", "test_mode": False},
runningMode="Normal",
fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
mapFnParam=sk_map_fun)
estimator.fit(ds).collect()
然后使用如下指令运行:
./bin/spark-submit \
--py-files spark-deep-learning-assembly-0.1.0-spark2.1.jar \
--jars spark-deep-learning-assembly-0.1.0-spark2.1.jar \
--master "local[*]" Sk2.py
记得改下代码。
只要修改map_fun函数即可,比如:
def map_fun(args={}, ctx=None, _read_data=None):
import tensorflow as tf
EMBEDDING_SIZE = args["embedding_size"]
params = args['params']['fitParam']
SEQUENCE_LENGTH = 64
def feed_dict(batch):
# Convert from dict of named arrays to two numpy arrays of the proper type
features = []
for i in batch:
features.append(i['sentence_matrix'])
# print("{} {}".format(feature, features))
return features
encoder_variables_dict = {
"encoder_w1": tf.Variable(
tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"),
"encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"),
"encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"),
"encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2")
}
def encoder(x, name="encoder"):
with tf.name_scope(name):
encoder_w1 = encoder_variables_dict["encoder_w1"]
encoder_b1 = encoder_variables_dict["encoder_b1"]
layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1)
encoder_w2 = encoder_variables_dict["encoder_w2"]
encoder_b2 = encoder_variables_dict["encoder_b2"]
layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2)
return layer_2
def decoder(x, name="decoder"):
with tf.name_scope(name):
decoder_w1 = tf.Variable(tf.random_normal([128, 256]))
decoder_b1 = tf.Variable(tf.random_normal([256]))
layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1)
decoder_w2 = tf.Variable(
tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE]))
decoder_b2 = tf.Variable(
tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE]))
layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2)
return layer_2
tf.reset_default_graph
sess = tf.Session()
input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x")
flattened = tf.reshape(input_x,
[-1, SEQUENCE_LENGTH * EMBEDDING_SIZE])
encoder_op = encoder(flattened)
tf.add_to_collection('encoder_op', encoder_op)
y_pred = decoder(encoder_op)
y_true = flattened
with tf.name_scope("xent"):
consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1),
tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)),
tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1))))
xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine))
tf.summary.scalar("xent", xent)
with tf.name_scope("train"):
# train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent)
train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent)
summ = tf.summary.merge_all()
sess.run(tf.global_variables_initializer())
for i in range(params["epochs"]):
print("epoll {}".format(i))
for data in _read_data(max_records=params["batch_size"]):
batch_data = feed_dict(data)
sess.run(train_step, feed_dict={input_x: batch_data})
sess.close()
我这里还是之前的一个例子,一个auto-encoder程序。
接着通过TextEstimator接入:
estimator = TextEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds",
kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
"mock_kafka_file": mock_kafka_file,
"group_id": "sdl_1", "test_mode": True},
runningMode="Normal",
fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
mapFnParam=map_fun)
estimator.fit(df).collect()
大同小异了。
关于tensorflow,还可以有集群模式,可参考: 为Spark Deep Learning 集成TFoS