当前位置: 首页 > 知识库问答 >
问题:

如何将记录拆分为不同的流,从一个主题到不同的流?

施学
2023-03-14

我有一个包含不同大小记录的单一源CSV文件,它将每个记录推送到一个源主题中。我想将这些记录从源主题拆分成不同的KStreams/KTables。我有一个用于一个表加载的管道,在这里我将记录从源主题以分隔格式推送到stream1中,然后将记录推送到另一个AVRO格式的流中,再将该流推送到JDBC接收器连接器中,该连接器将记录推送到MySQL数据库中。管道需要相同。但我希望将不同表的记录推送到一个源主题中,然后根据一个值将这些记录拆分为不同的流。这可能吗?我试着想办法做到这一点,但做不到。我是否可以改进管道,或者使用KTable而不是KStreams或任何其他修改?

我有一个不同的MySQL表test2具有不同的模式,该表的记录也存在于source.csv文件中。由于模式不同,我无法遵循test1的当前管道将数据插入test2表。

示例-在CSV源文件中,

线路1-9,atm,mun,ronaldo线路2-10,atm,mun,bravo,num2线路3-11,atm,sign,bravo,Sick

如果col4==C罗,如果col4==bravo和col3==mun,则转到表1,如果col4==bravo和col3==sign转到表3

我对Kafka很陌生,从前一周开始开发Kafka。

共有1个答案

洪知
2023-03-14

您可以编写一个分离的Kafka Streams应用程序,使用KStream#branch()运算符将记录从输入主题拆分到不同的KStream或输出主题:

KStream<K, V>[] branches = streamsBuilder.branch(
        (key, value) -> {filter logic for topic 1 here},
        (key, value) -> {filter logic for topic 2 here},
        (key, value) -> true//get all messages for this branch
);

// KStream branches[0] records for logic 1
// KStream branches[1] records for logic 2
// KStream branches[2] records for logic 3

或者可以像这样手动分支KStream:

KStream<K, V> inputKStream = streamsBuilder.stream("your_input_topic", Consumed.with(keySerde, valueSerdes));

inputKStream
        .filter((key, value) -> {filter logic for topic 1 here})
        .to("your_1st_output_topic");

inputKStream
        .filter((key, value) -> {filter logic for topic 2 here})
        .to("your_2nd_output_topic");
...
 类似资料:
  • 我们有两个不同的ASP.NET应用程序启用了Log4net日志记录。它们都有相同的log4net1.2.10.0版本。

  • 如何在SL4J中配置日志记录?我的项目有很多类:class1、class2、Class3....我想做两件事:将所有类记录到一个名为FILE1的文件追加器中,并具有警告级别(class1、class2、class3...)将一个名为class1的类记录到具有调试级别的名为FILE2的文件追加器中。 问题是,当我将class1的记录器配置为具有WARN级别的FILE1 appender时,我不知道如

  • 我们有一个传入的kafka主题,多个基于Avro模式的消息序列化到其中。 我们需要将Avro格式的消息拆分为多个其他kafka主题,基于某个公共模式属性的值。 想了解如何实现它,同时避免在汇流平台上构建中间客户端来进行这种拆分/路由。

  • 问题内容: 我有一个普通的INFO级别应用日志。我需要的是另外记录所有错误级别的事件,以单独的错误日志。我正在使用这样的配置: 此配置仅记录错误。如果我首先放置信息级别记录器,则它将仅记录到一般附加程序,但错误记录器将无法工作。我想让他们两个都工作。 问题答案: 您需要做的是只有一个定义了INFO级别的定义,但是在您的两个追加器定义中,您需要相应地设置其阈值,例如 然后,将两个追加程序添加到记录器

  • 问题内容: 如何配置Logback以将记录器的不同级别记录到不同的目的地? 例如,给定以下Logback配置,Logback会将消息记录到并将消息记录到吗? (请注意,此示例是第3章:Logback配置中所示示例的变形)。 问题答案: 更新:有关使用Groovy的基于所有配置的方法,请参见DeanHiller的答案。 好的,这是我最喜欢的xml方法。我为Eclipse版本执行此操作,因此我可以 单

  • 我有这样一份清单: 如何将此列表拆分为三个变量,每个变量分别保持不变