当前位置: 首页 > 知识库问答 >
问题:

流处理器(低级API)源处理器如何从主题中获取数据?

郦昆
2023-03-14

我是Kafka流处理器的新手,接触到了“拓扑”的关键概念。

我创建了源处理器,它从如下“源主题”中读取:

Topology topology = new Topology();
topology.addSource("SOURCE", "source-topic");

上面的代码片段将创建(如果我的理解正确的话)一个名为“source”的源流处理器,并将侦听Kafka主题“source topic”。

我没有为这个“SOURCE”流处理器编写任何代码,它是如何从kafka主题中获取消息的?它是由kafka stream API本身照顾的“特殊”类型的流处理器吗?

谁能帮我弄明白这一点?

共有2个答案

濮阳赞
2023-03-14
topology.addSource("SOURCE", "source-topic");

“SOURCE”上方只是一个名称。您可以在此处参考拓扑类以获取更多详细信息。

其内部工作原理是-创建拓扑时,需要使用以下方法定义源、接收器和处理器:

addSource()addSink()和addProcessor()

这些方法在拓扑中添加处理器节点,并在内部调用InternalTopologyBuilder类来构建拓扑图。

streams.start()方法上,它调用拓扑的处理。

孙京
2023-03-14

拓扑从一个源节点开始,从Kafka获取数据,然后包含一堆处理器节点来执行转换,最后以一个接收器节点结束,将转换后的数据写入Kafka。

addSource()将在拓扑中创建源节点。源节点使用来自指定主题的记录,并将其传递给拓扑中的下一个节点。它没有任何其他逻辑。在封面下,源节点将启动Kafka消费者来获取记录。

Kafka Streams API使您能够专注于您的逻辑(在处理器中),而不是与消费者和生产者打交道。

 类似资料:
  • 我刚开始接触Kafka。我已经经历了这一切。它只表示kafka流DSL的数据/主题管理。任何人都可以共享Kafka流处理器API的相同数据管理的任何链接吗?我对处理器API的用户和内部主题管理特别感兴趣。 在流处理器开始使用输入数据之前,从哪里用输入数据填充此源主题? 简而言之,我们可以像制片人写主题一样,使用流来写Kafka的“源”主题吗?或者流仅用于主题的并行消费?我相信我们应该像“Kafka

  • 我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。

  • 我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。 我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。 使用transformValues和StoreBuilder(正

  • 我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入主题。如何在Spring Cloud Streams Kafka应用程序中完成?

  • 我正在尝试重新实现malloc,我需要理解对齐的目的。据我所知,如果内存对齐,代码将执行得更快,因为处理器不必采取额外步骤来恢复被剪切的内存位。我想我理解64位处理器读取64位逐64位内存。现在,让我们想象一下,我有一个有序的结构(没有填充):一个char、一个short、一个char和一个int。为什么short会错位?我们有区块中的所有数据!为什么地址必须是2的倍数。整数和其他类型的问题是一样

  • 我刚开始使用Spring批处理,我有一个特殊问题。我希望使用从3个不同的jpa查询中获取结果,并分别处理它们,然后使用将它们写入一个统一的XML文件。 对于eg,生成的XML看起来像是,