当前位置: 首页 > 知识库问答 >
问题:

需要关于Spring Boot Kafka Stream Binder应用程序错误的建议

濮阳赞
2023-03-14

我试图运行一个Spring boot Kafka流示例https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_usage_2地点

我能够成功地构建它。但在运行时,出现如下所示的错误(java.lang.IllegalStateException:org.springframework.cloud.stream.function.FunctionConfiguration上的错误处理条件)。有人能帮忙找出代码中的问题吗?

ReplicateKafkaStreamApplication。Java语言

package com.example.kafka.replicateKafka;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.messaging.handler.annotation.SendTo;

import java.util.Arrays;
import java.util.Date;

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class ReplicateKafkaStreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReplicateKafkaStreamApplication.class, args);
    }

    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }

}

class WordCount {

    private String word;

    private long count;

    private Date start;

    private Date end;

    public WordCount(String word, long count, Date start, Date end) {
        this.word = word;
        this.count = count;
        this.start = start;
        this.end = end;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public Date getStart() {
        return start;
    }

    public void setStart(Date start) {
        this.start = start;
    }

    public Date getEnd() {
        return end;
    }

    public void setEnd(Date end) {
        this.end = end;
    }
}

pom。xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example.kafka</groupId>
    <artifactId>replicateKafkaStream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>replicateKafkaStream</name>
    <description>Demo project for replicating kafka</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>3.0.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kstream</artifactId>
            <version>1.3.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            <version>2.0.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

错误

.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.3.3.RELEASE)

2020-08-19 18:48:26.339  INFO 13724 --- [           main] c.e.k.r.ReplicateKafkaStreamApplication  : Starting ReplicateKafkaStreamApplication on DESKTOP-NS4292D with PID 13724 (C:\JavaReskillGit\replicateKafkaStream\target\classes started by BishwajeetNaik in C:\JavaReskillGit\replicateKafkaStream)
2020-08-19 18:48:26.344  INFO 13724 --- [           main] c.e.k.r.ReplicateKafkaStreamApplication  : No active profile set, falling back to default profiles: default
2020-08-19 18:48:27.786 ERROR 13724 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Error processing condition on org.springframework.cloud.stream.function.FunctionConfiguration
    at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:60) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.context.annotation.ConditionEvaluator.shouldSkip(ConditionEvaluator.java:108) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader$TrackedConditionEvaluator.shouldSkip(ConfigurationClassBeanDefinitionReader.java:469) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:131) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:120) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:331) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:236) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:280) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:96) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:707) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:533) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at com.example.kafka.replicateKafka.ReplicateKafkaStreamApplication.main(ReplicateKafkaStreamApplication.java:22) ~[classes/:na]
Caused by: java.lang.IllegalStateException: Failed to introspect Class [org.springframework.cloud.stream.binder.kstream.config.KStreamBinderSupportAutoConfiguration] from ClassLoader [jdk.internal.loader.ClassLoaders$AppClassLoader@368239c8]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapableBeanFactory.java:742) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[na:na]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getTypeForFactoryMethod(AbstractAutowireCapableBeanFactory.java:741) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.determineTargetType(AbstractAutowireCapableBeanFactory.java:680) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.predictBeanType(AbstractAutowireCapableBeanFactory.java:648) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.isFactoryBean(AbstractBeanFactory.java:1614) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBeanNamesForType(DefaultListableBeanFactory.java:523) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanNamesForType(DefaultListableBeanFactory.java:495) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.collectBeanNamesForType(OnBeanCondition.java:238) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getBeanNamesForType(OnBeanCondition.java:231) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getBeanNamesForType(OnBeanCondition.java:221) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getMatchingBeans(OnBeanCondition.java:169) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getMatchOutcome(OnBeanCondition.java:119) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:47) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    ... 17 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/core/KStreamBuilderFactoryBean
    at java.base/java.lang.Class.getDeclaredMethods0(Native Method) ~[na:na]
    at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166) ~[na:na]
    at java.base/java.lang.Class.getDeclaredMethods(Class.java:2309) ~[na:na]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    ... 33 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.core.KStreamBuilderFactoryBean
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) ~[na:na]
    ... 37 common frames omitted

2020-08-19 18:48:27.801  WARN 13724 --- [           main] o.s.boot.SpringApplication               : Unable to close ApplicationContext

java.lang.IllegalStateException: Failed to introspect Class [org.springframework.cloud.stream.binder.kstream.config.KStreamBinderSupportAutoConfiguration] from ClassLoader [jdk.internal.loader.ClassLoaders$AppClassLoader@368239c8]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapableBeanFactory.java:742) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[na:na]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getTypeForFactoryMethod(AbstractAutowireCapableBeanFactory.java:741) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.determineTargetType(AbstractAutowireCapableBeanFactory.java:680) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.predictBeanType(AbstractAutowireCapableBeanFactory.java:648) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.isFactoryBean(AbstractBeanFactory.java:1614) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBeanNamesForType(DefaultListableBeanFactory.java:523) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanNamesForType(DefaultListableBeanFactory.java:495) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:620) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:612) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.getBeansOfType(AbstractApplicationContext.java:1243) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.getExitCodeFromMappedException(SpringApplication.java:880) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.getExitCodeFromException(SpringApplication.java:868) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.handleExitCode(SpringApplication.java:855) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:806) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at com.example.kafka.replicateKafka.ReplicateKafkaStreamApplication.main(ReplicateKafkaStreamApplication.java:22) ~[classes/:na]
Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/core/KStreamBuilderFactoryBean
    at java.base/java.lang.Class.getDeclaredMethods0(Native Method) ~[na:na]
    at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166) ~[na:na]
    at java.base/java.lang.Class.getDeclaredMethods(Class.java:2309) ~[na:na]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    ... 21 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.core.KStreamBuilderFactoryBean
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) ~[na:na]
    ... 25 common frames omitted


Process finished with exit code 1

共有1个答案

贡烨烁
2023-03-14

您需要使用匹配的spring版本才能正确加载依赖项

真正的错误是NoClassDefFoundError

删除依赖项部分中的所有版本并更新您的父级以使用

<artifactId>spring-cloud-starter-stream-kafka</artifactId>

并从依赖项中删除相同的内容

 类似资料:
  • 这是我的组件代码。 这是我的Info.plist代码“NSAPPTransportSecurity” 当我跑的时候 模拟器显示错误 怎么修?(使用RN 0.40版本)

  • 嘿,所以我为ExecutorService的java文档选择了这个例子。我想确认这个代码的流程,执行者。newFixedThreadPool将创建一个线程池(我猜)。所以serversocket将等待连接,一旦获得连接,就会启动一个线程,所以现在池大小减少1。一旦线程完成执行,池大小将再次增加1,不是吗?线程会放弃它使用的资源吗?

  • 我们希望使用App Engine Admin API(REST)和Golang自动创建项目ID并安装我们的ULAPPH Cloud Desktop应用程序。https://cloud.google.com/appengine/docs/admin-api/?hl=en_US 我们能够得到一个令牌,但是当我们试图创建一个项目ID时,我们得到了下面的错误。 我们只是使用REST API调用。如上所示,

  • 基于 ASP 的应用程序是 ASP 页和 ActiveX 组件的集合。当用户定义应用程序时,将使用 Internet 服务管理器指定用户的 Web 站点中应用程序启动点的目录。在用户的 Web 站点中每个位于启动点目录下的文件和文件夹都被认为是应用程序的一部分,直到发现另外的启动点目录为止。这样,用户就可以使用目录作为边界定义应用程序的作用域。每个 Web 站点可以有多个应用程序,而每个应用程序的

  • 飞镖码 我希望这段代码画一个黑色的矩形屏幕,但它没有。我正在使用flutter框架,我想知道不flutter支持做这样的应用程序或我的代码中的任何错误。我只是想在没有内置flutter框架库的情况下运行flutter代码 }

  • 问题内容: 我有一个使用PHP(使用Kohana框架)编写的现有应用,并且想进行长时间轮询。从某些方面看,似乎不建议使用PHP进行长时间轮询,而使用诸如nodejs之类的方法是更好的选择。我的问题是将Node.js(或其他一些适合长时间轮询的工具)与现有应用程序集成的最佳方法是什么? 为了澄清起见,我的应用程序基本上是一个浏览器插件,您可以使用该插件将数据发送给其他人。发送该数据时,我希望收件人(