当前位置: 首页 > 知识库问答 >
问题:

未触发flink翻滚窗口(无水印策略)

李甫
2023-03-14

问题陈述:来自kafka源的流式事件。这些事件有效载荷为字符串格式。将它们解析为文档,并根据事件时间每隔5秒将其批量插入DB。

map()函数正在执行。但程序控制不会进入Application()。因此不会发生批量插入。我尝试了键控和非键控窗口。它们都不工作。没有抛出错误。

flink版本:1.15.0

下面是我的主要方法的代码。我应该如何解决这个问题?

  public static void main(String[] args) throws Exception {

    final Logger logger = LoggerFactory.getLogger(Main.class);
    final StreamExecutionEnvironment streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


    KafkaConfig kafkaConfig = Utils.getAppConfig().getKafkaConfig();

    logger.info("main() Loading kafka config :: {}", kafkaConfig);

    KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
        .setBootstrapServers(kafkaConfig.getBootstrapServers())
        .setTopics(kafkaConfig.getTopics())
        .setGroupId(kafkaConfig.getConsumerGroupId())
        .setStartingOffsets(OffsetsInitializer.latest())
        .setValueOnlyDeserializer(new SimpleStringSchema()).build();

    logger.info("main() Configured kafka source :: {}", kafkaSource);

    DataStreamSource<String> dataStream = streamExecutionEnv.fromSource(kafkaSource,
        WatermarkStrategy.noWatermarks(), "mySource");

    logger.info("main() Configured kafka dataStream :: {}", dataStream);

    DataStream<Document> dataStream1 = dataStream.map(new DocumentMapperFunction());

    DataStream<InsertOneModel<Document>> dataStream2 = dataStream1.map(new InsertOneModelMapperFunction());

    DataStream<Object> dataStream3 = dataStream2
        .windowAll(TumblingEventTimeWindows.of(Time.seconds(5), Time.seconds(0)))
            /*.keyBy(insertOneModel -> insertOneModel.getDocument().get("ackSubSystem"))
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))*/
        .apply(new BulkInsertToDB2())
            .setParallelism(1);

    logger.info("main() before streamExecutionEnv execution");

    dataStream3.print();

    streamExecutionEnv.execute();
  }

共有1个答案

程磊
2023-03-14

事件时间窗口需要水印策略。如果没有,窗口将永远不会触发。

 类似资料:
  • 我正在使用翻滚窗口(5分钟)和,因为我的源代码来自Kafka。但是窗口总是运行超过5分钟。有人能建议吗?

  • 我有一个使用flink应用程序的场景,该应用程序接收以下格式的数据流: {“event\u id”:“c1s2s34”,“event\u create\u timestamp”:“2019-03-07 11:11:23”,“amount”:“104.67”} 我使用下面的滚动窗口来查找过去60秒内输入流的总和、计数和平均值。 键值。时间窗口(时间秒(60)) 然而,我如何标记聚合结果,以便我可以说

  • 我们正在构建一个流处理管道,以使用Flink v1.11和事件时间特性来处理Kinesis消息。在定义源水印策略时,在官方留档中,我遇到了两个开箱即用的水印策略;forBoundedOutOfOrthy和forMonotonousTimestamps。但根据我对上述内容的理解,我认为这些不适合我的用法。以下是我的用法细节: 来自输入流的数据:(包含每分钟带有时间戳的数据) 现在,我想处理11:00

  • 我正在尝试在我的Flink作业中使用事件时间,并使用来提取时间戳并生成水印。但是我有一些输入Kafka具有稀疏流,它可以长时间没有数据,这使得中的根本没有调用。我可以看到数据进入函数。 我已经设置了getEnv()。getConfig()。设置自动水印间隔(1000L) 我尝试过 还有会话窗口 所有的水印都显示没有水印,我怎么能让Flink忽略这个没有水印的东西呢?

  • 我们正在构建一个流处理管道来处理/摄取Kafka消息。我们正在使用Flink v1.12.2。在定义源水印策略时,在官方留档中,我遇到了两种开箱即用的水印策略;forBoundedOutOfOrness和forMonotonousTimestamps。我确实浏览了javadoc,但并不完全理解何时以及为什么你应该使用一种策略而不是另一种策略。时间戳基于事件时间。谢谢。

  • 我正在尝试加入apache flink中的两个流以获得一些结果。 我的项目的当前状态是,我正在获取twitter数据并将其映射到一个2元组中,其中保存用户的语言和定义时间窗口中的推文总和。我这样做是为了每种语言的推文数量和每种语言的转发。推文/转发聚合在其他进程中运行良好。 我现在想得到一个时间窗口内转发次数占所有推文次数的百分比。 因此我使用以下代码: 当我打印或时,输出似乎很好。我的问题是我从