当前位置: 首页 > 知识库问答 >
问题:

如何在Apache Flink中从数据库中查找和更新记录的状态?

汝岳
2023-03-14

理想情况下,与特定记录相关的事件需要以FIFO顺序处理(尽管也会有一个时间戳来帮助检测无序事件),但与不同记录相关的事件可以并行处理。我计划使用keyby()构造按记录对流进行分区。

需要进行的处理取决于数据库中关于记录的当前信息。但是,我找不到一个示例或推荐的方法来查询数据库中的记录,以丰富事件,它正在被处理,我需要处理它的附加信息。

我想到的管道如下:

->接收到的id上的keyBy()->从对应于id的数据库中检索记录->对记录执行处理步骤->将处理过的事件推送到外部队列并更新数据库记录

共有1个答案

吴镜
2023-03-14

您可以通过为RichFlatMap函数扩展rich函数来执行数据库查找,在其Open()方法中初始化数据库连接一次,然后在FlatMap()方法中处理每个事件:

public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> {

    // Declare DB coonection and query statements

    @Override
    public void open(Configuration parameters) throws Exception {
      // Initialize Database connection
      // Prepare Query statements
    }

    @Override
    public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception {
      // look up the Database, update record, enrich event
      out.collect(enrichedEvent);        
    }
})

然后可以使用databaseMapper,如下所示:

stream.keyby(id)
      .flatmap(new DatabaseMapper())
      .addSink(..);

您可以在这里找到一个使用来自Redis的缓存数据的示例。

 类似资料:
  • 这是我正在接收的数据,如果阵列中不存在id,则希望添加新记录,如果阵列中没有id,则还希望更新记录 我的模型:课堂课程 课堂主题 类用户 这是通过PostMan从Rails API接收的数组[{"title":"Topic Name","path_url":"https://resource-asws-path-url" }, { "id": 2311,"title":"Topic Name","

  • 如何从java中知道在调用更新查询时记录是否被更新。我曾尝试过executeUpdate()中的返回类型int,但每当执行查询时,它都会显示1。也就是说,如果记录更改或不更改,它将返回值

  • 我正在开发一个web应用程序,使用React跟踪水果供应商的库存。js,MongoDB,Node。js和Express。我调用了数据库endpoint来呈现表中的数据。现在,我尝试使用按钮增加和减少库存量,但当我尝试设置新状态时,它不起作用。我尝试通过单击更改状态,然后更新数据库中的新状态。有什么建议吗? > 可结果成分: 从'react'导入Reac,{Component};从“react bo

  • 我从sql db移到了Room。我不知道如何检查数据库中是否存在项目。如何使用Room编写这段代码? 我试图用这段代码来实现这一点,但总是得到FALSE返回

  • 我有一个连接到Sybase ASE的PB应用程序;有没有一种方法可以在应用程序运行时跟踪DB调用。我以前在Oracle10/11G中经常这样做,但在sybase ase中无法理解。如果我能在PB应用程序中运行一个模块并检查之后执行的查询,那将非常有帮助。我使用的是DBVisualizer或RapidSQL。

  • 我已经从向量到一个文本框,这是可编辑的