当前位置: 首页 > 知识库问答 >
问题:

Apache Flink-从另一个Stream的MapFunction访问WindowedStream的内部缓冲区

谢锦程
2023-03-14

我有一个基于Apache Flink的流媒体应用程序,具有以下设置:

    < li >数据源:每分钟生成数据。 < li >使用CountWindow的窗口流,size=100,slide=1(滑动计数窗口)。 < li>ProcessWindowFunction对窗口中的数据应用一些计算(比如F(x))。 < li >使用输出流的数据接收器

这很好。现在,我想让用户能够提供一个函数G(x ),并将其应用于窗口中的当前数据,并将输出实时发送给用户

我不是在问如何应用任意函数G(x) - 我正在使用动态脚本来做到这一点。我正在询问如何从另一个流的映射函数访问窗口中的缓冲数据。

一些代码来澄清

DataStream<Foo> in  = .... // source data produced every minute
    in
       .keyBy(new MyKeySelector())
       .countWindow(100, 1)
       .process(new MyProcessFunction())
       .addSink(new MySinkFunction())

// The part above is working fine. Note that windowed stream created by countWindow() function above has to maintain internal buffer. Now the new requirement

DataStream<Function> userRequest  = .... // request function from user

userRequest.map(new MapFunction<Function, FunctionResult>(){
   public FunctionResult map(Function Gx) throws Exception {
         Iterable<Foo> windowedDataFromAbove = // HOW TO GET THIS???
         FunctionResult result = Gx.apply(windowedDataFromAbove);
         return result;

   }

})

共有2个答案

赵佐
2023-03-14

假设Fx动态聚合传入的FOO,Gx处理一个窗口的FOO值,您应该能够实现以下目标:

DataStream<Function> userRequest  = .... // request function from user
Iterator<Function> iter = DataStreamUtils.collect(userRequest);
Function Gx = iter.next();

DataStream<Foo> in  = .... // source data
 .keyBy(new MyKeySelector())
 .countWindow(100, 1)
 .fold(new ArrayList<>(), new MyFoldFunc(), new MyProcessorFunc(Gx))
 .addSink(new MySinkFunction())

Fold函数(一旦传入的数据到达就对它们进行操作)可以这样定义:

private static class MyFoldFunc implements FoldFunction<foo, Tuple2<Integer, List<foo>>> {
    @Override
    public Tuple2<Integer, List<foo>> fold(Tuple2<Integer, List<foo>> acc, foo f) {
        acc.f0 = acc.f0 + 1; // if Fx is a simple aggregation (count)
        acc.f1.add(foo);
        return acc;
    }
}

处理器功能可以是这样的:

public class MyProcessorFunc
    extends ProcessWindowFunction<Tuple2<Integer, List<foo>>, Tuple2<Integer, FunctionResult>, String, TimeWindow> {

    public MyProcessorFunc(Function Gx) {
        super();
        this.Gx = Gx;
    }

    @Override
    public void process(String key, Context context,
                        Iterable<Tuple2<Integer, List<foo>> accIt,
                        Collector<Tuple2<Integer, FunctionResult>> out) {
        Tuple2<Integer, List<foo> acc = accIt.iterator().next();
        out.collect(new Tuple2<Integer, FunctionResult>(
            acc.f0, // your Fx aggregation
            Gx.apply(acc.f1), // your Gx results
        ));
    }
}

请注意,默认情况下,fold\reduce函数不在内部缓冲元素。我们在这里使用fold来计算动态指标,并创建一个窗口项列表。

如果您对在翻滚窗口(不是滑动的)上应用Gx感兴趣,您可以在管道中使用翻滚窗口。为了也计算滑动计数,可以有另一个只计算滑动计数的管道分支(不应用Gx)。这样,您不必在每个窗口中保存100个列表。

注意:您可能需要添加以下依赖项才能使用DataStreamUtils:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-contrib</artifactId>
    <version>0.10.2</version>
</dependency>
霍弘厚
2023-03-14

连接两个流,然后使用CoProcessFunction。获取Functions流的方法调用可以将它们应用于其他方法调用窗口中的内容。

如果您想要广播函数,那么您需要使用Flink 1.5(它支持连接键控流和广播流),或者使用一些直升机特技来创建一个可以包含Foo和函数类型的流,并适当复制函数(和密钥生成)以模拟广播。

 类似资料:
  • 尝试从同一命名空间中的另一个服务连接到一个服务。使用ClusterIP创建服务。创建服务后使用该Ip访问服务。请求有时成功,有时失败,我看到两个pod都启动并运行。以下是服务配置

  • 我是一名Java程序员初学者。我试图在类交通中访问类车中的两个列表,这样我就可以执行while循环,循环直到主类中的列表为空 这就是我现在掌握的代码,我试着从普通车上扩展流量,但没有成功,我被卡住了。我该怎么解决这个问题?

  • 我有两个码头集装箱。一个是基于标准的MariaDB图像。这个容器被命名为“mariadb”,里面有一个叫做“fi”的数据库。fi db内部有几个表,每个表都有几行数据。使用DataGrip或任何其他数据库查看软件,我可以成功地访问和查询这个数据库,并使用端口3306在本地主机上调用它。 按照下面的建议对连接字符串进行了更改,如上图所示。现在使用命令“run-i-p8080:8080--link m

  • 伊登 JLS所说的是: 设C是ClassName表示的类。设n是一个整数,使得C是类的第n个词汇封闭类,在该类中出现有条件的this表达式。 [...] 如果当前类不是类C或C本身的内部类,则为编译时错误。 在的情况下,内部类集。这意味着上面的代码应该可以正常工作。发生了什么?

  • 问题内容: 我需要从另一个控制器内的另一个控制器访问方法。我该怎么做?我可以使用方法吗? 我可以在当前控制器中包含该控制器,并使其成为对象并通过该对象访问该方法吗?这样可以吗? 我想调用另一个控制器的表单方法— newAction。 问题答案: 您可以将控制器定义为服务,然后在另一个控制器中获取它。 在定义所需的控制器即服务中: 然后,在任何控制器中,您都可以通过容器获取此服务: 在文档中有一些关