我有一个kafka
stream应用程序,等待有关topic的记录被发布user_activity
。它将接收json数据,并根据我想将该流推送到不同主题的键的值来确定。
这是我的流应用程序代码:
KStream<String, String> source_user_activity = builder.stream("user_activity");
source_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
System.out.println("value: " + value);
ArrayList<String> keywords = new ArrayList<String>();
try {
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);
send.put("current_date", getCurrentDate().toString());
send.put("activity_time", received.get("CreationTime"));
send.put("user_id", received.get("UserId"));
send.put("operation_type", received.get("Operation"));
send.put("app_name", received.get("Workload"));
keywords.add(send.toString());
// apply regex to value and for each match add it to keywords
} catch (Exception e) {
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
}
return keywords;
}
}).to("user_activity_by_date");
在此代码中,我要检查操作类型,然后根据需要将流推送到相关主题中。
我该如何实现?
编辑:
我已将代码更新为:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
KStream<String, String>[] branches = source_o365_user_activity.branch(
(key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),
(key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")),
(key, value) -> true
);
branches[0].to("o365_sharing_set_by_date");
branches[1].to("o365_added_to_secure_link_by_date");
branches[2].to("o365_user_activity_by_date");
您可以使用branch
方法来拆分流。此方法使用谓词将源流分成几个流。
以下代码取自kafka-streams-examples:
KStream<String, OrderValue>[] forks = ordersWithTotals.branch(
(id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT,
(id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT);
forks[0].mapValues(
orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, FAIL))
.to(ORDER_VALIDATIONS.name(), Produced
.with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));
forks[1].mapValues(
orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, PASS))
.to(ORDER_VALIDATIONS.name(), Produced
.with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));
我有一个来自Kafka主题的大型json,我正在将其转换为Java对象,以提取find DB中所需的值。一些记录中会有一系列缺陷,我需要捕获这些缺陷并将其发送到不同的主题,这样它们就可以在数据库中自己的表中结束。使用接收器连接器将值插入数据库,这就是为什么我们使用多个主题。 我发现了分支和拆分,但这似乎更适合于确定一条已消费的记录应该转到哪个主题,而不是将记录的不同部分发送到不同的主题。有没有办法
我正在研究一个解决方案,我必须将Kafka两个主题t1和t2的数据结合起来 t1将包含消息的前半部分,t2将包含消息的后半部分 例如,如果完整消息是“a1b1c1d1”和“a2b2c2d2”,那么 t1将有“a1b1”和“a2b2” t2将有“c1d1”和“c2d2” 并且我必须对它们执行并集以生成“a1b1c1d1”和“a2b2c2d2” ,因为消息不会按顺序存储在KStream store中,
但是,有没有一种方法可以使用$http.post()来实现这一点?我是否必须始终包含头部才能使其工作?我相信上面的内容类型指定了发送数据的格式,但是我可以将它作为javascript对象发送吗?
问题内容: 谁能告诉我以下语句为什么不将帖子数据发送到指定的URL?调用了url,但是在打印$ _POST时在服务器上- 我得到一个空数组。如果我在将消息添加到数据之前在控制台中打印消息-它显示正确的内容。 我也尝试过将数据作为字符串(具有相同的结果): 当我以以下格式使用它时,它似乎可以正常工作: 但是有没有办法用$ http.post()做到这一点- 我是否总是必须包含标头才能使其正常工作?我
代码不应该在“参考”表中的“n”栏中的“电子邮件发送”栏中再次发送电子邮件。 如有任何帮助,我将不胜感激
问题内容: 这是我的 在我的控制器中 但是当我在Chrome中看到“网络”标签时, 如何将其作为有效载荷发送?所以那URL是 和数据一样 我的端点希望数据是请求的一部分,以便可以将其解析为 问题答案: 我正在共享我的代码库中的代码,该代码在按照@lucuma的建议进行更改后有效 The UserService looks like and ProfileController looks like