当前位置: 首页 > 知识库问答 >
问题:

使用kafkastreams的json,我想将其拆分并发送到两个不同的主题

都沈浪
2023-03-14

我有一个来自Kafka主题的大型json,我正在将其转换为Java对象,以提取find DB中所需的值。一些记录中会有一系列缺陷,我需要捕获这些缺陷并将其发送到不同的主题,这样它们就可以在数据库中自己的表中结束。使用接收器连接器将值插入数据库,这就是为什么我们使用多个主题。

我发现了分支和拆分,但这似乎更适合于确定一条已消费的记录应该转到哪个主题,而不是将记录的不同部分发送到不同的主题。有没有办法做到这一点,或者我需要在某个地方改变我的架构。

    @Autowired
    void buildPipeline(StreamsBuilder builder) throws Exception{
        KStream<String, String> messageStream = builder.stream(inputTopic, Consumed.with(STRING_SERDE, STRING_SERDE));
                logger.info("started consumer");
        System.out.println(messageStream.toString());

        KStream<String, String> auditRecords = messageStream
                  .map((key, value) -> {
                      try {
                          return new KeyValue<>("", mapStringToAuditOutput(value));
                      } catch (JsonProcessingException e) {
                          e.printStackTrace();
                      }
                      return null;
                  });
     auditRecords.to(outputTopic);

    }
    public String mapStringToAuditOutput(String input) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        AuditOutput auditResults = null;
        try {
            auditResults=  mapper.readValue(input, AuditOutput.class);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        //check for multiple defects
        if(auditResults.getCaseObject().getDefects() != null){
            //send defects to separate topic

        }
        String auditJson = JsonFlattener.flatten(mapper.writeValueAsString(auditResults));
        System.out.println(auditJson);
        return auditJson;
    }

共有1个答案

法和安
2023-03-14

发现了分支和分裂,但看起来这更多的是为了确定消费记录应该转到哪个主题

对的在分支之前,您需要过滤map/MapValue,以便将部分/整个事件发送到不同的主题

更具体地说,创建中间KStream实例并多次使用to()

 类似资料:
  • 我的任务是编写一个java程序,从一个主题中读取xml,将其转换为JSON并发送到另一个主题。我已经创建了一个将xml转换为json的程序,但我不知道接下来该怎么做,比如如何使用该主题中的xml并将其发送给另一个主题。

  • 我有一个包含不同大小记录的单一源CSV文件,它将每个记录推送到一个源主题中。我想将这些记录从源主题拆分成不同的KStreams/KTables。我有一个用于一个表加载的管道,在这里我将记录从源主题以分隔格式推送到stream1中,然后将记录推送到另一个AVRO格式的流中,再将该流推送到JDBC接收器连接器中,该连接器将记录推送到MySQL数据库中。管道需要相同。但我希望将不同表的记录推送到一个源主

  • 问题内容: 我有两个卡夫卡喷口,我要将其值发送到同一螺栓。 可能吗 ? 问题答案: 是的,有可能: 您也可以使用任何其他分组。 更新: 为了区分使用者螺栓中的元组(即topic_1或topic_2),有两种可能性: 1)您可以使用操作员ID(如@ user-4870385所建议): 2)您可以使用流名称(@zenbeni建议)。对于这种情况,两个喷口都需要声明命名流,而螺栓需要通过流名称连接到喷口

  • 我正在研究一个解决方案,我必须将Kafka两个主题t1和t2的数据结合起来 t1将包含消息的前半部分,t2将包含消息的后半部分 例如,如果完整消息是“a1b1c1d1”和“a2b2c2d2”,那么 t1将有“a1b1”和“a2b2” t2将有“c1d1”和“c2d2” 并且我必须对它们执行并集以生成“a1b1c1d1”和“a2b2c2d2” ,因为消息不会按顺序存储在KStream store中,

  • null preprocessor--为配置的请求拆分请求,在传递给HttpClientProcessor之前将它们放在列表中。 postprocessor-是否基于内容类型执行以下操作 xml-从单个响应中删除“xml”标记,并组合在一个根元素下形成一个响应 JSON-在一个父数组元素下组合json响应。 将所有响应组合成一个主响应(XML或JSON),作为路由的输出。 请指教。

  • 我有两个有效载荷,希望将它们合并为一个JSON对象(流连接)。在一些地方,人们建议使用AttributesToJSON,但由于其中一个JSON没有固定的属性集,我想这是不可能的。 第一个有效载荷是 第二个是, 我想要输出为