当前位置: 首页 > 知识库问答 >
问题:

flink:如何在flink中处理外部应用程序配置更改

屠建本
2023-03-14

共有1个答案

慕皓君
2023-03-14

更新正在运行的流式应用程序的配置是一个常见的要求。在Flink的DataStream API中,可以使用所谓的CoflatMapFunction来实现这一点,它处理两个输入流。其中一个流可以是数据流,而另一个流可以是控制流。

下面的示例演示如何动态适配一个筛选出超过一定长度的字符串的用户函数。

val data: DataStream[String] = ???
val control: DataStream[Int] = ???

val filtered: DataStream[String] = data
  // broadcast all control messages to the following CoFlatMap subtasks
  .connect(control.broadcast)
  // process data and control messages
  .flatMap(new DynLengthFilter)


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {

  var length = 0

  // filter strings by length
  override def flatMap1(value: String, out: Collector[String]): Unit = {
    if (value.length < length) {
      out.collect(value)
    }
  }

  // receive new filter length
  override def flatMap2(value: Int, out: Collector[String]): Unit = {
    length = value
  }

  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length

  override def restoreState(state: Int): Unit = {
    length = state
  }
}

dynlengthfilter用户函数实现筛选器长度的checkpoint接口。如果发生故障,该信息将自动恢复。

 类似资料:
  • 我的Flink处理器监听Kafka,处理器中的业务逻辑涉及调用外部REST服务,服务可能会停止。我想将元组重放回处理器中,是否仍有这样做的方法?我使用了Storm,我们将能够使元组失败,这样元组就不会被确认。因此,相同的元组将重播到处理器。 在Flink中,一旦消息被Flink Kafka消费者消费,元组就会被自动确认。有很多方法可以解决这个问题。其中一种方法是将消息发布回同一队列/重试队列。但我

  • 为了测试流处理和Flink,我给自己出了一个看似简单的问题。我的数据流由粒子的和坐标以及记录位置的时间组成。我的目标是用特定粒子的速度来注释这个数据。所以小溪看起来像这样。 现在无法保证事件会按顺序到达,即可能会在之前到达,即。 为了简单起见,可以假设任何迟来的数据将在早数据的内到达。 我承认,我是流处理和闪烁的新手,所以这可能是一个愚蠢的问题,提出一个明显的答案,但我目前被难倒了,如何去实现我的

  • 在SpringJSFWeb应用程序中将Netty客户端处理程序配置为消息接收点,有没有具体的方法? 如果一些独立的Java应用程序充当Netty服务器,我如何接收到SpringJSFWeb应用程序的消息?

  • null 其中lambda1、2等是条件检查函数,例如 但不知什么原因对我不起作用,也许还有其他方法?正如我从文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html)中了解到的,OutputTag用于创建标记为tag的附加消息。还是我错了?

  • 我想在一个操作符中接收和处理三个流。例如,Storm中实现的代码如下: <代码>生成器。setBolt(“C\u螺栓”,C\u螺栓(),parallelism\u提示)。字段分组(“A\u bolt”,“TRAINING”,新字段(“word”))。字段分组(“B\U螺栓”,“分析”,新字段(“word”))。所有分组(“A\U螺栓”、“总和”) 在Flink中,实现了和的处理: 但我不知道如何添

  • 我有一个spring-boot应用程序,我想用外部配置文件运行它。当我将其作为jar(带有嵌入式servlet容器)运行时,一切都很好。但是我想在外部servlet容器(Tomcat)下运行它,这里我遇到了外部配置的问题。我尝试了@PropertySource,但在这种情况下,应用程序只获得war文件配置中没有的属性:外部配置不会覆盖内部配置。那么问题是:我如何配置外部配置,它将覆盖内部配置?