我有两个名为“alarm”和“interprise”的流,它们包含JSON。如果警报器和干预器连接,那么它们将具有相同的钥匙。我想联系他们来检测24小时前没有干预的所有警报。
但这个程序不起作用,结果给我的所有警报就好像24小时前没有干预一样。我重新检查了我的数据集5次,有些警报在警报日期前24小时内进行了干预。
这张图片说明了情况:在此处输入图像描述
因此我需要知道警报之前是否有干预。
程序代码:
final KStream<String, JsonNode> alarm = ...;
final KStream<String, JsonNode> intervention = ...;
final JoinWindows jw = JoinWindows.of(TimeUnit.HOURS.toMillis(24)).before(TimeUnit.HOURS.toMillis(24)).after(0);
final KStream<String, JsonNode> joinedAI = alarm.filter((String key, JsonNode value) -> {
return value != null;
}).leftJoin(intervention, (JsonNode leftValue, JsonNode rightValue) -> {
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = null;
if (rightValue == null) {//No intervention before
try {
actualObj = mapper.readTree("{\"date\":\"" + leftValue.get("date").asText() + "\","
+ "\"alarm\":" + leftValue.toString()
+ "}");
} catch (IOException ex) {
Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
}
return actualObj;
} else {
return null;
}
}, jw, Joined.with(Serdes.String(), jsonSerde, jsonSerde));
final KStream<String, JsonNode> fraude = joinedAI.filter((String key, JsonNode value) -> {
return value != null;
});
fraude.foreach((key, value) -> {
rl.println("Fraude=" + key + " => " + value);
System.out.println("Fraude=" + key + " => " + value);
});
final KafkaStreams streams = new KafkaStreams(builder.build(), streamingConfig);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close();
rl.close();
el.close();
nfl.close();
}
}));
综上所述,我想在这里检测红色矩形中的模式enter image description
附注:我确保干预记录在报警记录之前发送
M.DJX,
我不认为Kafka Streams中的这个用例现在有一个完美的解决方案,但我有一些想法可以让您更接近。我准备在不久的将来提交一个KIP来解决类似这样的用例。
一点:与KTable不同,KStreams不是变更日志,所以较新的事件不会用相同的键覆盖较旧的事件;它们只是共存于同一条溪流中。我想这就是为什么您的foreach
使它看起来像所有的警报都没有干预;您将看到干预之前的中间连接事件。
例如:
LEFT RIGHT JOIN
a:1 a:(1,null)
a:X a:(1,X)
foreach
将在两个联接结果中被调用,使其看起来缺少正确的值,而实际上只是晚了一点。
如果在结果流上应用时间窗口,您将得到一个更改日志--较新的值将覆盖较旧的值。类似于:
joinedAI
.groupByKey()
.windowedBy(
TimeWindows
.of(1000 * 60 * 60 * 24) // the window will be 24 hours in size
.until(1000 * 60 * 60 * 48) // and we'll keep it in the state store for at least 48 hours
).reduce(
new Reducer<JsonNode>() {
@Override
public Long apply(final JsonNode value1, final JsonNode value2) {
return value2;
}
},
Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("alerts-without-interventions")
);
糟糕的是,这将生成一个具有正确语义的changelog流,但您仍然会看到中间值,因此您也不希望直接从该流触发任何操作(如foreach
)。
您可以做的一件事是每天安排一次作业,从昨天开始扫描windows的“Alerts-without-Intermentations”
。从窗口存储中得到的任何结果都将是该键的最新值。
我准备的KIP将提出一种方法,让您从窗口中筛选出中间结果,这将允许您将foreach附加到changelog,并仅在窗口的最终结果上触发它。
或者,如果应用程序的数据不是太大,如果您不太担心边缘情况,您可以考虑自己使用LinkedHashMap或Guava缓存实现“Windows final Events”语义。
希望这能帮上忙。
我有以下情况: 表A和表B使用FK连接 如何丢弃? 一个选项是执行,但在查询的情况下,这仍然是一个问题。 我们尝试使用事件时间戳进行过滤(即使用最新的时间戳保留事件),但时间戳的唯一性无法保证。 最终目标是能够识别最新的聚合,以便我们可以在查询时过滤出中间结果(在Athena/Presto或某些RDBMS中)。
基于apache Kafka文档,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流? 有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接? 在我的例子中,假设我有2个KStream、和我希望能够加入10天的到30天的。
需要一些关于 KStream/KTable 用法用例的意见/帮助。 场景: 我有两个具有公共关键字requestId的主题。 input_time启动时间 completion_time(Request Id, EndTime) input_time中的数据在时间t1填充,completion_time中的数据在时间tn填充(n是进程完成所需的时间)。 目的通过连接来自主题的数据来比较请求所用的时
我正在尝试执行kstream-kstream之间的内部连接。我注意到,当来自两个KStreams的消息都具有复合键(例如,具有许多属性的java pojo)时,即使用作复合键的pojo都实现了hashCode()和equals(Object o)方法,联接也不起作用。 uniqueidKey.java 当两个KStreams都有带有简单基元键(例如String、int、double)的消息时,内部
我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?
我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所