我在“提交”主题中放置了一个json对象。我想使用Kafka流使用消息,但出现了一个错误
@Configuration
@EnableKafka
@EnableKafkaStreams
public class AnalyzerConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, commitSerde().getClass());
return new KafkaStreamsConfiguration(props);
}
@Bean
public Serde<Commit> commitSerde() {
return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
}
@Bean
public KStream<String, Commit> kStream(StreamsBuilder builder) {
KStream<String, Commit> stream = builder.stream("commits", Consumed.with(Serdes.String(), commitSerde()));
KTable<String, Long> commitsCount = stream
.mapValues(Commit::getAuthorName)
.selectKey((key, word) -> word)
.groupByKey()
.count(Materialized.as("Counts"));
commitsCount.toStream().to("commits-count", Produced.with(Serdes.String(), Serdes.Long()));
return stream;
}
}
日志显示:
线程“test-streams-469f5ee6-d0de-472e-a602-a7b6d11f2e1c-StreamThread-1”组织中出现异常。阿帕奇。Kafka。流。错误。StreamsException:无法配置值serde类组织。阿帕奇。Kafka。常见的序列化。Serdes$包装器Serde
原因:组织。阿帕奇。Kafka。常见的KafkaException:找不到组织的公共无参数构造函数。阿帕奇。Kafka。常见的序列化。Serdes$包装器Serde
原因:java。lang.NoSuchMethodException
您的问题是StreamsConfig的注册。默认值\u SERDE\u CLASS\u配置。首先,在您的示例中不需要这样做,因为您在创建KStream时使用的消费代码中指定了值serde。您可以省略默认的serde。
如果将一个类注册为默认serde,Kafka Streams将在某个时候通过反射创建该类的实例。调用该类的默认(无参数)构造函数。在您的示例中,是一个组织。阿帕奇。Kafka。常见的序列化。Serdes$WrapperSerde类将从Serdes使用。serdeFrom(新型JsonSerializer
如果要为提交类型注册默认serde,需要将其包装到一个小类中:
public class CommitSerde extends WrapperSerde<Commit> {
public CommitSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
}
}
这个类应该适合使用props注册为默认值serde。put(StreamsConfig.DEFAULT\u VALUE\u SERDE\u CLASS\u CONFIG,CommitSerde.CLASS.getName()) 在您的示例中。
我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。
我试图使用kafka流库只使用一次kafka的功能。我只将proessing.guarantee配置为exactly_once。与此同时,需要将事务状态存储在内部主题(__transaction_state)中。 我的问题是,如何定制主题的名称?如果kafka集群由多个消费者共享,那么每个消费者是否需要不同的事务管理主题? 谢谢你,墨蒂
我正在IIS上运行新版本的ckeditor(4.6)(这是一个经典的asp站点,并不重要)。与ckfinder的集成很好。我可以上传图片,浏览服务器等。。我唯一剩下的问题是链接函数itslef:当我单击“浏览”时,我一直被重定向到:/ckeditor/plugins/imageuploader/imgbrowser。php?CKEditor=内容 我的配置。js文件是 等 我的问题是为什么“浏览”
问题内容: 我正在IDE Eclipse Indigo(最新版本)上使用JSF 2.0开发Web应用程序。 但是在 Project- > Properties-> Project Facets上,选项 JavaServer Faces 在版本1.2上配置,当我尝试将版本更改为2.0时,出现错误消息 无法将项目构面JavaServer Faces的版本更改为2.0。 奇怪的是,我项目中的JSF库都是
我有一个Spring启动项目,它作为一个库(打包的jar文件)到其他一些项目。我试图哟配置咖啡因缓存,将异步刷新请求后,向服务。 pom.xml(包括): 我的配置类: DAO层(此处需要缓存): DAO层(这里也需要缓存): 运行此安装程序时,我遇到以下错误堆栈: 不确定设置中缺少什么?
> 出了什么问题:配置项目“:反应原生矢量图标”时出现问题。 无法解析配置“:反应本机矢量图标:类路径”的所有文件。找不到 com.android.tools.build:gradle:2.3 的任何匹配项。 因为没有可用的com.android.tools.build:gradle版本。在以下位置搜索: https://jcenter.bintray.com/com/android/tools/