假设存在一个有限的 DataStream(例如,来自数据库源)和事件
a1、a2、...,一个
。
如何将另一个事件< code>b追加到此流以获取
a1, a2, ..., an, b
(即在所有原始事件之后输出添加的事件,保持原始顺序)?
我知道所有有限流在所有事件之后都会发出< code>MAX_WATERMARK。那么,有没有办法“抓住”这个水印,输出它之后的附加事件呢?
(不幸的是,<代码>。union()将原始数据流与由单个事件组成的另一个数据流(时间戳设置为< code>Long。MaxValue),然后使用该答案对联合流进行排序不起作用。)
另一种方法可能是将原始数据源“包装”到另一个数据源中,当委托对象的run()
方法返回时,该方法会发出最后一个元素。当然,您需要小心调用所有委托方法。
也许我错过了一些东西,但似乎你可以简单地有一个ProcessFunction,将事件时间计时器设置为遥远未来的某个地方,这样它只有在MAX_WATERMARK到达时才会触发。然后在 onTimer 方法中,如果当前水印MAX_WATERMARK,则发出该特殊事件。
问题内容: 如我所见,java mongo驱动程序不提供从现有gridFS文件获取的功能 我必须直接创建或使用方法。 是否缺少Java驱动程序或gridfs的限制? 除了创建新文件/删除旧文件之外,您能否建议其他解决方法? 谢谢 问题答案: GridFS不是MongoDB的核心功能,而是存储带有随附元数据的二进制数据的约定。您应该能够以通常的方式修改集合中的任何文档,同时保持相应文档的完整性。主要
我有一个流(KafkaMSG流到一个主题上),有一个flinkKafka消费者,我注意到一个有趣的行为,我想解决这个问题。 当数据正在流入时,如果它在窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。 流程示例: 我正在使用的Flink Kafka消费者010与env时间特性设置为事件时间。和consumer.assign时间戳和水印(新周期
尝试使用Apache Flink从Cassandra获取数据,引用本文,我可以读取数据,但我不知道如何将其加载到DataStream对象中。代码如下: 我试过了 将变量中的数据加载到数据流中
问题内容: 我有两个长度未知的数组,我只想将一个附加到另一个的末尾,即: 我曾尝试使用,但似乎无法使其正常工作。 问题答案: 使用,应类似于以下内容:
我正在使用下面的代码。然而,当我运行代码时,我想在同一个TXT文件中一个接一个地添加新的XML。这是否可以使用JDOM。请帮帮我。。 xmlOutput。输出(文档,新FileWriter(“c:\updated.txt”));是否需要修改?
不幸的是,Kafka Flink连接器只支持-csv、json和avro格式。因此,我不得不使用较低级别的API(数据流)。 问题:如果我可以从datastream对象中创建一个表,那么我就可以接受在该表上运行的查询。它将使转换部分无缝和通用。是否可以在数据流对象上运行SQL查询?