扩展方案 - rsyslog

优质
小牛编辑
129浏览
2023-12-01

Rsyslog 是 RHEL6 开始的默认系统 syslog 应用软件(当然,RHEL 自带的版本较低,实际官方稳定版本已经到 v8 了)。官网地址:http://www.rsyslog.com

目前 Rsyslog 本身也支持多种输入输出方式,内部逻辑判断和模板处理。

rsyslog - 图1

常用模块介绍

不同模块插件在 rsyslog 流程中发挥作用的原理,可以阅读:http://www.rsyslog.com/doc/master/configuration/modules/workflow.html

流程中可以使用 mmnormalize 组件来完成数据的切分(相当于 logstash 的 filters/grok 功能)。

rsyslog 从 v7 版本开始带有 omelasticsearch 插件可以直接写入数据到 elasticsearch 集群,配合 mmnormalize 的使用示例见: http://puppetlabs.com/blog/use-rsyslog-and-elasticsearch-powerful-log-aggregation

而 normalize 语法说明见: http://www.liblognorm.com/files/manual/index.html?sampledatabase.htm

类似的还有 mmfields 和 mmjsonparse 组件。注意,mmjsonparse 要求被解析的 MSG 必须以 @CEE:开头,解析之后的字符串为 JSON。使用示例见:http://blog.sematext.com/2013/05/28/structured-logging-with-rsyslog-and-elasticsearch/

此外,rsyslog 从 v6 版本开始,设计了一套 rainerscript 作为配置中的 DSL。利用 rainerscript 中的函数,也可以做到一些数据解析和逻辑判断:

  • tolower
  • cstr
  • cnum
  • wrap
  • replace
  • field
  • re_extract
  • re_match
  • contains
  • if-else
  • foreach
  • lookup
  • set/reset/unset

详细说明见:http://www.rsyslog.com/doc/v8-stable/rainerscript/functions.html

rsyslog 与 logstash 合作

虽然 Rsyslog 很早就支持直接输出数据给 elasticsearch,但如果你使用的是 v8.4 以下的版本,我们这里并不推荐这种方式。因为normalize 语法还是比较简单,只支持时间,字符串,数字,ip 地址等几种。在复杂条件下远比不上完整的正则引擎。

那么,怎么使用 rsyslog 作为日志收集和传输组件,来配合 logstash 工作呢?

如果只是简单的 syslog 数据,直接单个 logstash 运行即可,配置方式见本书 2.4 章节。

如果你运行着一个高负荷运行的 rsyslog 系统,每秒传输的数据远大过单个 logstash 能处理的能力,你可以运行多个 logstash 在多个端口,然后让 rsyslog 做轮训转发(事实上,单个 omfwd 本身的转发能力也有限,所以推荐这种做法):

  1. Ruleset( name="forwardRuleSet" ) {
  2. Action ( type="mmsequence" mode="instance" from="0" to="4" var="$.seq" )
  3. if $.seq == "0" then {
  4. action (type="omfwd" Target="127.0.0.1" Port="5140" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
  5. }
  6. if $.seq == "1" then {
  7. action (type="omfwd" Target="127.0.0.1" Port="5141" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
  8. }
  9. if $.seq == "2" then {
  10. action (type="omfwd" Target="127.0.0.1" Port="5142" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
  11. }
  12. if $.seq == "3" then {
  13. action (type="omfwd" Target="127.0.0.1" Port="5143" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
  14. }
  15. }

如果 rsyslog 仅是作为 shipper 角色运行,环境中有单独的消息队列可用,rsyslog 也有对应的 omkafka, omredis, omzmq 插件可用。

rsyslog v8 版的 mmexternal 模块

如果你使用的是 v8.4 及以上版本的 rsyslog,其中有一个新加入的 mmexternal 模块。该模块是在 v7 的 omprog 模块基础上发展出来的,可以让你使用任意脚本,接收标准输入,自行处理以后再输出回来,而 rsyslog 接收到这个输出再进行下一步处理,这就解决了前面提到的 “normalize 语法太简单”的问题!

下面是使用 rsyslog 的 mmexternal 和 omelasticsearch 完成 Nginx 访问日志直接解析存储的配置。

rsyslog 配置如下:

  1. module(load="imuxsock" SysSock.RateLimit.Interval="0")
  2. module(load="mmexternal")
  3. module(load="omelasticsearch")
  4. template(name="logstash-index" type="list") {
  5. constant(value="logstash-")
  6. property(name="timereported" dateFormat="rfc3339" position.from="1" position.to="4")
  7. constant(value=".")
  8. property(name="timereported" dateFormat="rfc3339" position.from="6" position.to="7")
  9. constant(value=".")
  10. property(name="timereported" dateFormat="rfc3339" position.from="9" position.to="10")
  11. }
  12. template( name="nginx-log" type="string" string="%msg%n" )
  13. if ( $syslogfacility-text == 'local6' and $programname startswith 'wb-www-access-' and not ($msg contains '/2/remind/unread_count' or $msg contains '/2/remind/group_unread') ) then
  14. {
  15. action( type="mmexternal" binary="/usr/local/bin/rsyslog-nginx-elasticsearch.py" interface.input="fulljson" forcesingleinstance="on" )
  16. action( type="omelasticsearch"
  17. template="nginx-log"
  18. server="eshost.example.com"
  19. bulkmode="on"
  20. dynSearchIndex="on"
  21. searchIndex="logstash-index"
  22. searchType="nginxaccess"
  23. queue.type="linkedlist"
  24. queue.size="50000"
  25. queue.dequeuebatchsize="5000"
  26. queue.dequeueslowdown="100000"
  27. )
  28. stop
  29. }

其中调用的 python 脚本示例如下(注意只是做示例,脚本中的 split 功能其实可以用 rsyslog 的 mmfields 插件完成):

  1. #! /usr/bin/python
  2. import sys
  3. import json
  4. import datetime
  5. def nginxLog(data):
  6. hostname = data['hostname']
  7. logline = data['msg']
  8. time_local, http_x_up_calling_line_id, request, http_user_agent, staTus, remote_addr, http_x_log_uid, http_referer, request_time, body_bytes_sent, http_x_forwarded_proto, http_x_forwarded_for, request_uid, http_host, http_cookie, upstream_response_time = logline.split('`')
  9. try:
  10. upstream_response_time = float(upstream_response_time)
  11. except:
  12. upstream_response_time = None
  13. method, uri, verb = request.split(' ')
  14. arg = {}
  15. try:
  16. url_path, url_args = uri.split('?')
  17. for args in url_args.split('&'):
  18. k, v = args.split('=')
  19. arg[k] = v
  20. except:
  21. url_path = uri
  22. # Why %z do not implement?
  23. ret = {
  24. "@timestamp": datetime.datetime.strptime(time_local, ' [%d/%b/%Y:%H:%M:%S +0800]').strftime('%FT%T+0800'),
  25. "host": hostname,
  26. "method": method.lstrip('"'),
  27. "url_path": url_path,
  28. "url_args": arg,
  29. "verb": verb.rstrip('"'),
  30. "http_x_up_calling_line_id": http_x_up_calling_line_id,
  31. "http_user_agent": http_user_agent,
  32. "status": int(staTus),
  33. "remote_addr": remote_addr.strip('[]'),
  34. "http_x_log_uid": http_x_log_uid,
  35. "http_referer": http_referer,
  36. "request_time": float(request_time),
  37. "body_bytes_sent": int(body_bytes_sent),
  38. "http_x_forwarded_proto": http_x_forwarded_proto,
  39. "http_x_forwarded_for": http_x_forwarded_for,
  40. "request_uid": request_uid,
  41. "http_host": http_host,
  42. "http_cookie": http_cookie,
  43. "upstream_response_time": upstream_response_time
  44. }
  45. return ret
  46. def onInit():
  47. """ Do everything that is needed to initialize processing
  48. """
  49. def onReceive(msg):
  50. data = json.loads(msg)
  51. ret = nginxLog(data)
  52. print json.dumps({'msg': ret})
  53. def onExit():
  54. """ Do everything that is needed to finish processing. This is being called immediately before exiting.
  55. """
  56. # most often, nothing to do here
  57. onInit()
  58. keepRunning = 1
  59. while keepRunning == 1:
  60. msg = sys.stdin.readline()
  61. if msg:
  62. msg = msg[:len(msg)-1]
  63. onReceive(msg)
  64. sys.stdout.flush()
  65. else:
  66. keepRunning = 0
  67. onExit()
  68. sys.stdout.flush()

注意输出的时候,顶层的 key 是不能变的,msg 还得叫 msg,如果是 hostname 还得叫 hostname ,等等。否则,rsyslog 会当做处理无效,直接传递原有数据内容给下一步。

慎用提示

mmexternal 是基于 direct mode 的,所以如果你发送的数据量较大时,rsyslog 并不会像 linkedlist mode 那样缓冲在磁盘队列上,而是持续 fork 出新的 mmexternal 程序,几千个进程后,你的服务器就挂了!!所以,务必开启 forcesingleinstance 选项。

rsyslog 的 mmgrok 模块

Rsyslog 8.15.0 开始,附带了 mmgrok 模块,系笔者贡献。利用该模块,可以将 Logstash 的 Grok 规则,运用在 Rsyslog 中。欢迎有兴趣的读者试用。