当前位置: 首页 > 知识库问答 >
问题:

如何在汇流kafka C#中读取GenericRecord特定数据

糜宜民
2023-03-14

这是我的一个简单代码片段,它试图从使用者读取Avro泛型记录:

using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
  using (var consumer = new 
        ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
                        .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
                        .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                        .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                        .Build())
                {
      consumer.Subscribe(topicName);

      try
       {
           while (true)
             {
                try
                 {
                    var consumeResult = consumer.Consume();
                    Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
                    Console.WriteLine(consumeResult.Value.Schema);
                            Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);

                  }
                 catch (ConsumeException e)
                   {
                       Console.WriteLine($"Consume error: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        // commit final offsets and leave the group.
                        consumer.Close();
                    }
                }

正如您所看到的,我可以记录模式,但不知道如何获取其数据值。

 Console.WriteLine(consumeResult.Value.Schema);

 Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);

而在此对象consumeresult.value中的架构和数据如下所示:

{schema:{“type”:“record”,“name”:“user”,“namespace”:“confluent.kafka.example.avrospecific”,“fields”:[{“name”:“name”,“type”:“string”},{“name”:“favorite_number”,“type”:[“int”,“null”]},{“name”:“favorite_color”,“type”:[“string”,“null”]}]},内容:{name:sfs,favorite_number:41,

我要读取内容数据。

共有1个答案

曹季同
2023-03-14

内容不可访问

我认为value.contents不是您想要的。特别是因为该属性是private,正如您提到的

getter的定义类似于字典-源代码

尝试consumeresult.value[“favorite_number”]

当您执行consumeresult.value.schema[“favorite_number”]时,您得到的是架构中的field对象,而不是外部记录中字段的值。-源代码

 类似资料:
  • 如何在AngularJS中读取此流?我尝试使用以下代码在新窗口中将其作为PDF文件打开: 但我无法看到打开的窗口中的内容。

  • 有谁能帮助我阅读作为Web服务调用响应的XML吗。 我得到的回应是这样的: 当我得到-1作为响应时,我想预先执行动作。我如何阅读它。我在GAE中使用struts,我使用

  • 我需要在某个时间读取firebase数据库中的一个值。另外,即使数据库voteValue在以后的某个时候发生了变化,我也不希望读取值被更新。不过,如果它更容易,我想我会去它。 我的firebase数据库中有许多时隙--它们碰巧是以秒为单位的时间。我已经调用了这些时隙,我在每个时隙都有一个投票值。 所以在下面的例子中,假设时间是@11seconds。我想读到那个时段的投票值是8

  • 我正试图从我已经创建的firebase数据库中获得一个特定的值。我关注了这个youtube教程和另一个类似的问题,这与我试图实现的目标相当相关。 这就是我的Firebase数据库看起来像数据库树的样子 以下是我到目前为止所尝试的。现在我只尝试获得名称项 在Oncreate方法中: PlumberList类 应用程序几秒钟后崩溃。以下是错误logcat的外观 我是android编码的新手,所以我可能

  • 我有一堆Excel文件中的数据要读取 每个Excel文件里,有多个 sheet,想要从特定名称 sheet 里读取数据,有没有什么简单的办法?

  • 我有一个包含5个字段(列)的csv文件。在5列中,我只想读第二列和第四列,这是进一步处理所需的。现在我正在使用opencsv api的readAll()方法进行读取。通过使用这种方法,我必须处理所有列,以获得第二列和第四列的值。 有没有办法读取所需列的值,即从csv文件中读取第二个和第四个值? 这是正确的方法还是我应该使用其他方法?