当前位置: 首页 > 知识库问答 >
问题:

Apache Flink-svm对流式数据的预测

程亦
2023-03-14

我使用Apache Flink来预测来自Twitter的流。

代码是在Scala中实现的

我的问题是,我从DataSet API训练的SVM模型需要一个DataSet作为predict()方法的输入。

我在这里已经看到一个问题,其中一个用户说,您需要编写一个自己的MapFunction,在作业开始时读取模型(参考:Flink中使用scala的实时流预测)

但是我不能写/理解这段代码。

即使我在StreamingMap函数中得到了模型。我仍然需要一个数据集作为预测结果的参数。

我真的希望有人能向我展示/解释这是如何做到的。

Flink版本:1.9 Scala版本:2.11 Flink ML:2.11

val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment

//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
        featureVectorService.learnTrainingData(labeledData, false)

//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
        val svm = SVM()
                .setBlocks(env.getParallelism)
                .setIterations(100)
                .setRegularization(0.001)
                .setStepsize(0.1)
                .setSeed(42)
//learning
svm.fit(trainingData)

//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))

//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
                .flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)

共有1个答案

金嘉言
2023-03-14

因此,目前FlinkML(SVM的一部分)不支持流式API。这就是为什么SVM只接受DataSet。我们的想法不是使用FlinkML,而是使用scala或java中提供的一些SVM库。然后您可以读取模型,例如从文件中读取。问题是您必须自己实现大部分逻辑。

你提到的帖子中的评论或多或少是在说完全相同的事情。

 类似资料:
  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

  • 我有2个使用kafka主题创建的流,我正在使用DataStream API加入它们。我希望将连接(应用)的结果发布到另一个kafka主题。我在外部主题中看不到连接的结果。 我确认我向两个源主题发布了正确的数据。不确定哪里出了问题。下面是代码片段, 创建的流如下所示。 流连接使用等于的连接执行,如下所示。 如下所述, 有什么线索吗,哪里出了问题?我可以在拓扑中看到可用的消息,谢谢

  • 由于我刚接触DataFlow/Beam,概念还不太清楚(或者至少我在开始编写代码时有困难),我有很多问题: 什么是最好的模板或模式,我可以用来做到这一点?我应该先执行BigQuery的PTransform(然后执行PubSub的PTransform)还是先执行PubSub的PTransform? 我怎么做加入?比如? PubSub的最佳窗口设置是什么?BigQuery的PTransform部分的窗

  • 我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2

  • 我是机器学习和OpenCV的新手。我从Cohn-Kanade人脸数据库中为每种情绪(中性和快乐)拍摄了一组10张图像。然后,我从每个图像中提取面部特征,并将它们放入我的训练数据矩阵中,并为各自的情绪分配标签(例如:0表示中性,1表示快乐)。 我使用了gamma=0.1和C=1的RBF内核。经过训练后,我将从智能手机摄像头中提取出的面部特征用于预测。预测总是返回1。 如果我增加中性表达式的训练样本数

  • 这适用于必须使用SVM方法来提高模型精度的分配。 共有3部分,编写了下面的代码 但在此之后,问题如下 执行数字标准化。数据,并将转换后的数据存储在可变数字中。 提示:从sklearn.preprocessing.使用所需的实用程序再次,将digits_standardized分成两个集合名称X_train和X_test。此外,将digits.target分成两组Y_train和Y_test。 提示