当前位置: 首页 > 知识库问答 >
问题:

如何在Kafka流应用程序中迭代GlobalKTable或KTable

任长卿
2023-03-14

我有一个Kafka流应用程序,有两个数据源:事件和用户。

我有4个主题:事件,用户,用户2,用户事件

Users2与Users相同,用于演示GlobalKTable。

Events主题使用时间戳模式,因此当到达时间戳字段日期时,KStream将接收事件记录。

此时,我想为用户KTable中的每个用户ID以及新的事件ID创建一个用户事件记录;但我不知道如何遍历GlobalKTable或KTable来实现这一点。

public Topology createTopology() {

    final Serde<String> serde = Serdes.String();
    final StreamsBuilder builder = new StreamsBuilder();

    final GlobalKTable<String, String> gktUsers =
            builder.globalTable("Users",
                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("user-store")
                            .withKeySerde(Serdes.String()).withValueSerde(serde));

    final KTable<String, String> ktUsers = builder.table("Users2");

    builder.stream("Events", Consumed.with(Serdes.String(), serde))
            .peek((k, v) -> {
                // This is called when a new Event record becomes current.
                // How do I iterate through gktUsers at this point
                // and output a User-ID and an Event-ID to the User-Events topic?

                //  This type of iteration doesn't work either.
                ktUsers.toStream().foreach(new ForeachAction<String, String>() {
                    @Override
                    public void apply(String s, String s2) {
                        log.info("{} {}", s, s2);
                    }
                });
            })
            
    builder.build();
}

共有1个答案

闻人越
2023-03-14

你需要让状态存储迭代

ReadOnlyKeyValueStore<String, String> keyValueStore = streams.store("user-store", QueryableStoreTypes.keyValueStore()));

String value = store.get("key");

KeyValueIterator<String, String> range = keyValueStore.all(); 

https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html

或者,也许您应该在用户和事件之间进行流表连接

 类似资料:
  • 我有一个真正的env与3个kafka机器集群,它正在接收大量数据。对于每个主题,有 25 个分区,复制因子设置为 2。 我的应用程序(基于kafka流的应用程序)从这个kafka集群获取数据的应用程序停机了一个多月。现在,每个分区都有大量滞后;高达90000000。 我知道以下参数: 我有两个消费者节点(使用kafka集群数据的相同组id)。 然而,它并没有赶上滞后,而是保持不变。有人能建议如何改

  • 在Java8中,类没有任何方法来包装。 相反,我将从获取,然后从获取一个,如下所示:

  • 我想用kafka流实现请求-响应模式,我使用spring boot kafka,其中添加了一些数据作为报头,命名为关联id,但是当kafka流API处理请求消息时,报头数据会丢失,无法发送到响应主题!我该怎么解决,还是用另一种方法??

  • 我想知道用Kafka Streams执行这种操作的最佳方法是什么。 我有一个 Kafka 流和一个 KGlobal 表,让我们说产品 (1.000.000 消息) 和类别逻辑表 (10 msg)。每当新消息到达主题类别LogicBlobTable时,我需要重新处理所有将新到达的消息应用于产品的产品,并且输出将转到第三个主题。 我在考虑使用Kafka . tools . streams reset逻

  • 我得到了同样的错误。这有可能吗?我真的更希望不执行stream.foreach()并传递一个消费者。 编辑:对我来说,不要复制元素是很重要的。

  • 嗨,我正在探索关于azure active Directory的一些身份验证和授权流。我以前在单页应用程序中使用誓言隐式流。在阅读microsoft文档之后,我已经理解了关于隐式流的以下内容。 隐式流: 单页javacript应用程序使用隐式流从azure active Directory获得获取访问令牌。它直接调用令牌endpoint来获取令牌,因此这使得隐式流不太安全。 NET Web应用程序