当前位置: 首页 > 知识库问答 >
问题:

使用kafka-streams对json输入流进行条件排序

濮佑运
2023-03-14

我是开发kafka-streams应用程序的新手。我的流处理器用于根据输入json消息中的用户键值对json消息进行排序。

Message 1: {"UserID": "1", "Score":"123", "meta":"qwert"}
Message 2: {"UserID": "5", "Score":"780", "meta":"mnbvs"}
Message 3: {"UserID": "2", "Score":"0", "meta":"fghjk"}

我读到这里动态连接一个Kafka输入流到多个输出流,没有动态解决方案。

在我的用例中,我知道对输入流排序所需的用户键和输出主题。因此,我编写了针对每个用户的单独的处理器应用程序,其中每个处理器应用程序匹配不同的用户ID。

所有不同的流处理器应用程序都从kafka中的相同json输入主题读取,但每个应用程序都只在满足预设用户条件的情况下将消息写入特定用户的输出主题。

public class SwitchStream extends AbstractProcessor<String, String> {
        @Override
        public void process(String key, String value) {
            HashMap<String, String> message = new HashMap<>();
            ObjectMapper mapper = new ObjectMapper();
            try {
                message = mapper.readValue(value, HashMap.class);
            } catch (IOException e){}

            // User condition UserID = 1
            if(message.get("UserID").equals("1")) {
                context().forward(key, value);
                context().commit();
            }
        }

        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sort-stream-processor");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            TopologyBuilder builder = new TopologyBuilder();
            builder.addSource("Source", "INPUT_TOPIC");
            builder.addProcessor("Process", SwitchStream::new, "Source");
            builder.addSink("Sink", "OUTPUT_TOPIC", "Process");

            KafkaStreams streams = new KafkaStreams(builder, props);
            streams.start(); 
       }
}

问题1:如果使用低级处理器API,那么使用高级流DSL是否可以轻松实现相同的功能?(我承认我发现更难理解和跟随其他在线的高级流DSL示例)

问题2:input json主题以20k-25k EPS的高速率获取输入。我的处理器应用程序似乎无法跟上这个输入流的步伐。我曾尝试部署每个流程的多个实例,但结果与我希望的结果相去甚远。理想情况下,每个处理器实例应该能够处理3-5K EP。

有没有一种方法来改进我的处理器逻辑或编写相同的处理器逻辑使用高级流DSL?那会有什么不同吗?

共有1个答案

萧浩漫
2023-03-14

您可以在高级DSL中通过filter()(您有效地实现了一个筛选器,因为您只返回一个消息,如果它是userid==1)。您可以通过使用kstream#branch()(有关更多详细信息,请参阅文档:http://docs.confluent.io/current/streams/developer-guide.html#Stateless-transformations)来概括这种筛选模式。也请阅读JavaDocs:http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/streams

KStreamBuilder builder = new KStreamBuilder();
builder.stream("INPUT_TOPIC")
       .filter(new Predicate() {
           @Overwrite
           boolean test(String key, String value) {
               // put you processor logic here
               return message.get("UserID").equals("1")
           }
        })
       .to("OUTPUT_TOPIC");

关于表演。单个实例应该能够处理10k+记录。如果没有任何进一步的信息,很难判断问题可能是什么。我建议在Kafka用户列表中询问(见http://Kafka.apache.org/contact)

 类似资料:
  • 我试图在Kafka流之上实现一个简单的CQRS/Event sourcing概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/所述) 我有4个基本部分: 命令处理器-命令流,左与聚合状态KTABLE连接。对于结果流中的每个条目,使用函数生成结果事件,并将它们发布到主题 问题是--有没有办法确保我在州存储中有聚

  • 问题内容: 我有一个包含Quote对象的数组列表。我希望能够按名称,更改和更改百分比的字母顺序进行排序。如何排序我的数组列表? 问题答案: 创建一个合适的对象,它将根据你所需的条件比较两个项目。然后在你的上使用 。 如果以后要按其他条件排序,请使用不同的再次调用。

  • 问题内容: 我有一个对象数组: 联系人类别: 而且我想通过该数组进行排序,然后在某些情况下接触得到了相同的。 我可以按其中一个条件进行排序,但不能同时按两个条件进行排序。 我如何添加更多条件来对该数组进行排序? 问题答案: 想一想“按多个标准排序”是什么意思。这意味着首先通过一个条件比较两个对象。然后,如果这些条件相同,则领带将被下一个条件破坏,依此类推,直到获得所需的排序。 您在这里看到的是me

  • 我有一个这样的方法: 此方法需要以如下字符串形式返回3个最昂贵项目的产品ID:“item1,item2,item3”。我应该只能使用溪流,我被困在这里了。我应该能够按值对项目进行排序,然后获得产品ID,但我似乎无法使其正常工作。 编辑: 产品ID位于入口类中

  • 我有来自 3 个 mysql 表、1 个主表和两个子表的原始流。我尝试加入三个原始流并转换为单个输出流。如果父流上有任何更新,但如果子流发生任何变化,则不触发输出,它就可以工作。 父流上的任何新添加或更新都由处理器拾取,并将其与其他KTable连接,并在输出流上返回。但对child1stream或child2stream的任何添加或更新都不会触发输出流。 我认为将所有输入流设为 KTable,它们

  • 问题内容: 我有一个对象列表。每个对象都包含a 和a (以及其他)。 我想先按排序,然后按排序。 如何以最干净的方式做到这一点? 谢谢! 马耳他 问题答案: 给定一个对象类,如下所示: 编写一个自定义比较器类,如下所示: 然后排序如下: