当前位置: 首页 > 知识库问答 >
问题:

如何在骆驼路线上模拟Kafka消费者endpoint?

穆文斌
2023-03-14

我有一个驼峰endpoint,基本上是Kafka消费者从一个主题中读取信息并将其发送到数据库。它工作得很好,但是,我很难对它进行单元测试,因为我无法模拟Kafkaendpoint。有谁能帮我在骆驼路线上嘲笑Kafka的消费者吗?

java prettyprint-override">@Override
public void configure() {    
    from(kafka:eph?brokers=localhost:9092...).routeId("KafkaConsumer")
        .to(direct:updateDatabase)
}

共有2个答案

季阳朔
2023-03-14

只需在属性中外部化endpointURI(例如,使用Spring属性工具)

from(consumerEndpoint).routeId("KafkaConsumer")

然后在生产配置中,使用真正的endpoint

consumerEndpoint=kafka:eph?brokers=localhost:9092...

而在测试配置中,使用直接endpoint

consumerEndpoint=direct:consumer

这一个很容易从骆驼路线测试中触发

producer.sendBody("direct:consumer", myMessageBody);
公冶高峯
2023-03-14

要对路线进行单元测试,可以使用标准的驼峰Spring引导测试。在测试期间,Kafka制作人(在Camel看来)可以与直接组件交换,模拟消息可以在那里传递。要查看路由是否正确处理了这些消息,可以使用模拟endpoint。

//Route definition
@Component
public class KafkaRoute extends RouteBuilder {

    public static final String KAFKA_ROUTE_NAME = "kafka-route";

    @Override
    public void configure() throws Exception {
        from("kafka:eph?brokers=localhost:9092").routeId(KAFKA_ROUTE_NAME)
                .log(LoggingLevel.INFO, "Message: ${body}  received on the topic: ${headers[kafka.TOPIC]} ")
                .to("direct:updateDatabase");

        from("direct:updateDatabase").log(LoggingLevel.INFO, "DB Updated.");

    }

}

import java.util.HashMap;
import java.util.Map;

import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.spring.CamelSpringBootRunner;
import org.apache.camel.test.spring.MockEndpoints;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;

@RunWith(CamelSpringBootRunner.class)
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@MockEndpoints("direct:*")
public class KafkaRouteTest {

    @Autowired
    CamelContext camelContext;

    @Produce
    ProducerTemplate mockKafkaProducer;

    @EndpointInject("mock:direct:updateDatabase")
    MockEndpoint finalSink;

    @Test
    public void testKafkaRoute() throws Exception {

        //Here we swap the FROM component in the KafkaRoute.KAFKA_ROUTE_NAME with a direct component, direct:kafka-from
        AdviceWithRouteBuilder.adviceWith(camelContext, KafkaRoute.KAFKA_ROUTE_NAME, routeBuilder -> {
            routeBuilder.replaceFromWith("direct:kafka-from");
        });

        Map<String, Object> headers = new HashMap<>();
        headers.put(KafkaConstants.TOPIC, "testTopic");

        //Send mock message to the route
        mockKafkaProducer.sendBodyAndHeaders("direct:kafka-from", "test-body", headers);


        //Assertions. You may do additional assertions with the likes of Mockito
        finalSink.expectedBodiesReceived("test-body");
        finalSink.expectedHeaderReceived(KafkaConstants.TOPIC, "testTopic");
        finalSink.assertIsSatisfied();

    }

}

Camel Kafka组件已经经过单元测试,在代码库中复制所有这些测试是没有意义的。但是,如果你真的想对一个真正的Kafka实例进行测试,你可以使用测试容器。这里有一个完整的例子,来自Camel存储库本身,使用测试容器。

 类似资料:
  • 我一直在尝试为Spring引导Kafka骆驼Avro消费者寻找示例代码,但没有运气。我在以下URL找到了Spring Camel Kafka消费者和生产者示例: https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/ 我的具体问题是,一旦我的bean从Avro模式创建,并且我有了POJO类,我如何将上面的c

  • 我在JPA上遇到了以下问题,但这可能更像是一个关于骆驼的概念问题。 我需要一个基于cron的石英消费者。但如果触发了,我想选择JPA组件作为第一步。 但是如果我用“to”调用JPA组件,那么它被用作生产者,而不是消费者。我可以以某种方式使用JPA组件来处理这个问题吗,或者我必须遵循服务激活器(基于bean的)逻辑并将JPA组件留在后面? 提前谢谢你,葛格利

  • 我有一个从JMS队列读取项目并将其写入数据库的路径。 我已经阅读了关于ApacheCamelJMS组件的文档,但我没有得到我的问题的确切和明确的答案,即“如果路由中出现异常,JMS消费者是否会重新插入项目或解锁JMS队列中的消息?”。 谢谢 阿里

  • 我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?

  • 我在为 端口设置 消费者以捕获消息时遇到问题。我的: 申请开始: 并且<code>514</code>端口已打开但未侦听 我可以在tcpdump中看到,tcpdump-I eth1-nn-A-s 0端口514和udp正确发送和接收消息。 有人能告诉我我做错了什么吗?