当前位置: 首页 > 知识库问答 >
问题:

Spring-kafka-test使用自定义反序列化测试JSON消息

华誉
2023-03-14

我基本上成功地使用EmbeddedKafka设置了一个测试,它生成并使用一条消息。下面是测试的工作版本:

@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class KafkaFlowTest {

    @Autowired
    EmbeddedKafkaBroker broker;

    public static final String EMAIL_TOPIC = "email-service";

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
    }

    private Consumer<String, KafkaEmailMessageWrapper> consumer;
    private Producer<String, KafkaEmailMessageWrapper> producer;

    @Before
    public void setUp() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "true", broker);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        DefaultKafkaConsumerFactory<String, KafkaEmailMessageWrapper> cf =
                new DefaultKafkaConsumerFactory<>(
                        consumerProps,
                        new StringDeserializer(),
                        new JsonDeserializer<>(KafkaEmailMessageWrapper.class)
                );
        consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton(EMAIL_TOPIC));
        consumer.poll(Duration.ZERO);

        DefaultKafkaProducerFactory<String, KafkaEmailMessageWrapper> pf =
                new DefaultKafkaProducerFactory<>(producerProps);
        producer = pf.createProducer();
    }

    @Test
    public void testNoErrorMessageFlow() {
        KafkaEmailMessageWrapper wrapperMessage = KafkaEmailMessageWrapper.builder()
                .emailRequest(
                    EmailMessage.builder()
                        .body("body")
                        .from("me@me.com")
                        .to(new String[]{"you@you.com"})
                        .subject("hey!")
                        .build()
                )
                .build();

        producer.send(new ProducerRecord<>(EMAIL_TOPIC, "whatever", wrapperMessage));
        producer.flush();

        ConsumerRecord<String, KafkaEmailMessageWrapper> record = KafkaTestUtils.getSingleRecord(consumer, EMAIL_TOPIC);
        KafkaEmailMessageWrapper receivedMessage = record.value();
        assertEquals(wrapperMessage.getEmailRequest().getSubject(), receivedMessage.getEmailRequest().getSubject());
    }

}

除非在to字段上添加assertequal,否则这是正常的,在JSON中,该字段是一个电子邮件地址列表,由分隔;在类中是一个字符串数组。因此,在EmailMessage类中,我有to字段,该字段用@JSONDeserializer注释,该字段指向沿着拆分的自定义反序列化器;等。

我试图修改上面的代码,将生产者 producer 更改为私有生产者 producer ,然后在测试中发送JSON而不是kafkaeMailMessageWrapper实例,但我得到了一个异常:

java.lang.ClassCastException: java.lang.String cannot be cast to com.test.model.KafkaEmailMessageWrapper

因此,由于某种原因,从JSON字符串到模型的反序列化似乎没有在本测试场景中发生。我希望测试尽可能接近真实的用例,因此它应该产生一个字符串消息,然后对我的模型类执行反序列化。不确定为什么这不发生,任何帮助理解为什么这将是感激的!

为了完成,这是消息侦听器的定义

@KafkaListener(topics = "email-service")
public void receive(@Payload KafkaEmailMessageWrapper message)

编辑

    null

kafkatemplate.senddefault(“key”,“ ”);

我得到一个:

Cannot handle message; nested exception is  org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.test.model.KafkaEmailMessageWrapper] for GenericMessage

但是,当我发送KafkaEmailMessageWrapper实例时:

@JsonDeserialize(using = SemicolonArrayDeserializer.class)
private String[] to;
@JsonDeserialize(using = SemicolonArrayDeserializer.class)
private String[] cc;

因此,当发送实例时,to和cc字段总是空的,因为字段中已经是字符串数组,所以沿着拆分的反序列化逻辑不会起任何作用。这是反序列化方法:

public String[] deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
    ObjectMapper mapper = (ObjectMapper) p.getCodec();
    JsonNode node = mapper.readTree(p);
    return node.asText().split(";");
}

共有1个答案

戚良弼
2023-03-14

您需要显示堆栈跟踪,但我猜测,因为您使用的是@springboottest,所以真正的使用者(侦听器方法)也会得到消息,这就是为什么您会得到异常的原因。

不清楚您在这里(使用您的testconsumer)对象测试的是什么。

更好的测试是将mailsender服务注入到您的侦听器bean中,并在测试用例中注入一个模拟mailsender来验证它都能正常工作。

 类似资料:
  • 问题内容: 我正在使用Flickr API 。调用该方法时,默认的JSON结果为: 我想将此响应解析为Java对象: JSON属性应按以下方式映射: 不幸的是,我无法找到一种使用Annotations做到这一点的好方法。到目前为止,我的方法是将JSON字符串读入a 并从中获取值。 但是我认为,这是有史以来最不优雅的方式。有没有简单的方法,可以使用注释还是自定义反序列化器? 这对我来说将是很明显的,

  • 问题内容: 我在使用Gson解析JSON响应时遇到问题。 JSON字串: 我有这两个课程: 但是当使用Gson解析这个时我有一个异常。我知道这是因为响应数组的第一个元素不是对象,而是整数。 所以问题是,我能以某种方式解决它吗? 问题答案: 您必须编写一个 自定义反序列化器 。我会做这样的事情: 首先,您需要包括一个新的类,而不是已有的两个类: 然后,您需要一个自定义解串器,类似于以下内容: 然后,

  • 我试图用Jackson库创建复杂类的对象。每个对象都有一个模式,反序列化器需要使用该模式来解释JSON。我的问题是如何向反序列化器提供模式? 反序列化程序扩展了类JSONDeserializer,该类具有无参数构造函数和必须重写的抽象方法反序列化(解析器、上下文)。我想改用另一种方法反序列化(解析器、上下文、值),其中值是部分构造的对象,其中包括模式。也就是说,反序列化方法可以调用value。sc

  • 问题内容: 我有一堂课 我想将下面的JSON数据反序列化到上面的类/对象中 我的想法是在JSON中是一个对象,但我只想获取(在JSON中)在反序列化期间将像在类中那样传递。 如何使用Json.NET实现该目标? 我相信我可以使用CustomJsonConverter完成它。但是我很困惑。docs中的示例仅用于,但不适用。 问题答案: 我只是使用上面在问题中提到的方法解决了我的问题。在我完整的代码下

  • 我正在构建一个简单的项目与Spring boot和sping-kafka,我不能配置它,使其工作,它是一个简单的应用程序,生成笔记(作者,内容,createddatetime,lastmodefieddatetime)和发送基于笔记的事件,当他们被创建。 我已经玩了两天了,但我想我还没学会。 这是我的配置,我很确定它有很多锅炉板,但我已经用了几个例子来使我的工作。 我有2个生产者和消费者工厂,因为

  • 问题内容: 我正在使用spring-webflux WebClient (内部版本20170502.221452-172)访问Web应用程序,该Web应用程序生成Entry对象流(application / stream + json),如下所示: 尽管Entry对象的反序列化对于使用标准通用类型(包括Java时间(JSR-310)数据类型,如java.time.Instant)的POJO都可以正