当前位置: 首页 > 知识库问答 >
问题:

KStream with Testbinder-Spring Cloud Stream Kafka

鲍俊杰
2023-03-14

我最近开始为Kafka研究Spring Cloud Stream,并且一直在努力使TestBinder与Kstreams一起工作。这是一个已知的限制,还是我忽略了什么?

这很好:

字符串处理器:

@StreamListener(TopicBinding.INPUT)
@SendTo(TopicBinding.OUTPUT)
public String process(String message) {
    return message + " world";
}

字符串测试:

  @Test
  @SuppressWarnings("unchecked")
  public void testString() {
    Message<String> message = new GenericMessage<>("Hello");
    topicBinding.input().send(message);
    Message<String> received = (Message<String>) messageCollector.forChannel(topicBinding.output()).poll();
    assertThat(received.getPayload(), equalTo("Hello world"));
  }

但当我试图在流程中使用KStream时,我无法让TestBinder正常工作。

Kstream处理器:

  @SendTo(TopicBinding.OUTPUT)
  public KStream<String, String>  process(
      @Input(TopicBinding.INPUT) KStream<String, String> events) {
    return events.mapValues((value) -> value + " world");
  }

KStream测试:

@Test
@SuppressWarnings("unchecked")
public void testKstream() {
    Message<String> message = MessageBuilder
      .withPayload("Hello")
      .setHeader(KafkaHeaders.TOPIC, "event.sirism.dev".getBytes())
      .setHeader(KafkaHeaders.MESSAGE_KEY, "Test".getBytes())
      .build();
    topicBinding.input().send(message);
    Message<String> received = (Message<String>) 
    messageCollector.forChannel(topicBinding.output()).poll();
    assertThat(received.getPayload(), equalTo("Hello world"));
 }

正如您可能已经注意到的,我从Kstream处理器中省略了@StreamListener,但是如果没有它,testbinder似乎无法找到处理程序。(但有了它,启动应用程序时就不起作用了)

这是一个已知的缺陷/限制,还是我只是在做一些愚蠢的事情?

共有1个答案

厍光霁
2023-03-14

测试活页夹仅适用于基于MessageChannel的活页夹(AbstractMessageChannel活页夹的子类)。KStreamBinder不使用MessageChannels。

您可以使用Spring-kafka-test模块提供的真正的活页夹和嵌入式kafka代理进行测试。

也看看这个问题。

 类似资料:

相关问答

相关文章

相关阅读