我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。
input :
a,1513695853 (generated at 13th second, received at 13th second)
a,1513695853 (generated at 13th second, received at 13th second)
a,1513695856 (generated at 16th second, received at 19th second)
a,1513695859 (generated at 13th second, received at 19th second)
if i am using Processing Time window :
Output :
(a,1)
(a,3)
(a,2)
package org.apache.flink.window.training;
import java.io.InputStream;
import java.util.Properties;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SocketStream {
private static Properties properties = new Properties();
public static void main(String args[]) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
InputStream inputStream =
SocketStream.class.getClassLoader().getResourceAsStream("local-kafka-server.properties");
properties.load(inputStream);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<String> consumer =
new FlinkKafkaConsumer010<>("test-topic", new SimpleStringSchema(), properties);
DataStream<Element> socketStockStream =
env.addSource(consumer).map(new MapFunction<String, Element>() {
@Override
public Element map(String value) throws Exception {
String split[] = value.split(",");
Element element = new Element(split[0], Long.parseLong(split[1]));
return element;
}
}).assignTimestampsAndWatermarks(new TimestampExtractor());
socketStockStream.map(new MapFunction<Element, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Element value) throws Exception {
return new Tuple2<String, Integer>(value.getId(), 1);
}
}).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1).
print();
env.execute();
}
public static class TimestampExtractor implements AssignerWithPunctuatedWatermarks<Element> {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(Element element, long previousElementTimestamp) {
return element.getTimestamp();
}
@Override
public Watermark checkAndGetNextWatermark(Element lastElement, long extractedTimestamp) {
// TODO Auto-generated method stub
return null;
}
}
}
事件时处理需要正确生成时间戳和水印。
代码中的timestampextractor
不会生成水印,但始终返回null
。
我是Flink的新手,需要方法的帮助。我有时间颗粒度为5分钟的事件流。我想通过调用rest API来获取事件的元数据,其中包含过去1小时数据点的历史事件,即过去12点(5分钟时间颗粒度)。 e、 g事件的时间戳为10:00、10:05、10:10、10:15等,因此如果我想获取时间戳为11:00的事件元数据,我将调用send发送所有时间戳为10:00、10:05、10:10、10:15的事件。。1
我需要根据一个键连接两个事件源。事件之间的间隔最长可达1年(即具有id1的event1可能在今天到达,而来自第二个事件源的具有id1的相应event2可能在一年后到达)。假设我只想输出连接的事件输出。 我正在探索在RocksDB后端使用Flink的选项(我遇到了表API,它们似乎适合我的用例)。我找不到做这种长窗口连接的引用体系结构。我希望系统一天能处理大约2亿个事件。 关于处理这种长窗口连接的任
我正在阅读《Stream Processing with Apache Flink》一书,书中说:“从版本0.10.0开始,Kafka支持消息时间戳。当从Kafka版本0.10或更高版本读取时,如果应用程序以事件时间模式运行,使用者将自动提取消息时间戳作为事件时间戳*“因此在函数中,调用将默认返回Kafka消息时间戳?请提供一个简单的示例,说明如何实现AssignerWithPeriodicalW
我有一个流系统,在这里我可以获得点击流数据。 数据格式: 我怎样才能做到这一点呢?基本上,我必须维护窗口中所有事件的状态,然后,一旦我获得事件,我必须从该状态获取价格。我并不要求任何工作解决方案,只是要求如何维护窗口中所有事件的状态。我也有一些自定义的Reduce操作。 在:我将2个事件数据加入到列表中。
作业并行性(4,8,16):[自动生成源]-->[Map1]-->[滚动窗口(10s)]-->[Map2]-->[接收器] Flink窗口性能eps 4p、8p、16p 作业以上的性能最高达到了每秒50k+-左右,不管我如何将集群缩放成4-16的并行度。 闪烁性能无窗口4p、8p 我已经删除了窗口的逻辑,以消除瓶颈性能的应用程序逻辑,但似乎窗口仍然导致我的整个流性能下降,即使该窗口只是一个通过函数
输入: 结果: