我有一个SOAP Web服务,它发送一个kafka请求消息,并等待一个kafka响应消息(例如,consumer.poll(10000))。
每次调用web服务时,它都会创建一个新的Kafka生产者和一个新的Kafka消费者。
每次调用web服务时,使用者都会收到相同的消息(例如,具有相同偏移量的消息)。
我使用的是Kafka0.9,启用了自动提交,并且自动提交频率为100毫秒。
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
final Handler handler = new Handler(consumerRecord);
executor.submit(handler);
}
更新0001
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class com.kafka.MDCDeserializer
group.id = group-A.group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class com.kafka.UUIDDerializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXTSASL
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = IbmX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest
根据您正在显示的代码。我认为你的问题是新的消费者是单线程的。如果轮询一次,然后不进行另一次轮询,则auto.commit.offset
将无法工作。
尝试将代码放入while循环中,然后查看再次轮询时将提交偏移量。
我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新
我正在使用spring kafka来消费来自kafka的消息。消费者监听器如下。 应用程序的单个实例,并发数为6。 该主题有6个分区。 从上面的日志中可以清楚地看到,两个用户在完全相同的时间收到了来自分区和偏移量的相同消息。 每个线程继续处理消息。最后,其中一个消费者失败了,错误如下 我知道上面的错误会在有负载或消息处理需要时间时出现。在这种情况下,处理不到一秒钟,kafka主题中的消息不到10条
我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是
我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:
我不知道是怎么回事,我的java客户机消费者用@KafkaListener注释后没有收到任何消息。当我通过命令行创建消费者时,它可以工作。同样,Producer也能按预期工作(同样在java中)。有人能帮我理解这种行为吗? application.yml 生产者配置: 消费者配置: 制作人 Spring控制器: 这是我的控制台输出,正如您所看到的,它发送一条消息,但该方法不接收任何内容。如果我没有
Kafka消费者不接收在消费者开始之前产生的消息。 ConsumerRecords始终为空 虽然,如果我启动我的消费者比生产者比它接收消息。(Kafka-客户端版本2.4.1)