当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Kafka-方法必须是声明性的

蒋正平
2023-03-14

我已经配置了一个基于Spring启动的应用程序与Spring云流。我试图在KStream上工作,但我不断得到错误"java.lang.IllegalArgumentExc的:方法必须是声明性的"。有人能帮我了解如何配置这个吗?我查找了StreamListener留档,但我无法让它工作。

https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RC2/api/org/springframework/cloud/stream/annotation/StreamListener.html

配置

spring.cloud.stream.kafka.binder.brokers=localhost
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.default.consumer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.concurrency=3
spring.cloud.stream.bindings.input.destination=myTopic
spring.cloud.stream.bindings.input.group=myGroup

应用

/**
 * This works
 */
@StreamListener(Sink.INPUT)
public void process (String event) {

   ...
}

/**
 * This doesn't work
 */
@StreamListener(Sink.INPUT)
public void process (KStream<String, String> event) {

   ...
}

错误

java.lang.IllegalArgumentException: Method must be declarative
    at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:503) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:162) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_172]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) ~[spring-beans-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at com.abc.xyz.Application.main(Application.java:43) [classes/:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_172]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_172]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.1.2.RELEASE.jar:2.1.2.RELEASE]

编辑1:添加Pom.xml

砰的一声

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
    <spring-cloud-stream.version>Fishtown.RELEASE</spring-cloud-stream.version>
</properties>

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

编辑2:添加@Input后添加更改

@StreamListener
public void process (@Input(Sink.INPUT) KStream<String, String> event) {

    System.out.println(event);
}

错误

java.lang.IllegalStateException: java.lang.ClassCastException: org.springframework.cloud.stream.messaging.DirectWithAttributesChannel cannot be cast to org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapper
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:308) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:164) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_172]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) ~[spring-beans-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at abc.xyz.apps.Application.main(Application.java:43) [classes/:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_172]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_172]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.1.2.RELEASE.jar:2.1.2.RELEASE]
Caused by: java.lang.ClassCastException: org.springframework.cloud.stream.messaging.DirectWithAttributesChannel cannot be cast to org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapper
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:268) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    ... 21 more

共有2个答案

爱唯
2023-03-14

Classpath with具有不同版本的Spring库。请更正您的依赖项以摆脱此异常

漆雕正奇
2023-03-14

正如它在您指向的javadocs中的声明模式下所说,您需要参数上的@Input

@StreamListener
public void process(@Input(MySink.INPUT) KStream<String, String> event) {

   ...
}

.

interface MySink {

    @Input("input")
    KStream<?, ?> input();

}
 类似资料:
  • 需要将KTable与Spring Kafka活页夹配置一起使用,以下是用于从主题中读取数据并在控制台上打印的示例代码。 但应用程序以Throw java终止。lang.IllegalArgumentException:方法在应用程序启动时必须是声明性的。 引用Spring Cloud Stream Kafka-方法必须是声明式的,结果仍然保持不变 Java-11 活页夹: 监听器 控制台输出:

  • 我正在使用SODA for Java存储和检索Oracle12c DB中的文档。我正在效仿这个例子。 这一例外背后的原因将是什么?

  • 问题内容: 这是我的代码,假设可以在按下按钮时更改一些文本:- 用下划线标记,它给我一个错误“类必须声明为抽象或实现抽象方法”。该代码大部分是从互联网上复制的,并且可以正常工作。可能仅是Android Studio错误。我如何使它工作? 问题答案: 必须实现该函数,否则您的类应该是抽象的,以便可以在某些子类中实现您的函数。但是在您的情况下,您犯了一个拼写错误。应该代替;

  • 问题内容: 有时我们有几个类,这些类的某些方法具有相同的签名,但是与声明的Java接口不对应。例如,和(在中的其他几个 )中都有一个方法 现在,假设我希望对具有该方法的对象进行一些操作。然后,我想有一个接口(或者自己定义),例如 这样我可以写: 但是,可悲的是,我不能: 此演员表将是非法的。编译器 知道 这 是不是 一个,因为类没有宣布实现该接口...... 然而“实际上”实现它 。 有时这会带来

  • 所以我一直有这个错误,我不知道发生了什么。我在谷歌上搜索了一下,读了一些关于这个错误的问题,但什么都没用。我只想运行它,这样我就可以完成我的项目。 错误:开始低库存;结束;*第1行错误:ORA-06550:第1行,第9列:PLS-00201:标识符“LOWINVENTORY”必须声明ORA-06550:第1行,第7列:PL/SQL:忽略语句 编辑: 表创建: