当前位置: 首页 > 知识库问答 >
问题:

使用Kafka的Logstash:无法解码avro

吕高寒
2023-03-14

我正在尝试使用来自Kafka队列的序列化avro事件。kafka队列使用简单的java生产者填充。为了清楚起见,我共享三个组件:

Avro模式文件

{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

Java Producer代码片段(User.class是使用avro工具生成的)

    User user1 = new User();
    user1.setName("Alyssa");
    user1.setFavoriteNumber(256);
    user1.setFavoriteColor("blue");
    String topic = "MemoryTest";

    // Properties set in 'props'
    KafkaProducer<Message, byte[]> producer = new KafkaProducer<Message, byte[]>(props);

    ByteArrayOutputStream out = new ByteArrayOutputStream();
    DatumWriter<User> writer = new SpecificDatumWriter<User>(User.class);
    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
    writer.write(user1, encoder);
    encoder.flush();
    out.close();
    byte[] serializedBytes = out.toByteArray();
    producer.send(new ProducerRecord<Message, byte[]>(topic, serializedBytes));

日志存储配置文件

input {
        kafka {
                zk_connect => "localhost:2181"
                topic_id => "MemoryTest"
                type => "standard_event"
                group_id => "butiline_dash_prod"
        reset_beginning => true
        auto_offset_reset => smallest
        codec => {
                avro => {
                    schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
                }
            }
        } 
}

output {
    stdout { 
     codec => rubydebug 
     }
}

问题

管道在logstash级别失败。当一个新事件被推送到Kafka时,我在logstash控制台上得到以下内容:

Alyssa�blue {:exception=>#<NoMethodError: undefined method `decode' for ["avro", {"schema_uri"=>"/opt/ELK/logstash-1.5.4/bin/user.avsc"}]:Array>, :backtrace=>["/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error}

共有1个答案

夏兴平
2023-03-14

终于找出了错误。而不是这个(如Logstash网站所建议的-https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html)

codec => {
    avro => {
        schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
    }
}

正确的语法是(如插件的留档https://github.com/logstash-plugins/logstash-codec-avro/blob/master/DEVELOPER.md所建议的):

codec =>   avro {
        schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}

我猜语法变了。

 类似资料:
  • 问题内容: 当我在Kibana中看到结果时,我发现JSON中没有字段,而且,该字段仅包含。 是否可以解析json中的字段并将其显示在Kibana中?我有以下配置: 以及以下JSON文件: 问题答案: 是。您需要在配置中添加一个过滤器,如下所示。 在这里的文档中对此进行了很好的描述 编辑 json编解码器似乎不喜欢传入数组。单个元素与此配置一起工作: 输入: Logstash结果: } 现在有了一个

  • 我是Logstash和Avro的初学者。我们正在建立一个系统,logstash作为Kafka队列的制作人。然而,我们遇到了这样一个问题:由Logstash生成的avro序列化事件无法由apache提供的avro工具jar(版本1.8.2)解码。此外,我们注意到Logstash和avro工具的序列化输出有所不同。 我们有以下设置: logstash 5.5版 logstash avro编解码器版本3

  • 我读到elasticsearch Rivers/River插件不推荐使用。所以我们不能直接进行elasticsearch-kafka积分。如果我们想这样做,那么我们需要在两者之间有一个java(或任何语言)层,使用它的API将数据从kafka放到弹性搜索。 另一方面,如果我们有kafka-logstash-elasticsearch,那么我们可以去掉上面的中间层,并通过logstash来实现,只需

  • Logstash 和过去很多日志收集系统比,优势就在于其源码是用 Ruby 写的,所以插件开发相当容易。现在已经有两百多个插件可供选择。但是,随之而来的问题就是:大多数框架都用 Java 写,毕竟做大规模系统 Java 有天生优势。而另一个新生代 fluentd 则是标准的 Ruby 产品(即 Matz’s Ruby Interpreter)。logstash 为什么选用 JRuby 来实现,似乎

  • 无法在linux上使用ElasticSearch映射logstash 我只是运行下面的命令,它显示了docker上所有正在运行的图像 sudo docker ps 输出: 我只想将logstash链接到弹性搜索并尝试运行以下命令 命令: 输出: 989e2a8f4d9fd972c4f2102d726a68877c989b546800899abbb8c382fb62f04c logstash。形态:

  • 我尝试使用Logstash tcp套接字追加器将日志从java应用程序发送到Logstash。java应用程序。已经可以使用logback 1.1.9(slf4j)和其他追加器。 现在,我将以下行添加到logback-test.xml中: