{ "current_ts": "2019-12-24 13:16:40.316952",
"primary_keys": ["ID"],
"before": null,
"tokens": {"txid":"3.17.2493",
"csn":"64913009"},
"op_type":"I",
"after": { "CODE":"AAAA41",
"STATUS":"COMPLETED",
"ID":24},
"op_ts":"2019-12-24 13:16:40.316941",
"table":"S_ORDER"}
KTable<Long, String> s_table = builder.stream("s-order-topic", Consumed.with(Serdes.Long(),Serdes.String()))
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
})
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return json.getLong("ID");
}, Grouped.with(Serdes.Long(), Serdes.String()))
.aggregate( ... );
我处理问题正确吗?
(mapValues->只保留“before”/“after”字段。groupBy->将ID作为消息的键。Aggregate->?)
我想出了解决我案子的办法。我实现了KTable,如下所示:
KTable<String, String> s_table = builder.stream("s-order-topic", Consumed.with(Serdes.String(),Serdes.String()))
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
})
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return String.valueOf(json.getLong("ID"));
}, Grouped.with(Serdes.String(), Serdes.String()))
.reduce((prev,newval)->newval);
aggrege
函数不适合这种情况,相反,我使用了reduce
函数。
控制台使用者的输出如下所示:
15 {"CODE":"AAAA17","STATUS":"PENDING","ID":15}
18 {"CODE":"AAAA50","STATUS":"SUBMITTED","ID":18}
4 {"CODE":"AAAA80","STATUS":"SUBMITTED","ID":4}
19 {"CODE":"AAAA83","STATUS":"SUBMITTED","ID":19}
18 {"CODE":"AAAA33","STATUS":"COMPLETED","ID":18}
5 {"CODE":"AAAA38","STATUS":"PENDING","ID":5}
10 {"CODE":"AAAA1","STATUS":"COMPLETED","ID":10}
3 {"CODE":"AAAA68","STATUS":"NOT COMPLETED","ID":3}
9 {"CODE":"AAAA89","STATUS":"PENDING","ID":9}
我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制
我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放 我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。 消费者设置: 生产者设置: 豆接线: 骆驼路线: 但
为了实现Kafka消费者对消息的一次处理,我一次提交一条消息,如下所示 上面的代码将消息的处理异步委托给下面的另一个类。 但是,这仍然不能保证只发送一次,因为如果处理失败,它可能仍会提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里的选项是什么?
我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所
问题内容: 我正在学习Java,但是在和接口上找不到任何好的解释。 当我实现an时,我的Eclipse IDE创建了一个方法。 我可以在没有界面的情况下关闭流。但是,我不明白如何使用接口实现该方法。而且,此接口的目的是什么? 我也想知道:如何检查是否真的关闭? 我正在使用下面的基本代码 问题答案: 在我看来,您对接口不是很熟悉。在您发布的代码中,您无需实现。 您仅需要(或应该)实现,或者如果您将要
SPDY 使用 TLS 的扩展称为 Next Protocol Negotiation (NPN)。在Java 中,我们有两种不同的方式选择的基于 NPN 的协议: 使用 ssl_npn,NPN 的开源 SSL 提供者。 使用通过 Jetty 的 NPN 扩展库。 在这个例子中使用 Jetty 库。如果你想使用 ssl_npn,请参阅https://github.com/benmmurphy/ss