需要将KTable与Spring Kafka活页夹配置一起使用,以下是用于从主题中读取数据并在控制台上打印的示例代码。
但应用程序以Throw java终止。lang.IllegalArgumentException:方法在应用程序启动时必须是声明性的。
引用Spring Cloud Stream Kafka-方法必须是声明式的,结果仍然保持不变
Java-11
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
</parent>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2020.0.2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
活页夹:
public interface WordCountBinding {
@Input(value = " data-input-channel")
public KTable<String, String> readDataStream();
}
监听器
@Service
@EnableBinding(value = WordCountBinding.class)
public class WordCountListener {
@StreamListener(value = "data-input-channel")
public void listen(KTable<String, String> data) {
KStream<String, String> wordStream = data.filter((key,value) -> key.contains("SRS")).toStream();
wordStream.foreach((key, value) -> System.out.println("Key: " + key + " Value: " + value));
}
}
控制台输出:
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-03-19 21:26:53.598 ERROR 1912 --- [ main] o.s.boot.SpringApplication : Application run failed
java.lang.IllegalArgumentException: Method must be declarative
at org.springframework.util.Assert.isTrue(Assert.java:121) ~[spring-core-5.3.5.jar:5.3.5]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:434) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:161) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:232) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:202) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:336) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:118) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:963) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
at com.shasr.streamwordcount.StreamWordcountApplication.main(StreamWordcountApplication.java:10) ~[classes/:na]
2021-03-19 21:26:53.613 INFO 1912 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
Process finished with exit code 1
您可以通过使用@Input
使方法参数声明性来摆脱该特定异常,如下所示。
@StreamListener
public void listen(@Input("data-input-channel") KTable<String, String> data) {
更重要的是,建议不要使用StreamListener,因为从3.1开始就不推荐使用它。xSpring云流发布。推荐的方法是使用功能性风格。有关更多详细信息,请参阅参考文档。
我已经配置了一个基于Spring启动的应用程序与Spring云流。我试图在KStream上工作,但我不断得到错误"java.lang.IllegalArgumentExc的:方法必须是声明性的"。有人能帮我了解如何配置这个吗?我查找了StreamListener留档,但我无法让它工作。 https://docs.spring.io/spring-cloud-stream/docs/Elmhurst
断面 映射器配置概述 讨论了 Mapper construct,它是一种结构,用于定义如何将特定的用户定义类映射到数据库表或其他SQL构造。下面的部分描述了关于声明性系统如何构造 Mapper . 使用声明性定义映射属性 中给出的示例 具有声明性的表配置 说明针对表绑定列的映射;单个列到ORM类属性的映射在内部由 ColumnProperty 构造。还有许多其他类型的映射器属性,最常见的是 rel
正如在 声明性映射 ,声明式样式包括生成映射 Table 对象,或容纳 Table 或其他 FromClause 直接对象。 以下示例假定声明性基类为: from sqlalchemy.orm import declarative_base Base = declarative_base() 下面的所有示例都说明了从上面继承的类 Base . 装饰风格 使用修饰符的声明性映射(无声明基) 下面的
所以我正在编写一个javafx程序来操作字节中的各个位。我为每个位都有一个文本字段。我想在文本字段上实现一个变更列表,这样除了0或1之外不能输入任何东西。如果字段为空并且用户尝试输入一个字母,它可以正常工作,但如果其中已经有0或1,它会引发异常,我不明白为什么。 这是我的代码:
问题内容: 如果我有抛出未检查的异常的方法,例如: 明确声明该方法引发异常有什么好处,即 与(或除了)描述javadoc中的行为相反: 我之所以声称拥有无效的原因是: 没有提供有关在什么情况下会引发异常的信息,只是提供了 可能 引发异常的信息; 因为它是未经检查的异常,所以我不必在调用代码中处理该异常。我只会真正知道,如果我去看看的实现,可能会抛出该错误; 的主体可能会调用引发其他类型的未经检查的
问题内容: 我尝试编译以下内容,但在m16h(x)周围得到以下内容: 不知道为什么。我已经尝试过各种方法,但是看来我做得对。 问题答案: 方法的签名表明很容易引发Exception。 这意味着异常之一: 必须由呼叫者处理 } catch (Exception e) { e.printStackTrace(); } 必须由呼叫者重新抛出