本节书摘来自华章出版社《ELK Stack权威指南 》一书中的第1章,第3节,作者饶琛琳,更多章节内容可以访问云栖社区“华章计算机”公众号查看。

场 景 示 例

前面虽然介绍了几十个Logstash插件的常见配置项,但是过多的选择下,如何组合使用这些插件,依然是一部分用户的难题。本章将列举一些最常见的日志场景,演示针对性的组件搭配,希望能给读者带来启发。

本章介绍的场景包括:Nginx访问日志、Nginx错误日志、Postfix日志、Ossec日志、Windows系统日志、Java日志、MySQL慢查询日志、Docker容器日志。

3.1 Nginx访问日志

访问日志处理分析绝对是使用ELK stack时最常见的需求。默认的处理方式下,性能和精确度都不够好。本节会列举对Nginx访问日志的几种不同处理方式,并阐明其优劣。

3.1.1 grok处理方式

Logstash默认自带了Apache标准日志的grok正则表达式:

COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{NOTSPACE:auth}\[%{HTTPDATE:

timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?

|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)

COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

对于Nginx标准日志格式,可以发现只是最后多了一个$http_x_forwarded_for变量。所以Nginx标准日志的grok正则定义是:

MAINNGINXLOG %{COMBINEDAPACHELOG} %{QS:x_forwarded_for}

自定义的日志格式,可以照此修改。

3.1.2 split处理方式

Nginx日志因为部分变量中内含空格,所以很多时候只能使用%{QS}正则来做分隔,性能和细度都不太好。如果能自定义一个比较少见的字符作为分隔符,那么处理起来就简单多了。假设定义的日志格式如下:

log_format main "$http_x_forwarded_for | $time_local | $request | $status |

$body_bytes_sent | "

"$request_body | $content_length | $http_referer | $http_user_agent | $nuid | "

"$http_cookie | $remote_addr | $hostname | $upstream_addr | $upstream_response_

time | $request_time";

实际日志如下:

117.136.9.248 | 08/Apr/2015:16:00:01 +0800 | POST /notice/newmessage?sign=cba4f614e05db285850cadc696fcdad0&token=JAGQ92Mjs3--gik_b_DsPIQHcyMKYGpD&did=b749736ac70f12df700b18cd6d051d5&osn=android&osv=4.0.4&appv=3.0.1&net=460-02-2g&longitude=120.393006&latitude=36.178329&ch=360&lp=1&ver=1&ts=1428479998151&im=869736012353958&sw=0&sh=0&la=zh-CN&lm=weixin&dt=vivoS11tHTTP/1.1| 200 | 132 | abcd-sign-v1://dd03c57f8cb6f1cef919ab5df66f2903f:d51asq5yslwnyz5t/{\x22type\x22:4,\x22uid\x22:7567306} | 89 | - | abcd/3.0.1, Android/4.0.4, vivo S11t | nuid=0C0A0A0A01E02455EA7CF47E02FD072C1428480001.157| - | 10.10.10.13 | bnx02.abcdprivate.com | 10.10.10.22:9999 | 0.022 | 0.022 59.50.44.53 | 08/Apr/2015:16:00:01 +0800 | POST /feed/pubList?appv=3.0.3&did=89da72550de488328e2aba5d97850e9f&dt=iPhone6%2C2&im=B48C21F3-487E-4071-9742-DC6D61710888&la=cn&latitude=0.000000&lm=weixin&longitude=0.000000&lp=-1.000000&net=0-0-wifi&osn=iOS&osv=8.1.3&sh=568.000000&sw=320.000000&token=7NobA7asg3Jb6n9o4ETdPXyNNiHwMs4J&ts=1428480001275 HTTP/1.1 | 200 | 983 | abcd-sign-v1://b398870a0b25b29aae65cd553addc43d:72214ee85d7cca22/{\x22nextkey\x22:\x22\x22,\x22uid\x22:\x2213062545\x22,\x22token\x22:\x227NobA7asg3Jb6n9o4ETdPXyNNiHwMs4J\x22}| 139 | - | Shopping/3.0.3 (iPhone; iOS 8.1.3; Scale/2.00) | nuid=0C0A0A0A81-DF2455017D548502E48E2E1428480001.154 | nuid=CgoKDFUk34GFVH0BLo7kAg== | 10.10.10.11 | bnx02.abcdprivate.com | 10.10.10.35:9999 | 0.025 | 0.026

然后还可以针对request做更细致的切分。比如URL参数部分。很明显,URL参数中的字段顺序是乱的。第一行问号之后的第一个字段是sign,第二行问号之后的第一个字段是appv。所以需要将字段进行切分,取出每个字段对应的值。官方自带grok满足不了要求,最终采用的Logstash配置如下:

filter {

ruby {

init =>"@kname =['http_x_forwarded_for','time_local','request','status',

'body_bytes_sent','request_body','content_length','http_referer','http_

user_agent','nuid','http_cookie','remote_addr','hostname','upstream_

addr','upstream_response_time','request_time']"

code => "

new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])

new_event.remove('@timestamp')

event.append(new_event)

"

}

}

if [request] {

ruby {

init =>"@kname = ['method','uri','verb']"

code => "

new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split(' '))])

new_event.remove('@timestamp')

event.append(new_event)

"

}

}

if [uri] {

ruby {

init =>"@kname = ['url_path','url_args']"

code => "

new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])

new_event.remove('@timestamp')

event.append(new_event)

"

}

}

kv {

prefix =>"url_"

source =>"url_args"

field_split =>"&"

remove_field => [ "url_args","uri","request" ]

}

}

}

mutate {

convert => [

"body_bytes_sent" , "integer",

"content_length", "integer",

"upstream_response_time", "float",

"request_time", "float"

]

}

date {

match => [ "time_local", "dd/MMM/yyyy:hh:mm:ss Z" ]

locale =>"en"

}

}

最终结果如下:

{

"message" =>"1.43.3.188 | 08/Apr/2015:16:00:01 +0800 | POST /search/sug

gest?appv=3.0.3&did=dfd5629d705d400795f698055806f01d&dt=iPhone7%2C2&im=

AC926907-27AA-4A10-9916-C5DC75F29399&la=cn&latitude=-33.903867&lm=

sina&longitude=151.208137&lp=-1.000000&net=0-0-wifi&osn=iOS&osv=8.1.3&sh=66

7.000000&sw=375.000000&token=_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO&ts=

1428480001567 HTTP/1.1 | 200 | 353 | abcd-sign-v1://a24b478486d3bb92ed89a-

901541b60a5:b23e9d2c14fe6755/{\\x22key\\x22:\\x22last\\x22,\\x22offset\\x22:

\\x220\\x22,\\x22token\\x22:\\x22_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO\\x22,

\\x22limit\\x22:\\x2220\\x22} | 148 | - | abcdShopping/3.0.3 (iPhone; iOS

8.1.3; Scale/2.00) | nuid=0B0A0A0A9A64AF54F97634640230944E1428480001.113

| nuid=CgoKC1SvZJpkNHb5TpQwAg== | 10.10.10.11 | bnx02.abcdprivate.com |

10.10.10.26:9999 | 0.070 | 0.071",

"@version" =>"1",

"@timestamp" =>"2015-04-08T08:00:01.000Z",

"type" =>"nginxapiaccess",

"host" =>"blog05.abcdprivate.com",

"path" =>"/home/nginx/logs/api.access.log",

"http_x_forwarded_for" =>"1.43.3.188",

"time_local" =>" 08/Apr/2015:16:00:01 +0800",

"status" =>"200",

"body_bytes_sent" => 353,

"request_body" =>"abcd-sign-v1://a24b478486d3bb92ed89a901541b60a5:b23e9d2c1

4fe6755/{\\x22key\\x22:\\x22last\\x22,\\x22offset\\x22:\\x220\\x22,\\x22token

\\x22:\\x22_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO\\x22,\\x22limit\\x22:\\x2220\\x22}",

"content_length" => 148,

"http_referer" =>"-",

"http_user_agent" =>"abcdShopping/3.0.3 (iPhone; iOS 8.1.3; Scale/2.00)",

"nuid" =>"nuid=0B0A0A0A9A64AF54F97634640230944E1428480001.113",

"http_cookie" =>"nuid=CgoKC1SvZJpkNHb5TpQwAg==",

"remote_addr" =>"10.10.10.11",

"hostname" =>"bnx02.abcdprivate.com",

"upstream_addr" =>"10.10.10.26:9999",

"upstream_response_time" => 0.070,

"request_time" => 0.071,

"method" =>"POST",

"verb" =>"HTTP/1.1",

"url_path" =>"/search/suggest",

"url_appv" =>"3.0.3",

"url_did" =>"dfd5629d705d400795f698055806f01d",

"url_dt" =>"iPhone7%2C2",

"url_im" =>"AC926907-27AA-4A10-9916-C5DC75F29399",

"url_la" =>"cn",

"url_latitude" =>"-33.903867",

"url_lm" =>"sina",

"url_longitude" =>"151.208137",

"url_lp" =>"-1.000000",

"url_net" =>"0-0-wifi",

"url_osn" =>"iOS",

"url_osv" =>"8.1.3",

"url_sh" =>"667.000000",

"url_sw" =>"375.000000",

"url_token" =>"_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO",

"url_ts" =>"1428480001567"

}

如果URL参数过多,可以不使用kv切分,或者预先定义成nested object后改成数组形式:

if [uri] {

ruby {

init =>"@kname = ['url_path','url_args']"

code => "

new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])

new_event.remove('@timestamp')

event.append(new_event)

"

}

if [url_args] {

ruby {

init => "@kname = ['key','value']"

code => "event.set('nested_args', event.get('url_args').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"

remove_field => [ "url_args","uri","request" ]

}

}

}

采用nested object的优化原理和nested object的使用方式,请阅读后面第11章中介绍Elasticsearch调优的内容。

3.1.3 JSON格式

自定义分隔符虽好,但是配置写起来毕竟复杂很多。其实对Logstash来说,Nginx日志还有另一种更简便的处理方式,就是自定义日志格式时,通过手工拼写直接输出成JSON 格式:

log_format json '{"@timestamp":"$time_iso8601",'

'"host":"$server_addr",'

'"clientip":"$remote_addr",'

'"size":$body_bytes_sent,'

'"responsetime":$request_time,'

'"upstreamtime":"$upstream_response_time",'

'"upstreamhost":"$upstream_addr",'

'"http_host":"$host",'

'"url":"$uri",'

'"xff":"$http_x_forwarded_for",'

'"referer":"$http_referer",'

'"agent":"$http_user_agent",'

'"status":"$status"}';

然后采用下面的Logstash配置即可:

input {

file {

path =>"/var/log/nginx/access.log"

codec => json

}

}

filter {

mutate {

split => [ "upstreamtime", "," ]

}

mutate {

convert => [ "upstreamtime", "float" ]

}

}

这里采用多个mutate插件,是因为upstreamtime可能有多个数值,所以先切割成数组以后,再分别转换成浮点型数值。而在mutate中,convert函数的执行优先级高于split函数,所以只能分开两步写。mutate内各函数的优先级顺序,之前2.3.8节有详细说明,读者可以返回去阅读。

3.1.4 syslog方式发送

Nginx从1.7版开始,加入了syslog支持,Tengine则更早。这样,我们可以通过syslog直接发送日志。Nginx上的配置如下:

access_log syslog:server=unix:/data0/rsyslog/nginx.sock locallog;

或者直接发送给远程Logstash机器:

access_log syslog:server=192.168.0.2:5140,facility=local6,tag=nginx-access,

severity=info logstashlog;

默认情况下,Nginx将使用local7.info等级,以nginx为标签发送数据。注意,采用syslog发送日志的时候,无法配置buffer=16k选项。

3.2 Nginx错误日志

Nginx错误日志是运维人员最常见但又极其容易忽略的日志类型之一。本节介绍对Nginx错误日志的处理方式,并推荐读者在性能优化中对此多加关注。Nginx错误日志既没有统一明确的分隔符,也没有特别方便的正则模式,但通过Logstash不同插件的组合,还是可以轻松进行数据处理的。

值得注意的是,Nginx错误日志中有一类数据是接收过大请求体时的报错,默认信息会把请求体的具体字节数记录下来。每次请求的字节数基本都是在变化的,这意味着常用的topN等聚合函数对该字段没有明显效果。所以,对此需要做一下特殊处理。

最后形成的Logstash配置如下所示:

filter {

grok {

match => { "message" =>"(?<datetime>\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d)

\[(?<errtype>\w+)\] \S+: \*\d+ (?<errmsg>[^,]+), (?<errinfo>.*)$" }

}

mutate {

rename => [ "host", "fromhost" ]

gsub => [ "errmsg", "too large body: \d+ bytes", "too large body" ]

}

if [errinfo]

{

ruby {

code => "

new_event = LogStash::Event.new(Hash[event.get('errinfo').split(', ').map{|l| l.split(': ')}])

new_event.remove('@timestamp')

event.append(new_event)""

"

}

}

grok {

match => { "request" => '"%{WORD:verb} %{URIPATH:urlpath}(?:\?%{NGX_

URIPARAM:urlparam})?(?: HTTP/%{NUMBER:httpversion})"' }

patterns_dir =>["/etc/logstash/patterns"]

remove_field => [ "message", "errinfo", "request" ]

}

}

经过以上Logstash配置的Nginx错误日志生成的事件如下所示:

{

"@version": "1",

"@timestamp": "2015-07-02T01:26:40.000Z",

"type": "nginx-error",

"errtype": "error",

"errmsg": "client intended to send too large body",

"fromhost": "web033.mweibo.yf.sinanode.com",

"client": "36.16.7.17",

"server": "api.v5.weibo.cn",

"host": "\"api.weibo.cn\"",

"verb": "POST",

"urlpath": "/2/client/addlog_batch",

"urlparam": "gsid=_2A254UNaSDeTxGeRI7FMX9CrEyj2IHXVZRG1arDV6PUJbrdANLROskWp9b

XakjUZM5792FW9A5S9EU4jxqQ..&wm=3333_2001&i=0c6f156&b=1&from=1053093010&c=

iphone&v_p=21&skin=default&v_f=1&s=8f14e573&lang=zh_CN&ua=iPhone7,1__weibo__

5.3.0__iphone__os8.3",

"httpversion": "1.1"

}

3.3 Postfix日志

Postfix是Linux平台上最常用的邮件服务器软件。邮件服务的运维复杂度一向较高,在此提供一个针对Postfix日志的解析处理方案。方案出自:https://github.com/whyscream/postfix-grok-patterns。

因为Postfix默认通过syslog方式输出日志,所以可以选择通过rsyslog直接转发给Logstash,也可以由Logstash读取rsyslog记录的文件。

Postfix会根据实际日志的不同,主动设置好不同的syslogtag,有anvil、bounce、cleanup、dnsblog、local、master、pickup、pipe、postdrop、postscreen、qmgr、scache、sendmail、smtp、lmtp、smtpd、tlsmgr、tlsproxy、trivial-rewrite和discard等20个不同的后缀,而在Logstash中,syslogtag通常被解析为program字段。本节以第一种anvil日志的处理配置作为示例:

input {

syslog { }

}

filter {

if [program] =~ /^postfix.*\/anvil$/ {

grok {

patterns_dir   =>["/etc/logstash/patterns.d"]

match          => [ "message", "%{POSTFIX_ANVIL}" ]

tag_on_failure => [ "_grok_postfix_anvil_nomatch" ]

add_tag        => [ "_grok_postfix_success" ]

}

}

mutate {

convert => [

"postfix_anvil_cache_size", "integer",

"postfix_anvil_conn_count", "integer",

"postfix_anvil_conn_rate", "integer",

]

}

}

配置中使用了一个叫POSTFIX_ANVIL的自定义grok正则,该正则及其相关正则内容如下所示。将这段grok正则保存成文本文件,放入/etc/logstash/patterns.d/目录即可使用。

POSTFIX_TIME_UNIT %{NUMBER}[smhd]

POSTFIX_ANVIL_CONN_RATE statistics: max connection rate %{NUMBER:postfix_anvil_conn_

rate}/%{POSTFIX_TIME_UNIT:postfix_anvil_conn_period} for \(%{DATA:postfix_

service}:%{IP:postfix_client_ip}\) at %{SYSLOGTIMESTAMP:postfix_anvil_

timestamp}

POSTFIX_ANVIL_CONN_CACHE statistics: max cache size %{NUMBER:postfix_anvil_

cache_size} at %{SYSLOGTIMESTAMP:postfix_anvil_timestamp}

POSTFIX_ANVIL_CONN_COUNT statistics: max connection count %{NUMBER:postfix_

anvil_conn_count} for \(%{DATA:postfix_service}:%{IP:postfix_client_ip}\) at

%{SYSLOGTIMESTAMP:postfix_anvil_timestamp}

POSTFIX_ANVIL %{POSTFIX_ANVIL_CONN_RATE}|%{POSTFIX_ANVIL_CONN_CACHE}|%{POSTFIX_

ANVIL_CONN_COUNT}

其余19种Postfix日志的完整grok正则和Logstash过滤配置,读者可以通过https://github.com/whyscream/postfix-grok-patterns获取。

3.4 Ossec日志

Ossec是一款开源的多平台入侵检测系统。将Ossec的监测报警信息转发到ELK中,无疑可以极大地帮助我们快速可视化安全事件。本节介绍Ossec与Logstash的结合方式。

3.4.1 配置所有Ossec agent采用syslog输出

配置步骤如下:

1)编辑ossec.conf文件(默认为/var/ossec/etc/ossec.conf)。

2)在ossec.conf中添加下列内容(10.0.0.1为接收syslog的服务器):

<syslog_output>

<server>10.0.0.1</server>

<port>9000</port>

<format>default</format>

</syslog_output>

3)开启Ossec允许syslog输出功能:

/var/ossec/bin/ossec-control enable client-syslog

4)重启Ossec服务:

/var/ossec/bin/ossec-control start

3.4.2 配置Logstash

在Logstash配置文件中增加(或新建)如下内容(假设10.0.0.1为Elasticsearch服务器):

input {

udp {

port => 9000

type =>"syslog"

}

}

filter {

if [type] == "syslog" {

grok {

match => { "message" =>"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:

syslog_host} %{DATA:syslog_program}: Alert Level: %{BASE10NUM:

Alert_Level}; Rule: %{BASE10NUM:Rule} - %{GREEDYDATA:Description};

Location: %{GREEDYDATA:Details}" }

add_field => [ "ossec_server", "%{host}" ]

}

mutate {

remove_field => [ "syslog_hostname", "syslog_message", "syslog_pid",

"message", "@version", "type", "host" ]

}

}

}

output {

elasticsearch {

}

}

3.4.3 推荐Kibana仪表盘

社区已经有人根据Ossec的常见需求制作了仪表盘,可以直接从Kibana 3页面加载使用,示例如图3-1所示。

仪表盘的JSON文件见:https://github.com/magenx/Logstash/raw/master/kibana/kibana_dash-board.json。

加载方式请阅读本书第三部分介绍的Kibana相关内容。

3.5 Windows系统日志

Logstash社区有众多的Windows用户,本节单独介绍一下对Windows平台系统日志的收集处理。之前介绍过Linux上的系统日志,即syslog的处理。事实上,对于Windows平台,也有类似syslog的设计,叫eventlog。本节介绍如何处理Windows eventlog。

3.5.1 采集端配置

由于Logstash作者出身Linux运维,早期版本中出了不少Windows平台上独有的bug。所以,目前对Windows上的日志,推荐大家在尝试Logstash的同时,也可以试用更稳定的nxlog软件。nxlog更详细的介绍,请阅读本书后面5.5节。

这里先介绍Logstash和nxlog在处理Windows的eventlog时的配置方法。

Logstash配置如下:

图3-1 Ossec仪表盘

input {

eventlog {

#logfile =>  ["Application", "Security", "System"]

logfile =>  ["Security"]

type =>"winevent"

tags => [ "caen" ]

}

}

nxlog配置中有如下几个要点:

1)ROOT位置必须是nxlog的实际安装路径。

2)输入模块,在Windows 2003及之前版本上,不叫im_msvistalog而叫im_mseventlog。

下面是一段完整的nxlog配置示例:

define ROOT C:\Program Files (x86)\nxlog

Moduledir %ROOT%\modules

CacheDir %ROOT%\data

Pidfile %ROOT%\data\nxlog.pid

SpoolDir %ROOT%\data

LogFile %ROOT%\data\nxlog.log

<Extension json>

Module  xm_json

</Extension>

<Input in>

Module  im_msvistalog

Exec    to_json();

</Input>

<Output out>

Module  om_tcp

Host    10.66.66.66

Port    5140

</Output>

<Route 1>

Path    in => out

</Route>

3.5.2 接收解析端配置

在中心的接收端,统一采用Logstash来完成解析入库操作。如果采集端也是Logstash,主要字段都已经生成,接收端配置也就没什么特别的了。如果采集端是nxlog,那么我们还需要把一些nxlog生成的字段转换成Logstash更通用的风格设计。

在之前插件介绍章节我们已经讲过,因为在Elasticsearch中默认按小写来检索,所以需要尽量把数据小写化。不巧的是,nxlog中,不单数据内容,字段名称也是大小写混用的,所以,我们只能通过logstash-filter-mutate的rename功能来完成对字段名称的小写化重命名。

配置示例如下:

input {

tcp {

codec =>"json"

port => 5140

tags => ["windows","nxlog"]

type =>"nxlog-json"

}

} # end input

filter {

if [type] == "nxlog-json" {

date {

match => ["[EventTime]", "YYYY-MM-dd HH:mm:ss"]

timezone =>"Europe/London"

}

mutate {

rename => [ "AccountName", "user" ]

rename => [ "AccountType", "[eventlog][account_type]" ]

rename => [ "ActivityId", "[eventlog][activity_id]" ]

rename => [ "Address", "ip6" ]

rename => [ "ApplicationPath", "[eventlog][application_path]" ]

rename => [ "AuthenticationPackageName", "[eventlog][authentication_package_name]" ]

rename => [ "Category", "[eventlog][category]" ]

rename => [ "Channel", "[eventlog][channel]" ]

rename => [ "Domain", "domain" ]

rename => [ "EventID", "[eventlog][event_id]" ]

rename => [ "EventType", "[eventlog][event_type]" ]

rename => [ "File", "[eventlog][file_path]" ]

rename => [ "Guid", "[eventlog][guid]" ]

rename => [ "Hostname", "hostname" ]

rename => [ "Interface", "[eventlog][interface]" ]

rename => [ "InterfaceGuid", "[eventlog][interface_guid]" ]

rename => [ "InterfaceName", "[eventlog][interface_name]" ]

rename => [ "IpAddress", "ip" ]

rename => [ "IpPort", "port" ]

rename => [ "Key", "[eventlog][key]" ]

rename => [ "LogonGuid", "[eventlog][logon_guid]" ]

rename => [ "Message", "message" ]

rename => [ "ModifyingUser", "[eventlog][modifying_user]" ]

rename => [ "NewProfile", "[eventlog][new_profile]" ]

rename => [ "OldProfile", "[eventlog][old_profile]" ]

rename => [ "Port", "port" ]

rename => [ "PrivilegeList", "[eventlog][privilege_list]" ]

rename => [ "ProcessID", "pid" ]

rename => [ "ProcessName", "[eventlog][process_name]" ]

rename => [ "ProviderGuid", "[eventlog][provider_guid]" ]

rename => [ "ReasonCode", "[eventlog][reason_code]" ]

rename => [ "RecordNumber", "[eventlog][record_number]" ]

rename => [ "ScenarioId", "[eventlog][scenario_id]" ]

rename => [ "Severity", "level" ]

rename => [ "SeverityValue", "[eventlog][severity_code]" ]

rename => [ "SourceModuleName", "nxlog_input" ]

rename => [ "SourceName", "[eventlog][program]" ]

rename => [ "SubjectDomainName", "[eventlog][subject_domain_name]" ]

rename => [ "SubjectLogonId", "[eventlog][subject_logonid]" ]

rename => [ "SubjectUserName", "[eventlog][subject_user_name]" ]

rename => [ "SubjectUserSid", "[eventlog][subject_user_sid]" ]

rename => [ "System", "[eventlog][system]" ]

rename => [ "TargetDomainName", "[eventlog][target_domain_name]" ]

rename => [ "TargetLogonId", "[eventlog][target_logonid]" ]

rename => [ "TargetUserName", "[eventlog][target_user_name]" ]

rename => [ "TargetUserSid", "[eventlog][target_user_sid]" ]

rename => [ "ThreadID", "thread" ]

}

mutate {

remove_field => [

"CurrentOrNextState","Description","EventReceivedTime","EventTime","EventTimeWritten","IPVersion","KeyLength","Keywords","LmPackageName","LogonProcessName","LogonType","Name","O-pcode","OpcodeValue","PolicyProcessingMode","Protocol","Prot-ocolType","SourceModuleType","State","Task","TransmittedSe-  rvices","Type","UserID","Version"

]

}

}

}

3.6 Java日志

之前在2.2节有关codec的介绍中曾经提到过,对Java日志,除了使用multiline做多行日志合并以外,还可以直接通过Log4J写入logstash里。本节就讲述如何在Java应用环境做到这点。

3.6.1 Log4J配置

首先,需要配置Java应用的Log4J设置,启动一个内置的SocketAppender。修改应用的log4j.xml配置文件,添加如下配置段:

<appender name="LOGSTASH" class="org.apache.log4j.net.SocketAppender">

<param name="RemoteHost" value="logstash_hostname" />

<param name="ReconnectionDelay" value="60000" />

<param name="LocationInfo" value="true" />

<param name="Threshold" value="DEBUG" />

</appender>

然后把这个新定义的appender对象加入root logger里,可以跟其他已有logger共存:

<root>

<level value="INFO"/>

<appender-ref ref="OTHERPLACE"/>

<appender-ref ref="LOGSTASH"/>

</root>

如果是log4j.properties配置文件,则对应配置如下:

log4j.rootLogger=DEBUG, logstash

###SocketAppender###

log4j.appender.logstash=org.apache.log4j.net.SocketAppender

log4j.appender.logstash.Port=4560

log4j.appender.logstash.RemoteHost=logstash_hostname

log4j.appender.logstash.ReconnectionDelay=60000

log4j.appender.logstash.LocationInfo=true

Log4J会持续尝试连接你配置的logstash_hostname这个地址,建立连接后,即开始发送日志数据。

3.6.2 Logstash配置

Java应用端的配置完成以后,开始设置Logstash的接收端。配置如下所示,其中4560端口是Log4J SocketAppender的默认对端端口:

input {

log4j {

type =>"log4j-json"

port => 4560

}

}

3.6.3 异常堆栈测试验证

运行Logstash后,编写一个简单的Log4J程序:

import org.apache.log4j.Logger;

public class HelloExample{

final static Logger logger = Logger.getLogger(HelloExample.class);

public static void main(String[] args) {

HelloExample obj = new HelloExample();

try{

obj.divide();

}catch(ArithmeticException ex){

logger.error("Sorry, something wrong!", ex);

}

}

private void divide(){

int i = 10 /0;

}

}

编译运行:

# javac -cp ./logstash-1.5.0.rc2/vendor/bundle/jruby/1.9/gems/logstash-input-

log4j-0.1.3-java/lib/log4j/log4j/1.2.17/log4j-1.2.17.jar HelloExample.java

# java -cp .:./logstash-1.5.0.rc2/vendor/bundle/jruby/1.9/gems/logstash-input-

log4j-0.1.3-java/lib/log4j/log4j/1.2.17/log4j-1.2.17.jar HelloExample

这样即可在Logstash的终端输出看到如下事件记录:

{

"message" =>"Sorry, something wrong!",

"@version" =>"1",

"@timestamp" =>"2015-07-02T13:24:45.727Z",

"type" =>"log4j-json",

"host" =>"127.0.0.1:52420",

"path" =>"HelloExample",

"priority" =>"ERROR",

"logger_name" =>"HelloExample",

"thread" =>"main",

"class" =>"HelloExample",

"file" =>"HelloExample.java:9",

"method" =>"main",

"stack_trace" =>"java.lang.ArithmeticException: / by zero\n\tat HelloExample.

divide(HelloExample.java:13)\n\tat HelloExample.main(HelloExample.java:7)"

}

可以看到,异常堆栈直接记录在单行内了。

3.6.4 JSON Event layout

如果无法采用SocketAppender,必须使用文件方式的,其实Log4J有一个layout特性,用来控制日志输出的格式。和Nginx日志自己拼接JSON输出类似,也可以通过layout功能记录成JSON格式。

Logstash官方提供了扩展包,可以通过mvnrepository.com搜索下载:

# wget http://central.maven.org/maven2/net/logstash/log4j/jsonevent-layout/1.7/

jsonevent-layout-1.7.jar

或者直接编辑自己项目的pom.xml添加依赖:

<dependency>

<groupId>net.logstash.log4j</groupId>

<artifactId>jsonevent-layout</artifactId>

<version>1.7</version>

</dependency>

然后修改项目的log4j.properties文件如下:

log4j.rootCategory=WARN, RollingLog

log4j.appender.RollingLog=org.apache.log4j.DailyRollingFileAppender

log4j.appender.RollingLog.Threshold=TRACE

log4j.appender.RollingLog.File=api.log

log4j.appender.RollingLog.DatePattern=.yyyy-MM-dd

log4j.appender.RollingLog.layout=net.logstash.log4j.JSONEventLayoutV1

如果是log4j.xml,则修改如下:

<appender name="Console" class="org.apache.log4j.ConsoleAppender">

<param name="Threshold" value="TRACE" />

<layout class="net.logstash.log4j.JSONEventLayoutV1" />

</appender>

生成的文件就是符合Logstash标准的JSON格式了,Logstash使用下面配置读取:

input {

file {

codec => json

path => ["/path/to/log4j.log"]

}

}

生成的Logstash事件如下:

{

"mdc":{},

"line_number":"29",

"class":"org.eclipse.jetty.examples.logging.EchoFormServlet",

"@version":1,

"source_host":"jvstratusmbp.local",

"thread_name":"qtp513694835-14",

"message":"Got request from 0:0:0:0:0:0:0:1%0 using Mozilla\/5.0 (Macintosh;Intel Mac OS X 10_9_1) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/32.0.1700.77 Safari\/537.36",

"@timestamp":"2014-01-27T19:52:35.738Z",

"level":"INFO",

"file":"EchoFormServlet.java",

"method":"doPost",

"logger_name":"org.eclipse.jetty.examples.logging.EchoFormServlet"

}

可以看到,同样达到了效果。

如果你使用的不是Log4J而是logback项目来记录Java日志,Logstash官方也有类似的扩展包,在pom.xml中改成如下定义即可:

<dependency>

<groupId>net.logstash.logback</groupId>

<artifactId>logstash-logback-encoder</artifactId>

<version>4.4</version>

</dependency>

3.7 MySQL慢查询日志

MySQL有多种日志可以记录,常见的有error log、slow log、general log、bin log等。其中slow log作为性能监控和优化的入手点,最为首要。本节即讨论如何用Logstash处理slow log。至于general log,格式处理基本类似,不过由于general量级比slow大得多,推荐采用packetbeat协议解析的方式更高效地完成这项工作,相关内容阅读本书稍后8.3节。

MySQL slow log 的Logstash处理配置示例如下:

input {

file {

type =>"mysql-slow"

path =>"/var/log/mysql/mysql-slow.log"

codec => multiline {

pattern =>"^# User@Host:"

negate => true

what =>"previous"

}

}

}

filter {

# drop sleep events

grok {

match => { "message" =>"SELECT SLEEP" }

add_tag => [ "sleep_drop" ]

tag_on_failure => [] # prevent default _grokparsefailure tag on real records

}

if "sleep_drop" in [tags] {

drop {}

}

grok {

match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clien-thost>\S*) )?\[(?:%{IP:clientip})?\]\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n# Time:.*$" ]

}

date {

match => [ "timestamp", "UNIX" ]

remove_field => [ "timestamp" ]

}

}

配置中,利用了grok插件的add_tag选项仅在成功时添加,而tag_on_failure选项仅在失败时添加的互斥特性,巧妙地过滤出日志中无用的sleep语句删除掉。

如下一段多行的MySQL slow log:

# User@Host: logstash[logstash] @ localhost [127.0.0.1]

# Query_time: 5.310431  Lock_time: 0.029219 Rows_sent: 1  Rows_examined: 24575727

SET timestamp=1393963146;

select count(*) from node join variable order by rand();

# Time: 140304 19:59:14

通过运行上面的配置,Logstash即可处理成如下单个事件:

{

"@timestamp" =>"2014-03-04T19:59:06.000Z",

"message" =>"# User@Host: logstash[logstash] @ localhost [127.0.0.1]\n# Query_

time: 5.310431  Lock_time: 0.029219 Rows_sent: 1  Rows_examined: 24575727\nSET

timestamp=1393963146;\nselect count(*) from node join variable order by rand();

\n# Time: 140304 19:59:14",

"@version" =>"1",

"tags" => [

[0] "multiline"

],

"type" =>"mysql-slow",

"host" =>"raochenlindeMacBook-Air.local",

"path" =>"/var/log/mysql/mysql-slow.log",

"user" =>"logstash",

"clienthost" =>"localhost",

"clientip" =>"127.0.0.1",

"query_time" => 5.310431,

"lock_time" => 0.029219,

"rows_sent" => 1,

"rows_examined" => 24575727,

"query" =>"select count(*) from node join variable order by rand();",

"action" =>"select"

}

后续即可针对其中的action、query_time、lock_time和rows_examined字段做监控报警及Kibana可视化统计了。

3.8 Docker日志

Docker是目前大规模互联网基础架构解决方案中最热门的技术。它带给运维工程师一个截然不同的思考角度和工作方式。

就日志层面看,Docker最大的影响在于:其最佳实践要求一个容器内部只有一个生命周期随时可以消亡的服务进程。这也就意味着:传统的写入磁盘,固定采集方式的日志系统,无法正常发挥作用。所以,在容器服务中,记录日志需要采用另外的方式。本节将介绍其中最常见的两种:记录到主机磁盘,或通过logspout收集。

3.8.1 记录到主机磁盘

默认情况下,Docker会将容器的标准输出和错误输出,保存在主机的/var/lib/docker/containers/目录下。所以,在规模比较稳定的情况下,直接记录到主机磁盘,然后通过主机上的Logstash收集日志,也是不错的方案。

以Nginx为例,将Nginx访问日志和错误日志输出到标准输出的配置如下:

daemon off;

error_log /dev/stdout info;

http {

access_log /dev/stdout;

...

}

不过,容器的特殊性在这里又一次体现出来,容器中其实是没有/dev/stdout设备的。所以我们需要自己单独处理一下,在Dockerflie里加上一句:

RUN ln -sf /proc/self/fd /dev/

这样,既保证了nginx.conf是主机和容器通用的配置,又顺利达到目的。

然后通过如下Logstash配置收集即可:

input {

file {

path => ["/var/lib/docker/containers/*/*-json.log"]

codec => json

}

}

filter {

grok {

match => ["path"。"/(?<container_id>\w+)-json.log" ]

remove_field => ["path"]

}

date {

match => ["time", "ISO8601"]

}

}

3.8.2 通过logspout收集

logspout是Docker生态圈中最有名的日志收集方式,其设计思路是:每个主机上启动一个单独容器运行logspout服务,负责将同一个主机上其他容器的日志,根据route设定,转发给不同的接收端。

logspout的基本用法如下:

$ docker pull gliderlabs/logspout:latest

$ docker run --name="logspout" \

--volume=/var/run/docker.sock:/tmp/docker.sock \

gliderlabs/logspout \

--publish=127.0.0.1:8000:80

syslog://remoteaddr:514

此外,logspout提供动态变更route的方式,如下所示:

# curl $(docker port `docker ps -lq` 8000)/routes \

-X POST \

-d '{"source": {"filter_name": "*_db", "types": ["stderr"]}, "target":

{"type": "syslog", "addr": "remoteaddr2:5140"}}'

这个配置的意思是,将容器名带有db字样的走错误输出的采集的日志,以syslog协议发送到remoteaddr2主机的5140端口。

注意,logspout采用的是RFC5424版本的syslog协议,所以如果使用的接收方是RFC3164版本的syslog协议解析,需要自己调整一下。比如logstash-input-syslog采用的就是RFC3164协议,所以需要自己来另外完成:

input {

tcp {

port => 5140

}

}

filter {

grok {

match => [“message”, “<SYSLOG5424PRI:syslog_pri> %{SYSLOG5424LINE:message}“]

}

}

此外,logspout支持模块化扩展,所以,我们也可以直接在logspout上处理成对Logstash更友好的格式。扩展logspout支持Logstash格式的方法如下:

1)编辑Dockerfile,修改成如下内容:

FROM gliderlabs/logspout:master

ENV ROUTE_URIS=logstash://host:port

2)编辑modules.go,修改成如下内容:

package main

import (

_ "github.com/looplab/logspout-logstash"

_ "github.com/gliderlabs/logspout/transports/udp"

)

3)构建镜像:

docker build

这样,后续Logstash就直接进行JSON解析即可。

《ELK Stack权威指南 》第3章 场景示例相关推荐

  1. 《ELK Stack权威指南 》第1章 入门示例

    本节书摘来自华章出版社<ELK Stack权威指南 >一书中的第1章,第1节,作者饶琛琳,更多章节内容可以访问云栖社区"华章计算机"公众号查看. 入 门 示 例 什么是 ...

  2. 《ELK Stack权威指南(第2版)》一3.8 Docker日志

    本节书摘来自华章出版社<ELK Stack权威指南(第2版)>一书中的第3章,第3.8节,作者 饶琛琳  更多章节内容可以访问云栖社区"华章计算机"公众号查看. 3.8 ...

  3. ELK学习5_ELK文档资料:《ELK stack 权威指南/饶琛琳》推荐

    <ELK stack 权威指南/饶琛琳>的内容在网上共享有电子版,网站地址是: http://kibana.logstash.es/ . 作者也提供了电子版的下载,下载描述如下: You ...

  4. 《ELK Stack权威指南(第2版)》一3.5 Windows系统日志

    本节书摘来自华章出版社<ELK Stack权威指南(第2版)>一书中的第3章,第3.5节,作者 饶琛琳  更多章节内容可以访问云栖社区"华章计算机"公众号查看. 3.5 ...

  5. ELK学习7_ELK文档资料:《ELK stack 权威指南/饶琛琳》勘误

    新作<ELK stack权威指南>已经上架销售,考虑到个人精力有限,笔误.差错肯定在所难免.特开这个页面作为勘误清 单,也包括一些书中技术经过重大更新不在适用或推荐的简略说明. 如果大家有 ...

  6. JavaScript权威指南 第11章JavaScript标准库

    JavaScript权威指南 第11章JavaScript标准库 第11章 JavaScript标准库 11.1 集合与映射 11.1.1 Set类 11.1.2 Map类 11.1.3 WeakMa ...

  7. 《Hadoop权威指南》第二章 关于MapReduce

    <Hadoop权威指南>第二章 关于MapReduce 目录 使用Hadoop来数据分析 横向扩展 注:<Hadoop权威指南>重点学习摘要笔记 1. 使用Hadoop来数据分 ...

  8. HTML5 权威指南第 10 章 文档分节 学习笔记

    HTML5 权威指南第 10 章 文档分节 学习笔记 第 8 章 标记文字 内容从从文字出发,专注如何将单体内容正确的呈现出来:第 9 章 组织内容 内容从段落出发,专注如何将单体内容合理的放在段落中 ...

  9. JavaScript权威指南 第15章 网络编程 第三部分

    JavaScript权威指南 第15章 网络编程 第三部分 可伸缩矢量图形 15.7.1 在HTML中使用SVG 15.7.2 编程操作SVG 15.7.3 通过JavaScript创建SVG图片 1 ...

最新文章

  1. C#基础概念二十五问
  2. 服务器json文件怎么创建对象,如何从json文件(或xml文件)创建vb.net对象类
  3. 一张图看懂小程序全生态
  4. 异常-throws的方式处理异常
  5. time zone issue in text processing
  6. 大数据与Hadoop
  7. ASP.NETLinkButton的Click事件中获取CommandArgument的值
  8. intel 酷睿core系列cpu的类型:U M H HQ MQ
  9. 扒一扒AI的那些事儿
  10. c语言的数组长度问题
  11. [译]用javascript实现一门编程语言-词法分析
  12. Coolite中的ComboBox控件示例
  13. 2022年华数杯数学建模
  14. 计算机上e盘拒绝访问,e盘拒绝访问怎么办,教你win7系统e盘拒绝访问的应对办法...
  15. java基础教程推荐_推荐Java入门视频教程
  16. Machine Learning in Action -- AdaBoost
  17. python 的 *args和 **kwargs 分别是什么意思
  18. PreScan里动力学模型的2D和3D的区别
  19. springboot整合redis,redisTemplate 空指针
  20. tx2使用teamviewer远程桌面访问

热门文章

  1. www服务器把信息组成,www服务器究竟是什么
  2. 淘宝迪士尼抢购脚本 无需修改代码 只需要修改抢购时间 附上安装教程
  3. 2023开学季哪款电容笔值得买?高品质电容笔品牌推荐
  4. 打字标准 / 打字键位 / 打字速度提升
  5. Internet技巧两则
  6. Python:字符 - ASCII 码相互转换
  7. 我们的节日| 包汤圆、猜灯谜,喜迎元宵佳节
  8. RGB-D深度图像介绍RGBD
  9. sha256为什么不可逆,sha256的安全性如何
  10. 小学老师工资多少一个月_小学老师一个月多少工资呢?揭开真实收入,难怪很多人都去当老师...