我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。
代码当前类似于
@Bean
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SomeObject> {
val stream = streamsBuilder.stream<String, SomeObject>(inputTopicName)
val branches: Array<KStream<String, SomeObject>> = stream.branch(
{ _, value -> isValidRawData(value)},
{ _, failedValue -> true}
)
branches[0].map { _, value -> transform(value) }.to(outputTopicName)
branches[1].foreach { _, value -> s3Service.uploadEvent(value) }
}
@Bean
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SpecificRecord> {
val topics = listOf("topic1", "topic2")
val stream = streamsBuilder.stream<String, SpecificRecord>(topics)
val branches: Array<KStream<String,SpecificRecord>> = stream.branch(
{ _, value -> isRecordFromTopic1(value)},
{ _, value -> isRecordFromTopic2(value)},
{ _, failedValue -> true}
)
branches[0].map { _, value -> transformTopic1Record(value) }.to(outputTopicName)
branches[1].map { _, value -> transformTopic2Record(value) }.to(outputTopicName)
branches[2].foreach { _, value -> s3Service.uploadEvent(value) }
}
由于如第二段代码所示,有一组主题API,所以我想说这两个变体都是有效的,也是有意义的。其他一切都只是个人喜好。我会选择第一个,因为从技术上来说,最终所有的东西都将在同一个流引擎上工作。第一种解决方案在将来引入第三种记录类型时更容易支持。或者您可能有特定流的额外逻辑。您可能有一个公共流来读取所有主题,并通过该条件和分支分发它们。其余的逻辑可以通过他们自己的中间主题在他们各自的流中完成。但仍然:只是我的观点...
在准备拓扑优化时,我偶然发现了以下几点: 目前,Kafka Streams在启用时会执行两种优化: 1-源KTable将源主题重新用作变更日志主题。 2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。 这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗: 1-KTable使用内部变更日志主题吗?如果是,有人能
我有一个应用程序,是基于Spring启动,SpringKafka和Kafka流。当应用程序启动时,它会创建带有默认主题列表的kafka流拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时,有新的主题名称出现,我想将此主题添加到我的拓扑结构中。目前,我正在考虑以某种方式删除现有的拓扑,关闭并清理KafkaStreams,在创建拓扑但使用新主题名称的地方运行逻辑,并再次启动Kaf
我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?
我有一个Kafka Streams拓扑,其中我加入了5个表,每个表都是在一个主题上创建的,该主题由一些Kafka连接器填充,这些连接器产生KeyValue事件,其中Key是针对相同的Avro模式产生的,但在我的拓扑中,当我加入这些表时,Key似乎不一样,如果它们是Java等于事件。所有这些背后的原因是什么? 它与Confluent Schema Registry集成。 我们已经使用了调试器,并且在
使用kafka processor API(不是DSL)读取源主题并写入目标主题,对于单个kafka集群设置(也就是说,如果源主题和目标主题都驻留在同一集群上)来说工作很好,但是当源主题和目标主题驻留在不同的kafka集群上时,我将获得目标处理器上下文的NullPointerException 我们如何使用kafka streams处理器API从一个集群中的一个主题写到另一个集群中的另一个主题?