当前位置: 首页 > 知识库问答 >
问题:

Flink会话窗口未按预期工作

申博厚
2023-03-14

Flink中的会话窗口在prod env上没有按预期工作(相同的逻辑在本地env上工作)。这个想法是为特定的用户ID发出“sample_event_two”的计数

  public void process(
      String s,
      Context context,
      Iterable<SampleEvent> sampleEvents,
      Collector<EnrichedSampleEvent> collector)
      throws Exception {

    EnrichedSampleEvent event = null;
    boolean isSampleEventOnePresent = false;
    int count = 0;


    for (SampleEvent sampleEvent : sampleEvents) {

      if (sampleEvent.getEventName().equals("sample_event_one_name")) {

        Logger.info("Received sample_event_one for userId: {}");
        isSampleEventOnePresent = true;

      } else {
        // Calculate the count for sample_event_two
        count++;

        if (Objects.isNull(event)) {
          event = new EnrichedSampleEvent();
          event.setUserId(sampleEvent.getUserId());
        }
      }
    }

    if (isSampleEventOnePresent && Objects.nonNull(event)) {
      Logger.info(
          "Created EnrichedSampleEvent for userId: {} with count: {}",
          event.getUserId(),
          event.getCount());
      collector.collect(event);
    } else if (Objects.nonNull(event)) {
      Logger.info(
          "No sampleOneEvent event found sampleTwoEvent with userId: {}, count: {}",
          event.getUserId(),
          count);
    }
  }

尽管集合中存在sample_event_one(通过验证日志消息“已接收sample_event_one”是否存在来确认)并且计数计算正确,但我没有看到任何输出事件被创建。我看到日志消息“未找到 sampleOneEvent 事件,而不是发出 EnrichedSampleEvent,而是看到 userID:”123,count: 5”。有人可以帮我解决这个问题吗?

共有1个答案

萧阳波
2023-03-14

您的ProcessWindowFunction将分别为每个键调用。由于密钥是用户id和记录id的组合,因此仅知道“Received sample_event_one”出现在同一用户的日志中是不够的。即使是同一个用户,也可能有不同的记录id。

 类似资料:
  • 在运行规则之前,我正在尝试将date设置为与我对象中的日期变量相同的日期。我使用此配置来创建我的 在运行规则之前,我使用将会话的日期设置为所需日期。 这导致使用滑动窗口的规则出现错误。比如说,检查1小时内通过的对象,而我在最后一个小时内没有任何对象。前一天我只有3件物品。下面是一个对象数据集示例。 我有一条规则,检查在1小时内是否有超过2个对象具有相同的客户ID。 当我用这些值传递对象时。上面的规

  • 例如,当我进入登录页面时...sessionCreated-将一个会话添加到计数器:1 然后,当我点击log out按钮时,会话计数减少了1(这很好),但紧接着会话计数增加了1(不是预期的)。 例如,当我按下注销按钮时...sessionDestroyed-从计数器0中扣除一个会话sessionCreated-将一个会话添加到计数器1中 我再次需要帮助来理解请。 这是我的Spring安全设置...

  • 我正在使用spring Roo并希望访问Controller类中的一个bean,该类在ApplicationContext.xml中具有以下配置: 配置类本身是: 在我的Controller中,我认为一个简单的Autowired注释应该可以完成这项工作 在启动过程中,spring在setSkipWeeks方法中打印消息。不幸的是,每当我在控制器中调用config.getSkipWeeks()时,它

  • 当我运行以下程序时,它只打印 然而,从Java 8的equalsIgnoreCase文档中我们发现: 如果以下至少一项为真,则两个字符c1和c2被视为相同的忽略情况: •对每个字符应用java.lang.character.ToUpperCase(char)方法会产生相同的结果 所以我的问题是为什么这个程序不打印 在这两种操作中,都使用了大写字符。

  • 我试图使用来传输我根据前面的问题设置的自定义标头。 我在文件中读到... 我的属性包括: