我是Kafka流处理器的新手,接触到了“拓扑”的关键概念。
我创建了源处理器,它从如下“源主题”中读取:
Topology topology = new Topology();
topology.addSource("SOURCE", "source-topic");
上面的代码片段将创建(如果我的理解正确的话)一个名为“source”的源流处理器,并将侦听Kafka主题“source topic”。
我没有为这个“SOURCE”流处理器编写任何代码,它是如何从kafka主题中获取消息的?它是由kafka stream API本身照顾的“特殊”类型的流处理器吗?
谁能帮我弄明白这一点?
topology.addSource("SOURCE", "source-topic");
“SOURCE”上方只是一个名称。您可以在此处参考拓扑类以获取更多详细信息。
其内部工作原理是-创建拓扑时,需要使用以下方法定义源、接收器和处理器:
addSource()
、addSink()和
addProcessor()
这些方法在拓扑中添加处理器节点,并在内部调用InternalTopologyBuilder类来构建拓扑图。
在
streams.start()
方法上,它调用拓扑的处理。
拓扑从一个源节点开始,从Kafka获取数据,然后包含一堆处理器节点来执行转换,最后以一个接收器节点结束,将转换后的数据写入Kafka。
addSource()
将在拓扑中创建源节点。源节点使用来自指定主题的记录,并将其传递给拓扑中的下一个节点。它没有任何其他逻辑。在封面下,源节点将启动Kafka消费者来获取记录。
Kafka Streams API使您能够专注于您的逻辑(在处理器中),而不是与消费者和生产者打交道。
我刚开始接触Kafka。我已经经历了这一切。它只表示kafka流DSL的数据/主题管理。任何人都可以共享Kafka流处理器API的相同数据管理的任何链接吗?我对处理器API的用户和内部主题管理特别感兴趣。 在流处理器开始使用输入数据之前,从哪里用输入数据填充此源主题? 简而言之,我们可以像制片人写主题一样,使用流来写Kafka的“源”主题吗?或者流仅用于主题的并行消费?我相信我们应该像“Kafka
我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。
我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。 我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。 使用transformValues和StoreBuilder(正
我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入主题。如何在Spring Cloud Streams Kafka应用程序中完成?
我正在尝试重新实现malloc,我需要理解对齐的目的。据我所知,如果内存对齐,代码将执行得更快,因为处理器不必采取额外步骤来恢复被剪切的内存位。我想我理解64位处理器读取64位逐64位内存。现在,让我们想象一下,我有一个有序的结构(没有填充):一个char、一个short、一个char和一个int。为什么short会错位?我们有区块中的所有数据!为什么地址必须是2的倍数。整数和其他类型的问题是一样
我刚开始使用Spring批处理,我有一个特殊问题。我希望使用从3个不同的jpa查询中获取结果,并分别处理它们,然后使用将它们写入一个统一的XML文件。 对于eg,生成的XML看起来像是,