我使用Apache Flink来预测来自Twitter的流。
代码是在Scala中实现的
我的问题是,我从DataSet API训练的SVM模型需要一个DataSet作为predict()方法的输入。
我在这里已经看到一个问题,其中一个用户说,您需要编写一个自己的MapFunction,在作业开始时读取模型(参考:Flink中使用scala的实时流预测)
但是我不能写/理解这段代码。
即使我在StreamingMap函数中得到了模型。我仍然需要一个数据集作为预测结果的参数。
我真的希望有人能向我展示/解释这是如何做到的。
Flink版本:1.9 Scala版本:2.11 Flink ML:2.11
val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
featureVectorService.learnTrainingData(labeledData, false)
//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
val svm = SVM()
.setBlocks(env.getParallelism)
.setIterations(100)
.setRegularization(0.001)
.setStepsize(0.1)
.setSeed(42)
//learning
svm.fit(trainingData)
//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))
//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
.flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)
因此,目前FlinkML(SVM的一部分)不支持流式API。这就是为什么SVM
只接受DataSet
。我们的想法不是使用FlinkML,而是使用scala或java中提供的一些SVM库。然后您可以读取模型,例如从文件中读取。问题是您必须自己实现大部分逻辑。
你提到的帖子中的评论或多或少是在说完全相同的事情。
我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?
我有2个使用kafka主题创建的流,我正在使用DataStream API加入它们。我希望将连接(应用)的结果发布到另一个kafka主题。我在外部主题中看不到连接的结果。 我确认我向两个源主题发布了正确的数据。不确定哪里出了问题。下面是代码片段, 创建的流如下所示。 流连接使用等于的连接执行,如下所示。 如下所述, 有什么线索吗,哪里出了问题?我可以在拓扑中看到可用的消息,谢谢
由于我刚接触DataFlow/Beam,概念还不太清楚(或者至少我在开始编写代码时有困难),我有很多问题: 什么是最好的模板或模式,我可以用来做到这一点?我应该先执行BigQuery的PTransform(然后执行PubSub的PTransform)还是先执行PubSub的PTransform? 我怎么做加入?比如? PubSub的最佳窗口设置是什么?BigQuery的PTransform部分的窗
我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2
我是机器学习和OpenCV的新手。我从Cohn-Kanade人脸数据库中为每种情绪(中性和快乐)拍摄了一组10张图像。然后,我从每个图像中提取面部特征,并将它们放入我的训练数据矩阵中,并为各自的情绪分配标签(例如:0表示中性,1表示快乐)。 我使用了gamma=0.1和C=1的RBF内核。经过训练后,我将从智能手机摄像头中提取出的面部特征用于预测。预测总是返回1。 如果我增加中性表达式的训练样本数
这适用于必须使用SVM方法来提高模型精度的分配。 共有3部分,编写了下面的代码 但在此之后,问题如下 执行数字标准化。数据,并将转换后的数据存储在可变数字中。 提示:从sklearn.preprocessing.使用所需的实用程序再次,将digits_standardized分成两个集合名称X_train和X_test。此外,将digits.target分成两组Y_train和Y_test。 提示