DataStream<Tuple2<String,JSONObject>> MetaAlert = events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new generateMetaAlert());
public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> {
@Override
public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2,
Collector<Tuple2<String, JSONObject>> arg3) throws Exception {
}
应用keyby
函数时(不使用匿名类),自定义windowfunction
(第三个字段)中的键类型应为tuple
,因为编译器无法确定键的类型。这段代码编译时没有错误(考虑到我试图用伪代码填充空白处):
public class Test {
public Test() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> events = env.readTextFile("datastream.log");
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new GenerateMetaAlert());
}
public class JSONObject {
}
public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
@Override
public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
}
但最直接的方法是使用匿名类,这样您就可以保留string
类型:
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
// Your code here
}
});
最后,如果希望保留类,但又希望保持you key的类型不变,则可以实现KeySelector
:
public class Test {
public Test() {
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
@Override
public String getKey(Tuple2<String, JSONObject> json) throws Exception {
return json.f0;
}
})
.timeWindow(Time.seconds(5))
.apply(new GenerateMetaAlert());
}
public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
}
我正在尝试使用主题列表中的单个kafka使用者组合两个kafka主题,进一步将流中的json字符串转换为POJO。然后,通过keyBy(On事件时间字段)将它们加入,并将它们合并为单个胖json,我计划使用窗口流并在窗口流上应用窗口函数。假设主题A 我有几个问题。 这种方法适合合并主题并创建单个JSON吗 所有窗口流上的窗口函数似乎工作不正常;任何指点都将不胜感激 代码片段: 我得到了- AllW
假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我
我有一个数据流是键控的,需要计算不同时间段(1分钟,5分钟,1天,1周)的翻滚计数。 有可能在一个应用程序中计算所有四个窗口计数吗?
我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢
我正在尝试flink的一些网络监控工作。我的目标是计算每个的不同。 我下面的代码工作,但性能真的很糟糕。似乎每个滑动窗口都重新计算所有事件,但这不应该是必要的。 例如,我们有时间秒1-600的事件。Flink可以得到每秒的累加器,所以我们每秒有600个累加器。当第一个滑动窗口过期时,flink只合并1-300的累加器,并销毁第二个1的累加器。此窗口还可以在最后一秒前预合并1-299。当第二个滑动窗
的结果是一个元素流-因此,我希望从这个流中获得一个“具有最高计数的key”的更新流。 然后我通过一个常量(-因为这是一个全局操作)进行键控,并使用-这几乎可以实现:我得到一个最高计数流,但当前的最高计数是针对每个元素发出的。 我想我要找的是某种带有前一个值的过滤器,它只会在新值与前一个值不同时才会发出元素。 目前在Flink有可能吗?