我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示:
流1:
2 {"CODE":"AAAA96","STATUS":"SUBMITTED","ID":2}
流2:
26 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100926","ID":26}
我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream:
KStream<String, String> s_joined = s_order
.join(s_order_item, (left,right) -> left + right,
JoinWindows.of(Duration.ofSeconds(30)))
.mapValues(value -> {
String[] arrOfstr = value.split("(?<=})");
JSONObject jl = new JSONObject(arrOfstr[0]);
JSONObject jr = new JSONObject(arrOfstr[1]);
JSONObject json = new JSONObject();
Iterator<String> keys = jl.keys();
while(keys.hasNext()) {
String key = keys.next();
json.put(key, jl.get(key));
}
keys = jr.keys();
while(keys.hasNext()) {
String key = keys.next();
json.put(key, jr.get(key));
}
return json.toString();
});
在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。
通过一个例子,我将解释我想做什么:
在窗口内发布以下消息:
流1
9 {"CODE":"AAAA98","STATUS":"CANCELED","ID":"9"}
流2
9 {"DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
加入流
出版的是什么
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
我想出版什么
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
总之,我只想在窗口中发布最新消息,而不是所有消息。这可能吗?
我找到了答案。实现我想做的事情的方法是使用函数抑制
。更详细地说,您可以使用KStream中的group pByKey()
,然后使用一个Windows
函数。最后,聚合分组数据并使用抑制
。
s_joined.toStream()
.groupByKey()
.WindowedBy(...)
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
您可以使用groupByKey
函数返回KGroupedStream
,然后使用map/reduce
函数以所需的方式对其进行转换。请参阅Kafka Streams DSL了解更多信息。
我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?
基于apache Kafka文档,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流? 有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接? 在我的例子中,假设我有2个KStream、和我希望能够加入10天的到30天的。
我正在尝试执行kstream-kstream之间的内部连接。我注意到,当来自两个KStreams的消息都具有复合键(例如,具有许多属性的java pojo)时,即使用作复合键的pojo都实现了hashCode()和equals(Object o)方法,联接也不起作用。 uniqueidKey.java 当两个KStreams都有带有简单基元键(例如String、int、double)的消息时,内部
我有以下情况: 表A和表B使用FK连接 如何丢弃? 一个选项是执行,但在查询的情况下,这仍然是一个问题。 我们尝试使用事件时间戳进行过滤(即使用最新的时间戳保留事件),但时间戳的唯一性无法保证。 最终目标是能够识别最新的聚合,以便我们可以在查询时过滤出中间结果(在Athena/Presto或某些RDBMS中)。
我尝试将KStream中的事件计入时间段: 我有这个例外: 线程“test-app-87ce 164d-c427-4d cf-aa76-aeeb 6 f 8 fc 943-stream thread-1”org . Apache . Kafka . streams . errors . streams Exception中出现异常:在进程中捕获到异常。taskId=0_0,processor =
我们希望基于公共字段(主键)执行Kstream Kstream连接。目前,使用下面的代码,我们得到的结果是只合并了两个流,没有任何主键约束。 您能建议如何根据公共字段/列连接2个流吗。