当前位置: 首页 > 知识库问答 >
问题:

Kafka流-缺少源主题

微生智刚
2023-03-14

我在Kafka Streams拓扑工作,有时,在更改应用程序ID和/或clientId属性后,我在特定的kafka流上收到错误:“缺少源主题stream.webshop.products.prices.5持久赋值。返回错误INCOMPLETE_SOURCE_TOPIC_METADATA”。我已经在每个Kafka节点的server.properties中设置了create.topic=true属性,但似乎没有创建此流的主题。

这是我的Kafka Streams拓扑:

    package ro.orange.eshop.productindexer.domain

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Printed
import ro.orange.digital.avro.Aggregate
import ro.orange.digital.avro.Key
import ro.orange.digital.avro.Price
import ro.orange.digital.avro.StockQuantity
import ro.orange.eshop.productindexer.infrastructure.configuration.kafka.makeStoreProvider
import java.util.concurrent.CompletableFuture

class SaleProductTopology(
        private val streamNameRepository: IStreamNameRepository,
        private val saleProductMapper: ISaleProductMapper,
        private val productRatingMapper: IProductRatingMapper,
        private val productStockMapper: IProductStockMapper,
        private val lazyKafkaStreams: CompletableFuture<KafkaStreams>
) {
    fun streamsBuilder(): StreamsBuilder {
        val streamsBuilder = StreamsBuilder()
        val productsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputWebshopProductsTopic)
        val productPricesStream = streamsBuilder.stream<Key, Price>(streamNameRepository.productsPricesStreamTopic)
        val productsRatingsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductRatingsTopic)
        val inputProductsStockStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductsStockTopic)

        val productsStockStream = inputProductsStockStream
                .mapValues(productStockMapper::aStockQuantity)
        productsStockStream.to(streamNameRepository.productsStockStreamTopic)

        streamsBuilder.globalTable<Key, StockQuantity>(
                streamNameRepository.productsStockStreamTopic,
                Materialized.`as`(streamNameRepository.productsStockGlobalStoreTopic)
        )

        val quantityProvider = lazyKafkaStreams.makeStoreProvider<StockQuantity>(streamNameRepository.productsStockGlobalStoreTopic)

        val saleProductsTable = productsStream
                .groupByKey()
                .reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.saleProductsStoreTopic))
                .mapValues { aggregate -> saleProductMapper.aSaleProduct(aggregate, quantityProvider) }

        saleProductsTable.toStream().print(Printed.toSysOut())

        val productPricesTable = productPricesStream
                .groupByKey()
                .reduce({ _, price -> price }, Materialized.`as`(streamNameRepository.productsPricesStoreTopic))

        productPricesTable.toStream().print(Printed.toSysOut())

        val productsRatingsTable = productsRatingsStream
                .groupByKey()
                .reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.productsRatingsStoreTopic))
                .mapValues { aggregate -> productRatingMapper.aProductRating(aggregate) }

        productsRatingsTable.toStream().print(Printed.toSysOut())

        val productsStockTable = productsStockStream
                .groupByKey()
                .reduce { _, aggregate -> aggregate }

        saleProductsTable
                .leftJoin(productPricesTable) { saleProduct, price -> saleProductMapper.aPricedSaleProduct(saleProduct, price) }
                .leftJoin(productsRatingsTable) { saleProduct, rating -> saleProductMapper.aRatedSaleProduct(saleProduct, rating) }
                .leftJoin(productsStockTable) { saleProduct, stockQuantity -> saleProductMapper.aQuantifiedSaleProduct(saleProduct, stockQuantity) }
                .mapValues { saleProduct -> AggregateMapper.aSaleProductAggregate(saleProduct) }
                .toStream()
                .to(streamNameRepository.saleProductsTopic)

        return streamsBuilder
    }
}

共有1个答案

唐高卓
2023-03-14

正如@jacek laskowski所写:

KafkaStreams不会创建它,因为它是一个来源

这是出于设计考虑的,因为如果其中一个源主题是自动创建的(它将具有默认的分区数),而第二个主题是由用户提前创建的,那么分区数可能会有所不同。当KStream/KTable连接时,它们必须具有相同的分区数——这是一个至关重要的假设。

用户必须有意识地使用适当数量的分区创建主题(对于流处理线程的数量,这是控制Kafka Streams应用程序性能的方法之一)。

阅读管理流应用程序主题。

 类似资料:
  • 问题内容: 我正在开发一个Spring Boot应用程序。我需要在开始时解析XML文件(countries.xml)。问题是我不知道将其放在哪里才能访问它。我的文件夹结构是 我的第一个想法是将其放在src / main / resources中,但是当我尝试创建File(countries.xml)时,我得到了NPE,并且stacktrace显示我的文件在ProjectDirectory中查找(因

  • 我用的是阿帕奇·Kafka。我创建了一个war文件,其中生产者用Java编码,消费者用Scala编码。制作人正在从HTML页面获取数据。我可以看到,生产商发布的大部分数据都是关于消费者的,但有些数据缺失。 这是我的制片人代码 文件1 } 文件2 现在,我使用以下命令检查消费者的消息。 我是否缺少任何生产者配置?

  • 不能再推送到源主,从几天开始,我已经卸载了git和Visual Code并重新安装了它,但什么都没有,我有同样的错误: 我不理解这种行为,因为在我的另一台电脑上一切正常,没有GitHub文件损坏。 下面是我的配置:Ubuntu 18.04.4 LTS 需要帮助吗!

  • 我最近开始在我所有的项目中遇到这个问题。当我的索引页加载其中包含对jQuery源文件的引用时,我的控制台记录这个错误:。 这完全不影响我的应用程序,但是每当我打开控制台时,看到它真的很烦人。有人知道这是从哪里来的吗? 编辑:注意,我没有显式引用.map文件,我只是指向

  • 在构建Kafka Streams拓扑时,可以通过两种不同的方式对多个主题的读取进行建模: 读取具有相同源节点的所有主题。 选项1相对于选项2是否有相对优势,反之亦然?所有主题都包含相同类型的数据,并具有相同的数据处理逻辑。

  • 我有一个SpringMVC 4。在应用程序上下文XML文件中定义了多个静态资源的x应用程序,如下所示: 这些文件分别存储在“/webapp/static/”目录和“/resources/static/”目录中(请参见映射,分别为和。在“js”和“css”子目录中有一些文件可以访问,但我最近添加的文件在尝试通过Netbeans控制的本地Tomcat服务器访问时返回404个错误。 我尝试了对sprin