当前位置: 首页 > 知识库问答 >
问题:

spring boot REST服务Kafka主题commitSync失败

夏飞鹏
2023-03-14

我有一个简单的spring boot服务,它是按需调用的,并且从主题中消耗指定数量的消息。要使用的消息数作为参数传递。服务每隔30分钟呼叫一次。每个消息大小约为1.6KB。我每次都能收到1100或1200条信息。只有一个分区的主题,REST服务是唯一的使用者。以下是该服务的名称:http://example.com/messages?limit=2000

private OutputResponse getNewMessages(String limit) throws Exception {
    
        System.out.println("***** START *****");
        
        final long start = System.nanoTime();
        
        int loopCntr = 0;   
        int counter = 0;
        OutputResponse outputResponse = new OutputResponse();       
        Output output = new Output();
        List<Output> rspListObject = new ArrayList<>();
        Consumer<Object, Object> consumer = null;
        String result = null;

        try {
            Properties p = new Properties();
            p.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "180000");
            p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, limit);
            
            consumer = consumerFactory.createConsumer("my-group-id", null, null, p);            
            consumer.assign(Collections.singleton(new TopicPartition("test-topic", 0)));

            while (loopCntr < 2) {
                loopCntr++;
                ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(15));
                
                for (ConsumerRecord<Object, Object> record : consumerRecords)
                {
                    counter++; 
                    try
                    {
                        //get json string
                        result = mapper.writeValueAsString(record.value());
                        //to json
                        output = mapper.readValue(result, Output.class);                   
                        rspListObject.add(output);                       
                    } catch (Exception e) {
                        logger.error(e);
                        insertToDB(record.value(),record.offset());
                    }
                }
            }

            outputResponse.setObjects(rspListObject);
            
            final long end = System.nanoTime();
            System.out.println("Took: " + ((end - start) / 1000000) + "ms");
            System.out.println("Took: " + (end - start) / 1000000000 + " seconds");

            // commit the offset of records to broker
            if (counter > 0) {
                consumer.commitSync();
            }
        } finally {
            try {
                System.out.println(" >>>>> closing the  consumer");
                if (consumer != null)
                    consumer.close();
            }catch(Exception e){
                //log message
            }
        }

        return outputResponse;
    }

这是我在application.yml中的内容

spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: latest
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.trusted.packages: '*'
        max.poll.interval.ms: 300000
      group-id: my-group-id

ConsumerConfig值:

allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = my-group-id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 180000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100

这是我在commitsync得到的一个错误。执行poll()时尝试使用5条消息,尝试设置p.put(consumerconfig.max_poll_interval_ms_config,“180000”);但同样的错误

无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加max.poll.interval.ms或通过减少poll()中返回的批的最大大小和max.poll.records来解决这一问题。

共有1个答案

能可人
2023-03-14

我相信这个应用程序模拟了您的用例,但它并没有展现出您描述的行为(正如我所期望的)。在手动分配主题/分区时,您永远不会看到重新平衡。

我建议您同时运行这两个程序,并比较调试日志以找出问题所在。

@SpringBootApplication
public class So63713473Application {

    public static void main(String[] args) {
        SpringApplication.run(So63713473Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63713473").partitions(1).replicas(1).build();
     }

    @Bean
    public ApplicationRunner runner(ConsumerFactory<String, String> factory, KafkaTemplate<String, String> template) {
        String msg = new String(new byte[1600]);
        return args -> {
            while (true) {
                System.out.println("Hit enter to run a consumer");
                System.in.read();
                int count = 0;
                try (Consumer<String, String> consumer = factory.createConsumer("so63713473", "")) {
                    IntStream.range(0, 1200).forEach(i -> template.send("so63713473", msg));
                    consumer.assign(Collections.singletonList(new TopicPartition("so63713473", 0)));
                    while (count < 1200) {
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
                        count += records.count();
                        System.out.println("Count=" + count);
                    }
                    consumer.commitSync();
                    System.out.println("Success");
                }
            }
        };
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.fetch-min-size=1920000
spring.kafka.consumer.fetch-max-wait=1000

spring.kafka.producer.properties.linger.ms=50

编辑

我可以通过在同一组中添加第二个(自动分配的)消费者来复制您的问题。

@KafkaListener(id = "so63713473", topics = "so63713473")
public void listen(String in) {
    System.out.println(in);
}
2020-09-08 16:40:15.828 ERROR 88813 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute ApplicationRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:789) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:776) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at com.example.demo.So63713473Application.main(So63713473Application.java:25) [classes/:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

不能在同一组中混合手动分配和自动分配。

 类似资料:
  • 建议仅在< code>enable.auto.commit设置为< code>false时使用< code>commitSync()。但是我们使用了< code>commitSync()而忘记了将< code>enable.auto.commit设置为< code>false。所以我想知道在这种情况下会发生什么?

  • 这更多的是一个系统设计问题。 让我们假设我有一个微服务体系结构,我有X个实例(用于负载平衡对服务的HTTP请求)。但是,也是Kafka主题的消费者。如何避免将同一消息处理X次(X是的实例数)<如果处理是幂等的,至少一次就可以了。它不需要是,但不能是。 服务A可以是订单服务。它生成关于用户向订单主题下单的消息。 服务B可以是支付服务。它使用订单主题中的消息向用户收费。 支付订单可能是幂等操作。但是,

  • 是否有人成功使用新的游戏服务操作系统许可证库?尝试按此处所述使用它时: https://developers.google.com/android/guides/opensource 我明白了: 原因:Java . lang . illegalstateexception:需要使用主题。AppCompat主题(或后代)与此活动。在Android . support . V7 . app . app

  • 我在windows中启动Kafka服务器时遇到问题 命令\bin\windows\kafka服务器启动。球棒\配置\服务器。属性 错误消息: 该命令的语法不正确。错误:无法找到或加载主类文件\IBM\WebSphere 知道吗?

  • 增加在这里并不理想,因为我实际上并不想接受那么大的消息。

  • Kafka主题分区偏移位置始终从0或随机值开始,如何确保使用者记录是分区中的第一条记录?有没有办法找出答案?如果有的话,请让我知道。谢谢。