当前位置: 首页 > 知识库问答 >
问题:

Java中的Spark流单元测试

劳嘉实
2023-03-14

我正在尝试为我的Spark流作业编写单元测试。我的spark streaming作业使用来自MQ的消息,并将其推入Kafka主题。

我的方法是

    null
public class StreamingJobTest {

private static KafkaConsumer<String, String> consumer;

@BeforeClass
public static void setUpClass()  {

    Properties properties = new Properties();

    properties.put("bootstrap.servers", "localhost:9090");
    properties.put("subscribe", "topic1");
    properties.put("startingOffsets", "earliest");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    consumer = new KafkaConsumer<String, String>(properties);



}


@Test
public void create_test() {
    String[] arguments = new String[]{};
    ConsumerRecords<String, String> records;

    Thread thread = new Thread(() -> StreamingJob.main(arguments));
    thread.start();

     //send a message to MQ.

    MqSender mqSender = new MqSender();
    mqSender.mqPushMsg("TestMsg");

    //keep polling the kafka topic.

    while(true){
        System.out.println("Polling...");
        records = consumer.poll(100);

        if(!records.isEmpty()){

            thread.interrupt();
            break;
        }

    assertNotNull(records);

    }


}

共有1个答案

诸葛绍元
2023-03-14

我自己想通了。我需要在一个单独的行订阅主题。我把它添加到我的属性中。而且groupid在Kafka中是强制性的,我错过了。它现在对我很好。下面是订阅主题的代码。

consumer.subscribe(Arrays.asList("topic1")); 
 类似资料:
  • 本文向大家介绍浅谈Java 中的单元测试,包括了浅谈Java 中的单元测试的使用技巧和注意事项,需要的朋友参考一下 单元测试编写 Junit 单元测试框架 对于Java语言而言,其单元测试框架,有Junit和TestNG这两种, 下面是一个典型的JUnit测试类的结构 测试结果如图所示 层次性表达测试用例 测试用例较多的情况下,为了层次性表达测试用例,使用Junit的Nested注解有层次的表达测

  • 我试图讨论下面SonarQube通过代码覆盖的if条件。SonarQube说2个条件中的1个都被覆盖了。如果有人能帮我把这两件事都盖上,我将不胜感激。 我已经声明了变量,并拥有了setter和getter。 下面是我要介绍的代码: 下面是测试2个条件中的1个条件的代码:

  • 我有一个文件在 /src/main/java有以下代码行: 相应的资源文件位于/src/main/resources。。。但我们正在考虑取消这项测试。json来自classpath(src/main/resources)文件夹,我们将再包含一个jar依赖项,该jar将包含此测试。json。。。我已经用jar依赖性进行了测试,它在类路径中运行良好。。但问题是我们有一些测试用例。。如果我们不包括jar

  • 问题内容: 我想知道什么是对Servlet进行单元测试的最佳方法。 只要测试内部方法不引用servlet上下文,就不会有问题,但是测试doGet / doPost方法以及引用上下文或使用会话参数的内部方法又如何呢? 有没有一种方法可以简单地使用经典工具(例如JUnit或最好是TestNG)来做到这一点?我是否需要嵌入tomcat服务器或类似的东西? 问题答案: 尝试使用HttpUnit,尽管您最终

  • 需要对项目的控制器部分进行单元测试,但却得到了错误。我相信ModelAndVIew部分导致了这个问题,尽管我曾经嘲弄过它并返回ModelAndVIew,因为它是方法的返回类型。然而,它并不起作用。pom.xml没有任何问题,因此没有添加它。ProjectController: java.lang.IllegalStateException:找不到@SpringBootConfiguration,您