当前位置: 首页 > 知识库问答 >
问题:

未找到Spring Cloud Stream Kafka-Serde类:org。阿帕奇。Kafka。常见的序列化。Serde$StringSerde

寿鸣
2023-03-14

我正在尝试使用Spring Cloud Stream框架构建一个简单的Kafka Streams应用程序。我可以连接到流以推送原始数据进行处理。但是当我尝试按键处理流进行事件计数时,我得到了未找到的Serde类:org.apache.kafka.common.serialization.Serde$StringSerde运行应用程序时异常。我检查了我的项目包含的库,我可以找到Serde类,它没有丢失。我不确定为什么在运行时它没有被加载!

下面是我的源文件。

<代码>com。pgp。学Kafka。分析。分析应用程序

package com.pgp.learn.kafka.analytics;

import com.pgp.learn.kafka.analytics.model.PageViewEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
@EnableBinding(AnalyticsBinding.class)
public class  AnalyticsApplication {

    public static void main(String[] args) {
        SpringApplication.run(AnalyticsApplication.class, args);
    }

    @Component
    public static class PageViewEventSource implements ApplicationRunner {
        private final MessageChannel pageViewOut;
        private final Log log = LogFactory.getLog(getClass());

        public PageViewEventSource(AnalyticsBinding binding) {
            this.pageViewOut = binding.pageViewsOut();
        }

        @Override
        public void run(ApplicationArguments args) throws Exception {
            List<String> names = Arrays.asList("Peter", "Tom", "Ady", "Nency", "George", "Kevin", "Chelsey", "Thomas");
            List<String> pages = Arrays.asList("About", "Contact", "Blogs", "Gallery", "Music", "Site-map", "News");

            Runnable runner = () -> {
                String rName = names.get(new Random().nextInt(names.size()));
                String rPage = pages.get(new Random().nextInt(pages.size()));

                PageViewEvent event = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10L : 1000L);

                Message<PageViewEvent> message = MessageBuilder
                        .withPayload(event)
                        .setHeader(KafkaHeaders.MESSAGE_KEY, event.getUserId().getBytes())
                        .build();

                try {
                    this.pageViewOut.send(message);
                    log.info("Sent: " + message.toString());
                } catch (Exception e) {
                    log.error(e);
                }
            };

            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runner, 1, 1, TimeUnit.SECONDS);
        }
    }

    @Component
    public static class PageViewEventProcessor {
        private final Log log = LogFactory.getLog(getClass());

        @StreamListener
        @SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
        public static KStream<String, Long> process(
                @Input(AnalyticsBinding.PAGE_VIEW_IN_CHANNEL) KStream<String, PageViewEvent> events) {
            return events
                    .filter((s, pageViewEvent) -> pageViewEvent.getDuration() > 10)
                    .map((s, pageViewEvent) -> new KeyValue<>(pageViewEvent.getPage(), 0L))
                    .groupByKey()
                    //Also tried using below line, but no luck
//.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))

.count(Materialized.as(AnalyticsBinding.PAGE_COUNT_MV))
                    .toStream();
        }
    }
}

<代码>com。pgp。学Kafka。分析。分析查找

package com.pgp.learn.kafka.analytics;

import com.pgp.learn.kafka.analytics.model.PageViewEvent;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface AnalyticsBinding {

    String PAGE_VIEW_OUT_CHANNEL = "pvout";
    String PAGE_VIEW_IN_CHANNEL = "pvin";
    String PAGE_COUNT_MV = "pcmvc";
    String PAGE_COUNT_OUT = "pcout";
    String PAGE_COUNT_IN = "pcin";

    @Input (PAGE_VIEW_IN_CHANNEL)
    KStream<String, PageViewEvent> pageViewsIn();

    @Output (PAGE_VIEW_OUT_CHANNEL)
    MessageChannel pageViewsOut();

    @Output (PAGE_COUNT_OUT)
    KStream<String, Long> pageCountOut();

}

<代码>应用程序。属性

# defaults
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.mms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serde$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serde$StringSerde
spring.cloud.stream.kafka.binder.brokers=54.173.206.255
#
# page views out
spring.cloud.stream.bindings.pvout.destination=pvs
spring.cloud.stream.bindings.pvout.producer.header-mode=raw
#
# page views in
spring.cloud.stream.bindings.pvin.destination=pvs
spring.cloud.stream.bindings.pvin.consumer.header-mode=raw
#
# page count out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde

<代码>pom。xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.pgp.learn.kafka</groupId>
    <artifactId>analytics</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>analytics</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.RC2</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>

运行时出现异常

java.lang.IllegalStateException: java.lang.IllegalStateException: Serde class not found: 
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:308) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RC4.jar:2.1.0.RC4]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:164) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RC4.jar:2.1.0.RC4]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195) ~[spring-cloud-stream-2.1.0.RC4.jar:2.1.0.RC4]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167) ~[spring-cloud-stream-2.1.0.RC4.jar:2.1.0.RC4]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_161]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) ~[spring-cloud-stream-2.1.0.RC4.jar:2.1.0.RC4]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) ~[spring-cloud-stream-2.1.0.RC4.jar:2.1.0.RC4]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) ~[spring-beans-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at com.pgp.learn.kafka.analytics.AnalyticsApplication.main(AnalyticsApplication.java:35) [classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
    at org.springframework.boot.maven.AbstractRunMojo$LaunchRunner.run(AbstractRunMojo.java:558) [spring-boot-maven-plugin-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
Caused by: java.lang.IllegalStateException: Serde class not found: 
    at org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver.getKeySerde(KeyValueSerdeResolver.java:177) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RC4.jar:2.1.0.RC4]
    at org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver.getInboundKeySerde(KeyValueSerdeResolver.java:74) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RC4.jar:2.1.0.RC4]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:246) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RC4.jar:2.1.0.RC4]
    ... 22 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.Serde$StringSerde
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_161]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_161]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_161]
    at java.lang.Class.forName0(Native Method) ~[na:1.8.0_161]
    at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_161]
    at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:333) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:322) ~[kafka-clients-2.0.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver.getKeySerde(KeyValueSerdeResolver.java:171) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RC4.jar:2.1.0.RC4]
    ... 24 common frames omitted

有什么帮助可以找到我在这里做错了什么吗?我是Kafka和Stream处理的新手。所以,如果我犯了任何愚蠢的错误,请原谅我。

共有1个答案

楚俊迈
2023-03-14

配置属性中有输入错误:

<代码>Spring。云流动Kafka。流。粘合剂配置违约钥匙serde=组织。阿帕奇。Kafka。常见的序列化。Serde$StringSerde

Serde中缺少“s”。应为<代码>。。。序列化。Serdes$StringSerde

 类似资料:
  • 我正在尝试使用actix-web服务器作为小型堆栈的网关,以保证堆栈内部的严格数据格式,同时为用户提供一些自由。 为此,我想将 JSON 字符串反序列化到结构中,然后对其进行验证,再次序列化它并将其发布在消息代理上。数据的主要部分是一个数组,其中包含整数,浮点数和日期时间。我使用 serde 进行反序列化,使用 chrono 来处理日期时间。 我尝试使用与枚举相结合的结构来允许不同的类型: 由于<

  • 我有两个代理1.0.0Kafka集群,我正在针对这个Kafka运行1.0.0Kafka流API应用程序。我增加了制片人的要求。暂停。毫秒到5分钟来修复生产者超时异常。 目前,在运行一段时间后,我发现以下两种类型的异常。我试图按照ApacheKafka中的建议修复这些异常:TimeoutException,然后什么都不起作用‏ 但不完整的解决方案就在这里。建议使用此解决方案(减少生产批量)。请帮忙。

  • 我正试图找出这两种设置之间的区别。大小和缓冲区。Kafka制作人的记忆。 据我所知。大小:这是可以发送的批次的最大大小。 文档描述了缓冲区。memory as:生产者可以用来缓冲等待发送的记录的内存字节。 我不明白这两者之间的区别。有人能解释一下吗? 谢啦

  • 我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不

  • 我正在windows计算机上使用Kafka,并尝试使用文件源连接器生成从文件到Kafka主题的内容。首先我启动了zookeeper,然后在启动Kafka Standalone Connector时启动了Kafka server(步骤3),我收到了很多警告,ReflectionsException 我对此没有什么疑问: 1。我需要添加一些jar文件吗 2。在libs文件夹下的Kafka dir中有一

  • 在我部署在WebSphere 8.5上的java应用程序中,我遇到了这个错误,尽管我的maven依赖项低于log4j。 我确实运行了一个,我可以看到它正以 的形式被拉进来,这是我期望在树结果中看到的,这样我就可以确认它在类路径中。有什么想法吗?