我正在尝试使用来自Kafka队列的序列化avro事件。kafka队列使用简单的java生产者填充。为了清楚起见,我共享三个组件:
Avro模式文件
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Java Producer代码片段(User.class是使用avro工具生成的)
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
user1.setFavoriteColor("blue");
String topic = "MemoryTest";
// Properties set in 'props'
KafkaProducer<Message, byte[]> producer = new KafkaProducer<Message, byte[]>(props);
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(user1, encoder);
encoder.flush();
out.close();
byte[] serializedBytes = out.toByteArray();
producer.send(new ProducerRecord<Message, byte[]>(topic, serializedBytes));
日志存储配置文件
input {
kafka {
zk_connect => "localhost:2181"
topic_id => "MemoryTest"
type => "standard_event"
group_id => "butiline_dash_prod"
reset_beginning => true
auto_offset_reset => smallest
codec => {
avro => {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
}
}
}
output {
stdout {
codec => rubydebug
}
}
问题
管道在logstash级别失败。当一个新事件被推送到Kafka时,我在logstash控制台上得到以下内容:
Alyssa�blue {:exception=>#<NoMethodError: undefined method `decode' for ["avro", {"schema_uri"=>"/opt/ELK/logstash-1.5.4/bin/user.avsc"}]:Array>, :backtrace=>["/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error}
终于找出了错误。而不是这个(如Logstash网站所建议的-https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html)
codec => {
avro => {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
}
正确的语法是(如插件的留档https://github.com/logstash-plugins/logstash-codec-avro/blob/master/DEVELOPER.md所建议的):
codec => avro {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
我猜语法变了。
问题内容: 当我在Kibana中看到结果时,我发现JSON中没有字段,而且,该字段仅包含。 是否可以解析json中的字段并将其显示在Kibana中?我有以下配置: 以及以下JSON文件: 问题答案: 是。您需要在配置中添加一个过滤器,如下所示。 在这里的文档中对此进行了很好的描述 编辑 json编解码器似乎不喜欢传入数组。单个元素与此配置一起工作: 输入: Logstash结果: } 现在有了一个
我是Logstash和Avro的初学者。我们正在建立一个系统,logstash作为Kafka队列的制作人。然而,我们遇到了这样一个问题:由Logstash生成的avro序列化事件无法由apache提供的avro工具jar(版本1.8.2)解码。此外,我们注意到Logstash和avro工具的序列化输出有所不同。 我们有以下设置: logstash 5.5版 logstash avro编解码器版本3
我读到elasticsearch Rivers/River插件不推荐使用。所以我们不能直接进行elasticsearch-kafka积分。如果我们想这样做,那么我们需要在两者之间有一个java(或任何语言)层,使用它的API将数据从kafka放到弹性搜索。 另一方面,如果我们有kafka-logstash-elasticsearch,那么我们可以去掉上面的中间层,并通过logstash来实现,只需
Logstash 和过去很多日志收集系统比,优势就在于其源码是用 Ruby 写的,所以插件开发相当容易。现在已经有两百多个插件可供选择。但是,随之而来的问题就是:大多数框架都用 Java 写,毕竟做大规模系统 Java 有天生优势。而另一个新生代 fluentd 则是标准的 Ruby 产品(即 Matz’s Ruby Interpreter)。logstash 为什么选用 JRuby 来实现,似乎
无法在linux上使用ElasticSearch映射logstash 我只是运行下面的命令,它显示了docker上所有正在运行的图像 sudo docker ps 输出: 我只想将logstash链接到弹性搜索并尝试运行以下命令 命令: 输出: 989e2a8f4d9fd972c4f2102d726a68877c989b546800899abbb8c382fb62f04c logstash。形态:
我尝试使用Logstash tcp套接字追加器将日志从java应用程序发送到Logstash。java应用程序。已经可以使用logback 1.1.9(slf4j)和其他追加器。 现在,我将以下行添加到logback-test.xml中: