理想情况下,与特定记录相关的事件需要以FIFO顺序处理(尽管也会有一个时间戳来帮助检测无序事件),但与不同记录相关的事件可以并行处理。我计划使用keyby()
构造按记录对流进行分区。
需要进行的处理取决于数据库中关于记录的当前信息。但是,我找不到一个示例或推荐的方法来查询数据库中的记录,以丰富事件,它正在被处理,我需要处理它的附加信息。
我想到的管道如下:
->接收到的id上的keyBy()->从对应于id的数据库中检索记录->对记录执行处理步骤->将处理过的事件推送到外部队列并更新数据库记录
您可以通过为RichFlatMap
函数扩展rich函数来执行数据库查找,在其Open()
方法中初始化数据库连接一次,然后在FlatMap()
方法中处理每个事件:
public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> {
// Declare DB coonection and query statements
@Override
public void open(Configuration parameters) throws Exception {
// Initialize Database connection
// Prepare Query statements
}
@Override
public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception {
// look up the Database, update record, enrich event
out.collect(enrichedEvent);
}
})
然后可以使用databaseMapper
,如下所示:
stream.keyby(id)
.flatmap(new DatabaseMapper())
.addSink(..);
您可以在这里找到一个使用来自Redis的缓存数据的示例。
这是我正在接收的数据,如果阵列中不存在id,则希望添加新记录,如果阵列中没有id,则还希望更新记录 我的模型:课堂课程 课堂主题 类用户 这是通过PostMan从Rails API接收的数组[{"title":"Topic Name","path_url":"https://resource-asws-path-url" }, { "id": 2311,"title":"Topic Name","
如何从java中知道在调用更新查询时记录是否被更新。我曾尝试过executeUpdate()中的返回类型int,但每当执行查询时,它都会显示1。也就是说,如果记录更改或不更改,它将返回值
我正在开发一个web应用程序,使用React跟踪水果供应商的库存。js,MongoDB,Node。js和Express。我调用了数据库endpoint来呈现表中的数据。现在,我尝试使用按钮增加和减少库存量,但当我尝试设置新状态时,它不起作用。我尝试通过单击更改状态,然后更新数据库中的新状态。有什么建议吗? > 可结果成分: 从'react'导入Reac,{Component};从“react bo
我从sql db移到了Room。我不知道如何检查数据库中是否存在项目。如何使用Room编写这段代码? 我试图用这段代码来实现这一点,但总是得到FALSE返回
我有一个连接到Sybase ASE的PB应用程序;有没有一种方法可以在应用程序运行时跟踪DB调用。我以前在Oracle10/11G中经常这样做,但在sybase ase中无法理解。如果我能在PB应用程序中运行一个模块并检查之后执行的查询,那将非常有帮助。我使用的是DBVisualizer或RapidSQL。
我已经从向量到一个文本框,这是可编辑的