当前位置: 首页 > 知识库问答 >
问题:

ApacheKafka-KStream与KStream连接最新消息

慕仲渊
2023-03-14

我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示:

流1:

2    {"CODE":"AAAA96","STATUS":"SUBMITTED","ID":2}

流2:

26   {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100926","ID":26}

我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream:

KStream<String, String> s_joined = s_order
        .join(s_order_item, (left,right) -> left + right,
                JoinWindows.of(Duration.ofSeconds(30)))
        .mapValues(value -> {
            String[] arrOfstr = value.split("(?<=})");
            JSONObject jl = new JSONObject(arrOfstr[0]);
            JSONObject jr = new JSONObject(arrOfstr[1]);
            JSONObject json = new JSONObject();
            Iterator<String> keys = jl.keys();
            while(keys.hasNext()) {
                String key = keys.next();
                json.put(key, jl.get(key));
            }
            keys = jr.keys();
            while(keys.hasNext()) {
                String key = keys.next();
                json.put(key, jr.get(key));
            }
            return json.toString();
        });

在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。

通过一个例子,我将解释我想做什么:

在窗口内发布以下消息:

流1

9 {"CODE":"AAAA98","STATUS":"CANCELED","ID":"9"}

流2

9 {"DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

加入流

出版的是什么

9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

我想出版什么

9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

总之,我只想在窗口中发布最新消息,而不是所有消息。这可能吗?

共有2个答案

瞿和硕
2023-03-14

我找到了答案。实现我想做的事情的方法是使用函数抑制。更详细地说,您可以使用KStream中的group pByKey(),然后使用一个Windows函数。最后,聚合分组数据并使用抑制

s_joined.toStream()
        .groupByKey()
        .WindowedBy(...)
        .aggregate(...)
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

柏麒
2023-03-14

您可以使用groupByKey函数返回KGroupedStream,然后使用map/reduce函数以所需的方式对其进行转换。请参阅Kafka Streams DSL了解更多信息。

 类似资料:
  • 我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?

  • 基于apache Kafka文档,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流? 有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接? 在我的例子中,假设我有2个KStream、和我希望能够加入10天的到30天的。

  • 我正在尝试执行kstream-kstream之间的内部连接。我注意到,当来自两个KStreams的消息都具有复合键(例如,具有许多属性的java pojo)时,即使用作复合键的pojo都实现了hashCode()和equals(Object o)方法,联接也不起作用。 uniqueidKey.java 当两个KStreams都有带有简单基元键(例如String、int、double)的消息时,内部

  • 我有以下情况: 表A和表B使用FK连接 如何丢弃? 一个选项是执行,但在查询的情况下,这仍然是一个问题。 我们尝试使用事件时间戳进行过滤(即使用最新的时间戳保留事件),但时间戳的唯一性无法保证。 最终目标是能够识别最新的聚合,以便我们可以在查询时过滤出中间结果(在Athena/Presto或某些RDBMS中)。

  • 我尝试将KStream中的事件计入时间段: 我有这个例外: 线程“test-app-87ce 164d-c427-4d cf-aa76-aeeb 6 f 8 fc 943-stream thread-1”org . Apache . Kafka . streams . errors . streams Exception中出现异常:在进程中捕获到异常。taskId=0_0,processor =

  • 我们希望基于公共字段(主键)执行Kstream Kstream连接。目前,使用下面的代码,我们得到的结果是只合并了两个流,没有任何主键约束。 您能建议如何根据公共字段/列连接2个流吗。