当前位置: 首页 > 知识库问答 >
问题:

Ktable to KGroupTable-架构不可用(未注册状态存储更改日志架构)

常英资
2023-03-14

我有一个Kafka主题-让我们活动-每日-聚合,我想使用KGroupTable进行聚合(添加/子)。所以我使用

final KTable<String, GenericRecord> inputKTable =
      builder.table("activity-daily-aggregate",Consumed.with(new StringSerde(), getConsumerSerde());

Note: getConsumerSerde - returns >> new GenericAvroSerde(mockSchemaRegistryClient)
2.Next Step,
inputKTable.groupBy(
        (key,value)->KeyValue.pair(KeyMapper.generateGroupKey(value), new JsonValueMapper().apply(value)),
        Grouped.with(AppSerdes.String(), AppSerdes.jsonNode())
        );

在第1步和第2步之前,我已经将MockSchemaRegistryClient配置为

 mockSchemaRegistryClient.register("activity-daily-aggregate-key",
                    Schema.parse(AppUtils.class.getResourceAsStream("/avro/key.avsc")));
       mockSchemaRegistryClient.register("activity-daily-aggregate-value",
                    Schema.parse(AppUtils.class.getResourceAsStream("/avro/daily-activity-aggregate.avsc")))

当我使用测试用例运行拓扑时,我在第2步得到一个错误。

组织。阿帕奇。Kafka。溪流。错误。StreamsException:进程中捕获异常。taskId=0_0,processor=KSTREAM-SOURCE-00000000 11,topic=activity daily aggregate,partition=0,offset=0,stacktrace=org。阿帕奇。Kafka。常见的错误。SerializationException:检索Avro架构时出错:{“type”:“record”,“name”:“FactActivity”,“namespace”:“com.ascendlearning.Avro”,“fields”:……}

原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientExctive:架构未找到;错误代码:404001

当我向MockSchemaRegistrCyclient注册模式时,错误就会发生,

stream-app-id-activity-daily-aggregate-STATE-STORE-00000000 10变更日志密钥

stream-app-id-activity-daily-aggregate-STATE-STORE-00000000 10变更记录值=

我们需要做这一步吗?我想它可能会被拓扑自动处理

共有1个答案

孔建柏
2023-03-14

在博客上,https://blog.jdriven.com/2019/12/kafka-streams-topologytestdriver-with-avro/

当您在传递到TopologyTestDriver的两个属性中配置相同的mock://URL时,对于传递到createInputTopic和createOutputTopic的(反)序列化程序实例,所有(反)序列化程序都将使用相同的MockSchemaRegistryClient,并具有单个内存模式存储。

 // Configure Serdes to use the same mock schema registry URL
    Map<String, String> config = Map.of(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL);
    avroUserSerde.configure(config, false);
    avroColorSerde.configure(config, false);

    // Define input and output topics to use in tests
    usersTopic = testDriver.createInputTopic(
        "users-topic",
        stringSerde.serializer(),
        avroUserSerde.serializer());
    colorsTopic = testDriver.createOutputTopic(
        "colors-topic",
        stringSerde.deserializer(),
        avroColorSerde.deserializer());

在传递给输入/输出主题的serdes中,我没有传递模拟注册表客户机模式URL。

 类似资料:
  • 我有一个docker容器运行AWS弹性容器服务(Fargate)中的confluentinc/cp模式注册表:5.5.0。只有一个容器正在运行。通过该模式注册表获取当前注册模式的API调用正在工作(例如,

  • 问题内容: 我已经看到了有关此主题的几个问题。但是所有人都只是说,您只需要从其他方法中恢复过来即可。但是没有人解释其他的意思!我找不到关于SO的答案。这也是该问题评论的后续内容。 假设我正在开发Uber应用。驾驶员需要知道乘客的位置。 一名乘客为设置接送地点。 2分钟后,她决定取消整个接送。所以现在我需要通知司机。 这是重要的状态更改更新。 首先想到的是: 发送一个包含通知的通知,`content

  • 嘿,我想将ConFluent模式注册表与Avro Serializers一起使用:留档现在基本上是说:不要为多个不同的主题使用相同的模式 谁能解释一下原因吗?我重新搜索了源代码,它基本上将模式存储在Kafka主题中,如下所示(topicname,magicbytes,version- 因此,除了冗余之外,我看不到多次使用模式的问题?

  • 我试图使用Avro模式向我的经纪人发送消息,但“我总是收到错误: 2020-02-01 11:24:37.189[nioEventLoopGroup-4-1]错误应用程序-未经处理:POST-/api/orchestration/org。阿帕奇。Kafka。常见的错误。SerializationException:注册Avro架构时出错:io导致“字符串”。汇合的。Kafka。阴谋论。客户Rest

  • 这里是我的docker容器: 我的制作人(用Kolin写成) 我的Avro架构: 状态:打开 消息:发送的邮件。 所以我把它发送给KAFKA,在connect(jdbc sink postgres)中,我只将消息(客户端)的属性作为Fields.whitelist而不获得命令id或状态。 4-https://github.com/rodrigodevelms/kafka-registry/blob

  • 我正在尝试从模式注册表中检索给定kafka主题的模式主题版本。我可以使用成功发布新版本,但我不确定如何检索版本。我在下面尝试使用curl请求,但结果立即命中-1(空)。 我如何修复这个GET请求,或者更好的是,我应该如何使用模式注册中心来检索一个模式?