当前位置: 首页 > 知识库问答 >
问题:

Kafka-Streams加入2个主题与JSON值背压机制?

解念
2023-03-14

我正在学习Kafka流,并尝试实现以下几点:

创建了2个Kafka主题(比如topic1、topic2),键为null,值为JSONString。来自topic1的数据(无重复项)在Topic2中有多个匹配项。即。topic1具有一些主流数据,当与Topic2连接时可以生成新的多数据流。

示例

topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}

我尝试了一段带有5分钟窗口的KStream代码,但看起来我无法在流中保存topic1数据。

请帮我做出正确的选择并加入。我正在使用来自Confluent和Docker实例的Kafka。

public void run() {
        final StreamsBuilder builder = new StreamsBuilder();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
        final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);

        // Hold data from this topic to 30 days
        KStream<String, JsonNode> cs = builder.stream("topic1", consumed);
        cs.foreach((k,v) -> {
            System.out.println( k + " --->" + v);
        });

        // Data is involved in one time process.
        KStream<String, JsonNode> css = builder.stream("topic2", consumed);
        css.foreach((k,v) -> {
            System.out.println( k + " --->" + v);
        });

        KStream<String, JsonNode> resultStream = cs.leftJoin(css,
                valueJoiner,
                JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
                Joined.with(
                        Serdes.String(), /* key */
                        jsonSerde,       /* left value */
                        jsonSerde)       /* right value */
        );

        resultStream.foreach((k, v) -> {
            System.out.println("JOIN STREAM: KEY="+k+ ", VALUE=" + v);
        });

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();
    }

共有1个答案

段干开宇
2023-03-14

Kafka中的连接总是基于键(*)。因此,要使任何连接工作,您需要在执行实际连接之前将要连接的字段提取到键中(唯一的部分例外是KStream-GlobalKTable连接)。在您的代码示例中,您不会得到任何结果,因为所有记录都有一个null键,因此无法联接。

对于连接本身,对于您的用例来说,KStream-KTable连接似乎是正确的选择。要使此操作成功,您需要:

  1. topic1正确设置联接键,并将数据写入一个附加主题(我们称之为topic1keyed)
  2. topic1keyed作为表读取
  3. topic2
  4. 正确设置联接键
  5. topic2Ktable
  6. 连接

有关join语义的全部详细信息,请参阅这篇博文:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

(*)更新:

 类似资料:
  • 我在《掌握Kafka Streams and ksqlDB》一书中遇到了以下两个短语,作者使用了两个术语,它们的真正含义是“压缩主题”和“未压缩主题” 他们对“日志压缩”有什么看法吗? 表可以被认为是对数据库的更新。在日志的这种视图中,只保留每个键的当前状态(给定键的最新记录或某种聚合)。表通常是从压缩的主题构建的。 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从

  • 从Kafka Streams 2.5.0开始,拓扑似乎必须包含一个输入主题。在Kafka2.4.1(以及更早的版本)中,情况并非如此。 我有一个应用程序,其中的拓扑只是创建一些全局状态存储,从其他应用程序写入的主题中读取数据。 使用Kafka 2.5.0,我得到以下错误: 如果添加一个虚拟输入主题(例如,通过),应用程序启动良好。 这种行为是意料之中的,还是Kafka Streams 2.5.0中

  • 我在一个输入主题上构建KTable,并且在两个Kafka Stream应用程序实例上加入KStream。 KTable的输入主题已经是一个日志压缩主题。因此,当我的一个应用程序实例关闭时,通过读取input log compacted主题,另一个实例状态存储似乎会用整个状态刷新。 所以不需要为我的KTable存储启用日志记录(更改日志)? 我的源输入日志压缩主题可能有数百万条记录,所以如果我在KT

  • 我们有一个Akka应用程序,它使用Kafka主题,并将收到的消息发送给Akka参与者。我不确定我编程的方式是否充分利用了Akka Streams内置背压机制的所有优点。 以下是我的配置。。。 这做了我所期望的商业案例,myActor收到命令更新(MyAvro) 我更讨厌背压的技术概念,据我所知,背压机制部分由水槽控制,但在这种水流配置中,我的水槽只是“水槽”。“忽略”。所以我的水槽可以缓解背压。

  • 我收到了一个数据库更改流,这些更改最终形成了一个压缩的主题。流基本上是键/值对,并且键空间很大(~4 GB)。 这个主题由一个kafka流进程使用,该进程将数据存储在RockDB中(每个消费者/碎片单独使用)。处理器做两件不同的事情: 将数据连接到另一个流中。 检查来自主题的邮件是新密钥还是对现有密钥的更新。如果是更新,则将旧的键/值和新的键/值对发送到不同的主题(更新很少)。 null

  • 我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它