我正在尝试(单元)测试使用Kafka DSL的Spring Cloud Stream Kafka处理器,但收到以下错误“无法建立到节点1的连接。代理可能不可用。”。此外,测试不会停止。我尝试了EmbeddedKafka和TestBinder,但我有同样的行为。我试图从Spring Cloud团队(有效)给出的响应开始,我将应用程序改编为使用Kafka DSL,并将测试类保持原样。EmbeddedKafka真的支持Kafka DSL吗?
我正在使用Elmhurst。释放
@SpringBootApplication
@EnableBinding(MyBinding.class)
public class So43330544Application {
public static void main(String[] args) {
SpringApplication.run(So43330544Application.class, args);
}
@StreamListener
@SendTo(MyBinding.OUTPUT)
public KStream<String,String> process(@Input(MyBinding.INPUT) KStream<String, String> in) {
return in.peek((k,v) -> System.out.println("Received value " +v ))
.mapValues(v -> v.toUpperCase());
}
}
interface MyBinding {
String INPUT = "input";
String OUTPUT = "output";
@Input(INPUT)
KStream<String, String> messagesIn();
@Output(OUTPUT)
KStream<String, String> messagesOut();
}
更新
如以下示例所示,当我使用Spring Cloud Stream通用语法编写事件处理器时,此答案中提出的方法对我有效,但当我使用Kafka DSL(KStreams)时,该方法无效。要查看行为的差异,只需切换到SpringBootTest注释中的ExampleAppWorking或ExampleAppNotWorking即可:
@RunWith(SpringRunner.class)
@SpringBootTest(classes=ExampleKafkaEmbeddedTest.ExampleAppNotWorking.class)
@DirtiesContext(classMode=ClassMode.AFTER_EACH_TEST_METHOD)
public class ExampleKafkaEmbeddedTest {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, "so0544in","so0544out");
@Autowired
private KafkaTemplate<Integer, byte[]> template;
@Autowired
private KafkaProperties properties;
private static Consumer consumer;
@BeforeClass
public static void setup() throws Exception{
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
System.setProperty("server.port","0");
System.setProperty("spring.jmx.enabled" , "false");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "so0544out");
}
@After
public void tearDown() {
if (consumer != null){
consumer.close();
}
}
@Test
public void testSendReceive() {
template.send("so0544in", "foo".getBytes());
Map<String, Object> configs = properties.buildConsumerProperties();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "so0544out");
System.out.println("Contenu chaine resultat : " + cr.value());
assertEquals(cr.value(), "FOO");
}
@SpringBootApplication
@EnableBinding(Processor.class)
public static class ExampleAppWorking {
public static void main(String[] args) {
SpringApplication.run(ExampleAppWorking.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String receive(String in) {
return in.toUpperCase();
}
}
@SpringBootApplication
@EnableBinding(MyBinding.class)
public static class ExampleAppNotWorking {
public static void main(String[] args) {
SpringApplication.run(ExampleAppNotWorking.class, args);
}
@StreamListener
@SendTo(MyBinding.OUTPUT)
public KStream<Integer,byte[]> toUpperCase (@Input(MyBinding.INPUT) KStream<Integer,byte[]> in){
return in.map((key, val) -> KeyValue.pair(key, new String(val).toUpperCase().getBytes()));
}
}
public interface MyBinding {
String INPUT = "input";
String OUTPUT = "output";
@Input(INPUT)
KStream<Integer, String> messagesIn();
@Input(OUTPUT)
KStream<Integer, String> messagesOut();
}
}
嵌入式Kafa应与Kafka Streams配合使用。所有这些测试都使用嵌入式Kafa进行测试。您可以按照这些测试中使用的模式作为自己测试的模板。
查看了您在下面的注释中提供的代码。您需要在install
方法中添加此属性。
System.set属性("spring.cloud.stream.kafka.streams.binder.brokers",嵌入dKafka.getBrokersAsString());
主Spring Boot应用程序期望Kafka代理在本地主机上可用,并且它不知道测试正在运行嵌入式代理。我们需要通过在测试中设置该属性来明确这一事实,以便主引导应用程序正确检测嵌入的kafka代理。
一段时间以来,我一直试图让Spring Cloud Stream与Kafka Streams一起使用,我的项目使用嵌入式kafka进行Kafka DSL测试,我使用这个存储库作为我的测试实现的基础(它本身就是这个问题的测试用例)。 我在这里制作了一个存储库来演示这一点。 基本上,当使用“Processor.class”的“DemoApplicationTest.ExampleAppWorking.
在探索如何对Kafka流进行单元测试时,我遇到了,不幸的是,这个类似乎被版本(KAFKA-4408)破坏了 对于KTable的问题,是否有一个解决方案? 我看到了“mocked streams”项目,但首先它使用的是,而我使用的是,其次它是Scala,而我的测试是Java/Groovy。 这里的任何关于如何在不需要引导zookeeper/kafka的情况下对流进行单元测试的帮助都将非常棒。 注意:
<代码>list.stream()。 列表中的每个项目都将从数据库中删除。 假设列表中有3个项目,如何进行单元测试: 删除被调用了3次。 删除被称为“按顺序/顺序”,即列表中的元素顺序?
问题内容: 我有一个与此类似的简单带注释的控制器: 我想用这样的单元测试来测试它: 问题是AnnotationMethodHandlerAdapter.handler()方法引发异常: 问题答案: 从Spring 3.2开始,有一种适当的方法可以轻松,优雅地进行测试。您将可以执行以下操作: 有关更多信息,请访问http://blog.springsource.org/2012/11/12/spri
我对使用Spring控制器进行单元测试的概念是新的。我正在遵循我在网上找到的一些示例,并尝试实现他们的测试策略。这是我的基本控制器: 这是我的单元测试: 看起来很简单,但我得到了以下错误: 它完成了这项工作,但它没有像我之前尝试的那样使用任何Spring注释…这种方法是不好的,所以试图弄清楚为什么每当我在测试文件中包含注释时,总是会出现错误。 我的POM:
问题内容: 如何在单元测试中测试 hashCode()函数? 问题答案: 每当我覆盖equals和hash代码时,我都会按照Joshua Bloch在“ Effective Java”第3章中的建议编写单元测试。我确保equals和hash代码是自反的,对称的和可传递的。我还确保“不等于”对所有数据成员均正常工作。 当我检查对equals的调用时,我还要确保hashCode的行为符合预期。像这样: