我有以下情况:
如何丢弃[a, null]
?
一个选项是执行innerJoin
,但在update
查询的情况下,这仍然是一个问题。
我们尝试使用事件时间戳进行过滤(即使用最新的时间戳保留事件),但时间戳的唯一性无法保证。
最终目标是能够识别最新的聚合,以便我们可以在查询时过滤出中间结果(在Athena/Presto或某些RDBMS中)。
目前,我发现最好的工作方法是利用输出记录中的Kafka偏移量。
该方法可概括为:
现在,您的输出主题包含同一个键的多条消息,但每条消息的偏移量不同。
在查询期间,现在可以使用子查询为每个键选择最大偏移量。
下面可以看到一个变压器供应商示例
/**
* @param <K> key type
* @param <V> value type
*/
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> {
@Override
public Transformer<K, V, KeyValue<String, String>> get() {
return new OutputTransformer<>();
}
private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
/**
* @param key the key for the record
* @param value the value for the record
*/
@Override
public KeyValue<String, String> transform(K key, V value) {
if (value != null) {
value.setKafkaOffset(context.offset());
}
return new KeyValue<>(key, value);
}
@Override
public KeyValue<String, String> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
// nothing to close
}
}
}
我有两个名为“alarm”和“interprise”的流,它们包含JSON。如果警报器和干预器连接,那么它们将具有相同的钥匙。我想联系他们来检测24小时前没有干预的所有警报。 但这个程序不起作用,结果给我的所有警报就好像24小时前没有干预一样。我重新检查了我的数据集5次,有些警报在警报日期前24小时内进行了干预。 这张图片说明了情况:在此处输入图像描述 因此我需要知道警报之前是否有干预。 程序代码
我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所
我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?
基于apache Kafka文档,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流? 有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接? 在我的例子中,假设我有2个KStream、和我希望能够加入10天的到30天的。
我们正在尝试实现下面描述的用例,我们有我们希望克服的实现问题, 用例, 我们试图通过匹配两个流的消息中的键(JSON)来实现两个Kafka主题之间的KStream连接。此外,我们还应该维护消息序列,因为它是从源代码到达KStream的。 场景是,如果匹配的键还没有到达任何一个流,我们应该停止或重试join,直到预期的键到达其他主题。我们想把不匹配的记录放回KStream,但在这种情况下,序列没有保
我正在尝试执行kstream-kstream之间的内部连接。我注意到,当来自两个KStreams的消息都具有复合键(例如,具有许多属性的java pojo)时,即使用作复合键的pojo都实现了hashCode()和equals(Object o)方法,联接也不起作用。 uniqueidKey.java 当两个KStreams都有带有简单基元键(例如String、int、double)的消息时,内部