当前位置: 首页 > 知识库问答 >
问题:

如何实例化junit测试的模拟Kafka主题?

史涵育
2023-03-14

澄清一下:我选择使用主题的一个实际的嵌入式实例,以便用一个实际的实例进行测试,而不是在mockito中嘲弄hand off。这样我就可以测试我的自定义编码器和解码器是否能够正常工作,并且当我使用一个真实的kafka实例时不会失败。

共有1个答案

乜昆
2023-03-14

https://gist.github.com/asmaier/6465468#文件-KafkaProducerTest-Java

此示例已更新为可以在新的0.8.2.2版本中使用。下面是包含maven依赖项的代码片段:

pom.xml:

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.8.2.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.8.2.2</version>
      <classifier>test</classifier>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.8.2.2</version>
    </dependency>
</dependencies>
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import kafka.admin.TopicCommand;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.TestZKUtils;
import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import static org.junit.Assert.*;

/**
 * For online documentation
 * see
 * https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala
 * https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala
 * https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
 */
public class KafkaProducerTest {

    private int brokerId = 0;
    private String topic = "test";

    @Test
    public void producerTest() throws InterruptedException {

        // setup Zookeeper
        String zkConnect = TestZKUtils.zookeeperConnect();
        EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
        ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);

        // setup Broker
        int port = TestUtils.choosePort();
        Properties props = TestUtils.createBrokerConfig(brokerId, port, true);

        KafkaConfig config = new KafkaConfig(props);
        Time mock = new MockTime();
        KafkaServer kafkaServer = TestUtils.createServer(config, mock);

        String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"};
        // create topic
        TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments));

        List<KafkaServer> servers = new ArrayList<KafkaServer>();
        servers.add(kafkaServer);
        TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);

        // setup producer
        Properties properties = TestUtils.getProducerConfig("localhost:" + port);
        ProducerConfig producerConfig = new ProducerConfig(properties);
        Producer producer = new Producer(producerConfig);

        // setup simple consumer
        Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));

        // send message
        KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8));

        List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
        messages.add(data);

        producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
        producer.close();

        // deleting zookeeper information to make sure the consumer starts from the beginning
        // see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka
        zkClient.delete("/consumers/group0");

        // starting consumer
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

        if(iterator.hasNext()) {
            String msg = new String(iterator.next().message(), StandardCharsets.UTF_8);
            System.out.println(msg);
            assertEquals("test-message", msg);
        } else {
            fail();
        }

        // cleanup
        consumer.shutdown();
        kafkaServer.shutdown();
        zkClient.close();
        zkServer.shutdown();
    }
}
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.2</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.2</version>
    <classifier>test</classifier>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.2</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.2.0-incubating</version>
    <scope>test</scope>
</dependency>
@Before
public void startZookeeper() throws Exception {
    zkTestServer = new TestingServer(2181);
    cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000));
}

@After
public void stopZookeeper() throws IOException {
    cli.close();
    zkTestServer.stop();
}
 类似资料:
  • 我们有一个消息调度程序,它在将消息属性放入带有密钥的Kafka主题队列之前,从消息属性生成一个散列密钥。 这样做是为了消除重复。但是,我不确定如果不实际设置本地集群并检查它的运行是否符合预期,如何能够测试重复数据删除。

  • 问题内容: 我有一个Java命令行程序。我想创建JUnit测试用例以进行模拟。因为当我的程序运行时,它将进入while循环并等待用户输入。如何在JUnit中模拟呢? 问题答案: 从技术上讲,可以进行切换,但是总的来说,不直接在代码中调用它,而是添加一层间接层,这样输入源就可以从应用程序的某个位置进行控制,这样会更健壮。确切地讲,这是实现的详细信息-依赖项注入的建议很好,但是你不一定需要引入第三方框

  • 本节介绍与JUnit Framework相关的各种模拟测试。 您可以在本地计算机上下载这些示例模拟测试,并在方便时离线解决。 每个模拟测试都提供一个模拟测试密钥,让您自己验证最终得分和评分。 JUnit Mock Test I 问题1 - 以下哪项描述正确测试? A - 测试是检查应用程序功能的过程,是否按照要求运行。 B - 测试是单个实体(类或方法)的测试。 C - 以上两者。 D - 以上都

  • 我有一些遗留代码,并对我在该代码上所做的增强进行了编写测试。我有一个类SiteSession,并提取了一个接口ISiteSession,以便将依赖项注入到调用类中。 调用类有一个构造函数,在该构造函数中,依赖项被注入到正在测试的控制器CustomerDetails中 现在,我的测试方法已经嘲弄了依赖关系,并且我对为这个控制器或代码的任何其他部分创建的任何测试都没有问题。但是,当调用该控制器上的测试

  • 我无法访问代码,无法更改它。如何模拟/实例化? 类和代码测试:

  • 我已经编写了示例CRUD方法,我已经为服务组件编写了JUnit测试用例,但得到的结果是“地址id没有找到…”当我做测试的时候。 @test public void updateAddressTest()引发ResourceNotFoundException{