当前位置: 首页 > 知识库问答 >
问题:

如何配置spring-kafka以忽略格式错误的消息?

罗伟兆
2023-03-14

我们的一个Kafka主题有一个问题,该主题被DefaultKafkaConsumerFactory&ConcurrentMessageListenerContainer组合使用,这里用工厂使用的JSONDeserializer描述。不幸的是,有人有点热情,并发表了一些无效的消息到主题。看来斯普林-Kafka没有处理这些信息中的第一个。有可能让spring-kafka记录一个错误并继续吗?查看记录的错误消息,似乎Apache kafka-clients库应该处理这样的情况:在迭代一批消息时,其中一个或多个消息可能无法解析?

下面的代码是说明这个问题的一个示例测试用例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;

/**
 * @author jfreedman
 */
public class TestSpringKafka {
    private static final String TOPIC1 = "spring.kafka.1.t";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, TOPIC1);

    @Test
    public void submitMessageThenGarbageThenAnotherMessage() throws Exception {
        final BlockingQueue<ConsumerRecord<String, JsonObject>> records = createListener(TOPIC1);
        final KafkaTemplate<String, JsonObject> objectTemplate = createPublisher("json", new JsonSerializer<JsonObject>());

        sendAndVerifyMessage(records, objectTemplate, "foo", new JsonObject("foo"), 0L);

        // push some garbage text to Kafka which cannot be marshalled, this should not interrupt processing
        final KafkaTemplate<String, String> garbageTemplate = createPublisher("garbage", new StringSerializer());
        final SendResult<String, String> garbageResult = garbageTemplate.send(TOPIC1, "bar","bar").get(5, TimeUnit.SECONDS);
        assertEquals(1L, garbageResult.getRecordMetadata().offset());

        sendAndVerifyMessage(records, objectTemplate, "baz", new JsonObject("baz"), 2L);
    }

    private <T> KafkaTemplate<String, T> createPublisher(final String label, final Serializer<T> serializer) {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "TestPublisher-" + label);
        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
        producerProps.put(ProducerConfig.RETRIES_CONFIG, 2);
        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
        producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer.getClass());
        final DefaultKafkaProducerFactory<String, T> pf = new DefaultKafkaProducerFactory<>(producerProps);
        pf.setValueSerializer(serializer);
        return new KafkaTemplate<>(pf);
    }

    private BlockingQueue<ConsumerRecord<String, JsonObject>> createListener(final String topic) throws Exception {
        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "TestConsumer");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        final DefaultKafkaConsumerFactory<String, JsonObject> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        cf.setValueDeserializer(new JsonDeserializer<>(JsonObject.class));
        final KafkaMessageListenerContainer<String, JsonObject> container = new KafkaMessageListenerContainer<>(cf, new ContainerProperties(topic));
        final BlockingQueue<ConsumerRecord<String, JsonObject>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, JsonObject>) records::add);
        container.setBeanName("TestListener");
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
        return records;
    }

    private void sendAndVerifyMessage(final BlockingQueue<ConsumerRecord<String, JsonObject>> records,
                                      final KafkaTemplate<String, JsonObject> template,
                                      final String key, final JsonObject value,
                                      final long expectedOffset) throws InterruptedException, ExecutionException, TimeoutException {
        final ListenableFuture<SendResult<String, JsonObject>> future = template.send(TOPIC1, key, value);
        final ConsumerRecord<String, JsonObject> record = records.poll(5, TimeUnit.SECONDS);
        assertThat(record, hasKey(key));
        assertThat(record, hasValue(value));
        assertEquals(expectedOffset, future.get(5, TimeUnit.SECONDS).getRecordMetadata().offset());
    }

    public static final class JsonObject {
        private String value;

        public JsonObject() {}

        JsonObject(final String value) {
            this.value = value;
        }

        public String getValue() {
            return value;
        }

        public void setValue(final String value) {
            this.value = value;
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) { return true; }
            if (o == null || getClass() != o.getClass()) { return false; }
            final JsonObject that = (JsonObject) o;
            return Objects.equals(value, that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "JsonObject{" +
                    "value='" + value + '\'' +
                    '}';
        }
    }
}

共有1个答案

冯霖
2023-03-14

我有一个解决方案,但我不知道它是不是最好的,我扩展了JSONDeserializer,如下所示,这会导致null值被spring-kafka消耗,并需要进行必要的下游更改来处理这种情况。

class SafeJsonDeserializer[A >: Null](targetType: Class[A], objectMapper: ObjectMapper) extends JsonDeserializer[A](targetType, objectMapper) with Logging {
  override def deserialize(topic: String, data: Array[Byte]): A = try {
    super.deserialize(topic, data)
  } catch {
    case e: Exception =>
      logger.error("Failed to deserialize data [%s] from topic [%s]".format(new String(data), topic), e)
      null
  }
}
 类似资料:
  • 我在Spring Boot(1.5.7.release)中添加了带有JWT身份验证的Spring Security性(5.0.0.release),但是CORS似乎不起作用。我添加了这里描述的CORS配置。我还尝试将@CrossOrigin添加到控制器中,但似乎没有改变任何东西。 和Spring Security调试日志: 下面是无效CORS请求的日志:

  • 我已经编写了许多通过RESTAPI调用进行通信的服务。这些服务可以配置为使用HTTP或HTTPS。任何给定的客户端都具有定义到服务器的连接的安全配置。“默认”配置属性由应用程序中的值设置。yml在这一点上效果很好。 然而,我逐渐意识到,这在更现实的情况下并不适用。问题是,我试图设置特定的参数,例如启动客户端时的服务器主机/端口,而我设置的值被忽略。 例如: 服务A(客户端)将出于某种目的与服务B(

  • 我正在使用Kafka(与雅虎Kafka经理) 我想为重置消息设置一个规则,或者他们如何称呼它:“分区偏移量的总和” 在server.properties上是否有滚动kafka偏移量的参数? (即:我想重置或删除所有影响邮件保留的参数) 谢谢。

  • QueryList遵循将业务与错误分离的原则, HTTP请求传输过程中如果出现的错误,QueryList将会抛出异常。 QueryList的HTTP客户端基于GuzzleHttp,它提供了丰富的HTTP异常类型,用户可以自行设计根据不同的异常类型做不同的处理。 如果你觉得麻烦,并不想每次都去处理HTTP异常,选择直接忽略,让程序继续运行下去,做法可以参考下面方式: 对内置的get()进行封装: u

  • 我想创建一个SpringWebClient,它忽略一个特定的HTTP错误。来自WebClient的文档。检索(): 默认情况下,4xx和5xx响应会导致WebClientResponseException。要自定义错误处理,请使用ResponseSpec。onStatus(谓词、函数)处理程序。 我希望通过WebClient实例的所有调用都忽略特定的HTTP错误。这就是为什么onState()对我

  • 像这种类型错误,发现是因为扩展版本的问题导致的错误,打算不去修复,想进行忽略,但是发现像 <!-- @ts-ignore --> 这样的注释没效果,这个怎么办好?