我试图使用ApacheFlink ML包的随机选择模型。
我不知道如何使用Kafka作为数据源,我知道它需要一个DataSet而不是DataStream,但我似乎无法窗口我的Kafka DataStream成为一个DataSet。
有没有一种方法可以将我的流视为一系列小数据集。例如,有没有办法说流中每10个元素匹配一个模式(按元素唯一ID滑动窗口),就将它们视为固定大小的数据集,并检测这个固定大小数据集中的任何异常值?
我想创建的场景是:
数据源-
我已经有了一个工作实现,直到预处理,我希望Flink能够满足我的要求?
我猜你可以创建一个基于计数的全局窗口,并使用执行环境来获取数据集。类似于以下内容可能会起作用(getResult将返回DataSet):
stream.
keyBy(...).
window(GlobalWindows.create).
trigger(CountTrigger.of(10)).
aggregate(new MyAggregator()).
...
class MyAggregator extends AggregateFunction[..., ..., ...] {
var valueList: List[LabeledVector] = List[LabeledVector]()
override def createAccumulator(): MyAggregator = new MyAggregator()
override def add(value: .., accumulator: MyAggregator): ... = ...
override def merge(agg1: MyAggregator, agg2: MyAggregator): ... = ...
override def getResult(accumulator: MyAggregator): ... = {
ExecutionEnvironment.getExecutionEnvironment.fromCollection(valueList)
}
}
我有一个关于Ignite流媒体部分的问题。 我所理解的是,这是一种将数据导入缓存的方式,但我也看到,我们可以配置流接收器来应用一些其他的自定义逻辑。 所以我尝试创建一个包含接收器的类和一个将数据注入流的类(因此在服务器模式下有2个main和2个Ignite实例),但我“只是”将数据放入流的缓存中(接收器中没有任何自定义逻辑处理)。所以,我在问我是不是错过了什么,或者是我不太理解什么是流到点燃。 如
我正在kubernetes中运行一个Flink作业。我的设置如下 1个作业管理器吊舱 我的flink作业从kafka源获取时间序列数据(时间、值),进行聚合和其他转换,并将其发布到kafka接收器。 有时,我的作业因检查点异常(10分钟后超时)而失败,主要是由一名操作员完成的。我不理解异步持续时间(在图中)的含义,为什么它花费的时间最长。在这个异常之前,Kafka的吞吐量非常高,有500-800万
我正在构建一个有以下要求的应用程序,我刚刚开始使用Flink。 null null 谢谢并感激你的帮助。
如果每个事件间隔为1秒,并且有2秒的滞后,那么我希望示例输入和输出如下所示。 输入:1,2,3,4,5,6,7... 输出:NA,NA,1,2,3,4,5...
需要一些建议,我已经使用scala创建了一个flink作业来消费来自Kafka的消息。但是消息是用base64编码压缩的。我已经试过这个代码了 代码由于它不是有效的Json格式而失败。 然后我尝试使用SimpleStringSchema(),就像下面的代码一样 Kafka的信息完美地消耗了,但是输出如下 如何将此数据解码为有效的JSON? 此致敬意
我使用Apache Flink来预测来自Twitter的流。 代码是在Scala中实现的 我的问题是,我从DataSet API训练的SVM模型需要一个DataSet作为predict()方法的输入。 我在这里已经看到一个问题,其中一个用户说,您需要编写一个自己的MapFunction,在作业开始时读取模型(参考:Flink中使用scala的实时流预测) 但是我不能写/理解这段代码。 即使我在St