当前位置: 首页 > 面试题库 >

Apache Flink:如何计算DataStream中的事件总数

燕鸿文
2023-03-14
问题内容

我有两个原始流,我正在加入这些流,然后我要计算已加入的事件总数是多少,尚未加入的事件有多少。我通过使用joinedEventDataStream如下所示的地图来做到这一点

joinedEventDataStream.map(new RichMapFunction<JoinedEvent, Object>() {

            @Override
            public Object map(JoinedEvent joinedEvent) throws Exception {

                number_of_joined_events += 1;

                return null;
            }
        });

问题1: 这是计算流中事件数量的适当方法吗?

问题2: 我注意到一种有线行为,有些人可能不相信。问题是,当我在IntelliJ
IDE中运行Flink程序时,它显示了的正确值,number_of_joined_events但是0当我将该程序提交为时jar。因此,我获得了number_of_joined_events将程序作为jar文件运行而不是实际计数时的初始值。为什么仅在jar提交文件而不在IDE中发生这种情况?


问题答案:

您的方法无效。通过JAR文件执行程序时,您会注意到行为。

我不知道该如何number_of_joined_events定义,但是我假设它在您的程序中是一个静态变量。当您在IDE中运行该程序时,它将在单个JVM中运行。因此,所有运算符都可以访问静态变量。当您将JAR文件提交到远程进程时,程序将在其他JVM(可能是多个JVM)中执行,并且客户端进程中的静态变量永远不会更新。

您可以使用Flink的指标或的ReduceFunction总和1来计算已处理记录的数量。



 类似资料:
  • 我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想

  • 本文向大家介绍如何用python计算Selenium中的帧总数?,包括了如何用python计算Selenium中的帧总数?的使用技巧和注意事项,需要的朋友参考一下 我们可以使用find_elements方法来计算Selenium页面中的帧总数。在框架上工作时,我们总是会在html代码中找到标记名,并且其值应为frame / iframe。 此特征仅适用于该特定页面上的框架,不适用于其他类型的UI元

  • 我想知道如何计算的累计总和在AnyLogic中。具体地说,我有一个循环事件,每周改变一个参数的值。从这个参数我想计算它收到的值的累计总和,我怎么做呢? 该事件是循环模式的超时。操作是: "name_parameter"=圆形(max(正常(10,200),0));

  • 我需要知道不同事件发生的频率。例如,在过去 15 分钟内发生了多少个 HTTP 请求。由于可能有大量的事件(数百万个),因此必须使用有限的内存量。 Java中有什么util类可以做到这一点吗? 如何在Java中实现这个自我? 理论用法代码可以如下所示: 编辑:它必须是一个实时值,可以在一分钟内更改数千次,并且将在一分钟内查询数千次。基于数据库或文件的解决方案是不可能的。

  • 问题内容: 使用Java访问系统时钟的简便方法是什么,以便我可以计算事件的经过时间? 问题答案: 我会避免使用它来测量经过时间。返回“壁钟”时间,该时间可能会更改(例如:夏时制,管理员用户更改时钟)并会使您的间隔测量值偏斜。 另一方面,返回自“某个参考点”(例如,JVM启动)以来的纳秒数,因此不会受到系统时钟变化的影响。

  • 问题内容: 我有这些表: 我如何计算这些列的总价: 对于每种产品: 适用于所有含运费的产品 问题答案: 您可以通过访问属性来检索 _数据透视_表的列,该属性已经存在了很长时间了 默认情况下,只有模型关键点会出现在枢轴对象上。如果数据透视表包含额外的属性,则在定义关系时必须指定它们: 在您的情况下,您可以像下面的代码中那样定义关系: 现在,可以通过属性(例如)访问表上的列。 最后,这是计算订单总数的