当前位置: 首页 > 知识库问答 >
问题:

在flink全局窗口中重新处理未更改的元素,并进行连接转换

濮冠宇
2023-03-14

Flink转换(连接)流中有一些元素正在重新处理,即使它们没有被修改。

假设我们有3个元素:1、2和3。插入它们时,会发生以下情况:

  • 插入第一个元素1时,输出为:1
  • 插入第二个元素2时,输出为:1-

在最后一次插入中,1或2没有任何更改,因此没有理由重新处理它们。

再处理规则:

  • 只有同一出版商的书籍会被重新处理。这意味着当插入出版商2的书籍时,只有出版商2的书籍会被重新处理。我们的目标是不重新处理任何书籍,因为它们不受现有新书的影响。
  • 当一个出版商被修改时,只有那个出版商的书被重新处理。(没关系)

联接后正在使用全局窗口,如下所示:

            bookStream
                .join(publisherStream)
                .where(book -> book.publisherId)
                .equalTo(publisher -> publisher.id)
                .window(GlobalWindows.create())
                .trigger(new ForeverTrigger<>())
                .apply(new JoinFunction<Book, Publisher, Book_Publisher>() {
                    @Override
                    public Book_Publisher join(Book book, Publisher publisher) throws Exception {
                        return new Book_Publisher(book, publisher);
                    }
                })

ForeverTrigger实施:

public class ForeverTrigger<T, E extends Window> extends Trigger<T, E> {

    @Override
    public TriggerResult onElement(T element, long timestamp, E window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, E window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, E window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(E window, TriggerContext ctx) throws Exception {}
}

对于这个用例,有必要始终存储所有元素,因为如果一本书被更新,我们需要有相应的出版商加入,反之亦然。因此,从bookStream或publisherStream中删除元素不是一个选项。

一个解决方案是使用TableAPI,如下所述:为什么Flink在DataStream join全局窗口上发出重复记录?。这将起作用,然后可以转换为数据流。然而,我希望避免将表API的使用与datastream API的使用混为一谈,特别是因为项目的主要目标是通用化和自动化flink管道的创建,这意味着要通用化的API将是两个而不是一个。因此,如果有一个不同的有效解决方案,那就太好了。

另一种解决方案是逐出或过滤元素,正如上面链接的同一篇文章中所提到的,但这似乎效率低下,因为它仍然需要处理元素,以便逐出/过滤它们。这需要保留以前状态的列表并比较传入元素。

理想情况下,Flink知道只处理包含更改的元素。是否有有效的解决方案来执行与数据流的连接并只处理修改后的元素?

共有1个答案

谷泽宇
2023-03-14

窗口连接的设计并没有考虑到这种情况。为了有效地处理这个问题,我认为您需要在API堆栈中降低一个级别并使用KeyedCoProcessFunctions,或者提高一个级别并使用表API。

 类似资料:
  • 我有一个流是消费的Flink Kafka消费者将加入另一个流为定义的窗口大小,如Time.milliseconds(10000)。 如何在运行时将窗口大小更改为Time.milliseconds(20000)?

  • 我在HA模式下配置了Flink,如下所述: 我想测试容错性,因此我做了以下工作: 设置具有2个JobManager和1个TaskManager的Flink群集 在任务管理器上启动流式处理作业 杀死活动的作业管理器(以模拟崩溃) 领导人选举如期举行 但注意到任务管理器正在重新连接到新的作业管理器。它只是每10秒尝试重新连接到前一个领导者 在此处粘贴任务管理器日志: 重新启动任务管理器没有帮助 重新启

  • 我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。 实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。 我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。 有没有办法合并Flink中与会话窗口相关的两个流?

  • 本文向大家介绍jQuery拖动元素并对元素进行重新排序,包括了jQuery拖动元素并对元素进行重新排序的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了jQuery拖动元素并对元素进行重新排序的实现方法,分享给大家供大家参考,具体实现内容如下 效果图: 具体内容如下: 从上图可以看出我们今天要实现的功能。当用户拖动一个图片时,就能改变图片的已有排序并更新表中的排列顺序。比如用户可以随意拖动我

  • 的结果是一个元素流-因此,我希望从这个流中获得一个“具有最高计数的key”的更新流。 然后我通过一个常量(-因为这是一个全局操作)进行键控,并使用-这几乎可以实现:我得到一个最高计数流,但当前的最高计数是针对每个元素发出的。 我想我要找的是某种带有前一个值的过滤器,它只会在新值与前一个值不同时才会发出元素。 目前在Flink有可能吗?

  • 如何在Flink SQL查询中使用SQL客户端进行窗口连接。窗口设置方式与下面链接中提到的方式相同https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html 需要窗口的示例查询:选择sourceKafka.*从sourceKafka内部连接SourceKafca上的bad