当前位置: 首页 > 知识库问答 >
问题:

Apache Flink 1.0.0。与事件时间相关的迁移问题

秦安怡
2023-03-14

我尝试将一些简单的任务迁移到Flink 1.0.0版本,但失败了,出现以下异常:

java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

代码由两个通过Kafka主题连接的独立任务组成,其中一个任务是简单消息生成器,另一个任务是简单消息消费者,它使用timeWindowAll计算每分钟消息的到达率。

同样,类似的代码在0.10.2版本中工作时没有任何问题,但现在看起来系统错误地解释了一些事件时间戳,如Long。导致任务失败的MIN\u值。

问题是,我是做错了什么,还是有一些bug将在将来的版本中修复?

主要任务:

public class Test1_0_0 {
    // Max Time lag between events time to current System time
    static final Time maxTimeLag = Time.of(3, TimeUnit.SECONDS);

    public static void main(String[] args) throws Exception {
        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        // Setting Event Time usage
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setBufferTimeout(1);
        // Properties initialization
        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");

        // Overwrites the default properties by one provided by command line
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        for(Map.Entry<String, String> e: parameterTool.toMap().entrySet()) {
            properties.setProperty(e.getKey(),e.getValue());
        }
        //properties.setProperty("auto.offset.reset", "smallest");
        System.out.println("Properties: " + properties);
        DataStream<Message> stream = env
        .addSource(new FlinkKafkaConsumer09<Message>("test", new MessageSDSchema(), properties)).filter(message -> message != null);
        // The call to rebalance() causes data to be re-partitioned so that all machines receive messages
        // (for example, when the number of Kafka partitions is fewer than the number of Flink parallel instances).
        stream.rebalance()
        .assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
        // Counts messages
        stream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
                Integer count = 0;
                if (values.iterator().hasNext()) {
                    for (Message value : values) {
                        count++;
                    }
                    collector.collect("Arrived last minute: " + count);
                }
            }
        }).print();
        // execute program
        env.execute("Messages Counting");
    }
 }

时间戳提取器:

public class MessageTimestampExtractor implements AssignerWithPeriodicWatermarks<Message>, Serializable {

    private static final long serialVersionUID = 7526472295622776147L;
    // Maximum lag between the current processing time and the timestamp of an event
    long maxTimeLag = 0L;
    long currentWatermarkTimestamp = 0L;

    public MessageTimestampExtractor() {
    }

    public MessageTimestampExtractor(Time maxTimeLag) {
        this.maxTimeLag = maxTimeLag.toMilliseconds();
    }


    /**
     * Assigns a timestamp to an element, in milliseconds since the Epoch.
     *
     * <p>The method is passed the previously assigned timestamp of the element.
     * That previous timestamp may have been assigned from a previous assigner,
     * by ingestion time. If the element did not carry a timestamp before, this value is
     * {@code Long.MIN_VALUE}.
     *
     * @param message The element that the timestamp is wil be assigned to.
     * @param previousElementTimestamp The previous internal timestamp of the element,
     *                                 or a negative value, if no timestamp has been assigned, yet.
     * @return The new timestamp.
     */
    @Override
    public long extractTimestamp(Message message, long previousElementTimestamp) {
        long timestamp = message.getTimestamp();
        currentWatermarkTimestamp = Math.max(timestamp, currentWatermarkTimestamp);
        return timestamp;
    }


    /**
     * Returns the current watermark. This method is periodically called by the
     * system to retrieve the current watermark. The method may return null to
     * indicate that no new Watermark is available.
     *
     * <p>The returned watermark will be emitted only if it is non-null and larger
     * than the previously emitted watermark. If the current watermark is still
     * identical to the previous one, no progress in event time has happened since
     * the previous call to this method.
     *
     * <p>If this method returns a value that is smaller than the previously returned watermark,
     * then the implementation does not properly handle the event stream timestamps.
     * In that case, the returned watermark will not be emitted (to preserve the contract of
     * ascending watermarks), and the violation will be logged and registered in the metrics.
     *
     * <p>The interval in which this method is called and Watermarks are generated
     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
     *
     * @see org.apache.flink.streaming.api.watermark.Watermark
     * @see ExecutionConfig#getAutoWatermarkInterval()
     */
    @Override
    public Watermark getCurrentWatermark() {
        if(currentWatermarkTimestamp <= 0) {
            return new Watermark(Long.MIN_VALUE);
        }
        return new Watermark(currentWatermarkTimestamp - maxTimeLag);
    }

    public long getMaxTimeLag() {
        return maxTimeLag;
    }

    public void setMaxTimeLag(long maxTimeLag) {
        this.maxTimeLag = maxTimeLag;
    }
}

共有1个答案

淳于升
2023-03-14

问题是调用assignTimestampsAndWatermarks会返回一个新的数据流,该数据流使用时间戳提取器。因此,您必须使用返回的数据流对其执行后续操作。

java prettyprint-override">DataStream<Message> timestampStream = stream.rebalance()
        .assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
// Counts Strings
timestampStream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
    @Override
    public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
        Integer count = 0;
        if (values.iterator().hasNext()) {
            for (Message value : values) {
                count++;
            }
            collector.collect("Arrived last minute: " + count);
        }
    }
}).print();
 类似资料:
  • 这是通过使用包含以下内容的SQL转储移植旧项目来学习Laravel(特别是v5.x)的一种尝试: 在垃圾堆的底部有一个与此特定表格相关的: 来自一对stackoverflow问题1、2、3;我修改了原始代码,将Schema::table的外键从Schema::create中拆分出来,并将unsigned()添加到每个字段: 但在使用新创建的空DB进行迁移时,仍然会出现错误: 根据MySQL,这意味

  • 我们目前正在将Java应用程序从Oracle JDK 8(由JNLP提供的应用程序代码)迁移到OpenJDK 11(作为可运行的应用程序代码以及Java运行时提供)。尽管我们在测试环境中或多或少找到了一个工作解决方案,但我们仍然存在以下问题: > 我们的应用程序需要JavaFX,我们希望使用jlink进行构建。是否建议使用https://gluonhq.com/products/javafx上提供

  • 我的Android项目中的数据类模型有一个变量lets,称之为value。 数据类Person(名称:String=“”,值:Int=0){} 我希望Int类型的变量值随时间减少。假设它每天减少1。所以如果今天是10,明天是9,后天是8等等,即使我的应用程序“死了”/没有打开。我该如何去实现这一点呢?

  • 前言 时间戳是很多应用系统,特别是加密货币开发设计中非常重要的元素。各种语言都提供了相应的时间处理函数,以前直接拿来就用了,也没有发现什么问题。但是在时间处理上,开发语言核心模块提供的个别Api并没有完全延续人类习惯。在Javascript语言里,有一个Date类的函数就非常奇葩,网络上很多文档的举例都是错误的,因此需要简单总结一下。 这似乎不是什么大问题,但是从stackoverflow.com

  • 我们正在从传统的单一应用程序迁移到微服务体系结构。我们使用CQRS和事件源模式以及消息代理(rabbitmq)作为通信机制。现在我们面临着一个挑战,即如何将旧数据库转换为新的体系结构,以及如何使用事件源进行这些转换?假设旧数据库没有事件,我们可以在不创建事件的情况下进行数据转换吗?在事件源模式中,旧数据库数据的起点是什么?

  • 我不确定似乎是什么问题,但这种膝盖迁移失败了。尽管我是编写迁移的新手,但我坚信此迁移文件是正确的。生成的错误如下 代码如下。最初,这些迁移函数在单独的文件中,我认为它失败了,因为文件没有同步执行,这导致我编写了一个文件。我不确定这是否有帮助,但是当我删除包含外键引用(UserRoles、RolePer的、令牌)的表的代码时,其余的似乎都在工作。