我正在使用kafka在spring boot中开发一个异步邮件服务器。
我已经用嵌入式Kafka编写了测试,它在一个随机端口中启动自己的Kafka主题,并使用它进行测试。
当我开始这个应用程序上下文正在加载,它期望在我的本地kafka集群。我需要停止应用程序上下文的加载。我从https://github.com/code-not-found/spring-kafka/blob/master/spring-kafka-unit-test-classrule/src/test/java/com/codenotfound/kafka/producer/SpringKafkaSenderTest.java复制了代码,这绝对很好。当我在我的项目中遵循相同的风格时,我可以看到实际的应用程序开始了。
SpringKafkaSenderTest. java
package com.mailer.embeddedkafkatests;
import static org.junit.Assert.assertTrue;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import com.mailer.model.Mail;
import com.mailer.producer.KafkaMessageProducer;
import com.mailer.serializer.MailSerializer;
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class SpringKafkaSenderTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(SpringKafkaSenderTest.class);
private static String SENDER_TOPIC = "sender.t";
@Autowired
private KafkaMessageProducer sender;
private KafkaMessageListenerContainer<String, Mail> container;
private BlockingQueue<ConsumerRecord<String, Mail>> records;
@ClassRule
public static EmbeddedKafkaRule embeddedKafka =
new EmbeddedKafkaRule(1, true, SENDER_TOPIC);
@Before
public void setUp() throws Exception {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties =
KafkaTestUtils.consumerProps("sender", "false",
embeddedKafka.getEmbeddedKafka());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MailSerializer.class);
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, Mail> consumerFactory =
new DefaultKafkaConsumerFactory<String, Mail>(
consumerProperties);//, new StringDeserializer(), new JsonDeserializer<>(Mail.class));
// set the topic that needs to be consumed
ContainerProperties containerProperties =
new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory,
containerProperties);
// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();
// setup a Kafka message listener
container
.setupMessageListener(new MessageListener<String, Mail>() {
@Override
public void onMessage(
ConsumerRecord<String, Mail> record) {
LOGGER.debug("test-listener received message='{}'",
record.toString());
records.add(record);
}
});
// start the container and underlying message listener
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
}
@After
public void tearDown() {
// stop the container
container.stop();
}
@Test
public void testSend() throws InterruptedException {
// send the message
Mail mail = new Mail();
mail.setFrom("vinoth@local.com");
sender.sendMessage(mail);
Thread.sleep(4000);
// check that the message was received
ConsumerRecord<String, Mail> received =
records.poll(10, TimeUnit.SECONDS);
// Hamcrest Matchers to check the value
assertTrue(received.value().getFrom().equals(mail.getFrom()));
System.out.println(received.value().getFrom());
// assertThat(received, hasValue(mail));
// AssertJ Condition to check the key
// assertThat(received).has(key(null));
}
}
为什么要停止加载spring上下文?这个junit的目的不是测试您的spring应用程序吗?
在任何情况下,只需删除@SpringBootTest
注释,spring上下文将不会加载。
我需要启动一个嵌入式cassandra实例,通过单元测试在cassandra键空间上执行一些操作。编程语言是Java。启动嵌入式cassandra有哪些选项? 我使用了mojo maven插件,但是在使用以下命令启动实例之后,我没有看到cassandra实例在localhost上的默认端口9042启动。插件:http://www.mojohaus.org/cassandra-maven-plugi
所以我用了这个嵌入Kafka的例子,还有这个 我对这个示例做了一点更改,并用一些数据库(如h2db)更新了kafka侦听器。 现在在我的单元测试中,当我想检查数据在数据库中是否可用时,我得到NULL。另外,我不确定如何手动检查数据库,因为h2是一个内存基础数据库。 这是更新的部分:在接收器类中 在单元测试中: 但 dt 始终为空。此外,我也无法检查数据库,因为它在测试停止后停止。有人知道如何使它可
尝试完成Spring Boot教程:https://Spring.io/guides/gs/spring-boot/#initial 以下是我的课程: 和pom.xml: 当我尝试使用以下命令运行它时:“mvn package&&java-jar target/gs-spring-boot-0.1.0.jar”,我得到的是: “-dmaven.home=C:\program files\jetbr
当我尝试运行此代码时,spring-boot应用程序会运行,但junit测试用例永远不会运行。但是如果我删除“@SpringBootTest(webEnvironment=webEnvironment.RANDOM_PORT)”,那么Junit测试会运行,但我的spring-boot应用程序不会运行。在我早期的项目中,这从来不是一个问题。
我试图使用Intellij启动一个Spring应用程序,但我得到了下面的错误。我尝试将Tomcat依赖项放在pom.xml中,但这并不奏效(我后来也发现不需要这样做)。 应用程序应该可以正常工作,因为它已经在一个同事的电脑上工作了,所以我不太确定为什么我会得到这个错误。我已经尝试了几个修复,我在这里遇到了,但没有一个奏效。下面是我的: