当前位置: 首页 > 知识库问答 >
问题:

不正确的结果Kstream-Kstream连接具有不对称的时间窗口

周培
2023-03-14

我有两个名为“alarm”和“interprise”的流,它们包含JSON。如果警报器和干预器连接,那么它们将具有相同的钥匙。我想联系他们来检测24小时前没有干预的所有警报。
但这个程序不起作用,结果给我的所有警报就好像24小时前没有干预一样。我重新检查了我的数据集5次,有些警报在警报日期前24小时内进行了干预。
这张图片说明了情况:在此处输入图像描述
因此我需要知道警报之前是否有干预。
程序代码:

    final KStream<String, JsonNode> alarm = ...;

    final KStream<String, JsonNode> intervention = ...;

    final JoinWindows jw = JoinWindows.of(TimeUnit.HOURS.toMillis(24)).before(TimeUnit.HOURS.toMillis(24)).after(0);

    final KStream<String, JsonNode> joinedAI = alarm.filter((String key, JsonNode value) -> {
        return value != null;
    }).leftJoin(intervention, (JsonNode leftValue, JsonNode rightValue) -> {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode actualObj = null;

        if (rightValue == null) {//No intervention before
            try {
                actualObj = mapper.readTree("{\"date\":\"" + leftValue.get("date").asText() + "\","
                        + "\"alarm\":" + leftValue.toString()
                        + "}");
            } catch (IOException ex) {
                Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
            }
            return actualObj;
        } else {
            return null;
        }
    }, jw, Joined.with(Serdes.String(), jsonSerde, jsonSerde));

    final KStream<String, JsonNode> fraude = joinedAI.filter((String key, JsonNode value) -> {
        return value != null;
    });

    fraude.foreach((key, value) -> {
        rl.println("Fraude=" + key + " => " + value);
        System.out.println("Fraude=" + key + " => " + value);
    });

    final KafkaStreams streams = new KafkaStreams(builder.build(), streamingConfig);

    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            streams.close();
            rl.close();
            el.close();
            nfl.close();
        }
    }));

综上所述,我想在这里检测红色矩形中的模式enter image description

附注:我确保干预记录在报警记录之前发送

共有1个答案

晏兴发
2023-03-14

M.DJX,

我不认为Kafka Streams中的这个用例现在有一个完美的解决方案,但我有一些想法可以让您更接近。我准备在不久的将来提交一个KIP来解决类似这样的用例。

一点:与KTable不同,KStreams不是变更日志,所以较新的事件不会用相同的键覆盖较旧的事件;它们只是共存于同一条溪流中。我想这就是为什么您的foreach使它看起来像所有的警报都没有干预;您将看到干预之前的中间连接事件。

例如:

LEFT   RIGHT    JOIN
a:1             a:(1,null)
       a:X      a:(1,X)

foreach将在两个联接结果中被调用,使其看起来缺少正确的值,而实际上只是晚了一点。

如果在结果流上应用时间窗口,您将得到一个更改日志--较新的值将覆盖较旧的值。类似于:

joinedAI
  .groupByKey()
  .windowedBy(
      TimeWindows
          .of(1000 * 60 * 60 * 24) // the window will be 24 hours in size
          .until(1000 * 60 * 60 * 48) // and we'll keep it in the state store for at least 48 hours
  ).reduce(
      new Reducer<JsonNode>() {
          @Override
          public Long apply(final JsonNode value1, final JsonNode value2) {
              return value2;
          }
      },
      Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("alerts-without-interventions")
  );

糟糕的是,这将生成一个具有正确语义的changelog流,但您仍然会看到中间值,因此您也不希望直接从该流触发任何操作(如foreach)。

您可以做的一件事是每天安排一次作业,从昨天开始扫描windows的“Alerts-without-Intermentations”。从窗口存储中得到的任何结果都将是该键的最新值。

我准备的KIP将提出一种方法,让您从窗口中筛选出中间结果,这将允许您将foreach附加到changelog,并仅在窗口的最终结果上触发它。

或者,如果应用程序的数据不是太大,如果您不太担心边缘情况,您可以考虑自己使用LinkedHashMap或Guava缓存实现“Windows final Events”语义。

希望这能帮上忙。

 类似资料:
  • 我有以下情况: 表A和表B使用FK连接 如何丢弃? 一个选项是执行,但在查询的情况下,这仍然是一个问题。 我们尝试使用事件时间戳进行过滤(即使用最新的时间戳保留事件),但时间戳的唯一性无法保证。 最终目标是能够识别最新的聚合,以便我们可以在查询时过滤出中间结果(在Athena/Presto或某些RDBMS中)。

  • 基于apache Kafka文档,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流? 有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接? 在我的例子中,假设我有2个KStream、和我希望能够加入10天的到30天的。

  • 需要一些关于 KStream/KTable 用法用例的意见/帮助。 场景: 我有两个具有公共关键字requestId的主题。 input_time启动时间 completion_time(Request Id, EndTime) input_time中的数据在时间t1填充,completion_time中的数据在时间tn填充(n是进程完成所需的时间)。 目的通过连接来自主题的数据来比较请求所用的时

  • 我正在尝试执行kstream-kstream之间的内部连接。我注意到,当来自两个KStreams的消息都具有复合键(例如,具有许多属性的java pojo)时,即使用作复合键的pojo都实现了hashCode()和equals(Object o)方法,联接也不起作用。 uniqueidKey.java 当两个KStreams都有带有简单基元键(例如String、int、double)的消息时,内部

  • 我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?

  • 我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所