我们有一个Kafka制作人,偶尔会制作一些信息。
我写了一个消费者来消费这些消息。问题是,只有当两个消息叠加时,它们才会被使用。例如,如果消息是在13:00产生的,消费者不做任何事情。如果另一条消息是在13:01生成的,则消费者会使用这两条消息。在kafkaTool中,在消费者属性中有一个名为LAG的列,当消息未被消费时,该列为1。我缺少的这个东西有什么配置吗?
消费者配置:
16:43:04,472 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] (http--0.0.0.0-8180-1) ConsumerConfig values:
request.timeout.ms = 180001
check.crcs = true
retry.backoff.ms = 100
ssl.truststore.password = null
ssl.keymanager.algorithm = SunX509
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.key.password = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.provider = null
sasl.kerberos.service.name = null
session.timeout.ms = 180000
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [mtxbuctra22.prod.orange.intra:9092]
client.id =
fetch.max.wait.ms = 180000
fetch.min.bytes = 1024
key.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
sasl.kerberos.kinit.cmd = /usr/bin/kinit
auto.offset.reset = earliest
value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
ssl.endpoint.identification.algorithm = null
max.partition.fetch.bytes = 1048576
ssl.keystore.location = null
ssl.truststore.location = null
ssl.keystore.password = null
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
security.protocol = PLAINTEXT
auto.commit.interval.ms = 1000
ssl.protocol = TLS
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.trustmanager.algorithm = PKIX
group.id = ifd_006
enable.auto.commit = true
metric.reporters = []
ssl.truststore.type = JKS
send.buffer.bytes = 131072
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS
heartbeat.interval.ms = 3000
16:43:04,493 INFO [io.confluent.kafka.serializers.KafkaAvroDeserializerConfig] (http--0.0.0.0-8180-1) KafkaAvroDeserializerConfig values:
max.schemas.per.subject = 1000
specific.avro.reader = true
schema.registry.url = [http://mtxbuctra22.prod.orange.intra:8081]
16:43:04,498 INFO [io.confluent.kafka.serializers.KafkaAvroDeserializerConfig] (http--0.0.0.0-8180-1) KafkaAvroDeserializerConfig values:
max.schemas.per.subject = 1000
specific.avro.reader = true
schema.registry.url = [http://mtxbuctra22.prod.orange.intra:8081]
想出来了。在kafka 0.9.0.1的文档中,声明fetch.min.bytes是1。但我有kafka的0.9.0.0。默认值是1024。因此,只有在2条消息之后,这个值才被传递。将fetch.min.bytes更改为1,现在可以正常工作了。
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用
我有一个Kafka系统,看起来像这样(所有消费者都在一个消费者群体中): 在每个消费者中,我轮询消息,然后进行昂贵的计算(从1到60秒)。如果操作成功,我将提交消费者。 在我提交之前,另一个使用者是否会开始处理相同的消息?我需要保证,一旦消息被拾取,它就会被只执行一次 - 除非处理中途失败。
有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。
我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是
我正在使用Spring Kafka1.0.3来消费kafka消息。Kafka的2个主题,每个主题有1个分区。在java代码中,有2@KafKalistener来消费每个主题消息。ConcurrentKafkaListenerContainerFactory的并发设置为1。但消息有时会延迟20秒以上。 有人知道为什么吗? 添加调试日志,并且延迟不是每次都可以,有时也可以: