当前位置: 首页 > 知识库问答 >
问题:

删除KStream-KStream连接在Kafka Streams中的重复中间结果

邢华清
2023-03-14

我有以下情况:

  1. 表A和表B使用FK连接

如何丢弃[a, null]

一个选项是执行innerJoin,但在update查询的情况下,这仍然是一个问题。

我们尝试使用事件时间戳进行过滤(即使用最新的时间戳保留事件),但时间戳的唯一性无法保证。

最终目标是能够识别最新的聚合,以便我们可以在查询时过滤出中间结果(在Athena/Presto或某些RDBMS中)。

共有1个答案

游乐池
2023-03-14

目前,我发现最好的工作方法是利用输出记录中的Kafka偏移量。

该方法可概括为:

  1. 执行您想要执行的所有逻辑,而不必担心同一密钥的多个记录

现在,您的输出主题包含同一个键的多条消息,但每条消息的偏移量不同。

在查询期间,现在可以使用子查询为每个键选择最大偏移量。

下面可以看到一个变压器供应商示例

/**
 * @param <K> key type
 * @param <V> value type
 */
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> {
  @Override
  public Transformer<K, V, KeyValue<String, String>> get() {
    return new OutputTransformer<>();
  }

  private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
      this.context = context;
    }

    /**
     * @param key   the key for the record
     * @param value the value for the record
     */
    @Override
    public KeyValue<String, String> transform(K key, V value) {
      if (value != null) {
        value.setKafkaOffset(context.offset());
      }
      return new KeyValue<>(key, value);
    }

    @Override
    public KeyValue<String, String> punctuate(long timestamp) {
      return null;
    }

    @Override
    public void close() {
      // nothing to close
    }
  }
}
 类似资料:
  • 我有两个名为“alarm”和“interprise”的流,它们包含JSON。如果警报器和干预器连接,那么它们将具有相同的钥匙。我想联系他们来检测24小时前没有干预的所有警报。 但这个程序不起作用,结果给我的所有警报就好像24小时前没有干预一样。我重新检查了我的数据集5次,有些警报在警报日期前24小时内进行了干预。 这张图片说明了情况:在此处输入图像描述 因此我需要知道警报之前是否有干预。 程序代码

  • 我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所

  • 我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?

  • 基于apache Kafka文档,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流? 有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接? 在我的例子中,假设我有2个KStream、和我希望能够加入10天的到30天的。

  • 我们正在尝试实现下面描述的用例,我们有我们希望克服的实现问题, 用例, 我们试图通过匹配两个流的消息中的键(JSON)来实现两个Kafka主题之间的KStream连接。此外,我们还应该维护消息序列,因为它是从源代码到达KStream的。 场景是,如果匹配的键还没有到达任何一个流,我们应该停止或重试join,直到预期的键到达其他主题。我们想把不匹配的记录放回KStream,但在这种情况下,序列没有保

  • 我正在尝试执行kstream-kstream之间的内部连接。我注意到,当来自两个KStreams的消息都具有复合键(例如,具有许多属性的java pojo)时,即使用作复合键的pojo都实现了hashCode()和equals(Object o)方法,联接也不起作用。 uniqueidKey.java 当两个KStreams都有带有简单基元键(例如String、int、double)的消息时,内部