当前位置: 首页 > 知识库问答 >
问题:

KStream 到 KStream 连接 - 如果窗口中没有匹配的记录,输出记录将发布可配置的时间

曾飞雨
2023-03-14

需要一些关于 KStream/KTable 用法用例的意见/帮助。

场景:

我有两个具有公共关键字requestId的主题。

  1. input_time启动时间
  2. completion_time(Request Id, EndTime)

input_time中的数据在时间t1填充,completion_time中的数据在时间tn填充(n是进程完成所需的时间)。

目的通过连接来自主题的数据来比较请求所用的时间,并在违反阈值时间的情况下发出警报。

可能会发生这样的情况:该过程可能会失败,并且数据可能根本没有到达请求completion_time主题。在这种情况下,我们打算使用检查,如果当前时间自开始时间以来远远超过特定(假设5s)阈值。

  1. input_time(要求1,100) completion_time(要求1,104) --

已尝试的选项。1)尝试了KTable-KTable和KStream-Kstream外连接,但第三种情况总是失败。

    final KTable<String,Long> startTimeTable =   builder.table("input_time",Consumed.with(Serdes.String(),Serdes.Long()));
    final KTable<String,Long> completionTimeTable = builder.table("completion_time",Consumed.with(Serdes.String(),Serdes.Long()));     
    KTable<String,Long> thresholdBreached =startTimeTable .outerJoin(completionTimeTable,
            new MyValueJoiner());
    thresholdBreached.toStream().filter((k,v)->v!=null)
            .to("finalTopic",Produced.with(Serdes.String(),Serdes.Long()));

木匠

 public Long apply(Long startTime,Long endTime){

        // if input record itself is not available then we cant use any alerting.
        if (null==startTime){
            log.info("AlertValueJoiner check: the start time itself is null so returning null");
            return null;
        }
        // current processing time is the time used.
        long currentTime= System.currentTimeMillis();
        log.info("Checking startTime {} end time {} sysTime {}",startTime,endTime,currentTime);
        if(null==endTime && currentTime-startTime>5000){
            log.info("Alert:No corresponding record from file completion yet currentTime {} startTime {}"
                    ,currentTime,startTime);
            return currentTime-startTime;
        }else if(null !=endTime && endTime-startTime>5000){
            log.info("Alert: threshold breach for file completion startTime {} endTime {}"
                    ,startTime,endTime);
            return endTime-startTime;
        }
    return null;
    }

2) 尝试了根据线程推荐的自定义逻辑方法 如何管理 Kafka KStream 到 Kstream 窗口式联接?-- 此方法已停止适用于方案 2 和 3。

是否有任何使用 DSL 或处理器处理所有三种情况的情况?

不确定我们是否可以使用某种标点符号来监听窗口何时更改并检查当前窗口中的流记录,如果没有找到匹配的记录,则使用systime生成结果。

共有1个答案

仉洲
2023-03-14

由于逻辑的性质,它肯定必须通过DSL和处理器API的组合来完成。

  1. 使用自定义转换器和状态存储与配置的值进行比较。(情况1
 类似资料:
  • 基于apache Kafka文档,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流? 有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接? 在我的例子中,假设我有2个KStream、和我希望能够加入10天的到30天的。

  • 我有两个名为“alarm”和“interprise”的流,它们包含JSON。如果警报器和干预器连接,那么它们将具有相同的钥匙。我想联系他们来检测24小时前没有干预的所有警报。 但这个程序不起作用,结果给我的所有警报就好像24小时前没有干预一样。我重新检查了我的数据集5次,有些警报在警报日期前24小时内进行了干预。 这张图片说明了情况:在此处输入图像描述 因此我需要知道警报之前是否有干预。 程序代码

  • 我正在尝试执行kstream-kstream之间的内部连接。我注意到,当来自两个KStreams的消息都具有复合键(例如,具有许多属性的java pojo)时,即使用作复合键的pojo都实现了hashCode()和equals(Object o)方法,联接也不起作用。 uniqueidKey.java 当两个KStreams都有带有简单基元键(例如String、int、double)的消息时,内部

  • 我正在尝试以下列方式使用kafka流实现事件源模式。 我在一家安全服务公司工作,处理两个用例: 注册用户,处理 应生成 。 更改用户名,处理 应生成 。 我有两个主题: 命令主题,每个命令都是键控的,密钥是用户的电子邮件。例如: 实现思想可以用以下拓扑表示: 对于这个拓扑,我使用的是。 此拓扑的更显式版本: 我遇到的问题: 在具有现有记录的命令主题上启动流应用程序: 在构建这样的拓扑时,我缺少什么

  • 下面是带有suppress运算符的窗口的简单定义: 所以我的问题是,suppress运算符如何检测一个事件是否是窗口的最后一个事件?让我们想象一下,我移除suppress运算符: 我知道,对于的每一个更改,都将生成两个事件: 具有值的记录,以删除上一条记录 具有新值的新记录 我要做的是移除运算符,自己检测最后一条记录: 这些信息是在DSL还是处理器API中公开的?

  • 问题内容: 在PostgreSQL中,我想根据某些条件选择一行,但是如果没有行与该条件匹配,则我想返回第一行。该表实际上包含一个序数列,因此该任务应该更容易(第一行是序数为0的那一行)。例如: 但是在这种情况下,无法保证匹配记录的顺序,因此我无法对其进行排序。使用单个语句执行此操作的方式是什么? 问题答案: 我想根据某些条件选择一行,但是如果没有行与该条件匹配,我想返回第一行 较短(正确) 你实际