当前位置: 首页 > 知识库问答 >
问题:

在Flink ProcessWindowFunction中检查会话持续时间的全局状态

韩单弓
2023-03-14

我是Fink新手,希望计算EventFormDTO流的键控总会话持续时间:

    {"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_1","status":"IN_QUEUQ","comment":"","del":false,"millSecs":1}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_2","status":"IN_QUEUQ","comment":"","del":false,"millSecs":5}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_1","status":"ON_GOING","comment":"","del":false,"millSecs":10}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_2","status":"ON_GOING","comment":"","del":false,"millSecs":18}

在队列中表示会话开始,而继续表示会话结束。预期输出应为事件到达时每个键控的总持续时间。因此,上述数据的样本输出为

ts        duration
timestamp_1  0
timestamp_2  4   // session_1: 5-1
timestamp_3  14  //(10-1)+ (10-5)  both session_1 and session_2 are active, and then session_1 end
timestamp_4  13  //18-5   the session_1 has already end.

在我的实现中,我使用了一个ProcessWindowFun和一个全局的MapState来跟踪

 public static void main(String[] args) {
    final DataStream<EventFormDTO> stream = ...;
    stream.keyBy(new KeySelector<EventFormDTO, String>() {
        @Override
        public String getKey(EventFormDTO eventFormDTO) throws Exception {
            return eventFormDTO.getProjectId()+"-"+eventFormDTO.getEventTypeId();
        }
    }).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).process(new EventProcessWindowFun()).print();
    try {
        log.info("Start application.");
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class EventFormDTO implements Serializable {
    private static final long serialVersionUID = 5034868557373901846L;
    Long  id;
    Integer projectId;
    Integer eventTypeId;
    String sessionId;
    String status;
    String comment;
    Boolean del;
    Long millSecs;
}

@Slf4j
public class EventProcessWindowFun extends ProcessWindowFunction<EventFormDTO, Tuple2<Long, Long>, String, TimeWindow> {
    //session_id --> startTime.

    final static MapStateDescriptor<String, Long> descriptor =
            new MapStateDescriptor<>(
                    "record", // the state name
                    BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
            );

    @Override
    public void process(String s, Context context, Iterable<EventFormDTO> iterable, Collector<Tuple2<Long, Long>> collector) throws Exception {
        MapState<String, Long > keyedMeasure = context.globalState().getMapState(descriptor);
        log.info("obtain reference to Map success.. ");
        for(EventFormDTO event: iterable){
            String sessionId = event.getSessionId();
            Long timeStamp = event.getMillSecs();
            String status = event.getStatus();
            Long duration = 0L;
            if(keyedMeasure.contains(sessionId)){
                duration += timeStamp - keyedMeasure.get(sessionId);
                keyedMeasure.remove(sessionId);
            }else{
                keyedMeasure.put(sessionId, timeStamp);
            }

            collector.collect(new Tuple2(context.window().getEnd(), duration));
        }
    }

}

然而,在调试过程中,我无法得到我想要的。

每次调用EventProcessWindowFun时,globalstatekeyedMeasure都是新的对象,并且没有在上一个窗口中计算的任何数据。

因此,我想问

  1. 如何在ProcessWindowFunction中获取globalState
  2. ProcessWindowFunction是否适用于我的案例?还有其他更好的解决方案吗?谢谢

共有1个答案

龚昊然
2023-03-14

keyedMeasure永远不会包含来自状态的键。我相信你是想写信的

if (keyedMeasure.contains(sessionId)) {
    duration += timeStamp - keyedMeasure.get(sessionId);
    keyedMeasure.remove(sessionId);
} else {
    keyedMeasure.put(sessionId, timeStamp)
}

如果将sessionId作为键的一部分,逻辑似乎会变得更简单。然后可以使用ValueState而不是MapState

此外,您可能希望使用RichFlatMapFunctionKeyedProcessFunction而不是使用windows来完成此操作。通过使用windows,您可以在一些可以通过连续流处理更自然地完成的事情上施加一种小型批处理。

 类似资料:
  • 如何为drools无状态会话设置全局变量。 假设两个线程访问同一个会话,但为每个线程设置一个全局变量customer arraylist和新的arraylist。对于全局变量customer,第二个线程的arraylist是否替换第一个线程的arraylist。 这似乎是Stateless知识库类留档的情况: 无状态KnowledgeSessions支持全局,其作用域有多种方式。我将首先介绍非命令

  • 我怎么能看到它?在portal中,我发现了一个选项,可以看到会话计数,但不是持续时间。

  • 把自己从软件检查员寻常的手工检查工作中解放出来 在开始新项目时,多数人计划在将代码投入生产发行之前审核它们;但是,当提交日程超越了其他因素时,审核常常成为第一个被抛弃的实践。如果能够自动执行其中一些审核,那么情况又会怎样呢?在新系列 “让开发自动化” 的第一篇文章中,开发自动化专家 Paul Duvall 首先将研究如何自动化检查器(例如 CheckStyle、JavaNCSS 和 CPD)、如何

  • 目前,我正在使用Codeigniter库为我的项目,并有一个定制的CMS。显然密码保护,但在加载每个控制器之前,我有一个功能,我运行它来检查会话是否存在,并管理登录,否则重定向到登录页面。 是否有一种方法可以全局检查此项,而不必在每个控制器中加载?

  • 问题内容: 在SQL Server中,名称为#temp的临时表具有本地范围。如果在会话中创建它们,则会话中的所有内容都可以看到它们,但不能在会话外部看到它们。如果在存储过程中创建这样的表,则作用域在该过程中是本地的。因此,当proc退出时,表消失了。 我知道的唯一替代方法是使用名称如## temp的表。这些是临时的,但在服务器范围内可见。因此,如果我在会话中创建表,则隔壁办公室中的Bob也会看到它

  • 问题内容: 我(几乎)成功地将Node.js与Express和Redis结合使用来处理会话。 我遇到的问题是使用时不保留会话。 这是我的看到方式: console.log()打印: 现在,这是以下代码: 这个console.log()打印出: 显然,“用户名”对象已消失。该会话没有保留它,而是重新构建了一个新会话。 我该如何解决?如果您需要任何信息,请不要犹豫。 这是我设置会话管理的代码: 这是基