当前位置: 首页 > 知识库问答 >
问题:

处理flink数据流的输出数据

狄凯
2023-03-14

下面是我的流处理的伪代码。

Datastream env = StreamExecutionEnvironment.getExecutionEnvironment()    
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Datastream stream = env.addSource() .map(mapping to java object) 
    .filter(filter for specific type of events) 
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(2)){})
    .timeWindowAll(Time.seconds(10));

//collect all records.
Datastream windowedStream = stream.apply(new AllWindowFunction(...))

Datastream processedStream = windowedStream.keyBy(...).reduce(...)

String outputPath = ""

final StreamingFileSink sink = StreamingFileSink.forRowFormat(...).build();

processedStream.addSink(sink)

上面的代码流程正在创建多个文件,我猜每个文件都有不同窗口的记录。例如,每个文件中的记录都有时间戳,范围在30-40秒之间,而窗口时间只有10秒。我预期的输出模式是将每个窗口数据写入单独的文件。对此的任何引用或输入都会有很大帮助。

共有1个答案

汤乐家
2023-03-14

看看BucketAssigner界面。它应该足够灵活,以满足您的需要。您只需要确保流事件包含足够的信息来确定要将它们写入的路径。

 类似资料:
  • 我有一个流(KafkaMSG流到一个主题上),有一个flinkKafka消费者,我注意到一个有趣的行为,我想解决这个问题。 当数据正在流入时,如果它在窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。 流程示例: 我正在使用的Flink Kafka消费者010与env时间特性设置为事件时间。和consumer.assign时间戳和水印(新周期

  • 我正在使用一个Flink流式Java应用程序,输入源为Kafka。在我的应用程序中总共使用了4个流。一个是主数据流,另一个3个用于广播流。 我加入了使用任何一种类型的三个广播流。我已经作为流B广播,并且能够在广播过程函数上下文状态(即在processBroadcastElement())中接收。 我的问题是, > 是否可以在广播状态下存储大数据? 注意:根据我的理解,Flink广播状态在运行时保存

  • 在我看来,Flink以三种方式处理后期事件: 窗口过期时删除延迟事件(默认)。 通过使用“允许延迟”机制包含延迟事件来更新窗口。 使用“侧输出”机制将延迟事件重定向到另一个DataStream。 让我们假设我有一个事件时间作业,它使用来自Kafka的数据,并每5分钟处理一个窗口。现在,假设我将延迟事件重定向到另一个数据流中。 这个新的数据流是独立的吗 谢谢大家!

  • 我没有找到任何文档允许将错误处理应用于此步骤,也没有找到将其重写为DOFN的方法。对此应用错误处理有什么建议吗?谢谢

  • Flink是否支持数据集中的侧输出功能(批处理Api)?如果没有,从文件加载时如何处理有效和无效记录?