当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka请求-应答分区模式:处理消息后无法提交偏移量

薛烨
2023-03-14

我正在用SpringKafka实现同步请求-应答模式。堆栈:

组织。springframework。云:spring云依赖项:2020.0.2

组织。springframework。Kafka:SpringKafka

木卫一。合流:kafka avro序列化程序:6.2.0

爪哇11

我有一个请求主题有5个分区和响应8个分区

我的响应消费者端配置如下。为了简洁起见,我没有显示producer配置:

  @Bean
    public ReplyingKafkaTemplate<String, PhmsPatientSearchRequest, PhmsPatientSearchResponse> replyKafkaTemplate(ProducerFactory<String, PhmsPatientSearchRequest> pf,
                                                                                                                 KafkaMessageListenerContainer<String, PhmsPatientSearchResponse> container) {
        final ReplyingKafkaTemplate<String, PhmsPatientSearchRequest, PhmsPatientSearchResponse> repl = new ReplyingKafkaTemplate<>(pf, container);
        repl.setMessageConverter(new StringJsonMessageConverter());
        return repl;
    }

 @Bean
    public KafkaMessageListenerContainer replyContainer(ConsumerFactory<String, PhmsPatientSearchResponse> replyConsumerFactory) {
        TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset(replyTopic, replyPartition);
        ContainerProperties containerProperties = new ContainerProperties(topicPartitionOffset);
        final KafkaMessageListenerContainer<String, PhmsPatientSearchResponse> msgListenerContainer = new KafkaMessageListenerContainer<>(replyConsumerFactory, containerProperties);
        return msgListenerContainer;
    }

 @Bean
    public ConsumerFactory<String, PhmsPatientSearchResponse> replyConsumerFactory() {
        final DefaultKafkaConsumerFactory<String, PhmsPatientSearchResponse> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs());
        return consumerFactory;
    }

 @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "ResponseConsumer");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 40000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, replyDeserializerTrustedPkg);
        props.put(SCHEMA_REGISTRY_URL, schemRegistryUrl);
        props.put(SPECIFIC_AVRO_READER, true);
        return props;
    }

我的请求回复代码


  ProducerRecord<String, PhmsPatientSearchRequest> patientSearchRequestRecord = new ProducerRecord(requestTopic, phmsPatientSearchRequest);
        // set reply topic in header
       
        patientSearchRequestRecord.headers().add(new RecordHeader(KafkaHeaders.MESSAGE_KEY, messageKey.getBytes()));
        //patientSearchRequestRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTopic.getBytes()));
        //patientSearchRequestRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition)));
       
        // post in kafka topic
        RequestReplyFuture<String, PhmsPatientSearchRequest, PhmsPatientSearchResponse> sendAndReceive = replyingKafkaTemplate.sendAndReceive(patientSearchRequestRecord);

        // get consumer record

        ConsumerRecord<String, PhmsPatientSearchResponse> consumerRecord = sendAndReceive.get();

我收到我的响应消息在正确的部分,但偏移量没有提交。每次响应消费者读取消息时,都会观察到以下堆栈跟踪。我不认为这是由于投票延迟。


org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:79) ~[spring-kafka-2.7.4.jar:2.7.4]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.4.jar:2.7.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1606) ~[spring-kafka-2.7.4.jar:2.7.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.4.jar:2.7.4]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1256) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1163) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1173) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1148) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1005) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2656) ~[spring-kafka-2.7.4.jar:2.7.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2651) ~[spring-kafka-2.7.4.jar:2.7.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2637) ~[spring-kafka-2.7.4.jar:2.7.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2451) ~[spring-kafka-2.7.4.jar:2.7.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.4.jar:2.7.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.4.jar:2.7.4]
    ... 3 common frames omitted


若我不使用TopicPartitionOffset,那个么我的使用者将侦听响应主题中的所有分区,并没有问题。

在这件事上请求帮助。

共有1个答案

奚光霁
2023-03-14

我只是复制了你的代码(但是使用了Strings),它的工作原理和预期的一样...

@SpringBootApplication
public class So68461640Application {

    public static void main(String[] args) {
        SpringApplication.run(So68461640Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so68461640").partitions(5).replicas(1).build();
    }

    @Bean
    public NewTopic reply() {
        return TopicBuilder.name("so68461640.replies").partitions(8).replicas(1).build();
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate(ProducerFactory<String, String> pf,
            KafkaMessageListenerContainer<String, String> container) {
        final ReplyingKafkaTemplate<String, String, String> repl = new ReplyingKafkaTemplate<>(
                pf, container);
//      repl.setMessageConverter(new StringJsonMessageConverter());
        return repl;
    }

    @Bean
    public KafkaMessageListenerContainer replyContainer(
            ConsumerFactory<String, String> replyConsumerFactory) {

        TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset("so68461640.replies", 3);
        ContainerProperties containerProperties = new ContainerProperties(topicPartitionOffset);
        final KafkaMessageListenerContainer<String, String> msgListenerContainer = new KafkaMessageListenerContainer<>(
                replyConsumerFactory, containerProperties);
        return msgListenerContainer;
    }

    @Bean
    public ConsumerFactory<String, String> replyConsumerFactory() {
        final DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(
                consumerConfigs());
        return consumerFactory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "ResponseConsumer");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 40000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,  "earliest");
        return props;
    }


    @KafkaListener(id = "so68461640", topics = "so68461640")
    @SendTo
    public String listen(String in) {
        System.out.println(in);
        return in.toUpperCase();
    }

    @Bean
    KafkaTemplate<String, String> replyTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate,
            KafkaTemplate<String, String> replyTemplate,
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        factory.setReplyTemplate(replyTemplate);

        return args -> {
            RequestReplyFuture<String, String, String> future =
                    replyKafkaTemplate.sendAndReceive(new ProducerRecord("so68461640", 0, null, "test"));
            future.getSendFuture().get(10, TimeUnit.SECONDS);
            ConsumerRecord<String, String> reply = future.get(10, TimeUnit.SECONDS);
            System.out.println(reply.value());
        };
    }

}
test
TEST
% kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group ResponseConsumer

Consumer group 'ResponseConsumer' has no active members.

GROUP            TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
ResponseConsumer so68461640.replies 3          1               1               0               -               -               -
 类似资料:
  • 我尝试使用Java buildIn DateTimeFormatter将Datestring格式化为OffsetDateTime。我的DateTimeFormatter无法按预期工作。 字符串是:“2018-11-12T14:55:17 0100” 我期望,与DateTimeFormater我将得到一个适当的OffsetDateTime。可以帮助我摆脱这种情况吗?

  • 我创建了以批处理方式接收消息的ConsumerConfig: Spring启动配置: 侦听器类 : 我在处理消息后使用手动确认。 我找到了一些调试日志: 在上面的调试日志中,***获取偏移量发生在偏移量提交之前,该偏移量未提交,因此它返回offset_OUT_OF_RANGE,之后使用者无法接收任何消息。是否有任何方法处理使用者代码中的此错误,或如何仅在提交后获取偏移量****

  • 我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。 由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量? 谢谢。 我正在使用的自定义错误处理程序: }

  • 我需要打印/记录/存储处理消息的kafka分区和偏移量。我如何才能做到这一点?我使用StreamBridge从制作人那里发送消息,还使用功能性spring kafka streams方法

  • 我是Storm世界的新手。在我的拓扑中,我使用Kafka的数据,并使用。 通过一些测试,我得到了以下警告消息: 2015-10-01 23:31:51.753 s.k.KafkaUtils[警告]获取了偏移量超出范围的获取请求:[85970]2015-10-01 23:31:51.755 s.k.PartitionManager[警告]使用新偏移量:0 我的\\\\\\\\\\\\\\\\\\\\

  • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?