当前位置: 首页 > 知识库问答 >
问题:

Spring kafka消息消费者延迟

堵毅然
2023-03-14

我正在使用Spring Kafka1.0.3来消费kafka消息。Kafka的2个主题,每个主题有1个分区。在java代码中,有2@KafKalistener来消费每个主题消息。ConcurrentKafkaListenerContainerFactory的并发设置为1。但消息有时会延迟20秒以上。

@Configuration
@EnableKafka
public class KafkaConfig {

@Value("${kafka.brokers}")
private String brokers;

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
  factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    return factory;
}


@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "server-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    return props;
}
@KafkaListener(id = "serverInChannel",topics =   CommonContants.KAFKA_TOPIC.SERVER_IN_CHANNEL)
public void consumeInMessage(@Payload String data,
                                 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                              @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
                              Acknowledgment ack) {
    logger.info("[serverInMessage]data=" + data);
    ack.acknowledge();
}

@KafkaListener(id = "webOutChannel",topics = CommonContants.KAFKA_TOPIC.WEB_OUT_CHANNEL)
public void consumeOutMessage(@Payload String data,
                               @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
                               Acknowledgment ack) {
    logger.info("[webOutMessage]data=" + data);
    ack.acknowledge();
}
2016-09-26 16:16:42,777 [serverInChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
2016-09-26 16:16:44,101 [webOutChannel-0-kafka-listener-6] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-26 16:16:45,551 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0]
2016-09-26 16:16:45,562 [serverInChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
2016-09-26 16:16:45,663 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[server_message_in_channel-0]
2016-09-26 16:16:45,715 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[]
2016-09-26 16:16:45,781 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0]
2016-09-26 16:16:45,805 [webOutChannel-0-kafka-listener-7] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-26 16:16:45,870 [webOutChannel-0-kafka-listener-7] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-26 16:17:01,099 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[]
2016-09-26 16:17:02,054 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0]
2016-09-26 16:17:02,108 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0]
2016-09-26 16:17:02,120 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[server_message_in_channel-0]
2016-09-26 16:17:02,133 [serverInChannel-0-kafka-listener-5] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc

有人知道为什么吗?

添加调试日志,并且延迟不是每次都可以,有时也可以:

2016-09-27 15:33:58,329 [serverInChannel-0-kafka-listener-2] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=179, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 179, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-listener-2] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}}
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}}
2016-09-27 15:33:58,623 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Stopping invoker
2016-09-27 15:33:58,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Stopping invoker
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Invoker stopped
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0]
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Invoker stopped
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[server_message_in_channel-0]
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {}
2016-09-27 15:33:59,405 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {}
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[]
2016-09-27 15:33:59,405 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[]
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:33:59,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:33:59,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:00,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:00,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:00,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:00,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:01,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:01,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:01,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:01,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:02,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:02,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:02,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:02,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:03,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:03,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:03,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:03,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:04,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:04,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:04,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:04,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:05,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:05,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:05,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:05,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:06,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:06,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:06,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:06,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:07,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:07,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:07,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:07,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:08,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:08,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:08,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:08,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:09,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:09,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:09,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:09,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:10,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:10,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:10,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:10,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:11,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:11,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:11,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:11,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:12,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:12,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:12,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:12,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:13,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:13,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:13,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:13,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:14,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:14,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:14,599 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[]
2016-09-27 15:34:14,599 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[]
2016-09-27 15:34:15,276 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {web_message_out_channel-0=OffsetAndMetadata{offset=174, metadata=''}}
2016-09-27 15:34:15,286 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}}
2016-09-27 15:34:15,516 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[server_message_in_channel-0]
2016-09-27 15:34:15,516 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0]
2016-09-27 15:34:15,518 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:15,518 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:15,518 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:15,518 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:15,787 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 3 records
2016-09-27 15:34:15,787 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=174, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 174, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=175, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 175, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=176, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 176, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-27 15:34:15,794 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {web_message_out_channel-0=OffsetAndMetadata{offset=177, metadata=''}}
2016-09-27 15:34:15,794 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {web_message_out_channel-0=OffsetAndMetadata{offset=177, metadata=''}}
2016-09-27 15:34:15,795 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Woken up during commit
2016-09-27 15:34:15,796 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 2 records
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-listener-3] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=180, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 180, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-listener-3] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
2016-09-27 15:34:15,806 [serverInChannel-0-kafka-listener-3] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=181, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 181, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:34:15,806 [serverInChannel-0-kafka-listener-3] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
2016-09-27 15:34:15,808 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {server_message_in_channel-0=OffsetAndMetadata{offset=182, metadata=''}}
2016-09-27 15:34:15,808 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=182, metadata=''}}

共有1个答案

尤博达
2023-03-14

容器对代理启动的分区重新平衡做出了正确的反应。

您必须检查服务器日志以确定重新平衡的原因。

 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认