我创建自定义密钥并将数据存储在全局状态存储中,但在重新启动后,它将消失,因为全局存储在恢复时将直接从源主题中获取数据,并绕过处理器。
我的输入主题有上面的数据。
{
"id": "user-12345",
"user_client": [
"clientid-1",
"clientid-2"
]
}
我维护以下两个状态存储:
Record1: {
"id": "user-1",
"user_client": [
"clientid-1",
"clientid-2"
]
}
Record2:{
"id": "user-2",
"user_client": [
"clientid-1",
"clientid-3"
]
}
全局状态存储应该具有:
id -> json Record'
clientid-1: ["user-1", "user-2"]
clientid-2: ["user-2"]
clientid-3: ["user-2"]
如何在全局状态存储中维护上述场景的还原案例
一种方法是为GlobalKTable维护一个changelog主题(haves retention.policy=compact),我们将其称为user_client_global_ktable_changelog
,为了简单起见,假设我们将消息序列化到java类(您可以使用HashMap或JsonNode或其他):
//initial message format
public class UserClients {
String id;
Set<String> userClient;
}
//message when key is client
public class ClientUsers {
String clientId;
Set<String> userIds;
}
//your initial topic
KStream<String, UserClients> userClientKStream = streamsBuilder.stream("un_keyed_topic");
//re-map initial message to user_id:{inital_message_payload}
userClientKStream
.map((defaultNullKey, userClients) -> KeyValue.pair(userClients.getId(), userClients))
.to("user_client_global_ktable_changelog");//please provide appropriate serdes
userClientKStream
//will cause data re-partition before running groupByKey (will create an internal -repartition topic)
.flatMap((defaultNullKey, userClients)
-> userClients.getUserClient().stream().map(clientId -> KeyValue.pair(clientId, userClients.getId())).collect(Collectors.toList()))
//we have to maintain a current aggregated store for user_ids for a particular client_id
.groupByKey()
.aggregate(ClientUsers::new, (clientId, userId, clientUsers) -> {
clientUsers.getUserIds().add(userId);
return clientUsers;
}, Materialized.as("client_with_aggregated_user_ids"))
.toStream()
.to("user_client_global_ktable_changelog");//please provide appropriate serdes
例如,用于聚合本地状态的user_ids:
//re-key message for client-based message
clientid-1:user-1
//your current aggregated for `clientid-1`
"clientid-1"
{
"user_id": ["user-1"]
}
//re-key message for client-based message
clientid-1:user-2
//your current aggregated for `clientid-1`
"clientid-1"
{
"user_id": ["user-1", "user-2"]
}
在kafka中的全局状态存储是否总是一个分区,或者我们可以为全局状态存储更改日志主题设置多个分区? 我找不到任何关于这方面的明确文件。
null 谢谢你的澄清。
全局状态存储与普通状态存储有何不同? 全局状态存储是否在不同机器上运行的所有实例中都有数据副本?由于全局状态存储不使用任何更改日志主题进行恢复,因此在重新启动时它的行为在我的场景中全局存储的源主题没有键。
我有一个使用处理器api更新状态存储的拓扑,配置为复制因子3,ACKS=ALL 我试图找出这个changelog主题滞后的根本原因,因为我没有在这个处理器中发出任何外部请求。有对rocksdb状态存储的调用,但这些数据存储都是本地的,检索速度应该很快。 我的问题是这个变更日志主题的使用者到底是什么?
我们正在运行一个Kafka0.11.0的6节点集群。我们设置了全局保留和每个主题保留(以字节为单位),但这两个保留都没有应用。我在日志中没有看到任何错误,只是没有被删除(按大小;时间保留似乎起作用了) 我可以看到在分区目录中有大量的段日志文件(每个512MB)...怎么回事?! 谢谢你,雷鸣
问题内容: 目前,我正在使用一个大量的JavaScript,jQuery,Microsoft客户端JavaScript和其他库的旧网页。底线- 我无法从头开始重写整个页面,因为企业无法证明它的合理性。所以…就是这样。无论如何,我需要使用变量来污染(我确实没有尝试过)全局名称空间。我在考虑三个选项- 只需使用普通的JavaScript声明存储/检索它- 使用jQuery在DOM标签中存储/获取值-