当前位置: 首页 > 知识库问答 >
问题:

Kafka流-旧状态聚合

百里修真
2023-03-14

我有一个KStream,其中包含从主题到1的数据,如下所示:

T1-KEY -> {T1}
T2-KEY -> {T2}

和KTable,构造如下:

A1-KEY -> { "A1", "Set": [
                          {"B1", "Rel": "T1"},
                          {"B2", "Rel": "T1"}
                         ]
          } 

..
T1 -> { ["B1", "B2"] }

稍后,主题To2中出现以下消息:

A1-KEY -> { "A1", "Set": [
                          {"B2", "Rel": "T1"}
                         ]
          } 

现在,我希望我的KTable能够反映这些变化,并且看起来像这样:

T1 -> { ["B2"] }

但看起来是这样的:

T1 -> { ["B1", "B2"] }

我想我缩小了范围:显然聚合的初始化器只在第一次调用--之后聚合总是接收最后一个聚合作为最后一个参数,例如。

@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {

}

其中,set aggrege 在第一次调用(通过初始值设定项创建)时为[],但在第二次调用时为[“b1”,“b2”]

有什么想法吗?

编辑2

public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {

    @Override
    public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
        aggregate.add(value);
        return aggregate;
    }
}

编辑3

我不能只是平面映射,因为我必须组合多个Ax元素,例如。

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          },
A2-KEY -> { "A2", "Set": [
                      {"B2", "Rel": "T1"}
                     ]
          },
...
T1 -> { ["B1", "B2"] }
A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          }

我期待的到达

T1 -> { ["B1"] }

..

共有1个答案

羊舌航
2023-03-14

请注意,在聚合器中,您只是将元素添加到聚合集中。使用这种逻辑,您的集合(对于给定的键)永远不会缩小。我认为你在这种情况下把小溪压扁得太厉害了。我建议您不要将其简化为(Tx-KEY key,Bx value)格式,而是使它们始终保留其设置的格式:(Tx-KEY key,set value) 。那么您根本就不需要聚合。为了实现这一点,我建议您转换输入集

"Set": [
     {"B1", "Rel": "T1"},
     {"B2", "Rel": "T1"}
]

进入

T1 -> { ["B1", "B2"] }

通过在KStream flatmap方法调用中使用标准java代码(Collections或Streams api)按“rel”字段分组,这样您就只能在KStream上发出带有set 类型值的消息,而不是单独发出带有bx类型值的消息。

如果您为当前的flatmap实现提供了代码,很乐意详细阐述更多内容。

 类似资料:
  • 我有一个像下面这样的用例。对于每个传入的事件,我希望查看某个字段,看看它的状态是否从a变为B,如果是,则将其发送到输出主题。流程是这样的:一个带有键“xyz”的事件以状态A进入,一段时间后另一个带有键“xyz”的事件以状态B进入。 有没有更好的方法使用DSL来编写这个逻辑? 上面代码中关于聚合创建的状态存储的两个问题。 null 提前道谢!

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将

  • 我试着把我的头缠绕在Kafka的溪流和一些根本的问题,我似乎无法解决,我自己。我理解和Kafka状态存储的概念,但我很难决定如何实现它。我还在使用Spring Cloud Streams,这在此基础上增加了另一个层次的复杂性。 我的用例: 一些有状态规则如下所示: 我当前的实现将所有这些信息存储在内存中,以便能够执行分析。由于显而易见的原因,它不容易扩展。所以我想我会坚持到Kafka的州立商店。

  • 我正在尝试编写一个简单的Kafka Streams应用程序(目标是Kafka 2.2/Confluent 5.2),将一个至少有一次语义的输入主题转换为一个恰好只有一次的输出流。我想对以下逻辑进行编码: 对于具有给定密钥的每条消息: (这是基于我们从上游系统获得的订购保证来保证提供正确的结果;我不想在这里做任何神奇的事情。) 起初,我以为我可以用Kafka Streams操作符来实现这一点,它可以