当前位置: 首页 > 知识库问答 >
问题:

创建使用架构但不使用架构注册表url的Kafka Producer

钮轩昂
2023-03-14

下午好,我最近才开始与Kafka合作,我有一个关于制作人与模式的问题。

最初,我尝试在C#中构建一个没有模式的简单生产者。到目前为止,这是可行的,代码也在一个简短的版本中给出。

无模式生产者代码

var config = new ProducerConfig
{
    BootstrapServers = "localhost:9092",
    BrokerAddressFamily = BrokerAddressFamily.V4,
};

using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
    producer.Flush();
    for(int i = 0; i < 3; i++)
    {
        producer.Produce("topic", new Message<Null, string> { Value = "Value: " + i + "..." });
    }
    producer.Flush();
};

但是模式会给我带来问题(请参阅下一节)。

假设我给了一个消费者,比如说Python中的消费者,他使用以下方案来接收整数:

{"type": "record",
 "name": "Sample",
 "fields": [{"name": "Value", "type": ["int"]}]
}

我现在想创建一个使用此方案并向Python消费者发送消息的C#生产者。根据该方案,消息应该只包含数字。

我试图构建一个使用模式的生产者,但不幸的是,在许多教程中,“模式注册表url”是运行生产者所必需的。这就是我所拥有的,但不幸的是,我无法避免使用“模式注册表url”。。。

使用模式的生产者代码

using Avro;
using Avro.Generic;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry.Serdes;
using Confluent.SchemaRegistry;
using System;
using System.Threading;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using System.IO;
namespace producer
{
    class Program
    {
        static async Task Main(string[] args)
        {
            //This is not further specified, nothing is set up here
            string schemaRegistryUrl = "localhost:8081";
            
            var schema = (RecordSchema)RecordSchema.Parse(
                @"{
                    ""type"": ""record"",
                    ""name"": ""Sample"",
                    ""fields"": [
                        {""name"": ""Value"", ""type"": [""int""]}
                    ]
                  }"
            );
            
            var config = new Confluent.Kafka.ProducerConfig
            {
                BootstrapServers = "localhost:9092",
                BrokerAddressFamily = BrokerAddressFamily.V4,
            };
            
            using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
            using (var producer = new Confluent.Kafka.ProducerBuilder<Confluent.Kafka.Null, GenericRecord>(config)
                    .SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
                    .Build())
            {
                for(int i = 0; i < 3; i++)
                {
                    var record = new GenericRecord(schema);
                    record.Add("Value", i);
                    producer.ProduceAsync("topic", new Confluent.Kafka.Message<Confluent.Kafka.Null, GenericRecord> { Value = record });
                }
                producer.Flush();
            }
        }
    }
}

这里有两个问题,没有“模式注册url”如何构建生产者?我已经发现了类似的东西(但不幸的是在Java中)。在C#中会是什么样子?我感兴趣的是一个来自生产者的解决方案,它使用scheme(从上面)将数字发送给Python消费者,最好不使用“schema registry url”。然而,我也对如何让“模式注册url”工作感兴趣。

作为提示:如果生产者试图在没有模式的情况下向Python使用者发送消息,使用者将注册此消息。但是消费者无法显示发送的号码,因为简单生产者不使用方案。我指的是第一个制作人的代码

我希望我的问题尽可能可以理解,我期待得到答案!

共有1个答案

邵祺
2023-03-14

confluent_kafkaPython库要求数据遵循confluent Schema注册表线格式。

这意味着,您需要让生产者也匹配该契约,因此您不能绕过注册表,而不编写您在代码中引用的new AvroSerializer的自己实现

如果生产者试图在没有架构的情况下向Python消费者发送消息,消费者会注册

这不会发生在合流Kafka图书馆。使用者根据消息中添加的ID从注册表中获取架构。

因此,除非您使用其他反序列化程序,否则您还需要在那里编写自己的实现

 类似资料:
  • 我正在使用微软。蔚蓝色的数据计划主义。ApacheAvro 1.0.0-beta1和我正在尝试使用avro架构将事件发送到eventhub,但由于某种原因azure响应为BadRequest:Code“:400,“Detail:”架构验证失败:解析未定义的值路径“u0022typeu0022”时出错,第1行,位置17。我使用的是微软提供的例子,但我也没有运气。 记录={Schema:{“type”

  • 嘿,我想将ConFluent模式注册表与Avro Serializers一起使用:留档现在基本上是说:不要为多个不同的主题使用相同的模式 谁能解释一下原因吗?我重新搜索了源代码,它基本上将模式存储在Kafka主题中,如下所示(topicname,magicbytes,version- 因此,除了冗余之外,我看不到多次使用模式的问题?

  • 我正在尝试从模式注册表中检索给定kafka主题的模式主题版本。我可以使用成功发布新版本,但我不确定如何检索版本。我在下面尝试使用curl请求,但结果立即命中-1(空)。 我如何修复这个GET请求,或者更好的是,我应该如何使用模式注册中心来检索一个模式?

  • 我们在当前的基础架构中安装了普通的apache Kafka,并开始记录一些我们想要使用Kafka Connect处理的数据。目前,我们使用Avro作为消息格式,但我们的基础架构中没有模式注册表。将来,我们计划用Confluent替换当前堆栈,并使用Schema Registry和Connect,但在一段时间内,我们只需要为此部署Connect。 是否可以以某种方式配置连接接收器,以便它们使用显式a

  • 当我尝试在没有模式注册表的情况下运行Kafka时, 我遇到一个错误,如需要CONNECT\u VALUE\u CONVERTER\u SCHEMA\u REGISTRY\u URL。命令[/usr/local/bin/dub sure CONNECT\u VALUE\u CONVERTER\u SCHEMA\u REGISTRY\u URL]失败! 设置kafka connect是否必须使用架构注

  • 我有一个docker容器运行AWS弹性容器服务(Fargate)中的confluentinc/cp模式注册表:5.5.0。只有一个容器正在运行。通过该模式注册表获取当前注册模式的API调用正在工作(例如,