我正在学习QuarkusKafka流教程,不太明白如何启动管道。
在本教程中,org.apache.kafka.streams。StreamsBuilder
用于构建org.apache.kafka.streams。描述管道的拓扑
。构建拓扑的方法用<code>注释为products<code>。在本备忘单中,描述了这足以运行Kafka Streams管道。在本教程中,还公开了httpendpoint。这在我目前正在实现的服务中不是必需的。此外,在本示例中,从未显式调用提供程序方法。当我在没有endpoint的情况下启动应用程序时,管道不会启动。
在本教程中,管道使用拓扑显式实例化。但在这里,属性必须手动设置,并且配置不是从Quarkus.kafka流中获取的。
问题是:如何使用第一个教程中的拓扑生成器来启动它所描述的管道?最佳情况是来自
Quarkus.kafka流的配置。
使用:
Java OpenJDK 11.0.8
Quarkus版本:1.8.0.最终版
匿名用户
解决了问题:无法保证完全正确。只是想分享解决我问题的方法。
最重要的是使用正确的@products
<代码>javax.enterprise.inject。products必须用于拓扑
producting方法javax.ws.rs.products
还可以用于定义输出的媒体类型,但不是强制性的:
@javax.ws.rs.Produces( MediaType.TEXT_PLAIN )
@Produces
@AlternativePriority( 1 )
public Topology buildTopology() {
...
}
Kafka 流的
实例由框架在启动时自动构建。运行管道所需的全部内容如下:
@ApplicationScoped
public class YourApplication {
private final KafkaStreams streams;
public NasDistributorApplication( final KafkaStreams streams ) {
this.streams = streams;
}
public void onStart( @Observes final StartupEvent startupEvent ) {
streams.start();
}
public void onStop( @Observes final ShutdownEvent shutdownEvent ) {
streams.close();
}
}
通过< code > @ alternative priority(1)对< code >拓扑生成方法进行注释,可能有必要告诉< code>quarkus使用此方法来构建< code >拓扑。我不知道框架的内部结构,但我怀疑使用了默认拓扑,并且< code > @ alternative priority 给予自定义方法比默认< code >拓扑更高的优先级
在准备拓扑优化时,我偶然发现了以下几点: 目前,Kafka Streams在启用时会执行两种优化: 1-源KTable将源主题重新用作变更日志主题。 2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。 这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗: 1-KTable使用内部变更日志主题吗?如果是,有人能
我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。 代码当前类似于
是否可以在Spring Cloud中使用@EnableBinding注释的类流或在方法中使用@StreamListener使用交互式查询(InteractiveQueryService)?我尝试在提供的KStreamMusicSampleApplication类和process方法中实例化ReadOnlyKeyValueStore,但它始终为空。 我的@StreamListener方法正在监听一组
我有一个应用程序,是基于Spring启动,SpringKafka和Kafka流。当应用程序启动时,它会创建带有默认主题列表的kafka流拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时,有新的主题名称出现,我想将此主题添加到我的拓扑结构中。目前,我正在考虑以某种方式删除现有的拓扑,关闭并清理KafkaStreams,在创建拓扑但使用新主题名称的地方运行逻辑,并再次启动Kaf
现在我想在一个污点中使用Drools,它在LocalCluster中正常工作,但是当我把它放在生产集群中时,它有错误。污点是: 我使用官方文件创建了kiesession。误差为: 也许有些东西没有初始化。但当blot执行时,我创建了一个新的kieservice。有人能帮我吗 谢啦!
> 灵光升起 StormUI启动 我使用的两个工人都起来了 Zookeper已启动 我和暴风一起跑 Storm罐myjar.jar MyClass Nimbus提交拓扑 该拓扑的日志文件不会出现在workers中。 我在supervisor.log上的worker中有以下日志: 所以我确信我与nimbus有连接问题,但是worker中的属性文件是: 错误在哪里,我如何修复它? 谢了!