当前位置: 首页 > 知识库问答 >
问题:

为Kafka流配置Serdes时出现问题

易弘亮
2023-03-14

我在“提交”主题中放置了一个json对象。我想使用Kafka流使用消息,但出现了一个错误

@Configuration
@EnableKafka
@EnableKafkaStreams
public class AnalyzerConfiguration {
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, commitSerde().getClass());

        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public Serde<Commit> commitSerde() {
        return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
    }

    @Bean
    public KStream<String, Commit> kStream(StreamsBuilder builder) {
        KStream<String, Commit> stream = builder.stream("commits", Consumed.with(Serdes.String(), commitSerde()));

        KTable<String, Long> commitsCount = stream
                .mapValues(Commit::getAuthorName)
                .selectKey((key, word) -> word)
                .groupByKey()
                .count(Materialized.as("Counts"));

        commitsCount.toStream().to("commits-count", Produced.with(Serdes.String(), Serdes.Long()));

        return stream;
    }
}

日志显示:

线程“test-streams-469f5ee6-d0de-472e-a602-a7b6d11f2e1c-StreamThread-1”组织中出现异常。阿帕奇。Kafka。流。错误。StreamsException:无法配置值serde类组织。阿帕奇。Kafka。常见的序列化。Serdes$包装器Serde

原因:组织。阿帕奇。Kafka。常见的KafkaException:找不到组织的公共无参数构造函数。阿帕奇。Kafka。常见的序列化。Serdes$包装器Serde

原因:java。lang.NoSuchMethodException

共有1个答案

盖玉石
2023-03-14

您的问题是StreamsConfig的注册。默认值\u SERDE\u CLASS\u配置。首先,在您的示例中不需要这样做,因为您在创建KStream时使用的消费代码中指定了值serde。您可以省略默认的serde。

如果将一个类注册为默认serde,Kafka Streams将在某个时候通过反射创建该类的实例。调用该类的默认(无参数)构造函数。在您的示例中,是一个组织。阿帕奇。Kafka。常见的序列化。Serdes$WrapperSerde类将从Serdes使用。serdeFrom(新型JsonSerializer

如果要为提交类型注册默认serde,需要将其包装到一个小类中:

public class CommitSerde extends WrapperSerde<Commit> {

    public CommitSerde() {
        super(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
    }
}

这个类应该适合使用props注册为默认值serde。put(StreamsConfig.DEFAULT\u VALUE\u SERDE\u CLASS\u CONFIG,CommitSerde.CLASS.getName()) 在您的示例中。

 类似资料:
  • 我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。

  • 我试图使用kafka流库只使用一次kafka的功能。我只将proessing.guarantee配置为exactly_once。与此同时,需要将事务状态存储在内部主题(__transaction_state)中。 我的问题是,如何定制主题的名称?如果kafka集群由多个消费者共享,那么每个消费者是否需要不同的事务管理主题? 谢谢你,墨蒂

  • 我正在IIS上运行新版本的ckeditor(4.6)(这是一个经典的asp站点,并不重要)。与ckfinder的集成很好。我可以上传图片,浏览服务器等。。我唯一剩下的问题是链接函数itslef:当我单击“浏览”时,我一直被重定向到:/ckeditor/plugins/imageuploader/imgbrowser。php?CKEditor=内容 我的配置。js文件是 等 我的问题是为什么“浏览”

  • 问题内容: 我正在IDE Eclipse Indigo(最新版本)上使用JSF 2.0开发Web应用程序。 但是在 Project- > Properties-> Project Facets上,选项 JavaServer Faces 在版本1.2上配置,当我尝试将版本更改为2.0时,出现错误消息 无法将项目构面JavaServer Faces的版本更改为2.0。 奇怪的是,我项目中的JSF库都是

  • 我有一个Spring启动项目,它作为一个库(打包的jar文件)到其他一些项目。我试图哟配置咖啡因缓存,将异步刷新请求后,向服务。 pom.xml(包括): 我的配置类: DAO层(此处需要缓存): DAO层(这里也需要缓存): 运行此安装程序时,我遇到以下错误堆栈: 不确定设置中缺少什么?

  • > 出了什么问题:配置项目“:反应原生矢量图标”时出现问题。 无法解析配置“:反应本机矢量图标:类路径”的所有文件。找不到 com.android.tools.build:gradle:2.3 的任何匹配项。 因为没有可用的com.android.tools.build:gradle版本。在以下位置搜索: https://jcenter.bintray.com/com/android/tools/