当前位置: 首页 > 知识库问答 >
问题:

春云流生产消费同题

容阳焱
2023-03-14
java.lang.IllegalStateException: The number of expected partitions was: 100, but 3 have been found instead
                at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner$2.doWithRetry(KafkaTopicProvisioner.java:260) ~[spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar!/:1.2.1.RELEASE]
                at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner$2.doWithRetry(KafkaTopicProvisioner.java:246) ~[spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar!/:1.2.1.RELEASE]
                at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:286) ~[spring-retry-1.2.0.RELEASE.jar!/:na]
                at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:163) ~[spring-retry-1.2.0.RELEASE.jar!/:na]
                at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:246) ~[spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar!/:1.2.1.RELEASE]
                at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:149) [spring-cloud-stream-binder-kafka-1.2.1.RELEASE.jar!/:1.2.1.RELEASE]
                at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:88) [spring-cloud-stream-binder-kafka-1.2.1.RELEASE.jar!/:1.2.1.RELEASE]
                at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:112) [spring-cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE]
                at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:57) [spring-cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE]
                at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152) [spring-cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE]
                at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:124) [spring-cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE]
                at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:238) [spring-cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE]
                at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:57) [spring-cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE]
                at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:175) [spring-context-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
                at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50) [spring-context-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
                at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:348) [spring-context-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
                at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:151) [spring-context-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
                at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114) [spring-context-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: kafka
          defaultBrokerPort: 9092
          zkNodes: zookeeper
          defaultZkPort: 2181
          minPartitionCount: 2
          replicationFactor: 1
          autoCreateTopics: true
          autoAddPartitions: true
          headers: type,message_id
          requiredAcks: 1
          configuration:
            "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
        bindings:
          test-updater-input:
            consumer:
              autoRebalanceEnabled: true
              autoCommitOnError: true
              enableDlq: true
          test-updater-output: 
            producer:
              sync: true
              configuration:
                retries: 0
          tenant-updater-output: 
            producer:
              sync: true
              configuration:
                retries: 100
      default:
        binder: kafka
        contentType: application/json
        group: test-adapter
        consumer:
          maxAttempts: 1       
      bindings:
        test-updater-input: 
          destination: test-tenant-update
          consumer:
            concurrency: 3
            partitioned: true
        test-updater-output: 
          destination: test-tenant-update
          producer:
            partitionCount: 100
        tenant-updater-output:
          destination: tenant-entity-update
          producer:
            partitionCount: 100

我试图改变生产者和消费者配置的顺序,但没有帮助。

编辑:我已经添加了完整的application.yml。当我第一次引导服务时,这个主题在Kafka中是不存在的。
它感觉在生产者和消费者配置之间有冲突,我认为它说有3个分区的原因是消费者中的并发性是3,所以它首先创建有3个分区的主题,然后当它移动到生产者配置时,它不调整分区计数。

共有1个答案

田化
2023-03-14

预期的分区数为:100,但已找到3个

本主题的分区不足,无法进行配置。

PartitionCount:100

 类似资料:
  • 我正试图按照GitHub的建议设置测试 其中StreamProcessor设置为 -->line从不使用在我看来应该在主题“output”上的消息,因为@StreamProcessor有@Sendto(“output”) 我希望能够测试流处理的消息。

  • 我的使用者绑定到匿名使用者组,而不是我指定的使用者组。 我的春靴应用 我的输入输出通道接口 我的控制台日志-- :在3.233秒内启动ConsumerApplication(JVM运行于4.004):[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]发现组协调器Singh:9092(ID:2

  • 我有一个服务,它从不同的Spring云流通道(绑定到EventHub/Kafka主题)生成和使用消息。有几种设置类似的服务。 配置如下所示 生产者/发布者代码如下所示 类似地,我还有多个其他发布者发布到不同的活动中心/主题。请注意,每个已发布的消息都有一个租户id标头。这是我的多租户应用程序特定于跟踪租户上下文的内容。还请注意,在发送消息时,我正在获取要发布到的频道。 我的消费者代码如下所示 同样

  • 问题内容: 我对于如何使用特定的生产者-消费者模式感到困惑,在该模式中,生产者和消费者都可以同时并独立地进行操作。 首先,考虑以下示例,该示例紧随docs中的示例: 关于此脚本,有一个更详细的细节:通过常规的for循环将项目同步放入队列。 我的目标是创建一个使用(或)和的脚本。两者都应安排为同时运行。没有一个消费者协程明确地与生产者绑定或链接。 我如何修改上面的程序,以便生产者是可以与消费者/工人

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 我有两个线程的问题,似乎没有正确同步。我基本上有一个布尔值名为“已占用”。当没有线程启动时,它被设置为false。但是当一个线程启动时,线程集被占用是真的,我有一个类,它有线程(run),它们调用下面的函数。 这是一个模拟银行的示例,它接收一个金额(初始余额),然后随机执行取款和存款。我的教授提到了一些关于从取款线程到存款线程的信号?这是怎么回事?在提取线程中,它应该运行到余额为2低,并等待存款线