当前位置: 首页 > 知识库问答 >
问题:

Spring的云流Kafka启用。汽车犯罪

杜思远
2023-03-14

我有一个项目升级到了spring boot和spring cloud的最新版本,并注意到了一些意想不到的行为。

Spring启动:2.5.2Spring云2020.0.3

同样值得一提的是,由于问题,我已经下调了spring kaka的评级https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1079

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.6.7</version>
</dependency>

我有日志打开:

<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="info"/>

我的春药如下:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: latest
      **enable-auto-commit: true**
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: myapp.serde.MyCustomDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: myapp.serde.MyCustomSerializer
    properties:
      security:
        protocol: PLAINTEXT
  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
        consumer:
          useNativeEncoding: true
      bindings:
        myInboundRoute:
          destination: some-destination.1
          group: a-custom-group
        myOutboundRoute:
          destination: some-destination.2

启动应用程序时,我看到以下输出:

[2021-07-09 21:41:42,310] [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 100
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-a-custom-group-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = a-custom-group
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class myapp.serde.MyCustomDeserializer

[2021-07-09 21:41:42,371] [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 100
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-a-custom-group-2
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = a-custom-group
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class myapp.serde.MyCustomDeserializer

[2021-07-09 21:41:42,542] [main] INFO  com.me.MyApplication: Started MyApplication in 3.164 seconds (JVM running for 3.875)

[2021-07-09 21:41:42,579] [kafka-binder-health-1] INFO  org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-null-3
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = null
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class myapp.serde.MyCustomDeserializer

[2021-07-09 21:41:52,391] [pool-2-thread-1] INFO  org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-a-custom-group-4
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = a-custom-group
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class myapp.serde.MyCustomDeserializer

如果你看看客户。id属性注销后,您将看到它是索引1、2、3和4。您还会在日志中注意到正在捕获的线程名称,例如[main]。

问题:在我的配置中,我有enable auto commit:true,但在日志中它清楚地显示了属性enable。汽车提交对于消费者1和消费者2来说是错误的。消费者3(活页夹健康版)和消费者4(在线程池-2-thread-1中)将其设置为true。

升级之前,我使用的是boot 2.1.7和Cloud Greenwich。SR1。它的输出显示了enable。汽车提交根据我的配置始终设置为true。这是日志中的一个bug,还是更严重?

共有1个答案

桂高义
2023-03-14

启用。汽车commit=true不推荐使用-侦听器容器以更确定的方式提交偏移量-在轮询返回的每条记录或每批记录之后。

自ApacheKafka的Spring版本2.3以来,它被设置为false(除非由属性显式设置)。

https://github.com/spring-projects/spring-kafka/issues/945

 类似资料:
  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放

  • 如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?

  • 1.自我介绍 2.对于这个岗位的期望?运维的工作目标 3.家乡哪里?家人同意来重庆工作吗? 4.为什么选择长安汽车? 5.多云管理存在什么问题?需要怎么管理? 6.你寝室舍友找的什么工作?他们都是ict行业,你为什么选择汽车行业? 7.汽车行业目前比较累,你怎么看待这个问题? 8.怎么看待加班?通宵值班呢? 9.你的职业规划?几年一个跨度? 10.第一个目标是什么?如果三年后未到达预期怎么办? 反

  • 如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

  • 我正在从Spring XD迁移到Spring Cloud Data Flow。当我寻找模块列表时,我意识到一些源码没有在Spring Cloud Flow中列出--其中一个是Kafka源码。 我的问题是为什么在spring cloud data flow中KAFKA源从标准源列表中删除?

  • 一段时间以来,我一直试图让Spring Cloud Stream与Kafka Streams一起使用,我的项目使用嵌入式kafka进行Kafka DSL测试,我使用这个存储库作为我的测试实现的基础(它本身就是这个问题的测试用例)。 我在这里制作了一个存储库来演示这一点。 基本上,当使用“Processor.class”的“DemoApplicationTest.ExampleAppWorking.