当前位置: 首页 > 知识库问答 >
问题:

全局状态存储区不创建更改日志主题如果全局存储区的输入主题有空键,解决办法是什么?

唐麒
2023-03-14

我创建自定义密钥并将数据存储在全局状态存储中,但在重新启动后,它将消失,因为全局存储在恢复时将直接从源主题中获取数据,并绕过处理器。

我的输入主题有上面的数据。

{
      "id": "user-12345",
      "user_client": [
        "clientid-1",
        "clientid-2"
      ]
} 

我维护以下两个状态存储:

    null
Record1: {
          "id": "user-1",
          "user_client": [
            "clientid-1",
            "clientid-2"
          ]
    } 



 Record2:{
          "id": "user-2",
          "user_client": [
            "clientid-1",
            "clientid-3"
          ]
    } 

全局状态存储应该具有:

id -> json Record'

clientid-1: ["user-1", "user-2"]
clientid-2: ["user-2"]
clientid-3: ["user-2"]

如何在全局状态存储中维护上述场景的还原案例

共有1个答案

盖斌
2023-03-14

一种方法是为GlobalKTable维护一个changelog主题(haves retention.policy=compact),我们将其称为user_client_global_ktable_changelog,为了简单起见,假设我们将消息序列化到java类(您可以使用HashMap或JsonNode或其他):

//initial message format
public class UserClients {
    String id;
    Set<String> userClient;
}
//message when key is client
public class ClientUsers {
    String clientId;
    Set<String> userIds;
}
//your initial topic
KStream<String, UserClients> userClientKStream = streamsBuilder.stream("un_keyed_topic");
  1. 很容易将记录重新键为user_id,只需重新键KStream然后将其发送到输出主题
//re-map initial message to user_id:{inital_message_payload}
userClientKStream
        .map((defaultNullKey, userClients) -> KeyValue.pair(userClients.getId(), userClients))
        .to("user_client_global_ktable_changelog");//please provide appropriate serdes
userClientKStream
        //will cause data re-partition before running groupByKey (will create an internal -repartition topic)
        .flatMap((defaultNullKey, userClients)
                -> userClients.getUserClient().stream().map(clientId -> KeyValue.pair(clientId, userClients.getId())).collect(Collectors.toList()))
        //we have to maintain a current aggregated store for user_ids for a particular client_id
        .groupByKey()
        .aggregate(ClientUsers::new, (clientId, userId, clientUsers) -> {
            clientUsers.getUserIds().add(userId);
            return clientUsers;
        }, Materialized.as("client_with_aggregated_user_ids"))
        .toStream()
        .to("user_client_global_ktable_changelog");//please provide appropriate serdes

例如,用于聚合本地状态的user_ids:

//re-key message for client-based message
clientid-1:user-1
//your current aggregated for `clientid-1`
"clientid-1"
{
    "user_id": ["user-1"]
}

//re-key message for client-based message
clientid-1:user-2
//your current aggregated for `clientid-1`
"clientid-1"
{
    "user_id": ["user-1", "user-2"]
}
 类似资料:
  • 在kafka中的全局状态存储是否总是一个分区,或者我们可以为全局状态存储更改日志主题设置多个分区? 我找不到任何关于这方面的明确文件。

  • 全局状态存储与普通状态存储有何不同? 全局状态存储是否在不同机器上运行的所有实例中都有数据副本?由于全局状态存储不使用任何更改日志主题进行恢复,因此在重新启动时它的行为在我的场景中全局存储的源主题没有键。

  • 我有一个使用处理器api更新状态存储的拓扑,配置为复制因子3,ACKS=ALL 我试图找出这个changelog主题滞后的根本原因,因为我没有在这个处理器中发出任何外部请求。有对rocksdb状态存储的调用,但这些数据存储都是本地的,检索速度应该很快。 我的问题是这个变更日志主题的使用者到底是什么?

  • 我们正在运行一个Kafka0.11.0的6节点集群。我们设置了全局保留和每个主题保留(以字节为单位),但这两个保留都没有应用。我在日志中没有看到任何错误,只是没有被删除(按大小;时间保留似乎起作用了) 我可以看到在分区目录中有大量的段日志文件(每个512MB)...怎么回事?! 谢谢你,雷鸣

  • 问题内容: 目前,我正在使用一个大量的JavaScript,jQuery,Microsoft客户端JavaScript和其他库的旧网页。底线- 我无法从头开始重写整个页面,因为企业无法证明它的合理性。所以…就是这样。无论如何,我需要使用变量来污染(我确实没有尝试过)全局名称空间。我在考虑三个选项- 只需使用普通的JavaScript声明存储/检索它- 使用jQuery在DOM标签中存储/获取值-