当前位置: 首页 > 面试题库 >

Kafka Streams-根据Streams数据发送不同的主题

有凯泽
2023-03-14
问题内容

我有一个kafka
stream应用程序,等待有关topic的记录被发布user_activity。它将接收json数据,并根据我想将该流推送到不同主题的键的值来确定。

这是我的流应用程序代码:

KStream<String, String> source_user_activity = builder.stream("user_activity");
        source_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                System.out.println("value: " +  value);
                ArrayList<String> keywords = new ArrayList<String>();
                try {
                    JSONObject send = new JSONObject();
                    JSONObject received = new JSONObject(value);

                    send.put("current_date", getCurrentDate().toString());
                    send.put("activity_time", received.get("CreationTime"));
                    send.put("user_id", received.get("UserId"));
                    send.put("operation_type", received.get("Operation"));
                    send.put("app_name", received.get("Workload"));
                    keywords.add(send.toString());
                    // apply regex to value and for each match add it to keywords

                } catch (Exception e) {
                    // TODO: handle exception
                    System.err.println("Unable to convert to json");
                    e.printStackTrace();
                }

                return keywords;
            }
        }).to("user_activity_by_date");

在此代码中,我要检查操作类型,然后根据需要将流推送到相关主题中。

我该如何实现?

编辑:

我已将代码更新为:

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
KStream<String, String>[] branches = source_o365_user_activity.branch( 
      (key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),
      (key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")),
      (key, value) -> true
     );

branches[0].to("o365_sharing_set_by_date");
branches[1].to("o365_added_to_secure_link_by_date");
branches[2].to("o365_user_activity_by_date");

问题答案:

您可以使用branch方法来拆分流。此方法使用谓词将源流分成几个流。

以下代码取自kafka-streams-examples:

KStream<String, OrderValue>[] forks = ordersWithTotals.branch(
    (id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT,
    (id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT);

forks[0].mapValues(
    orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, FAIL))
    .to(ORDER_VALIDATIONS.name(), Produced
        .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));

forks[1].mapValues(
    orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, PASS))
    .to(ORDER_VALIDATIONS.name(), Produced
  .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));


 类似资料:
  • 我有一个来自Kafka主题的大型json,我正在将其转换为Java对象,以提取find DB中所需的值。一些记录中会有一系列缺陷,我需要捕获这些缺陷并将其发送到不同的主题,这样它们就可以在数据库中自己的表中结束。使用接收器连接器将值插入数据库,这就是为什么我们使用多个主题。 我发现了分支和拆分,但这似乎更适合于确定一条已消费的记录应该转到哪个主题,而不是将记录的不同部分发送到不同的主题。有没有办法

  • 我正在研究一个解决方案,我必须将Kafka两个主题t1和t2的数据结合起来 t1将包含消息的前半部分,t2将包含消息的后半部分 例如,如果完整消息是“a1b1c1d1”和“a2b2c2d2”,那么 t1将有“a1b1”和“a2b2” t2将有“c1d1”和“c2d2” 并且我必须对它们执行并集以生成“a1b1c1d1”和“a2b2c2d2” ,因为消息不会按顺序存储在KStream store中,

  • 但是,有没有一种方法可以使用$http.post()来实现这一点?我是否必须始终包含头部才能使其工作?我相信上面的内容类型指定了发送数据的格式,但是我可以将它作为javascript对象发送吗?

  • 问题内容: 谁能告诉我以下语句为什么不将帖子数据发送到指定的URL?调用了url,但是在打印$ _POST时在服务器上- 我得到一个空数组。如果我在将消息添加到数据之前在控制台中打印消息-它显示正确的内容。 我也尝试过将数据作为字符串(具有相同的结果): 当我以以下格式使用它时,它似乎可以正常工作: 但是有没有办法用$ http.post()做到这一点- 我是否总是必须包含标头才能使其正常工作?我

  • 代码不应该在“参考”表中的“n”栏中的“电子邮件发送”栏中再次发送电子邮件。 如有任何帮助,我将不胜感激

  • 问题内容: 这是我的 在我的控制器中 但是当我在Chrome中看到“网络”标签时, 如何将其作为有效载荷发送?所以那URL是 和数据一样 我的端点希望数据是请求的一部分,以便可以将其解析为 问题答案: 我正在共享我的代码库中的代码,该代码在按照@lucuma的建议进行更改后有效 The UserService looks like and ProfileController looks like