当前位置: 首页 > 知识库问答 >
问题:

从docker compose使用应用程序定义的Kafka服务器从Spring启动Kafka Junit

蒯华彩
2023-03-14

我有一个带有docker compose的Spring Boot应用程序,我正在从中配置kafka和其他必需的应用程序。

我的应用程序中有Kafka制作人和消费者(@kafkaListener)。我从@Bean(不是从application.yml文件)获得了Kafka和JPA存储库配置。

我正在为kafka生产者和消费者编写一个junit测试,但我想使用应用程序定义的kafka(在docker compose中定义)。

所以在我的pom中。xml我有exec目标,首先启动应用程序,然后运行测试收集报告,最后停止应用程序。

有人能帮我用应用程序定义的Kafka为Kafka制作人和消费者编写Junit吗。我尝试了@RunWith(SpringRunner.class)、@SpringBootTest、@EmbeddedKafka,但总有例外。

  <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
            <version>1.1.1</version>
            <executions>
                <execution>
                    <id>Create IT environment</id>
                    <phase>pre-integration-test</phase>
                    <goals>
                        <goal>exec</goal>
                    </goals>
                    <configuration>
                        <executable>${basedir}/start-services.sh</executable>
                    </configuration>
                </execution>
                <execution>
                    <id>Generate Cobertura Reports</id>
                    <phase>integration-test</phase>
                    <goals>
                        <goal>exec</goal>
                    </goals>
                    <configuration>
                        <skip>${skip.coverage}</skip>
                        <executable>${basedir}/coverage/report.sh</executable>
                    </configuration>
                </execution>
                <execution>
                    <id>Clean up environment</id>
                    <phase>post-integration-test</phase>
                    <goals>
                        <goal>exec</goal>
                    </goals>
                    <configuration>
                        <executable>${basedir}/stop-services.sh</executable>
                    </configuration>
                </execution>
            </executions>
        </plugin>

样本测试-

 @RunWith(SpringRunner.class)
 @SpringBootTest
 @EmbeddedKafka(partitions = 1, brokerProperties = {  "listeners=PLAINTEXT://localhost:9092", "port=9092" })
 public class KafkaProcessorITCase {

    private static final String TOPIC = "topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private MessageConsumer consumer;

    @Autowired
    private MessageProducer producer;

    @Test
    public void shouldProcessMessage() throws  Exception {

        String message = "Sample message";

        producer.sendMessage(message, TOPIC);
        
        //Assertions
 }
}

制作人配置

  @Bean
  public ProducerFactory<String, String> producerConfig() {
     Map<String, Object> configMap = new HashMap<>();
     configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,      System.getenv("KAFKA_BROKERS"));
     configMap.put(ProducerConfig.ACKS_CONFIG, producerAcks);
     configMap.put(ProducerConfig.RETRIES_CONFIG, retries);
     configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,    StringSerializer.class.getName());
     configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

     return new DefaultKafkaProducerFactory<>(configMap);
  }

错误:

    java.lang.IllegalStateException: Failed to load ApplicationContext
at org.springframework.cloud.configuration.CompositeCompatibilityVerifier.verifyDependencies(CompositeCompatibilityVerifier.java:47)
at org.springframework.cloud.configuration.CompatibilityVerifierAutoConfiguration.compositeCompatibilityVerifier(CompatibilityVerifierAutoConfiguration.java:44)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:653)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:638)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:944)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:338)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:123)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:124)
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:124)
at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:248)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray2(ReflectionUtils.java:208)
at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:158)
at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:86)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:95)

共有1个答案

邰建业
2023-03-14

编辑:您得到的异常意味着Spring无法为所有注释为@autowmed的对象找到提供程序。您需要确保所有自动生成的对象都可以由提供程序方法或bean实例化。

我绝不是Kafka的专家,但看看这篇帖子(抱歉,这只是俄文的,但看看代码吧),他们有一个Kafka模板的bean,你用的不同。查看他们的代码:

    @Bean
    public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
        return new KafkaTemplate<>(greetingProducerFactory());
    }

然后他们在KafkaProducer中将其注释为@autowed。

你有以下变量的提供者吗?

java prettyprint-override">    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private MessageConsumer consumer;

    @Autowired
    private MessageProducer producer;

顺便说一下,你可以考虑使用Mockito作为消费者和生产者。我不知道这对Kafka是否有效。

 类似资料:
  • 我使用的是一个Java应用程序,它让消费者开始阅读Kafka主题。每次我需要启动消费者应用程序时,我都必须使用cmd中的命令启动Zookeeper和Kafka服务器。是否可以用小型Java程序启动/停止它们?非常感谢。

  • 在尝试运行Spring Temple项目时收到此错误消息不知道为什么?有什么建议吗? 这是来自控制台的完整日志 我对代码进行了任何修改,只是试图运行从Spring模板下载的应用程序

  • 我面临的问题是,有一个服务,我必须调用,这是一个传统的Spring启动应用程序,而不是反应性的! 下面是一个示例endpoint,它接近上述遗留系统的想法: 我知道我不能用这个来实现真正的反应性善,有没有一个快乐的非阻塞和阻塞的媒介我可以在这里实现? 谢谢

  • 问题内容: 我正在寻找一种从Matlab中启动应用程序的方法。问题是,我的Matlab脚本将一些结果保存到文件中,然后应在关联的应用程序中打开(在这种情况下为Blender)。 我熟悉类似的命令 要么 以及其他一些方法,但实际上,该应用程序是从Matlab PATH启动的,因此它在Matlab目录中查找所需的各种库。例如: 是否有某种方法可以启动使用全局(系统)PATH的应用程序? 不久前,我以为

  • 问题内容: 我可以使用Maven编译并启动Spring项目: 但是,当我使用(包括)将所有jar组合到一个文件中时,在执行过程中总是会得到一个: 我还尝试将架构定义(即等)直接附加到类路径,但是没有成功。 问题答案: Spring命名空间处理程序使用文件和解析。由于具有这些名称的文件存在于不同的Spring jar中,因此可能只有其中一个保留在目标jar之后。 也许您可以手动合并这些文件,然后以某

  • 问题内容: 我有一个GUI应用程序,需要在后台静默运行并继续收集信息。它首先需要显示UI以输入凭据,然后偶尔显示错误。 我了解我无法将GUI应用程序作为服务运行,因此我想创建服务只是为了启动GUI应用程序。因此,第一个程序作为服务运行,它仅启动GUI应用程序,然后继续监听来自GUI应用程序的任何信息。以下是启动GUI应用程序的第一个应用程序的代码。 当我运行此程序时,将启动GUI应用程序,但在5-