当前位置: 首页 > 知识库问答 >
问题:

Kafka使用者模式注册表动态主题名称

颜嘉誉
2023-03-14

我有多个主题A、B和C,我正在使用Kafka Streams将它们扇形成主题X。主题A、B和C使用默认主题名称策略在模式注册表中注册为主题。流式传输非常愚蠢,因为它只是将消息扇形,而不确保它们符合注册表中的模式,但它会在消息中添加一个ORIGINAL_TOPIC_NAME标头以指示它来自主题A、B或C。

然后,我让Kafka消费者从主题X中消费。该主题未在模式注册表中注册。Kafka消费者是我使用Kafkaavroderialiser和schema.registry的地方。使用注册表消费的url。我计划让消费者在这里对注册表进行检查,但使用原始的_TOPIC\u NAME头作为主题。但是,我不确定是否可以控制消费者,使其使用Kafka头解析主题名称,因为主题名称策略是您在初始化时输入消费者的内容。

有什么想法吗?

共有1个答案

石苏燕
2023-03-14

策略传递给反序列化器

反序列化函数只能访问键或值的传入字节数组以及主题名,而不能访问头。

可以设置头反序列化器,但它不在键/值反序列化器的调用路径中使用。

解决方法是对键/值使用ByteArrayDeserializer,可以选择手动提取头,使用如下逻辑获取模式ID

ByteBuffer bb = ByteBuffer.wrap(value); // or key
if (bb.get() != 0x0) {
    // not a valid encoded Schema Registry record 
}
int schemaId = bb.getInt();
// Completely ignores the subject 
Schema schema = schemaRegistryClient.getId(schemaId);
// TODO: continue to deserialize the key/value byte array 
 类似资料:
  • 我有一个关于使用Kafka和主题(Kafka代理)和主题(Schema注册表)的不同名称设置流处理器的问题。 首先,任何操作似乎都可以与 Kafka 代理和模式注册表一起工作,但是如果处理器收到该事件,则模式注册表将魔术开始。 而不是将abc作为主题发送到模式注册表abc。bla将被发送。架构注册表的回答为“未找到”。 预期:localhost:8081/subjects/ABC/versions

  • 我试图从REST代理发布json模式,但遇到异常 curl-k-x post-h“content-type:application/vnd.schemaregistry.v1+json”--数据“{”schema“:”{“type”:“object”,“properties”:{“firstname”:{“type”:“string”},“lastname”:{“type”:“string”},“

  • 融合模式注册表底层存储在Kafka中_schema主题下的所有模式。是否可以将此后端存储主题拆分为_schema1和_schema2等倍数? 为什么因为在我的生产用例中我想存储1000多个模式,所以分离后端存储会使主题的负载更少。 此外,融合模式注册表是否支持Kafka主题以外的后端存储?

  • 我有一个Kafka主题,我想用AVRO数据(目前是JSON)来填充它。我知道“正确”的方法是使用模式注册表,但出于测试目的,我想让它在没有它的情况下工作。 因此,我将AVRO数据作为数组[字节]发送,而不是常规的Json对象: 模式是在每个数据中启动的;我如何使它与kafka-connect一起工作?kafka-connect配置目前表现出以下属性(数据作为json.gz文件写入s3),我想编写P

  • 我试图用函数编程(和spring cloud stream)转换来自输入主题的输入AVRO消息,并在输出主题上发布新消息。下面是我的转换函数: 我的spring boot应用程序是以这种方式声明的,并激活了模式注册表客户机: 谢谢你能给我带来的任何帮助。 视CG