当前位置: 首页 > 知识库问答 >
问题:

带有集中式Avro模式的Apache Kafka

谷翰飞
2023-03-14

我们使用Apache Kafka(不是confluent Kafka)0.10。我们想用Kafka设置AVRO模式。我有如下的avro模式。

{
  "namespace": "Rule",
  "type": "record",
  "name": "RuleMessage",
  "fields": [
    {
      "name": "station",
      "type": "string"
    },
    {
      "name": "model",
      "type": "string"
    }
}

序列化消息,

public byte[] serializeMessage(EventMessage eventMessage) throws IOException {

        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        DatumWriter<EventMessage> writer = new SpecificDatumWriter<EventMessage>(EventMessage.getClassSchema());
        writer.write(eventMessage, encoder);
        encoder.flush();
        out.close();
        return out.toByteArray();
    }

这正像预期的那样起作用。

但是,希望在主题级别设置一个Avro模式,这样,如果消息不符合Avro模式,主题将拒绝消息

不管怎么说,我可以用阿帕奇Kafka0.10做到这一点。

共有1个答案

戚晨
2023-03-14

您可以在Apache Kafka0.10.0中使用Confluent的模式注册表(其开源和Apache许可)将模式与主题关联起来。它与Avro序列化器/反序列化器一起到达,这些序列化器可以按照您所请求的方式自动验证Avro模式。

请注意,这不是所谓的“合流Kafaka”--这将是一个商标侵犯拥有它。为了方便起见,Confluent只是将Apache Kafka打包在其发行版中,但由于模式注册表在github上,所以如果您愿意,您可以不使用Confluent打包而使用它。

 类似资料:
  • 因此,我们计划使用Avro在融合的Kafka生态系统上进行交流。我目前对Avro的理解是,每条消息都有自己的模式。如果是这样的话,我们需要模式注册表来解决版本更新吗? 我问,因为在每条消息中携带模式可以防止需要像模式注册表这样的东西来将消息ID映射到模式。还是我在这里错过了什么?

  • 大家好,我需要为下面的示例创建AVRO模式; 当我按照建议更改所有者对象时,avro-tool返回错误。 ]} 测试:

  • 我是Docker Swarm的新手。我试图用compose文件在Docker swarm上部署redis集群。我希望redis集群使用端口6380,所以我配置了端口,并让它在compose文件中挂载redis配置文件。 但是当我运行时,我得到了一个错误的声明,“对不起,集群配置文件redis-node.conf已经被不同的Redis集群节点使用了。请确保不同的节点使用不同的集群配置文件。” 这是我

  • 我正在从Firebase实时数据库切换到云Firestore。我的数据库包含拥有存储的用户,每个存储都包含盒子。每个用户都可以拥有多个包含盒子的存储器。每个存储可以包含几个盒子。每个箱子只能放在一个仓库里。 在我应用程序的主视图中,对于该特定用户,我需要列出所有存储以及每个存储中的框,如下所示: 然后,用户应该能够点击每个框以查看内容和更多信息。 在Firebase实时数据库中,每个用户只需一个请

  • 我有两个问题: > 我曾尝试使用模式V1编写记录,并使用模式V2读取记录,但出现以下错误: org.apache.avro。AvroTypeException:找到foo,应为foo 我使用avro-1.7.3和: 以下是这两种模式的示例(我也尝试过添加命名空间,但没有成功)。 架构V1: 架构V2: 提前谢谢。

  • 根据Avro模式规范(适用于接头):https://avro.apache.org/docs/current/spec.html 如上所述,Unions使用JSON数组表示。例如,["null","string"]声明一个模式,该模式可以是null或string。 ( 请注意,当为类型为联合的记录字段指定默认值时,默认值的类型必须与联合的第一个元素匹配。 因此,对于包含“null”的联合,通常首先