当前位置: 首页 > 知识库问答 >
问题:

带有KTable绑定器配置的Spring Cloud Stream-抛出java.lang.IllegalArgumentException:方法必须是声明性的

孟昆
2023-03-14

需要将KTable与Spring Kafka活页夹配置一起使用,以下是用于从主题中读取数据并在控制台上打印的示例代码。

但应用程序以Throw java终止。lang.IllegalArgumentException:方法在应用程序启动时必须是声明性的。

引用Spring Cloud Stream Kafka-方法必须是声明式的,结果仍然保持不变

Java-11

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.4.4</version>
</parent>
<properties>
    <java.version>11</java.version>
    <spring-cloud.version>2020.0.2</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

活页夹:

public interface WordCountBinding {

    @Input(value = " data-input-channel")
    public KTable<String, String> readDataStream();
}

监听器

@Service
@EnableBinding(value = WordCountBinding.class)
public class WordCountListener {

    @StreamListener(value = "data-input-channel")
    public void listen(KTable<String, String> data) {

        KStream<String, String> wordStream = data.filter((key,value) -> key.contains("SRS")).toStream();
        wordStream.foreach((key, value) -> System.out.println("Key: " + key + " Value: " + value));
    }
}

控制台输出:

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-03-19 21:26:53.598 ERROR 1912 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalArgumentException: Method must be declarative
    at org.springframework.util.Assert.isTrue(Assert.java:121) ~[spring-core-5.3.5.jar:5.3.5]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:434) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:161) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:232) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:202) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:336) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:118) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:963) ~[spring-beans-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
    at com.shasr.streamwordcount.StreamWordcountApplication.main(StreamWordcountApplication.java:10) ~[classes/:na]

2021-03-19 21:26:53.613  INFO 1912 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'

Process finished with exit code 1

共有1个答案

宰父淳
2023-03-14

您可以通过使用@Input使方法参数声明性来摆脱该特定异常,如下所示。

@StreamListener
    public void listen(@Input("data-input-channel") KTable<String, String> data) {

更重要的是,建议不要使用StreamListener,因为从3.1开始就不推荐使用它。xSpring云流发布。推荐的方法是使用功能性风格。有关更多详细信息,请参阅参考文档。

 类似资料:
  • 我已经配置了一个基于Spring启动的应用程序与Spring云流。我试图在KStream上工作,但我不断得到错误"java.lang.IllegalArgumentExc的:方法必须是声明性的"。有人能帮我了解如何配置这个吗?我查找了StreamListener留档,但我无法让它工作。 https://docs.spring.io/spring-cloud-stream/docs/Elmhurst

  • 断面 映射器配置概述 讨论了 Mapper construct,它是一种结构,用于定义如何将特定的用户定义类映射到数据库表或其他SQL构造。下面的部分描述了关于声明性系统如何构造 Mapper . 使用声明性定义映射属性 中给出的示例 具有声明性的表配置 说明针对表绑定列的映射;单个列到ORM类属性的映射在内部由 ColumnProperty 构造。还有许多其他类型的映射器属性,最常见的是 rel

  • 正如在 声明性映射 ,声明式样式包括生成映射 Table 对象,或容纳 Table 或其他 FromClause 直接对象。 以下示例假定声明性基类为: from sqlalchemy.orm import declarative_base Base = declarative_base() 下面的所有示例都说明了从上面继承的类 Base . 装饰风格 使用修饰符的声明性映射(无声明基) 下面的

  • 所以我正在编写一个javafx程序来操作字节中的各个位。我为每个位都有一个文本字段。我想在文本字段上实现一个变更列表,这样除了0或1之外不能输入任何东西。如果字段为空并且用户尝试输入一个字母,它可以正常工作,但如果其中已经有0或1,它会引发异常,我不明白为什么。 这是我的代码:

  • 问题内容: 如果我有抛出未检查的异常的方法,例如: 明确声明该方法引发异常有什么好处,即 与(或除了)描述javadoc中的行为相反: 我之所以声称拥有无效的原因是: 没有提供有关在什么情况下会引发异常的信息,只是提供了 可能 引发异常的信息; 因为它是未经检查的异常,所以我不必在调用代码中处理该异常。我只会真正知道,如果我去看看的实现,可能会抛出该错误; 的主体可能会调用引发其他类型的未经检查的

  • 问题内容: 我尝试编译以下内容,但在m16h(x)周围得到以下内容: 不知道为什么。我已经尝试过各种方法,但是看来我做得对。 问题答案: 方法的签名表明很容易引发Exception。 这意味着异常之一: 必须由呼叫者处理 } catch (Exception e) { e.printStackTrace(); } 必须由呼叫者重新抛出