当前位置: 首页 > 面试题库 >

如何在Logstash中编写grok模式

周翼
2023-03-14
问题内容

我正在尝试从logstash开始,并且我的应用程序具有以下类型的日志。这里的5表示接下来将有5行是针对不同相关事物收集的统计信息。

这些基本上是应用程序统计信息,每行指示大约一个资源。

有没有一种方法可以使用logstash正确解析它,以便可以将其用于elasticsearch

[20170502 01:57:26.209 EDT (thread-name) package-name.classname#MethodName INFO] Some info line (5 stats):
[fieldA: strvalue1| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue2| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue3| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue4| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue5| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]

编辑

这是我正在使用的配置,第一组统计信息已正确解析,但在该管道阻塞之后。请注意有150个此类日志,但是如果我仅保留2-3个日志,则可以正常工作。您能帮我在这里找到问题吗?

# [20170513 06:08:29.734 EDT (StatsCollector-1) deshaw.tools.jms.ActiveMQLoggingPlugin$ActiveMQDestinationStatsCollector#logPerDestinationStats INFO] ActiveMQ Destination Stats (97 destinations):
# [destName: topic://darts.metaDataChangeTopic | enqueueCount: 1 | dequeueCount: 1 | dispatchCount: 1 | expiredCount: 0 | inflightCount: 0 | msgsHeld: 0 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 | memoryLimit: 536870912 | avgEnqueueTimeMs: 0.0 | maxEnqueueTimeMs: 0 | minEnqueueTimeMs: 0 | currentConsumers: 1 | currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 | minMsgSize: 2392 | maxMsgSize: 2392 | avgMsgSize: 2392.0 | totalMsgSize: 2392]

input {
  file {
    path => "/u/bansalp/activemq_primary_plugin.stats.log.1"
### For testing and continual process of the same file, remove these before produciton
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^\[destName:"
      negate => false
      what => "previous"
    }
  }
}

filter {
    if ([message] =~ /^\s*$/ ){
        drop{}
    }
    if ([message] =~ /^[^\[]/) {
            drop{}
    }

    if ([message] =~ /logMemoryInfo|logProcessInfo|logSystemInfo|logThreadBreakdown|logBrokerStats/) {
            drop{}
    }
    if [message] =~ "logPerDestinationStats" {
        grok {
                match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]\s*"
                }
        }
        split { 
            field => "message"
        }
        grok {
                match => { "message" => "^\[%{DATA}:\s*%{DATA:destName}\s*\|\s*%{DATA}:\s*%{NUMBER:enqueueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dequeueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dispatchCount}\s*\|\s*%{DATA}:\s*%{NUMBER:expiredCount}\s*\|\s*%{DATA}:\s*%{NUMBER:inflightCount}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsHeld}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsCached}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryPercentUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryLimit}\s*\|\s*%{DATA}:\s*%{NUMBER:avgEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:maxEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:currentConsumers}\s*\|\s*%{DATA}:\s*%{NUMBER:currentProducers}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsCount}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:maxMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:avgMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:totalMsgSize}\]$" }
        }
        mutate {
            convert => { "message" => "string" }
            add_field => {
                "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                "load_timestamp" => "%{@timestamp}"
            }
            remove_field => ["yr","mnt", "daynum", "time", "timezone"]
        }
    }
}
output {
  stdout {codec => rubydebug}
}

问题答案:

当然有。

您需要做的是利用输入过滤器上的多行编解码器。

根据示例:

input {
  file {
    path => "/var/log/someapp.log"
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s*%{TIME}"
      negate => true
      what => previous
    }
  }
}

这基本上表明任何不以YYYYMMDD HH:mi:ss.000开头的行都将与上一行合并

现在,您可以从那里将Grok模式应用于第一行(以获取高级数据)。

一旦感到满意,您就可以从第一行获得所有所需的数据,然后可以在\ r或\ n上拆分,并使用单个grok模式(基于上面给出的示例)获取单个统计html" target="_blank">数据。

希望这可以帮助

d

更新2017-05-08 11:54:

完整的logstash conf可能看起来像这样,您将需要考虑更改grok模式以更好地满足您的要求(仅您知道数据)。

请注意,这尚未经过测试,由您自己决定。

input {
  file {
    path => "/var/log/someapp.log"
### For testing and continual process of the same file, remove these before produciton
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s*%{TIME}"
      negate => true
      what => previous
    }
  }
}
filter {
### Let's get some high level data before we split the line (note: anything you grab before the split gets copied)
    grok {
        match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]"
        }
    }
### Split the lines back out to being a single line now. (this may be a \r or \n, test which one)
    split { 
        "field" => "message"
        "terminator" => "\r" 
    }
### Ok, the lines should now be independent, lets add another grok here to get the patterns as dictated by your example [fieldA: str | field2: 0...] etc.
### Note: you should look to change the grok pattern to better suit your requirements, I used DATA here to quickly capture your content
    grok {
        break_on_match => false
        match => { "message" => "^\[%{DATA}:\s*%{DATA:fieldA}\|%{DATA}:\s*%{DATA:field2}\|%{DATA}:\s*%{DATA:field3}\|%{DATA}:\s*%{DATA:field4}\|%{DATA}:\s*%{DATA:field5}\|%{DATA}:\s*%{DATA:field6}\|%{DATA}:\s*%{DATA:field7}\]$" }
    }
    mutate {
    convert => { "message" => "string" }
        add_field => {
            "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
            "load_timestamp" => "%{@timestamp}"
        }
        remove_field => ["yr","mnt", "daynum", "time", "timezone"]
    }
}
output {
  stdout { codec => rubydebug }
}

编辑2017-05-15

Logstash是一个复杂的解析器,它希望作为一个进程保持运行状态并持续监视日志文件(因此您必须将其崩溃)

匹配中断意味着您可能对同一行有多个匹配要求,如果找不到匹配项,则尝试列表中的下一个(总是变得很简单)

输入过滤器,将路径更改为以.log *结尾,也按照您的原始示例,该模式不必与所需的日期格式匹配(以便将所有关联放在一行上)

您的过滤器应指定我相信的分割字符(否则,我相信默认字符是逗号)。

input {
  file {
    path => "/u/bansalp/activemq_primary_plugin.stats.log*"
### For testing and continual process of the same file, remove these before production
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^\[destName:"
      negate => false
      what => "previous"
    }
  }
}

filter {
    if "logPerDestinationStats" in [message] {
        grok {
                match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]\s*"
                }
        }
        split { 
            field => "message"
            terminator => "\r”
            }
        grok {
                match => { "message" => "^\[%{DATA}:\s*%{DATA:destName}\s*\|\s*%{DATA}:\s*%{NUMBER:enqueueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dequeueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dispatchCount}\s*\|\s*%{DATA}:\s*%{NUMBER:expiredCount}\s*\|\s*%{DATA}:\s*%{NUMBER:inflightCount}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsHeld}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsCached}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryPercentUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryLimit}\s*\|\s*%{DATA}:\s*%{NUMBER:avgEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:maxEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:currentConsumers}\s*\|\s*%{DATA}:\s*%{NUMBER:currentProducers}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsCount}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:maxMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:avgMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:totalMsgSize}\]$" }
        }
        mutate {
            convert => { "message" => "string" }
            add_field => {
                "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                "load_timestamp" => "%{@timestamp}"
            }
            remove_field => ["yr","mnt", "daynum", "time", "timezone"]
        }
    }
   else {
      drop{}
    }
}

请原谅我当前正在通过手机更新的格式,我很高兴有人代替我来更新格式。



 类似资料:
  • 问题内容: 我有一个格式的JSON: 我正在尝试使用logstash解析此JSON。基本上,我希望Logstash输出是可以使用kibana进行分析的key:value对的列表。我认为可以开箱即用。从大量的阅读中,我了解到我必须使用grok插件(我仍然不确定json插件的用途)。但是我无法获得所有领域的事件。我收到多个事件(甚至对于JSON的每个属性都一个)。像这样: 我应该使用多行编解码器还是j

  • 问题内容: 我有一个看起来像这样的日志文件(简化) Logline样本 我想提取 数据中 包含的json 并创建两个字段,一个用于名字,一个用于姓氏。但是,我得到的输出是这样的: 如你看到的 那不是我所需要的,我需要在kibana中为firstname和lastname创建字段,但是logstash不会使用json过滤器提取字段。 LogStash配置 非常感谢任何帮助,我敢肯定我错过了一些简单的

  • 我一直在寻找grok过滤器,用于处理log4j日志中的转换模式 下面是一个示例日志条目: 谁能帮我找到Logstash grok过滤器。

  • 我正在使用Logstash转发器从远程服务器接收Log4j生成的日志文件。log事件的字段包括一个名为“file”的字段,格式为/tomcat/logs/app.log、/tomcat/logs/app.log.1等。当然,文件路径/tomcat/logs位于远程计算机上,我希望Logstash仅使用文件名而不使用远程文件路径在本地文件系统上创建文件。 Logstash配置-我用什么来编写过滤器部

  • 问题内容: 是一个浮动字段。所提到的索引在elasticsearch中不存在。当使用运行配置文件时,我没有例外。但是,elasticsearch中反映和输入的数据显示了as 的映射。我该如何纠正?以及如何针对多个字段执行此操作? 问题答案: 你有两个问题。首先,您的grok过滤器会在csv过滤器之前列出,并且由于应用了过滤器是为了在应用grok过滤器时不会出现要转换的“基本”字段。 其次,除非您明

  • 我正在为一个在Laravel中完成的站点编写代码,我知道不建议在Blade模板中编写PHP代码,但在这个例子中,我的可用时间有限。 中间的PHP代码不起作用?