试图弄清楚我是否可以使用spring kafka和spring kafka测试为@KafkaListener编写单元测试。
我的听众课。
public class MyKafkaListener {
@Autowired
private MyMessageProcessor myMessageProcessor;
@KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
public void myMessageListener(MyMessage message) {
myMessageProcessor.process(message);
log.info("MyMessage processed");
}}
我的测试类别:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
@ContextConfiguration(classes = {TestKafkaConfig.class})
public class MyMessageConsumersTest {
@Autowired
private MyMessageProcessor myMessageProcessor;
@Value("${kafka.topic.01}")
private String TOPIC_01;
@Autowired
private KafkaTemplate<String, MyMessage> messageProducer;
@Test
public void testSalesforceMessageListner() {
MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
}}
我的测试配置类:
@Configuration
@EnableKafka
public class TestKafkaConfig {
@Bean
public MyMessageProcessor myMessageProcessor() {
return mock(MyMessageProcessor.class);
}
@Bean
public KafkaEmbedded kafkaEmbedded() {
return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");
}
//Consumer
@Bean
public ConsumerFactory<String, MyMessage> myMessageConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyMessage> myMessageListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(myMessageConsumerFactory());
return factory;
}
//Producer
@Bean
public ProducerFactory<String, MyMessage> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, MyMessage> messageProducer() {
return new KafkaTemplate<>(producerFactory());
}
}
有什么简单的方法可以做到这一点吗?
或者我应该以其他方式测试@KafkaListener?在单元测试中,如何确保在Kafka中收到新消息时调用@KafkaListener。
这是我根据您的代码为消费者提供的工作解决方案。谢谢你:-)
配置如下:
@TestConfiguration
@EnableKafka
@Profile("kafka_test")
public class KafkaTestConfig {
private static Logger log = LoggerFactory.getLogger(KafkaTestConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
@Primary
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
log.info("Consumer TEST config = {}", props);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
log.info("Producer TEST config = {}", props);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<String>());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
return pf;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.setConcurrency(2);
return factory;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
@Bean
public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry = new KafkaListenerEndpointRegistry();
return kafkaListenerEndpointRegistry;
}
}
将测试中需要包含的所有bean放在不同的类中:
@TestConfiguration
@Profile("kafka_test")
@EnableKafka
public class KafkaBeansConfig {
@Bean
public MyProducer myProducer() {
return new MyProducer();
}
// more beans
}
我创建了一个BaseKafkanConsumerTest类来重用它:
@ExtendWith(SpringExtension.class)
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
@TestInstance(Lifecycle.PER_CLASS)
@DirtiesContext
@ContextConfiguration(classes = KafkaTestConfig.class)
@ActiveProfiles("kafka_test")
public class BaseKafkaConsumerTest {
@Autowired
protected EmbeddedKafkaBroker embeddedKafka;
@Value("${spring.embedded.kafka.brokers}")
private String brokerAddresses;
@Autowired
protected KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
protected KafkaTemplate<String, String> senderTemplate;
public void setUp() {
embeddedKafka.brokerProperty("controlled.shutdown.enable", true);
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
System.err.println(messageListenerContainer.getContainerProperties().toString());
ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafka.getPartitionsPerTopic());
}
}
@AfterAll
public void tearDown() {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
messageListenerContainer.stop();
}
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
embeddedKafka.getKafkaServers().forEach(b -> b.awaitShutdown());
}
}
扩展基类以测试消费者:
@EmbeddedKafka(topics = MyConsumer.TOPIC_NAME)
@Import(KafkaBeansConfig.class)
public class MYKafkaConsumerTest extends BaseKafkaConsumerTest {
private static Logger log = LoggerFactory.getLogger(PaymentMethodsKafkaConsumerTest.class);
@Autowired
private MyConsumer myConsumer;
// mocks with @MockBean
@Configuration
@ComponentScan({ "com.myfirm.kafka" })
static class KafkaLocalTestConfig {
}
@BeforeAll
public void setUp() {
super.setUp();
}
@Test
public void testMessageIsReceived() throws Exception {
//mocks
String jsonPayload = "{\"id\":\"12345\","cookieDomain\":"helloworld"}";
ListenableFuture<SendResult<String, String>> future =
senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);
Thread.sleep(10000);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("successfully sent message='{}' with offset={}", jsonPayload,
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("unable to send message='{}'", jsonPayload, ex);
}
});
Mockito.verify(myService, Mockito.times(1))
.update(Mockito.any(MyDetails.class));
}
正如我在其他帖子中所读到的,不要以这种方式测试业务逻辑。只是打电话而已。
您可以在测试用例中包装侦听器。
鉴于
@SpringBootApplication
public class So52783066Application {
public static void main(String[] args) {
SpringApplication.run(So52783066Application.class, args);
}
@KafkaListener(id = "so52783066", topics = "so52783066")
public void listen(String in) {
System.out.println(in);
}
}
然后
@RunWith(SpringRunner.class)
@SpringBootTest
public class So52783066ApplicationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "so52783066");
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaTemplate<String, String> template;
@Before
public void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}
@Test
public void test() throws Exception {
ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry
.getListenerContainer("so52783066");
container.stop();
@SuppressWarnings("unchecked")
AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container
.getContainerProperties().getMessageListener();
CountDownLatch latch = new CountDownLatch(1);
container.getContainerProperties()
.setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
messageListener.onMessage(data, acknowledgment, consumer);
latch.countDown();
}
});
container.start();
template.send("so52783066", "foo");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
}
如何确保在Kafka中收到新消息时调用@KafkaListener。
好的,这本质上是测试这种功能的框架责任。在您的情况下,您只需要专注于业务逻辑和单元测试,而不是在框架中编译的代码。此外,测试KafkaListener方法也没有什么好处,它只记录传入消息。要找到测试用例验证的钩子肯定太难了。
另一方面,我真的相信您的@KafkaListener
方法中的业务逻辑比您展示的要复杂得多。因此,验证从该方法调用的自定义代码(例如DB插入、一些其他服务调用等)可能真的更好,而不是尝试准确地找出myMessageListener()
的挂钩。
您使用mock(MyMessageProcessor.class)所做的操作确实是业务逻辑验证的好方法。代码中唯一的错误是嵌入Kafka的重复:您使用注释,并且在配置中声明了一个Bean。您应该考虑删除其中一个。虽然还不清楚您的生产代码在哪里,但它实际上没有嵌入Kafka。否则,如果一切都在测试范围内,我看不到您的消费者和生产者工厂配置有任何问题。您肯定对KafkaListener和KafkaTemplate进行了尽可能少的配置。您只需要删除嵌入的Kafka,不要启动代理两次。
问题内容: 我有一个Java课。如何进行 单元测试? 就我而言,我有课做一个二进制和。它需要两个数组,将它们求和,然后返回一个新的二进制数组。 问题答案: 使用正确的输入定义正常情况下的预期和期望输出。 现在,通过声明一个类来实现测试,将其命名为任何东西(通常是类似TestAddingModule之类的东西),并向其添加testAdd方法(即,类似于下面的方法): 编写一个方法,并在其上方添加@T
我一直在学习使用Jest库编写JavaScript/TypeScript代码的单元测试。下面是一个我不知道如何处理的例子。它是用TypeScript输入的——只有两个公共方法和一个构造函数需要service1参数。 我想我需要测试两种情况: > 如果 函数为空。我没有在代码中看到它的任何实现,也不知道它是如何工作的。我应该把它作为参数传递给这个类的实例吗? 我很困惑,在这个特定的例子中,我应该使用
我不会告诉你有关后台任务的单元测试的任何内容,因为Hangfire没有添加任何特定方法 (除了 IJobCancellationToken 接口参数)去改变任务。使用您最喜爱的工具,并照常写入单元测试。本节介绍如何测试创建的后台任务。 所有的代码示例都使用静态 BackgroundJob 类来告诉你如何做这个或那些东西,只是出于简单演示的目的。但是当你想测试调用的静态方法时,会变得很痛苦。 不用担
我想为上面的内容编写单元测试,以测试我正在使用的注释的sampleURL,比如如果我给出任何应该与regex模式匹配的URL。我浏览了以下链接:如何在spring中进行单元测试验证注释,如何使用JUnit测试类的验证注释?但它们没有多大帮助,我也有setSampleURL函数。那么,如何为sampleURL变量编写测试呢。基本上,我想为regex模式编写测试,即我给sampleURL的值是否与re