当前位置: 首页 > 知识库问答 >
问题:

Spring Stream kafka活页夹测试自定义标题

华睿识
2023-03-14

我试图弄清楚如何在Spring消息中包含自定义标头

我觉得我遗漏了一些东西,因为我似乎可以使用TestChannelBinder使其正常工作,例如。

import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.util.function.Function;

@Component
@Slf4j
public class BaseStream implements Function<Message<String>, String> {
    @Override
    public String apply(Message<String> transactionMessage) {
        log.debug("Converted Message: {} ", transactionMessage);
        return transactionMessage.getPayload();
    }

}

带测试粘合剂的测试等级:


import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;


@SpringBootTest
@TestPropertySource("classpath:testStream.properties")
@Import(TestChannelBinderConfiguration.class)
public class TestForStream {


    @Autowired
    InputDestination inputDestination;
    @Autowired
    OutputDestination outputDestination;

    @Test
    void contextLoads() {
        inputDestination.send(MessageBuilder
                .withPayload("Test Payload")
                .setHeader("customHeader", "headerSpecificData")
                .build());
    }
}

测试tream.properties

spring.cloud.function.definition=baseStream
spring.cloud.stream.bindings.baseStream-in-0.destination=test-in
spring.cloud.stream.bindings.baseStream-out-0.destination=test-out
spring.cloud.stream.bindings.baseStream-in-0.group=test-group-base

运行时记录:

Converted Message: GenericMessage [payload=Test Payload, headers={id=5c6d1082-c084-0b25-4afc-b5d97bf537f9, customHeader=headerSpecificData, contentType=application/json, timestamp=1639398696800, target-protocol=kafka}]

这就是我想要做的。但当我试着测试Kafka拜德时,它似乎包含了

我只是想知道,如果有人能看到我在测试中的错误,因为我已经尝试了各种方法来实现这一点,并且看到它与测试活页夹一起工作,我会假设它适用于Kafka活页夹。

Kafka活页夹测试的测试类:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.TestPropertySource;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092"})
@SpringBootTest
@TestPropertySource("classpath:testStream.properties")
public class TestForStream {

    public static CountDownLatch latch = new CountDownLatch(1);
    @Autowired
    public EmbeddedKafkaBroker broker;

    @Test
    void contextLoads() {
        sleep(5);//Included this as it takes some time to init>

        sendMessage("test-in", MessageBuilder
                .withPayload("Test Payload")
                .setHeader("customHeader", "headerSpecificData")
                .build());


    }

    public <T> ProducerFactory<String, T> createProducerFactory() {
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(broker));
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        //Is JsonSerializer correct for a message?
        return new DefaultKafkaProducerFactory<>(configs);
    }

    public <T> void sendMessage(String topic, T listObj) {
        try {
            KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
            kafkaTemplate.send(new ProducerRecord<>(topic, listObj));
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void sleep(long time){
        try {
            latch.await(time, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}


消息的kafka活页夹测试日志:

Converted Message: GenericMessage [payload={"payload":"Test Payload","headers":{"customHeader":"headerSpecificData","id":"d540a3ca-28db-b137-fc86-c25cc4b7eb8b","timestamp":1639399810476}}, headers={deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test-in, target-protocol=kafka, kafka_offset=0, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@79580279, id=1cf2d382-df29-2672-4180-07da99e58244, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1639399810526, contentType=application/json, __TypeId__=[B@24c79350, kafka_groupId=test-group-base, timestamp=1639399810651}]

因此,在这里,消息已包含在有效负载中,并且kafka标头已按预期包含在标头中。

我已经尝试了spring.cloud.stream.kafka.binder.headersHeaderMode,看看他们是否会改变什么,但没有用。

编辑:

使用springCloudVersion=2020.0.3


共有1个答案

江飞白
2023-03-14

我正在使用:

public <T> void sendMessage(String topic, T listObj) {
    try {
        KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
        kafkaTemplate.send(new ProducerRecord<>(topic, listObj));
    }catch (Exception e){
        e.printStackTrace();
    }
}

发送将消息作为值的消息。

我应该使用的内容:

public void sendMessage(String topic, Message<?> listObj) {
    try {
        KafkaTemplate<String, Message<?>> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
        kafkaTemplate.setDefaultTopic(topic);
        kafkaTemplate.send(listObj);
    }catch (Exception e){
        e.printStackTrace();
    }
}
 类似资料:
  • 我正在尝试制作一个定制的spring cloud stream活页夹,但它无法自动注册: 活页夹实现: 配置类: Spring活页夹文件: application.yml 我已经按照spring cloud stream的指导方针创建了一个custome活页夹,但这不起作用。此外,使用@Configuration创建绑定bean会禁用我在类路径上添加的kafka绑定。

  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重

  • YDoc 的页面可支持 .md、.jsx、.html 三种类型。我们推荐大部分的文档内容 使用 markdown 编写,少数个性化页面使用 html 或 jsx 实现。 Markdown 规则 YDoc 会根据 markdown 内容获取网站标题和描述信息,如下所示,YDoc 会将当前页面标题设置为 “示例”, 页面描述信息设置为 “这是一个示例。”。 # 示例 这是一个示例。 ## 章节1

  • 自定义页面 您可以将网页添加到您的网站,而不是作为标准文档或博客 markdown 文件的一部分。 你可以通过在 website/pages 目录中添加 .js 文件来实现。 这些文件是 React 组件,并调用 render() 来创建它们,由CSS类等支持。 自定义主页 开始自定义主页的最简单方法是使用运行 Docusaurus 初始化脚本 时 创建 的示例网站。 你可以 启动 你的本地服务器

  • 我有一个自定义任务定义来运行每个测试具有特殊设置的特定测试文件。我的任务定义如下: 现在,此设置中的一些测试是不可靠的,我尝试再次运行它们,如下所示: 我编写了一个测试类,第一次总是失败,第二次总是成功: 不幸的是,测试只执行一次,整个测试套件失败。我可以使用中所述的自定义规则成功运行测试https://stackoverflow.com/a/55178053/6059889 有没有办法将测试重试