使用kafka processor API(不是DSL)读取源主题并写入目标主题,对于单个kafka集群设置(也就是说,如果源主题和目标主题都驻留在同一集群上)来说工作很好,但是当源主题和目标主题驻留在不同的kafka集群上时,我将获得目标处理器上下文的NullPointerException
Topology
topology.addSource("mySource", "SourceTopic");
topology.addProcessor("SourceStreamProcessor",()->new SourceStreamProcessor(), "mySource");
topology.addProcessor("TargetProcessor",()->new TargetProcessor(), "Target");
topology.addSink("sink1","OUTPUT_TOPIC1","TargetProcessor");
topology.addSink("sink2","OUTPUT_TOPIC2","TargetProcessor");
Properties sourceProcessorProps = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SourceStreamProcessor"); // Kafka Cluster 1
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_Cluser_xx.org:9092");
Properties targetProcessorProps = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "targetStreamProcessor"); // Kafka Cluster 2
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test_Cluser_xx.org:9092");
我们如何使用kafka streams处理器API从一个集群中的一个主题写到另一个集群中的另一个主题?
Kafka流不支持从一个Kafka集群读取并写入另一个Kafka集群。
您可以在一个集群内处理消息,然后使用Mirror Maker将其复制到另一个集群。
对于传入记录,我需要验证值,并且基于结果对象,我需要将错误转发到不同的主题,如果成功验证,则使用context.forward()转发相同的错误。可以使用本链接中提供的DSL来完成 现在,调用者再次需要检查并根据键来区分接收器主题。我使用processorAPI是因为我需要use头。 编辑: 当条件为false时,如何推送到不同的流。当前正在创建另一个谓词,该谓词收集链中不满足上述谓词的所有其他记
我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者
我想建立一个多kafka集群,大约有3个zookeeper实例,每个集群中有3个kafka代理,每个kafka经纪人大约有5个主题和5个分区。有什么设置指南可以参考吗? PS:我可以找到带有多个Kafka代理的单个zookeeper实例的信息,但不能找到带有多个zookeeper实例的设置。
在准备拓扑优化时,我偶然发现了以下几点: 目前,Kafka Streams在启用时会执行两种优化: 1-源KTable将源主题重新用作变更日志主题。 2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。 这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗: 1-KTable使用内部变更日志主题吗?如果是,有人能
我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。 代码当前类似于
本教程用于多代理kafka集群。我建立了三个经纪人: 本地主机:9092 本地主机:9093 本地主机:9094 问题是,如果我杀死,我就不能使用以下命令: 我知道端口被杀死了,但是--如何通过通用引导服务器来使它运行?我错过了什么? 编辑1: bin/kafka-console-consumer.sh--bootstrap-server localhost:9092,localhost:9093