我创建了一个Spring Boot应用程序,它向Kafka主题发送消息。我正在使用springSpring-Integration-Kafka
:KafkaProducerMessageHandler
订阅到一个通道(subscribableChannel
),并将接收到的所有消息推送到一个主题。应用程序运行良好。我看到消息通过console consumer(本地Kafka)到达Kafka。
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "myTopic");
//...
@Before
public void init() throws Exception {
mockConsumer = new MockConsumer<>( OffsetResetStrategy.EARLIEST );
kafkaEmbedded.consumeFromAnEmbeddedTopic( mockConsumer,"sikom" );
}
//...
@Test
public void endToEnd() throws Exception {
// ...
ConsumerRecords<String, String> records = mockConsumer.poll( 10000 );
StreamSupport.stream(records.spliterator(), false).forEach( record -> log.debug( "record: " + record.value() ) );
}
问题是我没有看到任何记录。我不确定我的KafkaEmbedded设置是否正确。但是消息由信道接收。
这对我管用。试试看
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaEmbeddedTest {
private static String SENDER_TOPIC = "testTopic";
@ClassRule
// By default it creates two partitions.
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Test
public void testSend() throws InterruptedException, ExecutionException {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
//If you wish to send it to partitions other than 0 and 1,
//then you need to specify number of paritions in the declaration
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
// Make sure you set the offset as earliest, because by the
// time consumer starts, producer might have sent all messages
consumerProps.put("auto.offset.reset", "earliest");
final List<String> receivedMessages = Lists.newArrayList();
final CountDownLatch latch = new CountDownLatch(3);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
kafkaConsumer.subscribe(Collections.singletonList(SENDER_TOPIC));
try {
while (true) {
ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
records.iterator().forEachRemaining(record -> {
receivedMessages.add(record.value());
latch.countDown();
});
}
} finally {
kafkaConsumer.close();
}
});
latch.await(10, TimeUnit.SECONDS);
assertTrue(receivedMessages.containsAll(Arrays.asList("message00", "message01", "message10")));
}
}
我使用倒计时锁存是因为producer.send(..)
是一个异步操作。所以我在这里做的是,在一个无限循环中,每100毫秒轮询Kafka,如果有新记录,如果有,将它添加到未来断言的列表中,然后减少倒计时。我将总共等待10秒以确定。
您也可以使用一个简单的循环,然后在几分钟后退出。(如果您不想使用CountdownLatch和ExecutorService之类的东西)
我在网上读了一些讨论。他们说,我们不应该对私有方法进行单元测试或检查私有状态,因为这是实现细节,是糟糕设计的标志。但就我而言,我真的不知道如何做得更好。 下面是一个示例代码(我的实际代码是使用factory编写的,但我尝试使用纯js创建一个相同的案例,这样每个人都更容易理解,因为相同的原因是闭包): 在我真正的应用程序中,我可以注入和模拟本地存储,但这是一个问题。我的问题是如何测试方法是否设置了其
问题内容: 我有以下指令来自动聚焦字段: 我将如何对此进行单元测试?我尝试了以下选择器之类的几种方法,但是它们都返回错误或false: 我的单元测试设置如下: 问题答案: 我想通了,实际上这很明显。 我的问题有两个: 我没有调用超时刷新功能,所以没有发生超时,并且 我试图查看元素的focus属性,而仅关注focus()函数的调用更像是单元测试。focus属性确实属于e2e测试领域。
我是spring的新手,目前我正在尝试为我的项目执行单元测试。我已经用hibernate配置了spring,现在我想检查创建的类的方法是否工作。例如,假设我有: 如何正确测试这些方法?最好的选择是什么?我习惯在JUnit测试中使用assertTrue和assertFalse,但我担心这在我的情况下不是一个好的选择。谢谢你。
问题内容: 我正在使用带有eclipse的junit编写功能测试。 当运行单个测试时,它将按照我在类中设置它们的顺序运行。 例如。 但是,当我将此测试作为套件的一部分运行时(在包中),顺序是随机的。 例如,它将执行验证,然后删除用户,然后删除joinuserToRoom然后创建用户。 我在套件中的测试并不相互依赖。但是,测试中的每个单独测试都取决于它们以正确的顺序运行。 有什么办法可以实现? 谢谢
问题内容: 我有一个简单的功能要测试: 但是,如何测试该函数实际发送到标准输出的内容呢?Test :: Output在Perl中实现了我想要的功能 但这对于每个测试来说都是很多额外的工作。我希望有一种更标准的方法,或者也许是一个抽象库来处理此问题。 问题答案: 还需要记住的一件事是,没有什么可以阻止您编写避免编写样板代码的函数。 例如,我有一个使用的命令行应用程序,我编写了以下函数: 然后像这样使
我需要在我的应用程序中进行单元测试JMSXGroupID。有人能告诉我是否有示例文章或参考文献来实现这一点吗?我试图在ActiveMQ门户中查看参考文献,但看起来下面的Hudson链接坏了。我正在使用Spring JMS进行编程。 http://activemq.apache.org/junit-reports.html