当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者仅在两条消息堆叠时读取消息

楚冷勋
2023-03-14

我们有一个Kafka制作人,偶尔会制作一些信息。

我写了一个消费者来消费这些消息。问题是,只有当两个消息叠加时,它们才会被使用。例如,如果消息是在13:00产生的,消费者不做任何事情。如果另一条消息是在13:01生成的,则消费者会使用这两条消息。在kafkaTool中,在消费者属性中有一个名为LAG的列,当消息未被消费时,该列为1。我缺少的这个东西有什么配置吗?

消费者配置:

16:43:04,472 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] (http--0.0.0.0-8180-1) ConsumerConfig values:
        request.timeout.ms = 180001
        check.crcs = true
        retry.backoff.ms = 100
        ssl.truststore.password = null
        ssl.keymanager.algorithm = SunX509
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.key.password = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.provider = null
        sasl.kerberos.service.name = null
        session.timeout.ms = 180000
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [mtxbuctra22.prod.orange.intra:9092]
        client.id =
        fetch.max.wait.ms = 180000
        fetch.min.bytes = 1024
        key.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        auto.offset.reset = earliest
        value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        ssl.endpoint.identification.algorithm = null
        max.partition.fetch.bytes = 1048576
        ssl.keystore.location = null
        ssl.truststore.location = null
        ssl.keystore.password = null
        metrics.sample.window.ms = 30000
        metadata.max.age.ms = 300000
        security.protocol = PLAINTEXT
        auto.commit.interval.ms = 1000
        ssl.protocol = TLS
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.trustmanager.algorithm = PKIX
        group.id = ifd_006
        enable.auto.commit = true
        metric.reporters = []
        ssl.truststore.type = JKS
        send.buffer.bytes = 131072
        reconnect.backoff.ms = 50
        metrics.num.samples = 2
        ssl.keystore.type = JKS
        heartbeat.interval.ms = 3000

16:43:04,493 INFO  [io.confluent.kafka.serializers.KafkaAvroDeserializerConfig] (http--0.0.0.0-8180-1) KafkaAvroDeserializerConfig values:
        max.schemas.per.subject = 1000
        specific.avro.reader = true
        schema.registry.url = [http://mtxbuctra22.prod.orange.intra:8081]

16:43:04,498 INFO  [io.confluent.kafka.serializers.KafkaAvroDeserializerConfig] (http--0.0.0.0-8180-1) KafkaAvroDeserializerConfig values:
        max.schemas.per.subject = 1000
        specific.avro.reader = true
        schema.registry.url = [http://mtxbuctra22.prod.orange.intra:8081]

共有1个答案

姬烨磊
2023-03-14

想出来了。在kafka 0.9.0.1的文档中,声明fetch.min.bytes是1。但我有kafka的0.9.0.0。默认值是1024。因此,只有在2条消息之后,这个值才被传递。将fetch.min.bytes更改为1,现在可以正常工作了。

 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我有一个Kafka系统,看起来像这样(所有消费者都在一个消费者群体中): 在每个消费者中,我轮询消息,然后进行昂贵的计算(从1到60秒)。如果操作成功,我将提交消费者。 在我提交之前,另一个使用者是否会开始处理相同的消息?我需要保证,一旦消息被拾取,它就会被只执行一次 - 除非处理中途失败。

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。

  • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是

  • 我正在使用Spring Kafka1.0.3来消费kafka消息。Kafka的2个主题,每个主题有1个分区。在java代码中,有2@KafKalistener来消费每个主题消息。ConcurrentKafkaListenerContainerFactory的并发设置为1。但消息有时会延迟20秒以上。 有人知道为什么吗? 添加调试日志,并且延迟不是每次都可以,有时也可以: