我试图开发以下代码,但它不起作用。我想使用apache Flink来延迟时间(在时间戳字段中指定的)与当前日期不同的事件。
样品:
>
当前日期:2022-05-06 10:30
事件1[{“user1”:“1”,“user2”:“2”,“timestamp”:“2022-05-06 10:30”}--
事件2[{“user1”:“1”,“user2”:“2”,“timestamp”:“2022-05-06 13:30”}--
FLinkLikePipeline
public class FlinkLikePipeline {
public static void createBackup() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
environment.getConfig().setAutoWatermarkInterval(Duration.ofMillis(1).toMillis());
FlinkKafkaConsumer011<Like> consumer = Consumers.createMsgLike(inputTopic, kafkaAddress, consumerGroup);
consumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Matches> producer = new FlinkKafkaProducer011<Matches>(kafkaAddress, outputTopic,
new BackupSerializationSchema());
DataStream<Like> likes = environment.addSource(consumer);
DataStream<Matches> matches = likes.keyBy(like -> like.getId()).process(new MatchFunction());
matches.addSink(producer);
environment.execute("Keyed Process Function Example");
}
public static void main(String[] args) throws Exception {
createBackup();
}
}
InputMessageTimestampAssigner
public class InputMessageTimestampAssigner implements
AssignerWithPunctuatedWatermarks<Like> {
@Override
public long extractTimestamp(Like element, long previousElementTimestamp) {
Timestamp now = new Timestamp(System.currentTimeMillis());
return now.getTime();
}
@Override
public Watermark checkAndGetNextWatermark(Like lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
}
匹配函数
public class MatchFunction extends KeyedProcessFunction<String, Like, Matches> {
ValueState<Like> liked;
@Override
public void open(Configuration parameters) throws Exception {
liked = getRuntimeContext().getState(new ValueStateDescriptor<>("like", Like.class));
}
@Override
public void processElement(Like newLike, Context ctx, Collector<Matches> out) throws Exception {
Timestamp now = new Timestamp(System.currentTimeMillis());
Timestamp sendAt = newLike.getTimestamp();
if (sendAt.before(now)) {
liked.clear();
out.collect(new Matches(newLike.getUser1(), newLike.getUser2(), newLike.getTimestamp()));
} else {
// schedule the next timer by sendAt
liked.update(newLike);
ctx.timerService().registerEventTimeTimer(sendAt.getTime());
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Matches> out) throws Exception {
Like like = liked.value();
Timestamp now = new Timestamp(System.currentTimeMillis());
Timestamp sendAt = like.getTimestamp();
if (now == sendAt) {
liked.clear();
out.collect(new Matches(like.getUser1(), like.getUser2(), like.getTimestamp()));
}
}
}
制作人Topic Kafka(Json Sample):{"user1":"1","user2":"2","时间戳":"2022-05-06T11:32:00.000Z"}
谢谢你的支持,朱塞佩。
尝试以下操作:
public class MatchFunction extends KeyedProcessFunction<String, Like, Matches> {
ValueState<Like> liked;
@Override
public void open(Configuration parameters) throws Exception {
liked = getRuntimeContext().getState(new ValueStateDescriptor<>("like", Like.class));
}
@Override
public void processElement(Like newLike, Context ctx, Collector<Matches> out) throws Exception {
Timestamp now = new Timestamp(ctx.timestamp()); // CHANGE: Use context timestamp
Timestamp sendAt = newLike.getTimestamp();
if (sendAt.before(now)) {
liked.clear();
out.collect(new Matches(newLike.getUser1(), newLike.getUser2(), newLike.getTimestamp()));
} else {
// schedule the next timer by sendAt
liked.update(newLike);
ctx.timerService().registerProcessingTimeTimer(sendAt.getTime()); // CHANGED: change to processing time
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Matches> out) throws Exception {
Like like = liked.value();
Timestamp now = new Timestamp(timestamp); // CHANGE: Use current timestamp
Timestamp sendAt = like.getTimestamp();
if (now >= sendAt) { // CHANGE: Should try equals or great
liked.clear();
out.collect(new Matches(like.getUser1(), like.getUser2(), like.getTimestamp()));
}
}
和
public static void createBackup() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // CHANGED: to processingtime
environment.getConfig().setAutoWatermarkInterval(Duration.ofMillis(1).toMillis());
FlinkKafkaConsumer011<Like> consumer = Consumers.createMsgLike(inputTopic, kafkaAddress, consumerGroup);
consumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Matches> producer = new FlinkKafkaProducer011<Matches>(kafkaAddress, outputTopic,
new BackupSerializationSchema());
DataStream<Like> likes = environment.addSource(consumer);
DataStream<Matches> matches = likes.keyBy(like -> like.getId()).process(new MatchFunction());
matches.addSink(producer);
environment.execute("Keyed Process Function Example");
}
我读过几篇关于Flink的文章,在读Flink的博客时,我遇到了这样一句话:“最多延迟60秒(事件最多延迟1分钟)” 是否在Flink中定义乱序事件持续时间用于技术“水印”,如果不是,那么内部目的是什么?
我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2
我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?
事件流 三个过程: 从window对象向下到触发元素的父级元素是捕获过程; 然后触发相应事件 从当前触发事件的节点的父节点开始向上冒泡,冒泡到顶层的window对象 capture phase 捕获过程 target phase 触发过程 bubble phase 冒泡过程 冒泡与捕获 <!DOCTYPE html> <html> <head> <meta charset="utf-8
DOM 事件流描述了 DOM 时间响应的阶段、路径。 DOM 事件流也会被称为 DOM 事件模型。 1. 事件流阶段 事件流有三个阶段: 捕获阶段 从window开始,寻找触发事件最深层的节点,过程中如果有节点绑定了对应事件,则触发事件 目标阶段 找到事件触及的最深节点 冒泡阶段 从最深节点按照捕获的路径进行返回,过程中如果有节点绑定了对应事件,则触发事件 现代浏览器默认都会在冒泡阶段触发事件。
当浏览器发展到第四代时(IE4 及Netscape Communicator 4),浏览器开发团队遇到了一个很有意思的问题:页面的哪一部分会拥有某个特定的事件?要明白这个问题问的是什么,可以想象画在一张纸上的一组同心圆。如果你把手指放在圆心上,那么你的手指指向的不是一个圆,而是纸上的所有圆。两家公司的浏览器开发团队在看待浏览器事件方面还是一致的。如果你单击了某个按钮,他们都认为单击事件不仅仅发生在