当前位置: 首页 > 知识库问答 >
问题:

Flink-如何使用TimestampAssigner从事件有效负载获取时间(不使用Kafka时间戳)

葛威
2023-03-14

我试图理解如何在Kafka源代码的水印策略中使用withTimestampAssigner()。我需要使用的“时间”在消息负载内。

为此,我有以下代码:

FlinkKafkaConsumer<Event> kafkaData =
        new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
kafkaData.assignTimestampsAndWatermarks(
        WatermarkStrategy
        .forMonotonousTimestamps()
                .withTimestampAssigner(Event, Event.time))

DataStream<Event> stream = env.addSource(kafkaData);

其中EventDeserializationSchema()是:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;
    
    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        
        return TypeInformation.of(Event.class);
    }
}

和事件:

import java.io.Serializable;

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public String time;

    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}

我想了解的是如何为withTimeStampAssigner()提供时间:

.withTimestampAssigner(???))

变量应该是Event.time但从flink页面我不太明白。

我一直在寻找

这让我有点困惑,因为我不明白在我的案例中,解决方案是非常直接的,还是我需要额外的背景。我找到的所有例子都是一样的。ForBoundedAutoforderness()或flink的早期版本,其实现与以下版本不同:

Kafka·Flink时间戳事件时间和水印

谢啦!

共有1个答案

常乐
2023-03-14

如果源(例如,Flink Kafka消费者)没有提供您要使用的时间戳,那么您需要提供TimestampAssigner。这是一个函数,它将事件和之前分配的时间戳(如果有的话)作为输入,并返回时间戳。在您的情况下,可以如下所示:

FlinkKafkaConsumer<Event> kafkaData =
        new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);

WatermarkStrategy<Event> wmStrategy = 
        WatermarkStrategy
          .<Event>forMonotonousTimestamps()
          .withTimestampAssigner((event, timestamp) -> event.getTime());

DataStream<Event> stream = env.addSource(
        kafkaData.assignTimestampsAndWatermarks(wmStrategy));

(注意:这不会很有效,因为您的getTime()方法返回一个字符串。您需要解析字符串并返回一个long,通常是一个long,表示从epoch开始的毫秒数。)

涉及时间分配供应商的案例。上下文或水印生成器供应商。上下文适用于需要访问较低级别API才能执行更多自定义操作的情况。在这种情况下没有必要这样做。

 类似资料:
  • 我想知道是否可以通过使用Flink的摄取时间模式获得记录的时间戳。考虑以下flink代码示例(https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/windowj

  • 我刚刚遇到了一个非常奇怪的问题,当使用带有时间戳和水印赋值器的EventTime时,我无法从流窗口联接中获得任何结果。 我使用Kafka作为我的数据流源,并尝试了AscendingTimestampExtractor和自定义赋值器,它们实现了Flink留档中提到的Assignerwith周期水印,正如我测试的那样,没有发出水印,也没有生成连接结果。如果我更改为使用ProcessingTime和Tu

  • 因此,我将替换为,就像在这个文档示例中一样(具有更高的maxOutOfOrderness延迟),以便处理乱序事件,但我仍然无法获得任何输出。这是为什么?

  • 我将Flink 1.11.3与SQL API和Blink planner结合使用。我在流模式下工作,使用带有文件系统连接器和CSV格式的CSV文件。对于一个时间列,我生成水印,并希望根据这个时间进行窗口聚合。就像根据事件时间快进过去一样。 是否必须为此对时间列进行排序,因为逐行使用时间列,如果不进行排序,可能会发生延迟事件,从而导致行的删除? 我对Ververica的CDC连接器也很感兴趣。也许我

  • 有没有办法在Kafka消息有效载荷中添加时间戳标头?我想检查消息是何时在消费者端创建的,并基于此应用自定义逻辑。 编辑: 我试图找到一种方法,将一些自定义值(基本上是时间戳)附加到生产者发布的消息上,这样我就可以在特定的时间段内消费消息。现在Kafka只确保消息将按照它们被放入队列的顺序传递。但是在我的例子中,先前生成的记录可能在某个延迟之后到达(因此在时间T1生成的消息可能比在稍后时间T2生成的

  • 请检查上面的代码,并告诉我是否做得正确。在事件时间和水印分配之后,我想在process函数中处理流,其中我将为不同的密钥收集10分钟的流数据。