当前位置: 首页 > 知识库问答 >
问题:

两个流的连接不起作用

齐朝明
2023-03-14

我正在尝试使用Apache Flink流API加入两个流,但没有任何内容加入,并且在阅读文档后我不知道我做错了什么

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStream<MyPojo2> source = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ola"), new MyPojo2(2, "Ola")))
            .assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>());
    DataStream<MyPojo2> source2 = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ela"), new MyPojo2(2, "Ela")))
            .assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>());
    DataStream<Tuple2<String, String>> joined = source.join(source2).where(keySelector).equalTo(keySelector).
            window(GlobalWindows.create()).apply(joinFunction);
    joined.print();
    env.execute("Window");

关键功能是 myPojo.getFirst()

共有1个答案

田博超
2023-03-14

除非指定自定义触发器,否则“全局窗口”窗口永远不会触发。在你的示例中,如果你使用类似TumblingEventTimeWindows.of(Time.seconds(5))的东西,你应该会看到结果。

 类似资料:
  • 问题内容: 2个流: 给定可读流, 并且 获取包含 并 连接 流的惯用(简洁)方法是什么? 我不能做,因为这样流内容混杂在一起。 n个 流: 给定一个EventEmitter发出不确定数量的流,例如 一种将 所有流串联在一起的流 的惯用(简洁)方法是什么? 问题答案: 该合并的流包会连接流。自述文件中的示例: 我相信您必须立即添加所有流。如果队列为空,则自动结束。参见问题5。 该流流库是一个具有明

  • 我想加入来自 kafka producer 的两个流,但该连接不起作用。我使用 AssignerWithPeriodicWatermark 来定义我的分配器,我尝试使用 3 分钟的窗口连接两个流。但我没有得到任何输出。我打印了这两个流,以确保它们的事件在时间上足够接近。

  • 我正在尝试将FBO的深度纹理和颜色纹理链接到GLSL着色器(版本4.0) 问题是,同时只有一个链接,这很奇怪,因为其他纹理可以很好地链接在一起(例如:漫反射贴图、法线贴图和镜面反射贴图) 以下是我的绑定RT代码: 我真的不知道这里怎么了。。。

  • 下面的代码片段是从JoinedStreams的javadoc复制的 这两个流仅基于一个键(通过< code>t =计算)进行连接 我会问我如何基于多个键进行连接,例如,one.a = two.a和

  • 我试图连接两个Ktable流,似乎作为连接操作的一个输出,我两次得到与输出相同的消息。似乎在此操作过程中调用了两次值Joiner。 让我知道如何解决这个问题,以便只有一条消息作为加入操作的输出发出。 由于两个ktable(msg1和msg2)之间的连接,我收到两条相同的消息。

  • 我构建了一个客户端/服务器聊天应用程序,其中服务器正在侦听IP号码和端口,客户端应该连接到该IP 问题-按以下顺序: > 我在计算机上运行服务器,正在侦听 (127.0.0.1 我还在我的计算机上运行一个客户端(与#1相同的计算机),它成功地连接到服务器(127.0.0.1 我正在从不同的IP运行另一个客户端,但在尝试连接到服务器时(127.0.0.1 我不明白为什么…这是代码: 服务器端: 服务