当前位置: 首页 > 知识库问答 >
问题:

带avro序列化的C#汇合kafka问题

姚培
2023-03-14

我正在使用docker从https://github.com/confluentinc/cp-All-in-One运行kafka和其他服务,并在我的测试项目中使用用于kafka、avro和schemaRegistry的汇合nuget包。

如果要发送json消息,我现在还没有问题,但是我正在努力发送avro序列化消息。

我看到https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/avrospecific示例,我尝试用同样的方法执行,但最终我得到了如下所示的异常

Local:值序列化错误
在confluent.kafka.producer2. D__52.MoveNext()在system.runtime.compilerServices.taskawaiter.throwforNonSuccess(任务任务)在system.runtime.compilerServices.taskawaiter 1.GetResult()在c:\users\lu95eb\source\repos\kafka_playground\kafkaProducer\kafkaService.cs:行126

内部例外

对象引用未设置为对象的实例。
在Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl在Confluent.schemaRegistryClient schemaRegistryClient,Boolean autoRegisterSchema,Int32 initialBufferSize在System.Runtime.CompilerServices.Taskawaiter.ThrowForNonSuccess(Task任务)在

这是我的SpecificRecord类

public class UserInfo : ISpecificRecord
{
    public string Name { get; set; }
    public int[] Numbers { get; set; }

    public Schema Schema => Schema.Parse(@"
        {
          ""name"": ""UserInfo"",
          ""type"": ""record"",
          ""namespace"": ""kafka"",
          ""fields"": [
            {
              ""name"": ""Name"",
              ""type"": ""string""
            },
            {
              ""name"": ""Numbers"",
              ""type"": {
                ""type"": ""array"",
                ""items"": ""int""
              }
            }
          ]
        }
        ");

    public object Get(int fieldPos)
    {
        switch (fieldPos)
        {
            case 0: return Name;
            case 1: return Numbers;
            default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
        }
    }

    public void Put(int fieldPos, object fieldValue)
    {
        switch (fieldPos)
        {
            case 0: Name = (string)fieldValue; break;
            case 1: Numbers = (int[])fieldValue; break;
            default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
        }
    }
}

和用于发送消息的方法

private async Task SendSpecificRecord(UserInfo userInfo)
    {
        using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
        using (var producer =
            new ProducerBuilder<string, UserInfo>(new ProducerConfig { BootstrapServers = _brokerUrl })
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                .SetValueSerializer(new AvroSerializer<UserInfo>(schemaRegistry))
                .Build())
        {

            var message = new Message<string, UserInfo>
            {
                Key = userInfo.Name,
                Value = userInfo
            };


            await producer.ProduceAsync(SpecificTopic, message);
        }
    }

kafkaservice.cs:第126行是Await Producer.ProduceAsync(SpecificTopic,message);

如果有人能指出我做错了什么,我会很感激的。提前谢谢你。

共有1个答案

翟沈义
2023-03-14

如果有人对解决方案感到好奇(我无法想象某人是怎样的;))然后我编写了“自定义”avro序列化器和反序列化器,并像一个魅力一样工作。

public class CustomAvroSerializer<T> : IAsyncSerializer<T>
    where T : class, ISpecificRecord
{
    public Task<byte[]> SerializeAsync(T data, SerializationContext context)
    {
        return Task.Run(() =>
        {
            using (var ms = new MemoryStream())
            {
                var enc = new BinaryEncoder(ms);
                var writer = new SpecificDefaultWriter(data.Schema);
                writer.Write(data, enc);
                return ms.ToArray();
            }
        });
    }
}

public class CustomAvroDeserializer<T> : IDeserializer<T>
    where T : class, ISpecificRecord
{
    public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        using (var ms = new MemoryStream(data.ToArray()))
        {
            var dec = new BinaryDecoder(ms);
            var regenObj = (T)Activator.CreateInstance(typeof(T));

            var reader = new SpecificDefaultReader(regenObj.Schema, regenObj.Schema);
            reader.Read(regenObj, dec);
            return regenObj;
        }
    }
}
 类似资料:
  • 我试图构建一个流,它获得一个Avro主题,做一个简单的转换,然后以Avro格式再次将其发送回另一个主题,我有点卡在最后的序列化部分。 我创建了一个AVRO模式,我正在导入它并使用它创建特定的AVRO Serde。但是我不知道如何使用这个serde将电影对象序列化回AVRO。 这是流类: 谢谢

  • 我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以

  • 我有一台装有Java 1.6的服务器。在这里,我需要使用Confluent的< code > KafkaAvroDeserializer 来反序列化avro消息。 问题是: 如果我使用Confluent-1.0(它与Java兼容 如果我使用Confluent-2.0或更高版本,它拥有一切,但它只与java兼容 在这种情况下我该怎么办? 为了比较: http://docs.confluent.io/

  • 问题内容: 我目前无法在KSTREAM APP 中 反序列化avro PRIMITIVE密钥 用avro模式编码的密钥(已在模式注册表中注册), 当我使用kafka-avro-console-consumer时,我可以看到密钥已正确反序列化 但是不可能使其在KSTREAM应用程序中工作 密钥的avro模式是主要的: 我已经关注了合流的文档 它对于该值工作得很好,但是该键将是一个字符串,该字符串包含

  • 目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。