当前位置: 首页 > 知识库问答 >
问题:

使用Kafka流处理器API进行用户主题管理

孙绍辉
2023-03-14

我刚开始接触Kafka。我已经经历了这一切。它只表示kafka流DSL的数据/主题管理。任何人都可以共享Kafka流处理器API的相同数据管理的任何链接吗?我对处理器API的用户和内部主题管理特别感兴趣。

TopologyBuilder builder = new TopologyBuilder();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")

在流处理器开始使用输入数据之前,从哪里用输入数据填充此源主题?

简而言之,我们可以像制片人写主题一样,使用流来写Kafka的“源”主题吗?或者流仅用于主题的并行消费?我相信我们应该像“Kafka的流API是建立在Kafka的生产者和消费者客户之上”。

共有2个答案

阎智
2023-03-14

您可以使用JXL(JavaExcel API)编写一个生产者,该生产者从Excel文件写入kafka主题。然后创建一个kafka流应用程序来使用该主题并生成另一个主题。您可以使用context.getTopic()来获取处理器正在接收的主题。然后设置多个if语句来在Process()函数中调用该主题的流程逻辑。

张兴旺
2023-03-14

是的,您必须使用KafkaProducer为提供KStream的源主题生成输入。

但是,中间主题可以通过

  • KafkaStreams#to
  • 通过
 类似资料:
  • 当前设置:Spark流作业处理timeseries数据的Kafka主题。大约每秒就有不同传感器的新数据进来。另外,批处理间隔为1秒。通过,有状态数据被计算为一个新流。一旦这个有状态的数据穿过一个treshold,就会生成一个关于Kafka主题的事件。当该值后来降至treshhold以下时,再次触发该主题的事件。 问题:我该如何避免这种情况?最好不要切换框架。在我看来,我正在寻找一个真正的流式(一个

  • 我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入主题。如何在Spring Cloud Streams Kafka应用程序中完成?

  • 注意:主题创建是在代理级别启用的。另外,这个主题正在被创建,但是它是用分区1创建的。 但是,我可以使用Kafka管理客户端API的

  • 我需要一些有关Azure API管理服务的帮助。 目前,我们有一个单页应用程序,它使用Azure上托管的两个后端服务(WebApi.NETCore)。为了对用户进行身份验证和授权,我们使用IdentityServer(也作为服务托管在Azure上)SubscriptionService。在这里,IdSrv对用户进行身份验证,并定义webapp可以访问哪些api。如果用户拥有给定API的权限,Sub

  • 我运行的是一个分布式Kafka Broker,其中使用SASL/SSL设置了代理间通信。为此,我修改了这里给出的JAAS配置。完成的文件如下所示: 我注意到“KafkaServer”部分有两个管理员用户。我也艰难地了解到我需要两者,但为什么会这样?我有一种感觉,几个月前我已经读过(但忘记了)原因,但我似乎再也找不到了。

  • 对于传入记录,我需要验证值,并且基于结果对象,我需要将错误转发到不同的主题,如果成功验证,则使用context.forward()转发相同的错误。可以使用本链接中提供的DSL来完成 现在,调用者再次需要检查并根据键来区分接收器主题。我使用processorAPI是因为我需要use头。 编辑: 当条件为false时,如何推送到不同的流。当前正在创建另一个谓词,该谓词收集链中不满足上述谓词的所有其他记