当前位置: 首页 > 知识库问答 >
问题:

测试Spring-Cloud-Stream服务的问题

师谦
2023-03-14

我在测试我的Spring Cloud stream服务(写入Kafka流)时遇到问题。它基于以下baeldung介绍

这是服务代码(具体省略)

@Service
@EnableBinding(Source.class)
public class KafkaWriterService {

    @SendTo(Source.OUTPUT)
    public String write(String str) {
        return str;
    }

}

这是测试

@SpringBootTest
@RunWith(SpringRunner.class)
public class KafkaWriterServiceTest {

    @Autowired
    private Source source;

    @Autowired
    private MessageCollector collector;

    @Autowired
    private KafkaWriterService service;

    @Test
    public void testMessages() {
        BlockingQueue<Message<?>> messages = collector.forChannel(source.output());
        service.write("FooBar");

        Object payload = messages.poll().getPayload();
        System.out.println(payload);
    }

非常直接,但是在执行测试时,我得到一个NullPointerException,因为轮询返回一个Null。

知道问题是什么吗?

谢谢

共有1个答案

邹海荣
2023-03-14

更新:

如果您只是在寻找Kafka生产者示例,请对您的服务进行以下更改:

@Service
@EnableBinding({Source.class})
public class KafkaWriterService {

    private final MessageChannel output;

    public KafkaWriterService(@Qualifier("output") MessageChannel output) {
        this.output = output;
    }

    public void write(String str) {
        output.send(MessageBuilder.withPayload(str).build());
    }
}

发送到需要通过StreamListener提供消息源。它用于转换输入消息。

在您的测试程序中,没有生成任何消息。

将代码更改为以下内容,以便在测试中生成消息:

@Service
@EnableBinding({Source.class, Sink.class})
public class KafkaWriterService {

    @StreamListener(Sink.INPUT)
    @SendTo(Source.OUTPUT)
    public String write(String str) {
        return "Transformed: " + str;
    }
}
@SpringBootTest
@RunWith(SpringRunner.class)
public class KafkaWriterServiceTest {

    @Autowired
    private Sink sink;

    @Autowired
    private Source source;

    @Autowired
    private MessageCollector collector;

    @Autowired
    private KafkaWriterService service;

    @Test
    public void testMessages() {
        BlockingQueue<Message<?>> messages = collector.forChannel(source.output());
        sink.input().send(MessageBuilder.withPayload("Hello World!").build());

        Object payload = messages.poll().getPayload();
        System.out.println(payload);
    }
}
 类似资料:
  • 我已经用Rabbitmq绑定器设置了一个Spring Cloud stream。我想用Spring Cloud stream做性能测试。有什么方法可以用它做性能测试吗?

  • 场景:我有3个Spring Cloud流媒体应用程序 1'st:将XML有效负载解组为JAXB对象 2'nd:将JAXB有效负载转换为我们的域POJO 3'rd:验证域对象 我正在尝试测试第三个应用程序。我已将第一个和第二个应用程序作为测试依赖项。我添加了: 现在,我有大约20个xml文件,其中包含各种验证场景。第一个测试运行良好。我能够通过以下方式获取频道的预期消息: 运行的第二个测试是我有问题

  • null 我正在尝试为SQS编写一个集成测试。 我有一个本地运行的localstack docker容器,其中SQS在上运行 在我的测试代码中,我定义了一个endpoint设置为Local4576的SQS客户机,它可以成功地连接和创建队列、发送消息和删除队列。我还可以使用SQS客户机接收消息并拾取我发送的消息。 我的问题是,如果我为了允许另一个组件获得消息而删除了正在手动接收消息的代码,似乎什么也

  • 本节将详细介绍如何使用Spring Cloud Stream。它涵盖了创建和运行流应用程序等主题。

  • 我试图用Spring的云流Kafka流来阅读Kafka。然后我在一分钟的时间窗口内汇总事件,并将其转移到不同的主题。然后,我需要从主题中读取聚合事件,并将其写入另一个主题,同时将该主题与另一个Kafka集群中的不同主题绑定。但我得到了下面的例外。 我按照链接中的示例,尝试了下面的代码。 应用属性 哈格。JAVA Transporter.java EGSRC处理器。JAVA

  • 我目前正在尝试使用Spring云配置,并且我无法让Spring云配置服务器从我的github存储库加载属性。我一直在关注这里的文档: http://cloud.spring.io/spring-cloud-config/spring-cloud-config.html#_spring_cloud_config_server 这是我的Mavenpom.xml文件: 这是我的应用课程: 这是我的app