当前位置: 首页 > 知识库问答 >
问题:

暂停Flink KeyedStream上的处理

曾宏毅
2023-03-14

我有一个Flink流应用程序,需要能够“暂停”和“取消暂停”对特定键控流的处理。“处理”意味着只是在流上执行一些简单的异常检测。

我们正在考虑的flow是这样工作的:

命令流,可以是ProcessCommand、PauseCommand或ResumeCommand,每个命令都有一个用于按键的id。

处理命令将检查按键在处理前是否暂停,如果没有暂停,则检查缓冲区。

暂停命令(PauseCommands)将暂停按键处理。

恢复命令(ResumeCommands)将取消对键的暂停处理并刷新缓冲区。

这个流程看起来合理吗?如果合理,我是否可以使用类似拆分操作符的方法来实现?

示例流,输入的单个记录时间戳:

<代码>[{命令:进程,id:1},{命令:暂停,id:1},{命令:进程,id:1},{命令:恢复,id:1},{命令:进程,id:1}]

Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=> 
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=> 
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off 

共有1个答案

龙永福
2023-03-14

这可以通过使用Flink的窗口操作符来实现。首先,通过应用映射操作创建基于POJO或元组的流。

然后,根据您的需要,您可以在该流上使用keyBy来获取keyedStream

现在,通过结合使用基于时间的无限窗口、触发器和窗口函数,可以实现命令流的切换行为。

基本上,您可以使用windows作为缓冲区,它在接收到暂停记录后,会保存进程记录,直到接收到恢复记录。您将编写一个自定义触发器,根据您的场景逐出窗口(缓冲区)。

下面是具有重写方法的触发器的自定义实现。

/**
 * We trigger the window processing as per command inside the record. The
 * process records are buffered when a pause record is received and the
 * buffer is evicted once resume record is received. If no pause record is
 * received earlier, then for each process record the buffer is evicted.
 */
@Override
public TriggerResult onElement(Tuple2<Integer, String> element, long timestamp, Window window,
        TriggerContext context) throws Exception {
    if (element.f1.equals("pause")) {
        paused = true;
        return TriggerResult.CONTINUE;
    } else if (element.f1.equals("resume")) {
        paused = false;
        return TriggerResult.FIRE_AND_PURGE;
    } else if (paused) // paused is a ValueState per keyed stream.
        return TriggerResult.CONTINUE;
    return TriggerResult.FIRE_AND_PURGE;
}

查看此github存储库中的完整工作示例

 类似资料:
  • 暂停脚本的当前线程。 #p::Pause ; 按一次 Win+P 会暂停脚本. 再按一次则取消暂停. Pause [, On|Off|Toggle, OperateOnUnderlyingThread?] 参数 On|Off|Toggle 如果为空或省略, 则它默认为 Toggle. 否则, 请指定下列单词的其中一个: Toggle:如果在当前线程下的潜在线程处于运行状态,则暂停当前线程,否则让潜

  • 通过使用会话窗口与相当高级的组一起运行流数据流管道,在运行几个小时后,我遇到了问题。工作在workers中扩展,但后来开始获得日志负载,其内容如下 记录此代码的转换位于“group by”-块之后,并执行对外部服务的异步HTTP调用(使用)。 你知道为什么会这样吗?与异步、伸缩或按策略分组有关? 作业ID:2018-01-29_03_13_40-12789475517328084866 SDK:A

  • 我想使用Lmax Disruptor进行性能测试: 配置DURUPTOR 以某种方式“暂停”处理 向RingBuffer添加多条消息 “取消暂停”处理 这样,我可以清楚地测量缓冲区清空的速度。如果我“混合”添加了许多消息(这会带来一些延迟)并进行处理,那么在处理速度方面可能会有不太确定的结果。 然而,我似乎没有在LMAX Disruptor(https://lmax-exchange.github

  • 我创建了一个小测试程序供以后使用,它只需连接到服务器并检索一个文件,如下所示: 程序打印连接成功,文件已经存在,所以它必须是检索文件。程序不会崩溃,也不会抛出异常。

  • 1.若要暂停训练,请按下该按钮。显示暂停。若要继续训练,请点击绿色箭头图标。 2.若要停止训练,在记录训练期间或处于暂停模式时长按该按钮三秒钟,直至计数器清零。或者您可以点击并按住显示屏上的红色停止按钮。 如果在暂停后停止训练,则暂停后经过的时间不包括在总训练时间内。

  • 若要暂停训练 长按正面按钮 或 在训练视图中向右滑动,找出并轻触暂停图标。 通过向左滑动您可以在暂停模式中看到训练总结。 恢复暂停的训练 轻触绿色箭头图标。 停止训练 在训练期间或在暂停模式中,长按正面按钮直至绿色计时器一直倒数。 或 在暂停模式中,轻触并按住红色停止图标可结束记录。