当前位置: 首页 > 知识库问答 >
问题:

Flink中的处理流

弓华茂
2023-03-14
    null
streamIn -> process1 -> process2 -> .... -> processN
                 \         |                /
                  \        |               /
                     sink to kafka
 OutputTag<SysmonPartial> filtersOutput = new OutputTag<SysmonPartial>("FiltersOutput"){};

 DataStream<SysmonPartial> kafkaSource = env.addSource(consumer);

        DataStream<SysmonPartial> source = kafkaSource.rebalance();

        SingleOutputStreamOperator<SysmonPartial> s = source
                .process(lambda1).name("lambda1").startNewChain()
                .process(lambda2).name("lambda2").startNewChain()
                .process(lambda3).startNewChain()
                .process(lambda4).startNewChain()
                .process(lambda5).startNewChain()
                .process(lambda6).startNewChain()
                .process(lambda7).startNewChain();

        SingleOutputStreamOperator<Any> output = s
                .getSideOutput(filtersOutput)
                .process(filterProcessFunction).setParallelism(1).startNewChain();

        output.addSink(sink).setParallelism(1);

        env.execute(jobName);

其中lambda1、2等是条件检查函数,例如

public class Bypass_WS_01_03 extends FilterTagFuction<SysmonPartial, SysmonPartial, SysmonPartial> {
    private static final Pattern p_1 = Pattern.compile("pattern1");
    private static final Pattern p_0 = Pattern.compile("pattern2");

    @Override
    public void processElement(SysmonPartial t, Context ctx, Collector<SysmonPartial> out) throws Exception {
        out.collect(t);
        if (
                "1".equals(t.B_VendorEventID) &&
                        t.CommandLine != null && t.CommandLine.length() != 0 && p_0.matcher(t.CommandLine).find() &&
                        t.ImageName != null && t.ImageName.length() != 0 && p_1.matcher(t.ImageName).find()
        ) {
            t.RuleId = "Bypass_WS_01_03";
            ctx.output(getOutputTag(), t);
        }
    }
}

但不知什么原因对我不起作用,也许还有其他方法?正如我从文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html)中了解到的,OutputTag用于创建标记为tag的附加消息。还是我错了?

共有1个答案

常鸿朗
2023-03-14

连接作业图的方式意味着在作业结束时访问边输出

SingleOutputStreamOperator<Any> output = s
    .getSideOutput(filtersOutput)

您只获得最后一个进程函数放在侧输出中的任何内容--即,只获得由lambda7发出的事件。

我相信你打算做的事情可以表达为

SingleOutputStreamOperator<...> s1 = source.process(lambda1).name("lambda1");
SingleOutputStreamOperator<...> s2 = s1.process(lambda2).name("lambda2");
SingleOutputStreamOperator<...> s3 = s2.process(lambda3);
...

DataStream<...> side1 = s1.getSideOutput(filtersOutput);
DataStream<...> side2 = s2.getSideOutput(filtersOutput);
DataStream<...> side3 = s3.getSideOutput(filtersOutput);
...

SingleOutputStreamOperator<Any> output = side1.union(side2, side3, ...)
    .process(filterProcessFunction)
    ...
 类似资料:
  • 进入Flink作业的数据可能会由于代码中的bug或缺乏验证而触发异常。我的目标是提供一致的异常处理方式,我们的团队可以在Flink作业中使用,不会导致任何生产停机。 > 重新启动策略在这里似乎不适用,因为: null null 示例代码: 我想有能力跳过在“keyby”和类似的方法中导致问题的处理,这些方法应该返回一个结果。

  • 我们正在构建一个具有两个流的应用程序: 大量信息流 我们希望连接这两个流以获得共享状态,以便第一个流可以使用第二个状态进行扩展。 每天左右,拼花文件(第二流的源代码)都会更新,这需要我们清除第二流的状态并重建它(可能需要大约2分钟)。 问题是,我们可以在该进程运行时阻止/延迟来自第一流的消息吗? 谢谢。

  • 我们正在努力计算 1 分钟翻滚时间窗口内不同类型的事件的最大并发计数。 这些事件就像传感器数据,这些数据是从我们的桌面代理每分钟收集的,然而,一些代理得到了一个错误的时间戳,比如说,它甚至比现在晚了几个小时。 所以,我的问题是如何处理/删除这些事件,目前我只是应用过滤器(s = 我的第一个问题是,如果我不这样做,我怀疑这个坏的“未来”事件会触发窗口计算,即使是那些不完整的数据窗口 第二个问题是,我

  • 我正在运行一个流式flink作业,它消耗来自kafka的流式数据,在flink映射函数中对数据进行一些处理,并将数据写入Azure数据湖和弹性搜索。对于map函数,我使用了1的并行性,因为我需要在作为全局变量维护的数据列表上逐个处理传入的数据。现在,当我运行该作业时,当flink开始从kafka获取流数据时,它的背压在map函数中变得很高。有什么设置或配置我可以做以避免背压在闪烁?

  • 我有一个Flink流应用程序,需要能够“暂停”和“取消暂停”对特定键控流的处理。“处理”意味着只是在流上执行一些简单的异常检测。 我们正在考虑的flow是这样工作的: 命令流,可以是ProcessCommand、PauseCommand或ResumeCommand,每个命令都有一个用于按键的id。 处理命令将检查按键在处理前是否暂停,如果没有暂停,则检查缓冲区。 暂停命令(PauseCommand

  • 我研究Flink已经一个多星期了。我们正在从Kafka消费事件,我们希望事件属于一个特定的对象id需要按照事件时间的顺序进行处理。到目前为止,我的研究告诉我,我应该使用keyby和timewinds,我的理解是正确的吗? 另一个问题是,当一个任务管理器关闭时,只有属于该任务管理器的事件才会被停止处理,直到该任务管理器启动?检查点机制是否知道未被处理的事件,它将如何请求Kafka关于这些事件? 下面