当前位置: 首页 > 知识库问答 >
问题:

Camel-Kafka ConsumerCount从定义值降至1(默认值)

蓬兴国
2023-03-14

CamelConsumer配置

kafka.consumersCount=15
kafka.consumerStreams=15

我从日志中看到,当使用者启动时,有15个使用者线程(假设在1个节点上),这是很好的配置。

INFO  Camel (camel-1) thread #2 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to topic XYZ
INFO  Camel (camel-1) thread #3 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to topic XYZ
INFO  Camel (camel-1) thread #4 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to topic XYZ
INFO  Camel (camel-1) thread #5 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to topic XYZ
INFO  Camel (camel-1) thread #6 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to topic XYZ
INFO  Camel (camel-1) thread #7 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to topic XYZ
INFO  Camel (camel-1) thread #8 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to topic XYZ
INFO  Camel (camel-1) thread #9 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to topic XYZ
INFO  Camel (camel-1) thread #10 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to topic XYZ
INFO  Camel (camel-1) thread #11 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to topic XYZ
INFO  Camel (camel-1) thread #12 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to topic XYZ
INFO  Camel (camel-1) thread #13 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to topic XYZ
INFO  Camel (camel-1) thread #14 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to topic XYZ
INFO  Camel (camel-1) thread #15 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to topic XYZ
INFO  Camel (camel-1) thread #16 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to topic XYZ

如果服务器不可用时由于网络问题或任何其他情况而停止响应。所有的kafka消费者都开始退订,这又是一个预期的行为(到目前为止还不错)

INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [ListOfDefinedServers]
check.crcs = true
client.id = 
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = XYZ-GroupId-12345
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 5000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = HTTPS
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = cert.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = cert.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 0 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 1 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 2 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 3 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 4 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 5 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 6 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 7 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 8 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 9 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 10 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 11 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 12 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 13 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to topic XYZ

INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Discovered group coordinator servername (id: 2147482644 rack: null)
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Revoking previously assigned partitions []
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:336)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] (Re-)joining group
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Successfully joined group with generation 1
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Setting newly assigned partitions [XYZ-17, XYZ-19, XYZ-13, XYZ-15, XYZ-25, XYZ-27, XYZ-21, XYZ-23, XYZ-1, XYZ-3, XYZ-28, XYZ-9, XYZ-11, XYZ-5, XYZ-7, XYZ-16, XYZ-18, XYZ-12, XYZ-14, XYZ-24, XYZ-26, XYZ-20, XYZ-22, XYZ-0, XYZ-2, XYZ-29, XYZ-8, XYZ-10, XYZ-4, XYZ-6]

共有1个答案

冷宏茂
2023-03-14

我们遇到了同样的问题。

设置如下:

  • spring-boot:2.2.x
  • 骆驼版本:2.24.1
  • 主题有3个分区。
    • kafka.consumerscount=3(每个分区1个)
    • kafka.consumersstreams=10[默认值]
      null
      null
      doStart(){
          // create executor with worker thread count = consumerStreams 
          for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
               KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + "", getProps());
               // pre-initialize task during startup so if there is any error we have it thrown asap
               task.preInit();
               executor.submit(task);  
               tasks.add(task);
          }    
      }
    
    doRun(){
       clientConsumer.subscribe(topic);
       while( isRunAllowed() && !reConnect && !isStoppingOrStopped() && !isSuspendingOrSuspended()){
          clientConsumer.poll();
       }
       clientConsumer.unsubscribe(topic) 
    } 
    

    >

  • 因为executor.submit(),没有任何延迟地触发doRun(),所以KafkaConsumer状态仍然停止,while循环被跳过,并且kafka使用者客户端结束取消对主题的订阅。

    一旦doStart()的for循环完成,KafKaconsumer的状态被标记为stopped=false,started=true

    因此,对于最后一个(KafkaFetchRecord)任务的doRun()调用,输入while循环,消费者继续轮询消息

      null

 类似资料:
  • 当我在JAX-RS中添加字符串的默认值时,它不接受该值。它保持为null或空。 当我将状态传递为空或或未定义时,它将保持为空或或未定义。它不会将默认值视为已确认。

  • 问题内容: 如何在Doctrine 2中设置默认值? 问题答案: 数据库默认值不受“便携式”支持。使用数据库默认值的唯一方法是通过mapping属性,在该属性中为字段所映射的列指定代码段(包括原因在内)。 您可以使用: 最好使用PHP级别的默认值,因为这些值也可以在新创建和持久保存的对象上正确使用(持久保存新对象以获取默认值后,Doctrine不会返回数据库)。

  • 问题内容: 是否可以定义在操作失败时将返回的默认值? 例如,这样: 将返回默认值而不是抛出错误吗? 问题答案: CAST没有默认值: 类型转换指定从一种数据类型到另一种数据类型的转换。PostgreSQL接受两种等效的类型转换语法: 除了要转换的表达式和所需的目标类型以外,语法中没有余地。 但是,您可以通过简单的功能手动完成此操作: 然后,您可以说出类似的话并得到。 PostgreSQL还允许您创

  • 注:内容翻译自官网文档 Language Guide (proto3) 中的 Default Values 一节 当消息被解析时, 如果被编码的消息没有包含特定的简单元素, 被解析的对象对应的字段被设置为默认值. 默认值是和类型有关的: 对于strings, 默认值是空字符串(注, 是"", 而不是null) 对于bytes, 默认值是空字节(注, 应该是byte[0], 注意这里也不是null)

  • 问题内容: 有没有一种方法可以指定与struct一起使用的默认dtype ? 我特别想成为和成为。相反,我得到和 问题答案: 默认值取决于您的系统。在64位系统上,默认类型为64位。在32位系统上,默认类型为32位。无法使用其他系统C标头更改默认值,即重新编译numpy。 您当然可以明确指定dtype,例如 编辑:正如kazemakase在下面提到的,以上内容仅适用于int32 / int64。在

  • 问题内容: 我想有一个可选参数,如果仅存在未指定值的标志,则默认为一个值,但是存储用户指定的值,而不是如果用户指定一个值,则存储默认值。是否已经有可用于此的措施? 一个例子: 我可以创建一个动作,但想查看是否存在执行此操作的方法。 问题答案: 表示0或1参数 当参数为0时设置默认值 将参数转换为int 如果即使未指定,也要设置为1 ,则包括。也就是说, 然后