当前位置: 首页 > 知识库问答 >
问题:

基于scala的Flink流媒体实时预测

林正平
2023-03-14

Flink版本:1.2.0
Scala版本:2.11.8

我想使用一个数据流来预测使用scala在flink中的模型。我在flink中有一个使用scala的DataStream[String],其中包含来自kafka源的json格式的数据。我想用这个数据流来预测已经训练过的Flink ml模型。问题是所有flink ml示例都使用DataSet api进行预测。我对flink和scala比较陌生,所以如果您能提供代码解决方案,我将不胜感激。

输入:

{"FC196":"Dormant","FC174":"Yolo","FC195":"Lol","FC176":"4","FC198":"BANKING","FC175":"ABDULMAJEED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053570","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"","FC188":"BR01","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"}

代码:

package org.apache.flink.quickstart

//imports

import java.util.Properties

import org.apache.flink.api.scala._
import org.apache.flink.ml.recommendation.ALS
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.util.parsing.json.JSON

//kafka consumer imports
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

//kafka json table imports
import org.apache.flink.table.examples.scala.StreamTableExample
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSource
import org.apache.flink.api.java.DataSet

//JSon4s imports
import org.json4s.native.JsonMethods



// Case class
case class CC(FC196:String,FC174:String,FC195:String,FC176:String,FC198:String,FC175:String,FC197:String,FC178:String,FC177:String,FC199:String,FC179:String,FC190:String,FC192:String,FC191:String,FC194:String,FC193:String,FC203:String,FC205:String,FC185:String,FC184:String,FC187:String,FC186:String,FC189:String,FC200:String,FC188:String,FC202:String,FC201:String,FC181:String,FC180:String,FC183:String,FC182:String)


object WordCount {

  implicit val formats = org.json4s.DefaultFormats

  def main(args: Array[String]) {

    // set up the execution environment
    implicit lazy val formats = org.json4s.DefaultFormats

    // kafka properties
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
    properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
    properties.setProperty("group.id","grouop")
    properties.setProperty("auto.offset.reset", "earliest")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val st = env
      .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
      .flatMap(raw => JsonMethods.parse(raw).toOption)


    val mapped = st.map(_.extract[CC])

    mapped.print()

    env.execute()

    }
}

共有1个答案

端木安国
2023-03-14

解决这个问题的方法是编写一个MapFunction,在作业开始时读取模型。然后,MapFunction将模型存储为其内部状态的一部分。这样,在发生故障时,它将自动恢复:

public static void main(String[] args) throws Exception {
        // obtain execution environment, run this example in "ingestion time"
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<Value> input = ...; // read from Kafka for example

        DataStream<Prediction> prediction = input.map(new Predictor());

        prediction.print();

        env.execute();
    }

    public static class Predictor implements MapFunction<Value, Prediction>, CheckpointedFunction {

        private transient ListState<Model> modelState;

        private transient Model model;

        @Override
        public Prediction map(Value value) throws Exception {
            return model.predict(value);
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // we don't have to do anything here because we assume the model to be constant
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Model> listStateDescriptor = new ListStateDescriptor<>("model", Model.class);

            modelState = context.getOperatorStateStore().getUnionListState(listStateDescriptor);

            if (context.isRestored()) {
                // restore the model from state
                model = modelState.get().iterator().next();
            } else {
                modelState.clear();

                // read the model from somewhere, e.g. read from a file
                model = ...;

                // update the modelState so that it is checkpointed from now
                modelState.add(model);
            }
        }
    }

    public static class Model {}

    public static class Value{}

    public static class Prediction{}
}
 类似资料:
  • 我有一个Java应用程序午餐一个flink工作来处理Kafka流。

  • 我想用不同的比特率和分辨率对MPEG-DASH的实时流进行编码,以便实时播放。 到目前为止,我发现的一切要么只使用源分辨率(灵活,nginx rtmp模块),要么似乎只用于VOD流媒体(DASHEncoder)。 是否可以将DASHEncoder与实时输入(rtmp流)一起使用,我将如何做到这一点?如果没有,是否可以将nginx-rtmp ffmpeg用于我想做的事情?

  • 我正在使用Google的YouTube API Explorer(备用)来查找属于其他人的任意流媒体广播的信息。 无论我在字段中输入了什么,我都会返回 这似乎很荒谬,考虑到视频显然是流媒体。 我突然想到,我可能误解了字段的说明,所以我尝试了几种不同的可能性。这些包括。。。 频道ID() 用户ID() 视频ID() ...每个都无济于事。 我如何询问一个频道有关其直播流视频的信息?这个问题在过去可以

  • 我正在尝试运行官方示例,该示例展示了如何使用Apache Flink Streaming与Twitter:https://github.com/apache/flink/tree/master/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter 如果我不提供到属性的路径。文件,推特流数

  • 是否可以在Apache Flink中使用已经成批训练的模型对dataStream进行预测? 支持向量机的预测功能需要一个数据集作为输入,而不需要一个数据集。 不幸的是,我不知道如何使用FlatpMap/Map函数使其工作。 我是这样训练SVM模型的: val svm2=SVM() svm2.setseed(1) svm2.fit(trainLV) val testVD=testlv.map(lv=

  • 首先,我是流处理框架的新手。我想对其中一些进行基准测试,所以我从Flink开始。 对于我的用例,我需要将窗口t中的事件与窗口t-1中的事件进行比较,两者的大小都是15分钟,然后进行一些聚合。 以下是我的用例的简化版本: 我们将分析的事件视为形式的元组。在窗口1中,我们有:(A,1),(B,2),(C,3),在窗口2中,我们有:(D,6)和(B,7)。然后,我需要将当前窗口中的事件与前一个窗口中的事