当前位置: 首页 > 知识库问答 >
问题:

如何用Kafka流单元测试Spring云流

轩辕季同
2023-03-14

一段时间以来,我一直试图让Spring Cloud Stream与Kafka Streams一起使用,我的项目使用嵌入式kafka进行Kafka DSL测试,我使用这个存储库作为我的测试实现的基础(它本身就是这个问题的测试用例)。

我在这里制作了一个存储库来演示这一点。

基本上,当使用“Processor.class”的“DemoApplicationTest.ExampleAppWorking.class”时,测试是成功的

但是当使用使用自定义绑定的“DemoApplication ationTest. example pleAppNotWorking.class”时失败

第二个用例中的错误是:

[No records found for topic] 
Expecting:
 <false>
to be equal to:
 <true>
but was not.
org.opentest4j.AssertionFailedError: [No records found for topic] 
Expecting:
 <false>
to be equal to:
 <true>
but was not.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:155)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:138)
    at com.example.demo.DemoApplicationTest.testSendReceive(DemoApplicationTest.java:104)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
    at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
    at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:40)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
    at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
    at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:71)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
    at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
    at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:102)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:82)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:78)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
    at com.sun.proxy.$Proxy2.stop(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:132)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
    at java.lang.Thread.run(Thread.java:748)

我错过了什么?

共有1个答案

程飞星
2023-03-14

最终使用新的“功能风格”实现了这一点。我怀疑这是因为我在使用spring cloud-stream@3.0.0.M3带着Spring的云-stream@2.2.1.RELEASE参考指南。

我添加了一个分支,其中包含一个说明解决方案的工作测试,为了简单起见,删除了所有未破坏测试的内容:https://github.com/avif/spring-cloud-stream-kafka-embeddedkafka/tree/functional-working

 类似资料:
  • 我正在尝试(单元)测试使用Kafka DSL的Spring Cloud Stream Kafka处理器,但收到以下错误“无法建立到节点1的连接。代理可能不可用。”。此外,测试不会停止。我尝试了EmbeddedKafka和TestBinder,但我有同样的行为。我试图从Spring Cloud团队(有效)给出的响应开始,我将应用程序改编为使用Kafka DSL,并将测试类保持原样。EmbeddedK

  • 在探索如何对Kafka流进行单元测试时,我遇到了,不幸的是,这个类似乎被版本(KAFKA-4408)破坏了 对于KTable的问题,是否有一个解决方案? 我看到了“mocked streams”项目,但首先它使用的是,而我使用的是,其次它是Scala,而我的测试是Java/Groovy。 这里的任何关于如何在不需要引导zookeeper/kafka的情况下对流进行单元测试的帮助都将非常棒。 注意:

  • 我正试图按照GitHub的建议设置测试 其中StreamProcessor设置为 -->line从不使用在我看来应该在主题“output”上的消息,因为@StreamProcessor有@Sendto(“output”) 我希望能够测试流处理的消息。

  • 我已经使用Spring云流启动了一个小型微服务。 我只有两个流绑定,如下所示: 我用Serenity开发了组件测试,我将通道注入到我想要发送测试消息的地方: 哪里: 只是定义为字符串常量: 组件测试模块导入依赖项: 我发送的信息如下: 快乐流工作正常。但是,我想在侦听器无法处理消息时测试错误流。 这是一个监听器的例子: 从try/catch引发异常时,错误由服务激活器处理: 在没有Spring-C

  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放

  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重