下面是我的流处理的伪代码。
Datastream env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Datastream stream = env.addSource() .map(mapping to java object)
.filter(filter for specific type of events)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(2)){})
.timeWindowAll(Time.seconds(10));
//collect all records.
Datastream windowedStream = stream.apply(new AllWindowFunction(...))
Datastream processedStream = windowedStream.keyBy(...).reduce(...)
String outputPath = ""
final StreamingFileSink sink = StreamingFileSink.forRowFormat(...).build();
processedStream.addSink(sink)
上面的代码流程正在创建多个文件,我猜每个文件都有不同窗口的记录。例如,每个文件中的记录都有时间戳,范围在30-40秒之间,而窗口时间只有10秒。我预期的输出模式是将每个窗口数据写入单独的文件。对此的任何引用或输入都会有很大帮助。
看看BucketAssigner界面。它应该足够灵活,以满足您的需要。您只需要确保流事件包含足够的信息来确定要将它们写入的路径。
我有一个流(KafkaMSG流到一个主题上),有一个flinkKafka消费者,我注意到一个有趣的行为,我想解决这个问题。 当数据正在流入时,如果它在窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。 流程示例: 我正在使用的Flink Kafka消费者010与env时间特性设置为事件时间。和consumer.assign时间戳和水印(新周期
我正在使用一个Flink流式Java应用程序,输入源为Kafka。在我的应用程序中总共使用了4个流。一个是主数据流,另一个3个用于广播流。 我加入了使用任何一种类型的三个广播流。我已经作为流B广播,并且能够在广播过程函数上下文状态(即在processBroadcastElement())中接收。 我的问题是, > 是否可以在广播状态下存储大数据? 注意:根据我的理解,Flink广播状态在运行时保存
在我看来,Flink以三种方式处理后期事件: 窗口过期时删除延迟事件(默认)。 通过使用“允许延迟”机制包含延迟事件来更新窗口。 使用“侧输出”机制将延迟事件重定向到另一个DataStream。 让我们假设我有一个事件时间作业,它使用来自Kafka的数据,并每5分钟处理一个窗口。现在,假设我将延迟事件重定向到另一个数据流中。 这个新的数据流是独立的吗 谢谢大家!
我没有找到任何文档允许将错误处理应用于此步骤,也没有找到将其重写为DOFN的方法。对此应用错误处理有什么建议吗?谢谢
null
Flink是否支持数据集中的侧输出功能(批处理Api)?如果没有,从文件加载时如何处理有效和无效记录?