当前位置: 首页 > 知识库问答 >
问题:

哪个版本的spring cloud stream binder kstream与Kafka 1.0.0兼容

方谦
2023-03-14

当尝试运行稍微修改过的word count示例版本时,我遇到了一个错误,即“没有符合条件的'org.apache.kafka.streams.kstream.KStreamBuilder'类型的bean”。在我的POM中,我使用了Spring-Cloud-Stream依赖项:Elmhurst。M3导入依赖项,其中导入了spring cloud stream绑定器kstream:2.0.0。立方米。

然而,我不认为我做了什么特别的事情:

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class KafkaExampleSpringcloud1Application {

       public static void main(String[] args) {
        SpringApplication.run(KafkaExampleSpringcloud1Application.class, 
        args);

    }

    @Autowired
    private TimeWindows timeWindows;

    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<Object, String> input) {

        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serdes.String(), Serdes.String())
                .count(timeWindows, "WordCounts")
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }



    static class WordCount {

        private String word;

        private long count;

        private Date start;

        private Date end;

        WordCount(String word, long count, Date start, Date end) {
            this.word = word;
            this.count = count;
            this.start = start;
            this.end = end;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public long getCount() {
            return count;
        }

        public void setCount(long count) {
            this.count = count;
        }

        public Date getStart() {
            return start;
        }

        public void setStart(Date start) {
            this.start = start;
        }

        public Date getEnd() {
            return end;
        }

        public void setEnd(Date end) {
            this.end = end;
        }
    }

这是我的申请表。yml:

application.name: kafka-example-01
spring.cloud.stream:
  kstream:
    configuration:
      commit.interval.ms: 1000
      key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    timeWindow.length: 5000
    binder:
      brokers: localhost
      zkNodes: localhost
  kafka:
    binder:
      autoCreateTopics: false
  bindings:
    output.destination: word-count-output
    input.destnation : word-count-input

我尝试使用旧版本,但我一直有ClassNotFoundException错误(例如StreamsBuilder类)

共有2个答案

郎健柏
2023-03-14

让它与这些一起工作:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kstream</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>1.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.1.1.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>
翟浩穰
2023-03-14

它目前并不是一个里程碑,只有2.0.0。构建快照。它将在M4中。

在这里promise。

 类似资料:
  • 问题内容: 其实我有点困惑。尽管我阅读了一些有关此的资源。 要使用 ChromeBrowser 测试 Selenium 3, 我们需要一个名为 ChromeDriver 的附加应用。 我从GitHub找到了这段文字: ChromeDriver仅与Chrome 12.0.712.0版或更高版本兼容。如果您需要测试旧版的Chrome,请使用Selenium RC和Selenium支持的WebDrive

  • 运行时,出现以下错误: 有人帮忙吗?

  • 问题内容: 哪个Firefox版本与Selenium 2.53.0兼容?我尝试使用Firefox 45.0,但遇到此异常: 问题答案: 我在使用Firefox 47.0运行Selenium Webdriver 2.53.0时遇到了类似的问题。 Selenium Webdriver 2.53.0 可与 Firefox 46.0一起使用 。您可以在https://support.mozilla.org

  • 我想更新Selenium版本3.6.0,我想知道如果不使用Gecko驱动程序,使用哪个FF版本最好? 如有任何帮助,我们将不胜感激。

  • 我使用的是spring-boot-starter父版本1.4.0.M3和spring-boot-starter data-solr,在pom文件中没有任何版本,solr-solrj也没有任何版本(它使用的是5.5.1版本)。我想包括solr-core,因为我必须提到版本,如果我添加了版本6.0.0或6.1.0,项目将编译,但在运行时,当我尝试创建SolrConfig对象时失败,错误如下 尝试降低s

  • junit-vintage engine是否意味着与JUnit4.12.x之前的版本兼容? 我将异常的来源缩小到VintageTestDescriptor.java中的几行(盯着第86行看) Category在JUnit4.12的实验包中,但似乎在JUnit4.7中没有。我找不到doc上的junit版本的复古引擎是兼容的。