当前位置: 首页 > 知识库问答 >
问题:

Flink流媒体程序可以在流转时长下正确运行,但不会在事件时间内产生结果

司空赞
2023-03-14

更新添加了env。getConfig()。设置自动水印间隔(1000L)

没有解决问题。

我想问题在于我代码的另一部分。所以首先要多了解一些背景知识。

该程序从一个单一的kafka队列中使用混合消息类型的JSON流。该程序最初转换成类型为ObjectNode的流。然后,该流被使用分割()分成大约10个单独的流。这些流被映射到POJO的流。

然后,这些POJO流在被添加到窗口(每个POJO类型的流有一个窗口)之前被分配时间戳,然后在被发送回另一个Kafka队列之前,在自定义函数中进行键控、汇总和平均。

扩展代码示例

public class flinkkafka {

public static void main(String[] args) throws Exception {
    //create object mapper to allow object to JSON transform
    final ObjectMapper mapper = new ObjectMapper();
    final String OUTPUT_QUEUE = "test";
    //setup streaming environment
    StreamExecutionEnvironment env =    
         StreamExecutionEnvironment
              .getExecutionEnvironment();

    //set streaming environment variables from command line
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    //set time characteristic to EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //set watermark polling interval
    env.getConfig().setAutoWatermarkInterval(1000L);

    //Enable checkpoints to allow for graceful recovery
    env.enableCheckpointing(1000);

    //set parallelism
    env.setParallelism(1);

    //create an initial data stream of mixed messages
    DataStream<ObjectNode> messageStream = env.addSource
            (new FlinkKafkaConsumer09<>(
                    parameterTool.getRequired("topic"), 
                    new JSONDeserializationSchema(),
                    parameterTool.getProperties())) 
                      .assignTimestampsAndWatermarks(new
                      BoundedOutOfOrdernessTimestampExtractor<ObjectNode>
                      (Time.seconds(10)){
                        private static final long serialVersionUID = 1L;

                        @Override
                        public long extractTimestamp(ObjectNode value) {
                            DateFormat format = new SimpleDateFormat("yyyy-
                             MM-dd HH:mm:ss", Locale.ENGLISH);
                            long tmp = 0L;
                            try {
                                tmp = 
                               format.parse(value.get("EventReceivedTime")
                                    .asText()).getTime();
                            } catch (ParseException e) {
                                e.printStackTrace();
                            }
                            System.out.println("Assigning timestamp " + 
                               tmp);
                            return tmp;
                        }

                    });

    //split stream by message type
    SplitStream<ObjectNode> split = messageStream.split(new  
               OutputSelector<ObjectNode>(){
        private static final long serialVersionUID = 1L;

        @Override
        public Iterable<String> select(ObjectNode value){
            List<String> output = new ArrayList<String>();
            switch (value.get("name").asText()){
            case "one":
                switch (value.get("info").asText()){
                case "two":
                    output.add("info");
                    System.out.println("Sending message to two
                          stream");
                    break;
                case "three":
                    output.add("three");
                    System.out.println("Sending message to three stream");
                    break;
                case "four":
                    output.add("four");
                    System.out.println("Sending message to four stream");
                    break;
                case "five":
                    output.add("five");
                    System.out.println("Sending message to five stream");
                    break;
                case "six":
                    output.add("six");
                    System.out.println("Sending message to six stream");
                    break;
                default:
                    break;
                }
                break;
            case "seven":
                output.add("seven");
                System.out.println("Sending message to seven stream");
                break;
            case "eight":
                output.add("eight");
                System.out.println("Sending message to eight stream");
                break;
            case "nine":
                output.add("nine");
                System.out.println("Sending message to nine stream");
                break;
            case "ten":
                switch (value.get("info").asText()){
                case "eleven":
                    output.add("eleven");
                    System.out.println("Sending message to eleven stream");
                    break;
                case "twelve":
                    output.add("twelve");
                    System.out.println("Sending message to twelve stream");
                    break;
                default:
                    break;
                }
                break;
            default:
                output.add("failed");
                break;
            }
            return output;
        }
    });

    //assign splits to new data streams
    DataStream<ObjectNode> two = split.select("two");
    //assigning more splits to streams

    //convert ObjectNodes to POJO 

    DataStream<Two> twoStream = two.map(new MapFunction<ObjectNode, Two>(){
        private static final long serialVersionUID = 1L;

        @Override
        public Twomap(ObjectNode value) throws Exception {
            Two stream = new Two();
            stream.Time = value.get("Time").asText();
            stream.value = value.get("value").asLong();
            return front;
        }
    });

    DataStream<String> keyedTwo = twoStream
            .keyBy("name")
            .timeWindow(Time.minutes(5))
            .apply(new twoSum())
            .map(new MapFunction<Two, String>(){
                private static final long serialVersionUID = 1L;
                @Override
                public String map(Two value) throws Exception {
                    return mapper.writeValueAsString(value);
                }
            });
    keyedTwo.addSink(new FlinkKafkaProducer09<String>
         (parameterTool.getRequired("bootstrap.servers"),
                 OUTPUT_QUEUE, new SimpleStringSchema()));

    env.execute();

我试图使用Flink聚合一个Kafka队列,并将数据流推回到Kafka。聚合将使用5分钟的事件时间窗口,程序编译并运行,但收集的数据从未离开窗口传递给聚合函数,因此从未向Kafka传递消息。然而,如果我注释掉eventTime特性,程序就会运行并产生结果。我不知道哪里出了问题。

事件时间代码

StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(args);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.enableCheckpointing(1000);

DataStream<FrontEnd> frontEndStream = frontEnd.map(new
    MapFunction<ObjectNode, FrontEnd>(){

        private static final long serialVersionUID = 1L;

        @Override
        public FrontEnd map(ObjectNode value) throws Exception {
        FrontEnd front = new FrontEnd();
        front.eventTime = value.get("EventReceivedTime").asText();
        return front;
        }
    }).assignTimestampsAndWatermarks(new
        BoundedOutOfOrdernessTimestampExtractor<FrontEnd>(Time.seconds(10)){
            private static final long serialVersionUID = 1L;
            @Override
            public long extractTimestamp(FrontEnd value) {
                DateFormat format = new SimpleDateFormat("yyyy-MM-
                    ddHH:mm:ss",Locale.ENGLISH);
                long tmp = 0L;
                try {
                tmp = format.parse(value.eventTime).getTime();
            } catch (ParseException e) {
                e.printStackTrace();
            }
            return tmp;
        }

    });

    DataStream<String> keyedFrontEnd = frontEndStream
        .keyBy("name")
        .timeWindow(Time.minutes(5))
        .apply(new FrontEndSum())
        .map(new MapFunction<FrontEnd, String>(){
                private static final long serialVersionUID = 1L;
                @Override
                public String map(FrontEnd value) throws Exception {
                    return mapper.writeValueAsString(value);
                }
            });
   .map(new MapFunction<FrontEnd, String>(){
                private static final long serialVersionUID = 1L;
                @Override
                public String map(FrontEnd value) throws Exception {
                    return mapper.writeValueAsString(value);
                }
            });
    keyedFrontEnd.addSink(new FlinkKafkaProducer09<String>
    (parameterTool.getRequired("bootstrap.servers"), OUTPUT_QUEUE, new 
    SimpleStringSchema()));  

    env.execute();
    }
}

我尝试过将时间戳提取器连接到传入流,并将一个时间戳提取器连接到每个POJO流。同样,这段代码在事件时间内运行,并生成具有预期聚合的JSON字符串流的预期结果。但是,一旦启用事件时间,windows就不会产生结果


共有2个答案

沈嘉瑞
2023-03-14

我的第一个倾向总是假设一个时区问题。

您的kafka负载中的EventReceivedTime字段的时区是多少?

SimpleDataFormat将在本地JVM时区进行解析:

DateFormat format = new SimpleDateFormat("yyyy-MM-ddHH:mm:ss",Locale.ENGLISH);

你可以加上

format.setTimeZone(TimeZone.getTimeZone("GMT"));

例如,如果文本所代表的是GMT,则将字符串解析为GMT。您应该确保所有日期、水印等的时区/偏移量匹配,并以UTC/大纪元时间进行比较(这是从中提取长时间后得到的结果)。

商嘉木
2023-03-14

BoundedAutoFordernessTimestampExtractor实现了赋值器与PeriodicWatermarks接口,这意味着Flink会定期查询当前水印。

您必须通过ExecutionConfig配置轮询间隔:

env.getConfig.setAutoWatermarkInterval(1000L); // poll watermark every second
 类似资料:
  • 首先,我是流处理框架的新手。我想对其中一些进行基准测试,所以我从Flink开始。 对于我的用例,我需要将窗口t中的事件与窗口t-1中的事件进行比较,两者的大小都是15分钟,然后进行一些聚合。 以下是我的用例的简化版本: 我们将分析的事件视为形式的元组。在窗口1中,我们有:(A,1),(B,2),(C,3),在窗口2中,我们有:(D,6)和(B,7)。然后,我需要将当前窗口中的事件与前一个窗口中的事

  • 我正在写一个screen scraper,它从帖子中获取一个URL列表,然后访问这些URL并获取页面上所有链接的列表。然后,它访问所有链接(原始链接和从片段中),并获得一个图像列表。当我内联运行作业时,一切都很好(除了需要30秒才能完成,这是一个问题,因为响应API调用需要很长时间)。出于某种原因,当我使用相同的代码并使用后台工作程序运行它时,有两个URL永远不会更新为已完成。它始终是相同的2个U

  • Spark-submit--class MyClass-master yar--deploy-mode cluster--executor-memory 1g--executor-cores 2 hdfs://url:port/my.jar 这个应用程序,接收来自kinesis流的传入数据,并基于它执行一个请求(回发)到一个我可以跟踪的url。我已经在本地测试了我的应用程序,运行它设置SparkC

  • 这个问题涵盖了如何使用FlinkSQL对乱序流进行排序,但我更愿意使用DataStream API。一种解决方案是使用ProcessFunction来执行此操作,该ProcessFunction使用PriorityQueue来缓冲事件,直到水印指示它们不再乱序,但这在RocksDB状态后端中表现不佳(问题是每次访问PriorityQueue都需要整个PriorityQueue的ser/de)。无论

  • 目前我正在使用Streaming API(https://stream.twitter.com/1/statuses/filter.json)。 连接成功后,我会将服务器上的所有推文记录到数据库中。只有在运行了几个小时或几天没有问题后,问题才会出现,然后无法检索更多推文。如果我重新启动客户端,处理会恢复正常,一切正常,直到下一次挂起。