我试图理解如何在Kafka源代码的水印策略中使用withTimestampAssigner()。我需要使用的“时间”在消息负载内。
为此,我有以下代码:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
kafkaData.assignTimestampsAndWatermarks(
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner(Event, Event.time))
DataStream<Event> stream = env.addSource(kafkaData);
其中EventDeserializationSchema()是:
public class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private static final CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
private static final ObjectMapper mapper = new CsvMapper();
@Override
public Event deserialize(byte[] message) throws IOException {
return mapper.readerFor(Event.class).with(schema).readValue(message);
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}
和事件:
import java.io.Serializable;
public class Event implements Serializable {
public String firstName;
public String lastName;
private int age;
public String time;
public Event() {
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
}
我想了解的是如何为withTimeStampAssigner()提供时间:
.withTimestampAssigner(???))
变量应该是Event.time但从flink页面我不太明白。
我一直在寻找
这让我有点困惑,因为我不明白在我的案例中,解决方案是非常直接的,还是我需要额外的背景。我找到的所有例子都是一样的。ForBoundedAutoforderness()或flink的早期版本,其实现与以下版本不同:
Kafka·Flink时间戳事件时间和水印
谢啦!
如果源(例如,Flink Kafka消费者
)没有提供您要使用的时间戳,那么您需要提供TimestampAssigner
。这是一个函数,它将事件和之前分配的时间戳(如果有的话)作为输入,并返回时间戳。在您的情况下,可以如下所示:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
WatermarkStrategy<Event> wmStrategy =
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTime());
DataStream<Event> stream = env.addSource(
kafkaData.assignTimestampsAndWatermarks(wmStrategy));
(注意:这不会很有效,因为您的getTime()
方法返回一个字符串。您需要解析字符串并返回一个long,通常是一个long,表示从epoch开始的毫秒数。)
涉及时间分配供应商的案例。上下文或水印生成器供应商。上下文适用于需要访问较低级别API才能执行更多自定义操作的情况。在这种情况下没有必要这样做。
我想知道是否可以通过使用Flink的摄取时间模式获得记录的时间戳。考虑以下flink代码示例(https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/windowj
我刚刚遇到了一个非常奇怪的问题,当使用带有时间戳和水印赋值器的EventTime时,我无法从流窗口联接中获得任何结果。 我使用Kafka作为我的数据流源,并尝试了AscendingTimestampExtractor和自定义赋值器,它们实现了Flink留档中提到的Assignerwith周期水印,正如我测试的那样,没有发出水印,也没有生成连接结果。如果我更改为使用ProcessingTime和Tu
因此,我将替换为,就像在这个文档示例中一样(具有更高的maxOutOfOrderness延迟),以便处理乱序事件,但我仍然无法获得任何输出。这是为什么?
我将Flink 1.11.3与SQL API和Blink planner结合使用。我在流模式下工作,使用带有文件系统连接器和CSV格式的CSV文件。对于一个时间列,我生成水印,并希望根据这个时间进行窗口聚合。就像根据事件时间快进过去一样。 是否必须为此对时间列进行排序,因为逐行使用时间列,如果不进行排序,可能会发生延迟事件,从而导致行的删除? 我对Ververica的CDC连接器也很感兴趣。也许我
有没有办法在Kafka消息有效载荷中添加时间戳标头?我想检查消息是何时在消费者端创建的,并基于此应用自定义逻辑。 编辑: 我试图找到一种方法,将一些自定义值(基本上是时间戳)附加到生产者发布的消息上,这样我就可以在特定的时间段内消费消息。现在Kafka只确保消息将按照它们被放入队列的顺序传递。但是在我的例子中,先前生成的记录可能在某个延迟之后到达(因此在时间T1生成的消息可能比在稍后时间T2生成的
请检查上面的代码,并告诉我是否做得正确。在事件时间和水印分配之后,我想在process函数中处理流,其中我将为不同的密钥收集10分钟的流数据。