当前位置: 首页 > 知识库问答 >
问题:

spring-kafka-当来自不同分区的多个失败记录时,seekToCurrentErrorHandler行为随机,使用者离开代理

金旺
2023-03-14
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) throws IOException{
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(2000));
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(processingFailedErrorRecoverer(),new FixedBackOff(0L,5L));
        seekToCurrentErrorHandler.setCommitRecovered(true);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        return factory;
    }
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) throws IOException{
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        factory.setRetryTemplate(new RetryTemplate()); // 3 retries by default
        factory.setStatefulRetry(true);
        factory.setRecoveryCallback(context -> {
            processingFailedErrorRecoverer().accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
                    (Exception) context.getLastThrowable());
            return null;
        });

        factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(2000));
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(new FixedBackOff(0L,3L));
        seekToCurrentErrorHandler.setCommitRecovered(true);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        return factory;
    }

使用者断开连接时的日志语句。

2.3.3-发布->2020-08-10 10:51:23.881 23[org.springframework.kafka.kafkalistenerendpointcontainer#0-0-c-1]信息org.apache.kafka.clients.Consumer.internals.abstractcordinator-[Consumer clientid=consumer-1,groupid=xyz-consumer-group]成员consumer-1-1a0978c4-9ae6-45b9-8d9d-f3dee081df9向协调器(ID:2147482644 rack:null)发送离开组请求

2.5.4-发行版->2020-08-10 14:34:20.902 36[kafka-coordinator-heartbeat-thread xyzconsumer-group]INFO org.apache.kafka.clients.Consumer.internals.abstractcoordinator-[Consumer clientid=consumer-xyz-consumer-group-1,groupid=xyz-consumer-group]成员consumer-xyz-consumer-group-1-8324f4e3-4ec3-4b34-b6af-c4ff01a0aa01由于Consumer轮询这意味着对poll()的后续调用之间的时间长于配置的max.poll.interval.ms,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加max.poll.interval.ms或通过减少poll()中返回的批的最大大小和max.poll.records来解决这一问题。

src/main/java

package com.orgname.gtb.cmng.kafka;
/**
 * @param <V> Original message type.
 * @param <T> Message type to be published.
 */
@Slf4j
public abstract class AbstractErrorRecoverer<V,T> implements BiConsumer<ConsumerRecord<?, ?>, Exception> {
    private static final String LOGGER_NAME="ERRORHANDLER";
    private static final Logger LOGGER = LoggerFactory.getLogger(LOGGER_NAME);       
    private final KafkaTemplate<String, T> kafkaTemplate; 
    private final KafkaTemplate<String, byte[]> deserializationErrorRecoveryKafkaTemplate;

    protected AbstractErrorRecoverer(KafkaTemplate<String, T> kafkaTemplate,KafkaTemplate<String, byte[]> deserializationErrorRecoveryKafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
        this.deserializationErrorRecoveryKafkaTemplate=deserializationErrorRecoveryKafkaTemplate;
        log.info("Recoverer initialized with alertDispatcher and kafkaTemplate.");
    }
    

    @SuppressWarnings("unchecked")
    @Override
    public void accept(ConsumerRecord<?, ?> consumerRecord, Exception e) {
        V original = (V) consumerRecord.value();
        // TODO Do other common things, like alerting etc. 
        List<Header> headers = this.enhanceHeaders(consumerRecord, e);
        
        DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(consumerRecord,
                ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER,  new LogAccessor(LOGGER_NAME));
        
        if(deserEx!=null){  
            ProducerRecord<String, byte[]> deserilizationErrorRecord = new ProducerRecord<>(getDeserializationErrorRecoveryTopic(), consumerRecord.partition(),
                    (String) consumerRecord.key(), deserEx.getData(), headers);
            if (deserializationErrorRecoveryKafkaTemplate.isTransactional() && !deserializationErrorRecoveryKafkaTemplate.inTransaction()) {
                deserializationErrorRecoveryKafkaTemplate.executeInTransaction(kafkaOperations -> {
                    this.publishDeserializationError(deserilizationErrorRecord, kafkaOperations);
                    return null;
                });
            } else {
                publishDeserializationError(deserilizationErrorRecord, deserializationErrorRecoveryKafkaTemplate);
            }
        }
        else {
            T objectToPublish=messageToPublish(consumerRecord,e.getCause());
            ProducerRecord<String, T> pr = new ProducerRecord<>(getErrorTopic(), consumerRecord.partition(),
                    (String) consumerRecord.key(), objectToPublish, headers);

            if (kafkaTemplate.isTransactional() && !kafkaTemplate.inTransaction()) {
                kafkaTemplate.executeInTransaction(kafkaOperations -> {
                    this.publish(pr, kafkaOperations);
                    return null;
                });
            } else {
                publish(pr, kafkaTemplate);
            }
        }
    }

    private void publish(ProducerRecord<String, T> record, KafkaOperations<String, T> ops) {
        try {   
            ops.send(record).addCallback(stringTSendResult -> {
                log.debug("Successfully published message to dead letter topic");
            }, ex -> {
                log.error("error publishing to ERROR-Topic", ex);
            });
        } catch (Exception e) {
            log.error("Error publishing to error-topic.", e);
        }
    }
    
    private void publishDeserializationError(ProducerRecord<String, byte[]> record, KafkaOperations<String, byte[]> ops) {
        try {
            System.out.println("before pub to recovery topic");
            ops.send(record).addCallback(stringTSendResult -> {
                log.debug("Successfully published message to deserialization recovery topic.");
            }, ex -> {
                log.error("error publishing to deserialization recovery topic.", ex);
            });
        } catch (Exception e) {
            log.error("Error publishing to deserialization recovery topic.", e);
        }
    }
    
    
    private List<Header> enhanceHeaders(ConsumerRecord<?, ?> record, Exception exception) {
        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC, record.topic().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION, ByteBuffer.allocate(4).putInt(record.partition()).array()));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET, ByteBuffer.allocate(8).putLong(record.offset()).array()));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, ByteBuffer.allocate(8).putLong(record.timestamp()).array()));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_FQCN, exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE, exception.getMessage().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, this.getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)));
        Header valDeserExceptionheader  =record.headers().lastHeader(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER);
        if (valDeserExceptionheader != null) {
            headers.add(valDeserExceptionheader);
        }
        return headers;
    }

    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter(stringWriter, true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }

    /**
     * @return  The error topic to which the notification should be sent.
     */
    protected abstract String getErrorTopic();
    
    /**
     * 
     * @return The error topic to which deserialization error should be sent.
     */
    protected abstract String getDeserializationErrorRecoveryTopic();
    
    /**
     * This method receives the original consumer record and throwable that was thrown by the listener 
     * Override this method to publish a different message (e.g. an enriched message to errorTopic).
     * By default the original message is returned which is published.
     * @param originalConsumerRecord The original consumer record. Same as that received by listener
     * @param t Throwable thrown by listner.
     * @return The expected message to be published.
     */
    protected T messageToPublish(ConsumerRecord<?, ?> originalConsumerRecord,Throwable t){
        return (T)originalConsumerRecord.value();
    }

}

SRC/测试/Java

package com.orgname.gtb.cmng.config;
@EnableKafka
@Configuration
@Slf4j
public class IntegrationTestConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // start of config for kafkatemplate that publishes a message 
    @Bean
    public Map<String, Object> producerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory(producerProps(), new StringSerializer(), new StringSerializer());
    }


    @Bean 
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    //end of config for kafkatemplate that publishes a message
    
    // start of config for kafkatemplate that recovers deserialiazation error
    @Bean
    public Map<String, Object> deserializationErrorProducerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, byte[]> deserializationErrorProducerFactory() {
        return new DefaultKafkaProducerFactory(deserializationErrorProducerProps());
    }


    @Bean 
    public KafkaTemplate<String, byte[]> deserializationErrorRecoveryKafkaTemplate() {
        return new KafkaTemplate<>(deserializationErrorProducerFactory());
    }
   // end of config for kafkatemplate that recovers deserialiazation error
    
    // config for kafkatemplate that publishes to deadlettertopic.
    @Bean
    public KafkaTemplate<String, String> deadLetterKafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    // consumers config
    @Bean
    public Map<String, Object> getConsumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

    @Bean
    DefaultKafkaConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory(
                getConsumerProps(),
                new StringDeserializer(),
                new StringDeserializer()
        );
    }

    // config for the error handler and its publisher to the dead letter topic

    @Bean   // the error recoverer
    public StringErrorRecovererImplementation processingFailedErrorRecoverer() {
        return new StringErrorRecovererImplementation(deadLetterKafkaTemplate(),deserializationErrorRecoveryKafkaTemplate());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(2000));
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(processingFailedErrorRecoverer(),new FixedBackOff(0L,5L));
        seekToCurrentErrorHandler.setCommitRecovered(true);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        return factory;
    }
    
    // config for the listener on the happy topic
    @Bean
    @Primary
    public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry =
                new KafkaListenerEndpointRegistry();
        return kafkaListenerEndpointRegistry;
    }

    // the listener
    @Bean
    public IntegrationTestMessageListener simpleStringMessageListener() {
        return new IntegrationTestMessageListener(kafkaListenerEndpointRegistry());
    }

SRC/测试/Java

 package com.orgname.gtb.cmng.kafka.integrationtest;
    @RunWith(SpringRunner.class)
    @TestPropertySource(properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
    @EmbeddedKafka(partitions = 3, topics = {"${topics.happy}", "${topics.deadLetter}"})
    @SpringBootTest(classes = {IntegrationTestConfig.class})
    public class ErrorRecovererIntegrationTest {
    
         private static final String BAD_MESSAGE = "Poison message";
    
        @Value("${topics.happy}")
        private String happyTopic;
    
        @Value("${topics.deadLetter}")
        private String deadLetterTopic;
    
        @Autowired
        private EmbeddedKafkaBroker embeddedKafka;
    
        @Autowired
        private ConsumerFactory<String, String> consumerFactory; // will use the deadLetterConsumer factory in the TestKafkaConfig
    
    
        @Autowired
        protected KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    
        @Autowired
        private IntegrationTestMessageListener listener;
    
        private Consumer<String, String> deadLetterConsumer;
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Before
        public void setUp() {
            embeddedKafka.brokerProperty("controlled.shutdown.enable", true);
    
            for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
                log.debug("Listener container props:\n{}",messageListenerContainer.getContainerProperties().toString());
                ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafka.getPartitionsPerTopic());
            }
            deadLetterConsumer = consumerFactory.createConsumer();
            deadLetterConsumer.subscribe(Collections.singleton(deadLetterTopic));
            deadLetterConsumer.poll(Duration.ofMillis(0));
        }
    
        @After
        public void tearDown() {
            listener.clear();
        }
    
        @Test
        @DirtiesContext
        public void given_bad_message_should_publish_to_dead_letter_topic() throws Exception {
            IntStream.range(0, 6).forEach(i -> kafkaTemplate.send(happyTopic, i % 3,i+"", BAD_MESSAGE));
            Thread.sleep(5000);
            ConsumerRecords<String, String> consumerRecords= KafkaTestUtils.getRecords(deadLetterConsumer);
            assertEquals(6,consumerRecords.count());
        }

SRC/测试/Java

package com.db.orgname.cmng.kafka.integrationtest;
/**
 * This listener will listen for "poison messages" and throw a runtime exception so the exception handling can be done.
 */
@Service
@Slf4j
public class IntegrationTestMessageListener {

    @Getter
    private final KafkaListenerEndpointRegistry registry;

    @Getter
    private Map<String,String> messages = new HashMap<>();

    public void clear() {
        messages.clear();
    }

    @Autowired
    public IntegrationTestMessageListener(KafkaListenerEndpointRegistry registry) {
        log.debug("Created simple listener");
        this.registry = registry;
    }

    @KafkaListener(topics = "${topics.happy}")
    public void listen(@Payload String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        log.info("Simple listener received message --  key: {}, value: {}", key, value);
        if (value.toLowerCase().startsWith("poison")) {
            throw new RuntimeException("failed");
        } else {
            messages.put(key, value);
        }

    }
package com.orgname.gtb.cmng.kafka.integrationtest;
@Getter
@Service
public class StringErrorRecovererImplementation extends AbstractErrorRecoverer<String,String> {

    public StringErrorRecovererImplementation(KafkaTemplate<String, String> kafkaTemplate,KafkaTemplate<String, byte[]> deserializationErrorRecoveryKafkaTemplate) {
        super(kafkaTemplate,deserializationErrorRecoveryKafkaTemplate);
    }

    @Override
    protected String getErrorTopic() {
        return "T-ERROR-TOPIC";
    }

    @Override
    protected String messageToPublish(ConsumerRecord<?, ?> orginal, Throwable t) {
        String originalString=(String)orginal.value();
        return originalString + t.getMessage();
    }

    @Override
    protected String getDeserializationErrorRecoveryTopic() {
        return "T-DESERIALIZATION-ERROR-TOPIC";
    }
topics:
  happy: T-HAPPY-TOPIC
  deadLetter: T-ERROR-TOPIC
  deserializationError: T-DESERIALIZATION-ERROR-TOPIC
spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: latest
    producer:
      acks: all

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.orgname.gtb.cmng</groupId>
    <artifactId>nextgen-commons-error-handler</artifactId>
    <version>0.1.1-SNAPSHOT</version>
    <name>nextgen-commons-error-handler</name>
    <description>nextgen commons error handler</description> <!--fixme: Add proper description-->

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <sonar.language>java</sonar.language>
        <lombok.version>1.18.8</lombok.version>

        <!--Test Dependencies-->
        <confluent.version>5.4.0</confluent.version>
        <mockito-core.version>2.9.0</mockito-core.version>
        <mockito-all.version>1.9.5</mockito-all.version>
        <junit.version>4.13</junit.version>
        <assertj-core.version>3.13.2</assertj-core.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
            <version>${lombok.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.4.RELEASE</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.1</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>             
        </dependency>
        <!--Test Dependencies-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>2.5.4.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>${confluent.version}</version>
            <scope>test</scope>
            <exclusions>
              <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
              </exclusion>
              <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-annotations</artifactId>
                </exclusion>
            </exclusions> 
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>common-config</artifactId>
            <version>${confluent.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>common-utils</artifactId>
            <version>${confluent.version}</version>
            <scope>test</scope>
        </dependency>
        <!-- Test dependencies -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.3.2.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>${assertj-core.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.jacoco</groupId>
                <artifactId>jacoco-maven-plugin</artifactId>
                <version>0.8.3</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>prepare-agent</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>report</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>report</goal>
                        </goals>
                        <!--TODO-changeme: Change the exclusions based on individual project requirements-->
                        <configuration>
                            <excludes>
                                <exclude>**/entities/*.class</exclude>
                                <exclude>**/avro/*.class</exclude>
                            </excludes>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>sonar-maven-plugin</artifactId>
                <version>3.7.0.1746</version>
            </plugin>
        </plugins>
    </build>

</project>

共有1个答案

潘刚洁
2023-03-14

请提供一个展示这种行为的项目;我不能复制它;此应用程序的一切工作都与预期的一样:

@SpringBootApplication
public class So63349172Application {

    public static void main(String[] args) {
        SpringApplication.run(So63349172Application.class, args);
    }

    @KafkaListener(id = "so63349172", topics = "so63349172")
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @Bean
    ErrorHandler eh() {
        return new SeekToCurrentErrorHandler(
                (rec, ex) -> System.out.println("Recovered " + ListenerUtils.recordToString(rec, true)),
                        new FixedBackOff(0, 2)) {

            @Override
            public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
                    Consumer<?, ?> consumer, MessageListenerContainer container) {

                System.out.println("Failed " + ListenerUtils.recordToString(records.get(0), true));
                super.handle(thrownException, records, consumer, container);
            }

        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63349172").partitions(3).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 6).forEach(i -> template.send("so63349172", i % 3, null, "foo"));
        };
    }

}

我在你的配置中没有看到任何会导致重新平衡的东西。

以下是我测试的结果(3个代理集群)...

$ egrep '^(Failed|Recovered)' ../tmp/gg
Failed so63349172-1@0
Failed so63349172-2@0
Failed so63349172-0@0
Failed so63349172-1@0
Failed so63349172-2@0
Failed so63349172-0@0
Failed so63349172-1@0
Recovered so63349172-1@0
Failed so63349172-2@0
Recovered so63349172-2@0
Failed so63349172-0@0
Recovered so63349172-0@0
Failed so63349172-1@1
Failed so63349172-2@1
Failed so63349172-0@1
Failed so63349172-1@1
Failed so63349172-2@1
Failed so63349172-0@1
Failed so63349172-1@1
Recovered so63349172-1@1
Failed so63349172-2@1
Recovered so63349172-2@1
Failed so63349172-0@1
Recovered so63349172-0@1
 类似资料:
  • 我已经用实现了一个Kafka消费者。我的使用者应该使用事件,然后为每个事件向其他服务发送REST请求。只有当REST服务关闭时,我才想重试。否则,我可以忽略失败的事件。 我的容器工厂配置如下: 我使用来设置异常和相应的重试策略。 如果我有机会了解中的哪个记录失败了,那么我将创建的自定义实现,以检查失败的消息是否可重试(通过使用字段)。如果它是不可重试的,那么我将从列表中删除它以重新查找。 关于如何

  • 我有4个分区和4个消费者(例如A、B、C、D)。如何使用使用者组配置哪个使用者将从哪个分区读取数据。我用的是Kafka的春靴。

  • 我们正在使用Spring kafka来消费消息。我们已经为每个分区创建了接收消息的接收器。现在我们需要多个接收者从单个分区接收消息。 对于例如。假设我们有一个分区0。目前,我们只有一个接收器(接收器1)从这个分区接收消息。现在我想为同一个分区(分区0)添加另一个接收器(接收器2)。 因此,如果生产者向这个分区发送100条消息,接收器1应该接收50条消息,其余50条消息应该在接收器2中接收。我不希望

  • 我的消费者并不是每次都能收到信息。我有3个代理(3个服务器)的Kafka集群,有3个主题和复制因子3的分区。 我有Java中的消费者,我将最大轮询记录设置在50000获取字节上,配置在50MB上。应用程序每分钟都进行轮询。当我向主题“my-topic”发送10条消息时,consumer不会给我所有的消息,而是只给我其中的一部分,其余的将在下一次运行中给我。消息是在applicatin睡眠期间由脚本

  • 有没有任何选项或配置可以方便单个Kafka消费者同时消费来自两个不同集群的消息?在创建生产者和消费者时,我将两个集群都提到逗号分隔。我一直在观察消费者只消费来自单个集群的消息。 请参阅下面的说明:消费者C1被配置为监听集群:集群-1:Zooker-1 with Broker-1集群-2:Zooker-2 with Broker-2 我正在寻找一种解决方案,其中消费者C1可以同时消费来自集群1和集群

  • 我有一个将消息写入主题/分区的生产者。为了保持顺序,我希望使用单个分区,我希望12个使用者读取来自这个分区的所有消息(没有使用者组,所有消息都应该发送给所有使用者)。这是可以实现的吗?我读过一些论坛,每个分区只有一个用户可以阅读。