更新06/04这里是消费者出厂设置。它是Spring-Kafka-1.3.1。Kafka经纪人合流版
@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(ListingMessage.class));
}
@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
ConsumerFactory<String, ListingMessage> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(listingConsumerFactory);
factory.setConcurrency(1);
factory.setAutoStartup(false);
factory.setBatchListener(true);
return factory;
}
注意:容器工厂已将自动启动设置为false。这是在加载大文件时手动启动/停止使用者。
运行大约1小时后(时间不同),使用者停止使用来自其主题的消息,即使该主题有许多可用消息。Consumer方法中有一个log语句,用于停止在日志中打印。
$ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_name
我们如何设计Spring-Kafka消费者,使其在停止消费的情况下重新启动消费者?是否有一个侦听器可以记录消费者停止时的准确点?这是因为将并发设置为1吗?我必须将并发设置为1的原因是,如果该使用者具有更多的并发性,则会有其他使用者被减慢。
我刚刚运行了一个带有30秒max.poll.interval.ms=30000
的测试,挂起了侦听器,30秒后恢复了它;我在原木上看到了这个...
2018-06-04 18:35:59.361 INFO 4191 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so50687794-0]
foo
2018-06-04 18:37:07.347 ERROR 4191 --- [ foo-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250) ~[kafka-clients-1.0.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1329) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1190) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
2018-06-04 18:37:07.350 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=foo] Revoking previously assigned partitions [so50687794-0]
2018-06-04 18:37:07.351 INFO 4191 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [so50687794-0]
2018-06-04 18:37:07.351 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=foo] (Re-)joining group
2018-06-04 18:37:10.400 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=foo] Successfully joined group with generation 15
2018-06-04 18:37:10.401 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=foo] Setting newly assigned partitions [so50687794-0]
2018-06-04 18:37:10.445 INFO 4191 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so50687794-0]
foo
您可以看到,在重新平衡之后,消费者被重新添加,相同的消息被重新传递;这正是我所期待的。
我得到同样的结果;即使是1.3.1。
我们有一个具有Ha all策略的2节点RabbitMQ集群。我们在应用程序中使用Spring AMQP与RabbitMQ对话。生产者部分工作正常,但消费者工作了一段时间并暂停。生产者和消费者作为不同的应用程序运行。更多关于消费者部分的信息。 我们将与一起使用,使用手动模式和默认 在我们的应用程序中,我们创建队列(按需)并将其添加到侦听器中 当我们从10个和20个开始时,消费大约持续15个小时并暂停
Firebase在大约一个小时后停止连接到数据库并停止工作。如果我退出并重新登录,它的工作原理很好。这是使用谷歌和Facebook登录(目前为Facebook)。 我已经登录了,大约一个小时后,出现了这个: W/PersistentConnection:pc_0-提供的身份验证凭据无效。这通常表示FirebaseApp实例未正确初始化。确保你的谷歌服务。json文件具有正确的firebase_ur
触发spring boot REST服务后,该服务可以正常运行数小时,所有请求都可以正常工作,没有任何问题。发生的是,一段时间后,它随机地停止了。在查看日志时,我没有发现任何错误,除了应用程序已被销毁的信息。 一段时间后的日志 Maven依赖项 对于为什么spring boot REST API可能会停止有什么想法吗?我的maven依赖关系是根据演示的--而且它正在成功运行--这就是为什么服务在随
问题内容: 在我的组织中,我们有许多Redis工作人员来完成我们的关键任务。通常,一天一次或两次,我们的工人会停止处理队列。 该代码基本上如下所示: 如果看到的话,就代码而言,发生的事情并不多,但是每隔一段时间,队列就会开始建立,并且工作程序不会从队列中弹出任何项目。为设置超时根本没有用,因为我们假设问题出在Redis客户端连接上。 目前,我们已经建立了一些侦听器,这些侦听器会在队列建立时提醒我们
我有4个Kafka和debezium一起运行。经过几天的良好运行后,三台kafka机器脱离网络一段时间,在< code > connect distributed . out 日志文件中,我收到了许多包含以下错误的消息: 我有4台Kafka机器,经纪人从0到3 动物园管理员: <代码>192.168.240.70 关注我的 - 除了之外,有相同的 指向安装 Kafka 的计算机的相同 IP,并且
我有6个集装箱在码头群中运行。Kafka Zookeeper、MongoDB、A、B、C和接口。接口是来自公共的主要访问点-只有这个容器发布端口-5683。接口容器在启动期间连接到A、B和C。我使用docker组合文件docker堆栈部署,每个服务都有一个名称,用作接口的主机。一切都开始顺利,运转良好。过了一段时间(20分钟、1小时……),我无法向接口提出请求。接口接收到我的请求,但应用程序与服务