我有一个来自Kafka主题的大型json,我正在将其转换为Java对象,以提取find DB中所需的值。一些记录中会有一系列缺陷,我需要捕获这些缺陷并将其发送到不同的主题,这样它们就可以在数据库中自己的表中结束。使用接收器连接器将值插入数据库,这就是为什么我们使用多个主题。
我发现了分支和拆分,但这似乎更适合于确定一条已消费的记录应该转到哪个主题,而不是将记录的不同部分发送到不同的主题。有没有办法做到这一点,或者我需要在某个地方改变我的架构。
@Autowired
void buildPipeline(StreamsBuilder builder) throws Exception{
KStream<String, String> messageStream = builder.stream(inputTopic, Consumed.with(STRING_SERDE, STRING_SERDE));
logger.info("started consumer");
System.out.println(messageStream.toString());
KStream<String, String> auditRecords = messageStream
.map((key, value) -> {
try {
return new KeyValue<>("", mapStringToAuditOutput(value));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
});
auditRecords.to(outputTopic);
}
public String mapStringToAuditOutput(String input) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
AuditOutput auditResults = null;
try {
auditResults= mapper.readValue(input, AuditOutput.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//check for multiple defects
if(auditResults.getCaseObject().getDefects() != null){
//send defects to separate topic
}
String auditJson = JsonFlattener.flatten(mapper.writeValueAsString(auditResults));
System.out.println(auditJson);
return auditJson;
}
发现了分支和分裂,但看起来这更多的是为了确定消费记录应该转到哪个主题
对的在分支之前,您需要过滤map/MapValue,以便将部分/整个事件发送到不同的主题
更具体地说,创建中间KStream实例并多次使用to()
我的任务是编写一个java程序,从一个主题中读取xml,将其转换为JSON并发送到另一个主题。我已经创建了一个将xml转换为json的程序,但我不知道接下来该怎么做,比如如何使用该主题中的xml并将其发送给另一个主题。
我有一个包含不同大小记录的单一源CSV文件,它将每个记录推送到一个源主题中。我想将这些记录从源主题拆分成不同的KStreams/KTables。我有一个用于一个表加载的管道,在这里我将记录从源主题以分隔格式推送到stream1中,然后将记录推送到另一个AVRO格式的流中,再将该流推送到JDBC接收器连接器中,该连接器将记录推送到MySQL数据库中。管道需要相同。但我希望将不同表的记录推送到一个源主
我正在研究一个解决方案,我必须将Kafka两个主题t1和t2的数据结合起来 t1将包含消息的前半部分,t2将包含消息的后半部分 例如,如果完整消息是“a1b1c1d1”和“a2b2c2d2”,那么 t1将有“a1b1”和“a2b2” t2将有“c1d1”和“c2d2” 并且我必须对它们执行并集以生成“a1b1c1d1”和“a2b2c2d2” ,因为消息不会按顺序存储在KStream store中,
问题内容: 我有两个卡夫卡喷口,我要将其值发送到同一螺栓。 可能吗 ? 问题答案: 是的,有可能: 您也可以使用任何其他分组。 更新: 为了区分使用者螺栓中的元组(即topic_1或topic_2),有两种可能性: 1)您可以使用操作员ID(如@ user-4870385所建议): 2)您可以使用流名称(@zenbeni建议)。对于这种情况,两个喷口都需要声明命名流,而螺栓需要通过流名称连接到喷口
null preprocessor--为配置的请求拆分请求,在传递给HttpClientProcessor之前将它们放在列表中。 postprocessor-是否基于内容类型执行以下操作 xml-从单个响应中删除“xml”标记,并组合在一个根元素下形成一个响应 JSON-在一个父数组元素下组合json响应。 将所有响应组合成一个主响应(XML或JSON),作为路由的输出。 请指教。
我有两个有效载荷,希望将它们合并为一个JSON对象(流连接)。在一些地方,人们建议使用AttributesToJSON,但由于其中一个JSON没有固定的属性集,我想这是不可能的。 第一个有效载荷是 第二个是, 我想要输出为