我在Kafka Streams拓扑工作,有时,在更改应用程序ID和/或clientId属性后,我在特定的kafka流上收到错误:“缺少源主题stream.webshop.products.prices.5持久赋值。返回错误INCOMPLETE_SOURCE_TOPIC_METADATA
”。我已经在每个Kafka节点的server.properties中设置了create.topic=true
属性,但似乎没有创建此流的主题。
这是我的Kafka Streams拓扑:
package ro.orange.eshop.productindexer.domain
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Printed
import ro.orange.digital.avro.Aggregate
import ro.orange.digital.avro.Key
import ro.orange.digital.avro.Price
import ro.orange.digital.avro.StockQuantity
import ro.orange.eshop.productindexer.infrastructure.configuration.kafka.makeStoreProvider
import java.util.concurrent.CompletableFuture
class SaleProductTopology(
private val streamNameRepository: IStreamNameRepository,
private val saleProductMapper: ISaleProductMapper,
private val productRatingMapper: IProductRatingMapper,
private val productStockMapper: IProductStockMapper,
private val lazyKafkaStreams: CompletableFuture<KafkaStreams>
) {
fun streamsBuilder(): StreamsBuilder {
val streamsBuilder = StreamsBuilder()
val productsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputWebshopProductsTopic)
val productPricesStream = streamsBuilder.stream<Key, Price>(streamNameRepository.productsPricesStreamTopic)
val productsRatingsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductRatingsTopic)
val inputProductsStockStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductsStockTopic)
val productsStockStream = inputProductsStockStream
.mapValues(productStockMapper::aStockQuantity)
productsStockStream.to(streamNameRepository.productsStockStreamTopic)
streamsBuilder.globalTable<Key, StockQuantity>(
streamNameRepository.productsStockStreamTopic,
Materialized.`as`(streamNameRepository.productsStockGlobalStoreTopic)
)
val quantityProvider = lazyKafkaStreams.makeStoreProvider<StockQuantity>(streamNameRepository.productsStockGlobalStoreTopic)
val saleProductsTable = productsStream
.groupByKey()
.reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.saleProductsStoreTopic))
.mapValues { aggregate -> saleProductMapper.aSaleProduct(aggregate, quantityProvider) }
saleProductsTable.toStream().print(Printed.toSysOut())
val productPricesTable = productPricesStream
.groupByKey()
.reduce({ _, price -> price }, Materialized.`as`(streamNameRepository.productsPricesStoreTopic))
productPricesTable.toStream().print(Printed.toSysOut())
val productsRatingsTable = productsRatingsStream
.groupByKey()
.reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.productsRatingsStoreTopic))
.mapValues { aggregate -> productRatingMapper.aProductRating(aggregate) }
productsRatingsTable.toStream().print(Printed.toSysOut())
val productsStockTable = productsStockStream
.groupByKey()
.reduce { _, aggregate -> aggregate }
saleProductsTable
.leftJoin(productPricesTable) { saleProduct, price -> saleProductMapper.aPricedSaleProduct(saleProduct, price) }
.leftJoin(productsRatingsTable) { saleProduct, rating -> saleProductMapper.aRatedSaleProduct(saleProduct, rating) }
.leftJoin(productsStockTable) { saleProduct, stockQuantity -> saleProductMapper.aQuantifiedSaleProduct(saleProduct, stockQuantity) }
.mapValues { saleProduct -> AggregateMapper.aSaleProductAggregate(saleProduct) }
.toStream()
.to(streamNameRepository.saleProductsTopic)
return streamsBuilder
}
}
正如@jacek laskowski所写:
KafkaStreams不会创建它,因为它是一个来源
这是出于设计考虑的,因为如果其中一个源主题是自动创建的(它将具有默认的分区数),而第二个主题是由用户提前创建的,那么分区数可能会有所不同。当KStream/KTable连接时,它们必须具有相同的分区数——这是一个至关重要的假设。
用户必须有意识地使用适当数量的分区创建主题(对于流处理线程的数量,这是控制Kafka Streams应用程序性能的方法之一)。
阅读管理流应用程序主题。
问题内容: 我正在开发一个Spring Boot应用程序。我需要在开始时解析XML文件(countries.xml)。问题是我不知道将其放在哪里才能访问它。我的文件夹结构是 我的第一个想法是将其放在src / main / resources中,但是当我尝试创建File(countries.xml)时,我得到了NPE,并且stacktrace显示我的文件在ProjectDirectory中查找(因
我用的是阿帕奇·Kafka。我创建了一个war文件,其中生产者用Java编码,消费者用Scala编码。制作人正在从HTML页面获取数据。我可以看到,生产商发布的大部分数据都是关于消费者的,但有些数据缺失。 这是我的制片人代码 文件1 } 文件2 现在,我使用以下命令检查消费者的消息。 我是否缺少任何生产者配置?
不能再推送到源主,从几天开始,我已经卸载了git和Visual Code并重新安装了它,但什么都没有,我有同样的错误: 我不理解这种行为,因为在我的另一台电脑上一切正常,没有GitHub文件损坏。 下面是我的配置:Ubuntu 18.04.4 LTS 需要帮助吗!
我最近开始在我所有的项目中遇到这个问题。当我的索引页加载其中包含对jQuery源文件的引用时,我的控制台记录这个错误:。 这完全不影响我的应用程序,但是每当我打开控制台时,看到它真的很烦人。有人知道这是从哪里来的吗? 编辑:注意,我没有显式引用.map文件,我只是指向
在构建Kafka Streams拓扑时,可以通过两种不同的方式对多个主题的读取进行建模: 读取具有相同源节点的所有主题。 选项1相对于选项2是否有相对优势,反之亦然?所有主题都包含相同类型的数据,并具有相同的数据处理逻辑。
我有一个SpringMVC 4。在应用程序上下文XML文件中定义了多个静态资源的x应用程序,如下所示: 这些文件分别存储在“/webapp/static/”目录和“/resources/static/”目录中(请参见映射,分别为和。在“js”和“css”子目录中有一些文件可以访问,但我最近添加的文件在尝试通过Netbeans控制的本地Tomcat服务器访问时返回404个错误。 我尝试了对sprin