当前位置: 首页 > 知识库问答 >
问题:

使用基于计数的窗口连接两个流

巢宏富
2023-03-14

我是Flink Streaming API的新手,我想完成以下简单(IMO)任务。我有两个流,我想使用基于计数的窗口加入它们。到目前为止,我拥有的代码如下:

public class BaselineCategoryEquiJoin {

private static final String recordFile = "some_file.txt";

private static class ParseRecordFunction implements MapFunction<String, Tuple2<String[], MyRecord>> {
    public Tuple2<String[], MyRecord> map(String s) throws Exception {
        MyRecord myRecord = parse(s);
        return new Tuple2<String[], myRecord>(myRecord.attributes, myRecord);
    }
}

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
    ExecutionConfig config = environment.getConfig();
    config.setParallelism(8);
    DataStream<Tuple2<String[], MyRecord>> dataStream = environment.readTextFile(recordFile)
            .map(new ParseRecordFunction());
    DataStream<Tuple2<String[], MyRecord>> dataStream1 = environment.readTextFile(recordFile)
            .map(new ParseRecordFunction());
    DataStreamSink<Tuple2<String[], String[]>> joinedStream = dataStream1
            .join(dataStream)
            .where(new KeySelector<Tuple2<String[],MyRecord>, String[]>() {
                public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
                    return recordTuple2.f0;
                }
            }).equalTo(new KeySelector<Tuple2<String[], MyRecord>, String[]>() {
                public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
                    return recordTuple2.f0;
                }
            }).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
            .apply(new JoinFunction<Tuple2<String[],MyRecord>, Tuple2<String[],MyRecord>, Tuple2<String[], String[]>>() {
                public Tuple2<String[], String[]> join(Tuple2<String[], MyRecord> tuple1, Tuple2<String[], MyRecord> tuple2) throws Exception {
                    return new Tuple2<String[], String[]>(tuple1.f0, tuple1.f0);
                }
            }).print();
    environment.execute();
}
}

我的代码可以正常工作,但不会产生任何结果。实际上,从未调用apply方法(通过在调试模式下添加断点进行验证)。我认为,前面的主要原因是我的数据没有时间属性。因此,窗口化(通过窗口实现)没有正确完成。因此,我的问题是如何表示我希望根据计数窗口进行加入。例如,我希望连接具体化每个流中的每100个元组。之前的方法在Flink中可行吗?如果是,我应该在代码中更改什么来实现它。

在这一点上,我必须通知您,我试图调用countWindow()方法,但由于某种原因,Flink的JoinedStreams没有提供它。

非常感谢。

共有1个答案

郑和泰
2023-03-14

不支持基于计数的联接。您可以通过使用“事件-时间”语义来模拟基于计数的窗口,并将惟一的seq-id作为时间戳应用于每条记录。因此,时间窗口“5”实际上是计数窗口5。

 类似资料:
  • 我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。 实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。 我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。 有没有办法合并Flink中与会话窗口相关的两个流?

  • 我正在尝试使用以下有问题的sql查询计算特定页面上的注释: 问题是它输出的是0而不是2。 表列如下: 页数: 评论: 因此,实际上,原始查询应该从中获取每个注释的真(如pages表中设置的,由连接 完成连接和/或获得计数的最终代码是什么样子的?谢谢.

  • 我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想

  • 问题内容: 我正在寻找一种类似的各种功能的方法,但我希望滚动计算的窗口由一个值范围(例如,DataFrame列的值范围)定义,而不是由窗口中的行数。 例如,假设我有以下数据: 如果执行类似的操作,则会得到一个滚动总和,其中每个窗口包含5行。但是我想要的是一个滚动总和,其中每个窗口都包含的一定范围的值。也就是说,我希望能够执行类似的操作,并得到一个结果,其中第一个窗口包含所有介于1和5之间的行,然后

  • 问题内容: 好的,我试图查看每个供应商提供了多少个产品(“产品”和“供应商”为单独的表格)。我希望将结果与公司名称和公司提供的产品数量一起显示。我不确定该如何设置。 到目前为止,我有: 我不确定如何使每个公司的ProductName计数。如果您能为我提供任何帮助,我将不胜感激。 问题答案: 您所缺少的只是GROUP BY子句: 使用LEFT {OUTER} JOIN意味着如果某些供应商不提供任何产

  • 数据帧: df1: df2: 我想将df1中的“col1、col2、col3、col4、col5、col6、col7”列值与df2中的“mapping_value”列相匹配,并在df1中创建一个名为“Scientific value”的新列,该列将包含来自df2中“Scientific value”列的条目。 输出: 谢谢