当前位置: 首页 > 知识库问答 >
问题:

如何用交互式查询存储和全局存储实现处理单主题的Kafka流拓扑

扶誉
2023-03-14

我试图实现Kafka流,这将把单一主题流作为全局数据库与互动查询可能。所以我想拥有:

>

  • 记录的全局存储区(GlobalKTable、KeyValueStore)

     KStream<String, TercUnitRecord> recordsStream = topologyBuilder.stream(topicName);
     KTable<String, Long> lastUpdateStore = recordsStream.mapValues(record -> record.getLastUpdate())
                    .selectKey((key, value) -> "lastdate")
                    .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
                    .reduce((maxValue, currValue) -> maxValue.compareTo(currValue) == 1 ? maxValue : currValue,
     Materialized.as("terc-lastupdate"));
    
  • 共有1个答案

    高泳
    2023-03-14

    我为每个任务使用了多个KafkaStreams实例,并且工作正常。

     类似资料:
    • 我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码

    • 我有一点困惑,我想澄清一下。我有东西在做。我想有一个Kafka Streams拓扑,将有五个独立的KStreams从他们各自的主题读取数据,并将这些数据转储到一个大的整体主题。接下来,我将有一个GlobalKTable,它将从这个整体主题中读取并实现一个全局存储,比方说叫做。我想把这个实体化的全局存储作为其他Kafka Streams应用程序的“查找”表。我已经阅读了一些关于使用带有配置的RPC层

    • 我试图编写一个kafka streams应用程序,它进行一些转换,并将数据放入物化键值存储,然后从存储中读取它(我需要查询压缩数据)。一件重要的事情是,我不想启动streams应用程序并公开restendpoint,而是希望:。 编辑: 我已经有多个Kafka主题的数据(并用于不同的场景),我需要每天两次获得所有键的最新值。我可以用典型的交互式查询场景来实现:保持kafka streams应用程序

    • 问题内容: 在kafka流中定义拓扑时,可以添加全局状态存储。它将需要一个源主题以及一个。处理器接收记录,并可以在理论上对其进行转换,然后再将其添加到存储中。但是,在还原的情况下,记录会直接从源主题(更改日志)插入到全局状态存储中,从而跳过了最终在处理器中完成的转换。 StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder,字符串主题,已消耗

    • 我有一个用例,我需要从一个Kafka主题中消费,做一些工作,生成另一个只有一次语义的Kafka主题,并保存到mongo数据库。看完文档后,我想kafka事务和mongo事务可以同步,但它们仍然是两个不同的事务。在下面的场景中,如果mongo提交失败,是否有方法回滚提交到主题并从消费者处重播的kafka记录。