当前位置: 首页 > 知识库问答 >
问题:

Kafka流线程计数应为双倍

安轶
2023-03-14

如果我创建一个简单的拓扑,其中有一个源和一个处理器,那么在控制台中得到的StreamThread将是预期的两倍。

例如,如果我将线程设置为一个,并且有一个分区,我会看到2个流线程。如果我设置为20个线程,并且有20个分区,我会看到40个流线程。

调用stream.start()之后,我看到这20个线程从CREATED转换到Running。

其他20个线程只是在初始化的后期才被创建。看起来StreamsBuilderFactoryBean#start()是在拓扑不包含任何内容的地方调用的。看来我要么需要阻止它被调用,要么删除我的创建过程。不确定首选什么。

共有1个答案

孟品
2023-03-14

结果@enableKafKastReams注释将为您启动kafka流。因为我没有创建拓扑bean,所以没有冲突,因为它是一个空拓扑。

 类似资料:
  • 我对流媒体有一个普遍的问题,但对于问题的范围,让我们限制自己使用Kafka Streams。让我们进一步缩小范围,将我们的问题局限于单词计数,或者可能是一般的计数。假设我有一个由某个键和一个值组成的流,键可以是一个字符串(假设我们可以有很多字符串,除了空字符串,由世界上的任何字符组成),值是一个整数,现在我们正在构建一个单词计数应用程序,如果词汇表中的单词总数是一万亿,我们不能将它们存储在本地缓存

  • 我知道这里之前有人问过这个问题:Kafka流并发? 但这对我来说很奇怪。根据文档(或者我可能遗漏了什么),每个分区都有一个任务,这意味着不同的处理器实例,每个任务由不同的线程执行。但是当我测试它的时候,我看到不同的线程可以得到不同的处理器实例。因此,如果你想在处理器中保持内存状态(老式的方式),你必须锁定? 线程ID:88 ID:c667e669-9023-494b-9345-236777e9df

  • 比如说,KStream拓扑结构很简单:输入主题- 如果有一个应用程序实例使用

  • 我使用的是Kafka流,具有无状态的简单处理器拓扑结构。 我有一个主题,有100个分区,有2台机器,每台机器有50个线程,运行同一个流媒体应用程序,因此最终我将在它们之间进行1-1映射。 主题中的消息已是键控消息。 我有一个逻辑约束,一旦线程连接到一个或多个分区,它应该继续处理这些分区(当然,直到重新启动发生,它会重新洗牌) 我从日志中看到线程反复(重新)加入消费者组。 我的问题,kafka 流

  • 其中一个Kafka流应用程序在Kafka代理和消费者端产生了大量未知生产者ID错误。 流配置如下: 消费者方面的错误: 这背后的原因是什么?

  • 我正在研究我的第一个Kafka流样本: 当我试图运行它时,会出现以下错误: 这是来自“用户”主题的示例消息: 价值: 标题: 关键: 我应该怎么做才能避免这个问题?