网络大数据分析 -- 使用 ElasticSearch + LogStash + Kibana 来可视化网络流量
简介
ELK 套装包括 ElasticSearch、LogStash 和 Kibana。
其中,
ElasticSearch 是一个数据搜索引擎(基于 Apache Lucene)+分布式 NoSQL 数据库;
LogStash 是一个消息采集转换器,类似 Syslog,可以接收包括日志消息在内的多种数据格式,然后进行格式转换,发送给后端继续处理;
Kibana 是一个 Web 前段,带有强大的数据分析、整理和可视化功能。
一般情况下,ELK 的工作流程为 原始数据 -> LogStash -> ElasticSearch -> Kibana。
网络流量信息采集协议常见包括 sFlow 和 NetFlow,两种协议都支持采集需要的网络信息,发送到指定的采集器(例如 netflow-analyzer、sFlowTrend 和 softflowd)。
其中,
sFlow 一般基于采样,
NetFlow 则可以基于所有网包信息获取更为精确的信息。
这里展示如何使用 ELK + NetFlow 来实时可视化网络流量,基于 sFlow 的操作是类似的,也给出了关键步骤。
安装 ELK
快速体验
觉得下面的本地安装过程太复杂的,可以直接从
https://github.com/yeasy/docker-compose-files/tree/master/elk_netflow 下载 docker-compose 模板,一键部署并启动。
$ sudo mkdir -p /opt/data/elasticsearch/
$ git clone https://github.com/yeasy/docker-compose-files.git && cd elk_netflow
$ docker-compose up
启动后,logstash 将监听本地 2055 端口过来的 netflow 消息;kibana 监听在 5601 端口;elasticsearch 监听在 9200 端口。
安装 ElasticSearch
步骤参考 http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/setup-repositories.html。可以选择源码编译或安装包安装。
安装和启动服务
以 CentOS 下使用安装包为例,首先添加证书。
$ rpm --import https://packages.elasticsearch.org/GPG-KEY-elasticsearch
添加源文件 /etc/yum.repos.d/elasticsearch.repo,内容为
[elasticsearch-1.4]
name=Elasticsearch repository for 1.4.x packages
baseurl=http://packages.elasticsearch.org/elasticsearch/1.4/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
执行安装命令
$ yum update; yum install -y elasticsearch
如果开启了 iptables,需要打开相关的端口。
-A INPUT -p tcp -m state --state NEW -m tcp -m multiport --dports 9200:9400 -j ACCEPT
-A INPUT -m pkttype --pkt-type multicast -j ACCEPT
启动 elasticsearch 服务,默认监听在 9200 端口。
$ /etc/init.d/elasticsearch restart
配置 elasticsearch 服务随系统自动启动
$ chkconfig --add elasticsearch
此时,查询本地 9200 端口,会获取类似下面的反馈信息,表示服务启动成功。
$ curl -XGet localhost:9200
{"status" : 200,"name" : "Thunderball","cluster_name" : "elasticsearch","version" : {"number" : "1.4.4","build_hash" : "c88f77ffc81301dfa9dfd81ca2232f09588bd512","build_timestamp" : "2015-02-19T13:05:36Z","build_snapshot" : false,"lucene_version" : "4.10.3"},"tagline" : "You Know, for Search"
}
配置文件在 /etc/elasticsearch/elasticsearch.yml,选项不是特别多,有详细的注释说明。
添加索引模板
为了让采集到的数据的类型能被 elasticsearch 正确处理,添加如下索引模板,自动对所有 logstash_netflow- 开头的索引采取指定的类型解析:
curl -XPUT http://localhost:9200/_template/logstash_netflow -d '{"template" : "logstash_netflow-*","settings": {"index.cache.field.type": "soft","index.store.compress.stored": true},"mappings" : {"_default_" : {"_all" : {"enabled" : false},"properties" : {"@message": { "index": "analyzed", "type": "string" },"@source": { "index": "not_analyzed", "type": "string" },"@source_host": { "index": "not_analyzed", "type": "string" },"@source_path": { "index": "not_analyzed", "type": "string" },"@tags": { "index": "not_analyzed", "type": "string" },"@timestamp": { "index": "not_analyzed", "type": "date" },"@type": { "index": "not_analyzed", "type": "string" },"netflow": {"dynamic": true,"path": "full","properties": {"version": { "index": "analyzed", "type": "integer" },"first_switched": { "index": "not_analyzed", "type": "date" },"last_switched": { "index": "not_analyzed", "type": "date" },"direction": { "index": "not_analyzed", "type": "integer" },"flowset_id": { "index": "not_analyzed", "type": "integer" },"flow_sampler_id": { "index": "not_analyzed", "type": "integer" },"flow_seq_num": { "index": "not_analyzed", "type": "long" },"src_tos": { "index": "not_analyzed", "type": "integer" },"tcp_flags": { "index": "not_analyzed", "type": "integer" },"protocol": { "index": "not_analyzed", "type": "integer" },"ipv4_next_hop": { "index": "analyzed", "type": "ip" },"in_bytes": { "index": "not_analyzed", "type": "long" },"in_pkts": { "index": "not_analyzed", "type": "long" },"out_bytes": { "index": "not_analyzed", "type": "long" },"out_pkts": { "index": "not_analyzed", "type": "long" },"input_snmp": { "index": "not_analyzed", "type": "long" },"output_snmp": { "index": "not_analyzed", "type": "long" },"ipv4_dst_addr": { "index": "analyzed", "type": "ip" },"ipv4_src_addr": { "index": "analyzed", "type": "ip" },"dst_mask": { "index": "analyzed", "type": "integer" },"src_mask": { "index": "analyzed", "type": "integer" },"dst_as": { "index": "analyzed", "type": "integer" },"src_as": { "index": "analyzed", "type": "integer" },"l4_dst_port": { "index": "not_analyzed", "type": "long" },"l4_src_port": { "index": "not_analyzed", "type": "long" }},"type": "object"}}}}
}'
安装插件
另外,还可以为 elasticsearch 安装插件,来查看集群状态、查看数据信息等。
head
sudo /usr/share/elasticsearch/bin/plugin -install mobz/elasticsearch-head
之后通过 http://localhost:9200/_plugin/head/ 查看。
bigdesk
sudo /usr/share/elasticsearch/bin/plugin -install lukas-vlcek/bigdesk
之后通过 http://localhost:9200/_plugin/bigdesk/ 查看。
kopf
sudo /usr/share/elasticsearch/bin/plugin -install lmenezes/elasticsearch-kopf
之后通过 http://localhost:9200/_plugin/kopf/ 查看。
安装 LogStash
从 http://www.elasticsearch.org/overview/logstash/download/ 下载源码包或安装包。
以 CentOS 下使用安装包为例:
$ yum install https://download.elasticsearch.org/logstash/logstash/packages/centos/logstash-1.4.2-1_2c0f5a1.noarch.rpm
默认安装到 /opt/logstash/ 目录下,配置文件在 /etc/logstash/ 目录下。
也可以通过添加 yum 源的方式。
$cat /etc/yum.repos.d/logstash.repo
[logstash-1.4]
name=logstash repository for 1.4.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.4/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
测试安装是否成功
$ /opt/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }'
此时,随意从控制台输入内容后回车,会作为一条 log,显示出来。
LogStash 支持配置文件,下面通过配置文件配置 LogStash 将收到的 NetFlow 消息进行格式转换后发送到本机的 ElasticSearch。
假定 NetFlow 消息会被采集器发送到本地的 2055 端口,将消息转换后发送给本地的 elastic search 主机,索引名称为 "logstash_netflow-%{+YYYY.MM.dd}"。
$ cat /etc/logstash/conf.d/netflow.conf
input {
udp {
port => 2055
type => netflow
codec => netflow {
definitions => "/opt/logstash/lib/logstash/codecs/netflow/netflow.yaml"
}
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
host => localhost
index => "logstash_netflow-%{+YYYY.MM.dd}"
}
}
之后重启 logstash 服务。
安装 Kibana
下载压缩包,并解压。
$ axel https://download.elasticsearch.org/kibana/kibana/kibana-4.0.0-beta3.tar.gz
$ tar xzvf kibana-4.0.0-beta3.tar.gz
$ cd kibana-4.0.0-beta3
修改 config/kibana.yml,指定 elasticsearch 所在地址,默认认为在本地的 9200 端口。
如果开启了 iptables,需要打开相关的端口。
-A INPUT -p tcp -m state --state NEW -m tcp -m multiport --dports 5601 -j ACCEPT
运行 kibana。
$ ./bin/kibana
启动成功后,会打印如下的信息
The Kibana Backend is starting up... be patient
{"@timestamp":"2015-01-08T13:54:43+08:00","level":"INFO","name":"Kibana","message":"Kibana server started on tcp://0.0.0.0:5601 in production mode."}
通过浏览器访问本地的 5601 端口,因为没有数据,所以这个时候看不到任何数据信息。如果有了采集到的数据后,选择logstash_netflow-* 格式的索引,会查询到所有的 NetFlow 数据信息。
配置数据采集
配置 OpenvSwitch
利用下面的命令,创建一个新的网桥 ovs-br,作为要进行抓包的网桥,也可以使用已有网桥。
$ ovs-vsctl add-br ovs-br
并配置一个 IP 地址给网桥内部接口,否则后面启动 Docker 服务会报错(检查避免与 Docker 默认网桥冲突)。
$ ifconfig ovs-br 172.17.42.1/24
下面可以选择使用 NetFlow 采集还是 sFlow 采集,需要对应调整 logstash 中的配置。
NetFlow
COLLECTOR_IP=127.0.0.1
COLLECTOR_PORT=2055
TIMEOUT=10
sudo ovs-vsctl -- set Bridge ovs-br netflow=@nf -- --id=@nf create NetFlow targets=\"${COLLECTOR_IP}:${COLLECTOR_PORT}\" active-timeout=${TIMEOUT}
或者,一条命令搞定,例如:
sudo ovs-vsctl -- set Bridge br-int netflow=@nf -- --id=@nf create NetFlow targets=\"9.186.100.101:2055\" active-timeout=10
清除 netflow 规则。
$ sudo ovs-vsctl clear Bridge ovsbr netflow
sFlow
$ COLLECTOR_IP=127.0.0.1
$ COLLECTOR_PORT=6343
$ AGENT=eth0
$ HEADER=128
$ SAMPLING=512
$ POLLING=10
其中,AGENT 是从本地的哪个网卡发出流量到采集器。
$ sudo ovs-vsctl -- \
--id=@sflow create sflow agent=${AGENT} target=\"${COLLECTOR_IP}:${COLLECTOR_PORT}\" header=${HEADER} \
sampling=${SAMPLING} polling=${POLLING} \
-- set bridge ovs-br sflow=@sflow
清除 sflow 规则。
$ sudo ovs-vsctl clear Bridge ovsbr sflow
ipfix
ovs−vsctl −− set Bridge br-int ipfix=@ipf −− −−id=@ipf create IPFIX targets=\"192.168.0.34:4739\" obs_domain_id=123 obs_point_id=456
使用 Docker 容器生成流量
这里可以使用 Docker 容器来模拟主机发送流量。
在启动 Docker 服务的时候,使用 -b BRIDGE 或 --bridge=BRIDGE 来指定使用的网桥。或者修改 Docker 配置文件来添加参数并重启服务。
Ubuntu 下配置文件为 /etc/default/docker,重启服务命令为 service docker restart。
CentOS 下配置文件为 /etc/sysconfig/docker,重启服务命令为 /etc/init.d/docker restart。
搜索带有 iperf 的镜像。
$ docker search iperf
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
moutten/iperf Dockerfile to setup an iperf server for ne... 0 [OK]
xeor/iperf-server 0 [OK]
gdanii/iperf Ubuntu 14.04 with iperf 0
chengping/iperf docker test with iperf 0
m2i3/iperf 0
kiratomato/iperf 0
brendanburns/iperf 0
mbennett/iperf_client for internal testing 0
mbennett/iperf_server 0
mbennett/iperfbase 0
mbennett/iperfserver 0
这里可以任意选择一个,例如下载 gdanii/iperf 镜像。
$ docker pull gdanii/iperf
启动两个容器,一个当作服务端(地址为 172.17.0.2)
$ iperf -s
一个当作客户端(地址为 172.17.0.3),对服务端发起请求。
$ iperf -c 172.17.0.2
这个时候刷新 kibana 页面,就可以看到收集到的每条 flow 的信息了。一条 NetFlow 的信息可能长成下面的样子。
{
"_index": "logstash_netflow-2015.04.28",
"_type": "logs",
"_id": "tnUt34s8QCyN-E1mc46wqQ",
"_score": 1,
"_source": {
"@version": "1",
"@timestamp": "2015-04-28T03:02:27.760Z",
"netflow": {
"version": "5",
"flow_seq_num": "190",
"engine_type": "9",
"engine_id": "9",
"sampling_algorithm": "0",
"sampling_interval": "0",
"flow_records": "2",
"ipv4_src_addr": "10.0.0.2",
"ipv4_dst_addr": "10.0.0.4",
"ipv4_next_hop": "0.0.0.0",
"input_snmp": "2",
"output_snmp": "3",
"in_pkts": "10",
"in_bytes": "980",
"first_switched": "2015-04-28T02:46:57.760Z",
"last_switched": "2015-04-28T03:02:27.759Z",
"l4_src_port": "0",
"l4_dst_port": "2048",
"tcp_flags": "0",
"protocol": "1",
"src_tos": "0",
"src_as": "0",
"dst_as": "0",
"src_mask": "0",
"dst_mask": "0"
},
"host": "9.186.100.88"
},
"fields": {
"netflow.last_switched": [
1430190147759
],
"@timestamp": [
1430190147760
],
"netflow.first_switched": [
1430189217760
]
}
}
这些信息作为记录被存放到了 elastic search 中。可以通过自定义可视化集合来实时监控各个主机的进出流量情况、流量中各种应用的占比情况等等。
也可以使用 kvm 启动两个 vm 挂载到 ovs-br 上进行测试或者利用本地主机作为客户端或服务端之一。这里不再赘述。
当然,ELK 的功能要远大于此,关键在于如何灵活运用,可以通过阅读官方文档进行学习。
常见可视化模板
下载 kibana 可视化模板的 json 文件,通过 kibana 界面直接导入即可看到效果。
如下图所示。
包括如下常见的面板:
查看整体带宽使用情况-时序柱状图
查看整体带宽中不同源端口的数量-时序区域图
查看整体带宽中不同目的端口的数量-时序区域图
查看整体带宽中不同源地址的数量-饼图
查看整体带宽中不同目的地址的数量-饼图
查看整体带宽中不同源端口的数量-饼图
查看整体带宽中不同目的端口的数量-饼图
参考
ElasticSearch:http://elasticsearch.org
LogStash:http://www.logstash.net/docs
http://www.areteix.net/blog/2013/08/network-flow-monitoring-with-open-vswitch/
http://blogs.cisco.com/security/step-by-step-setup-of-elk-for-netflow-analytics
netflow-analyzer: http://www.solarwinds.com/products/freetools/netflow-analyzer.aspx
sFlowTrend: http://www.inmon.com/products/sFlowTrend.php
softflowd: https://code.google.com/p/softflowd
————————————————
版权声明:本文为CSDN博主「yeasy」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/yeasy/article/details/45332493
网络大数据分析 -- 使用 ElasticSearch + LogStash + Kibana 来可视化网络流量相关推荐
- ELK(ElasticSearch+Logstash+ Kibana)搭建实时日志分析平台
来源:http://www.cnblogs.com/zclzhao/p/5749736.html 一.简介 ELK 由三部分组成elasticsearch.logstash.kibana,elasti ...
- 5W字穿透 ELK(史上最全):elasticsearch +logstash+kibana
本文 5w 字,帮忙大家 绞杀式.穿透式 掌握 elk 的原理和实操 文章很长,建议收藏起来慢慢读! 总目录 博客园版 为您奉上更多の珍贵的学习资源 有关本文的 脚本 和 代码,可以来 尼恩 发起的J ...
- Elasticsearch+logstash+kibana
ELK搜索高级课程 1. 课程简介 1.1 课程内容 ELK是包含但不限于Elasticsearch(简称es).Logstash.Kibana 三个开源软件的组成的一个整体.这三个软件合成ELK.是 ...
- 企业级日志分析系统ELK(Elasticsearch , Logstash, Kibana)
企业级日志分析系统ELK(Elasticsearch , Logstash, Kibana) 前言 一.ELK概述 1.ELK日志分析系统 2.ELK日志处理特点 3.Elasticsearch概述 ...
- 使用ELK(Elasticsearch + Logstash + Kibana) 搭建日志集中分析平台实践--转载
原文地址:https://wsgzao.github.io/post/elk/ 另外可以参考:https://www.digitalocean.com/community/tutorials/how- ...
- Elasticsearch + Logstash + Kibana 搭建日志集中分析平台实践
为什么80%的码农都做不了架构师?>>> 比较详细的搭建教程:https://segmentfault.com/a/1190000003689999 Elasticsearch ...
- ElasticSearch + Logstash + kibana
ElasticSearch + Logstash + kibana 1.介绍: Logstash:搬运工 ElasticSearch:搜索引擎 Kilbana:可视化系统 ElasticSearch是 ...
- Centos7下使用ELK(Elasticsearch + Logstash + Kibana)搭建日志集中分析平台
Centos7下使用ELK(Elasticsearch + Logstash + Kibana)搭建日志集中分析平台 日志监控和分析在保障业务稳定运行时,起到了很重要的作用,不过一般情况下日志都分散在 ...
- ELK6.0部署:Elasticsearch+Logstash+Kibana搭建分布式日志平台
一.前言 1.ELK简介 ELK是Elasticsearch+Logstash+Kibana的简称 ElasticSearch是一个基于Lucene的分布式全文搜索引擎,提供 RESTful API进 ...
最新文章
- 2016-2022年AutoCAD起重机吊装计划和索具图纸
- R | 对亚马逊新总部可能位置进行可视化
- ubuntu 上code blocks 建glut工程时问题
- visual studio 2017发布dotnet core到docker
- SQL datediff()函数 时间差
- mongodb的管理员和安全认证
- Intellij IDEA 代码格式化配置和快捷键
- hcie到底是个啥 ?
- Java编程学习指南(带学习经验)
- java程序员生日祝福语_给朋友过生日的祝福语
- #455 科技乱炖:看完《流浪地球2》,我们为行星发动机设计了网络架构
- gh-ost封装脚本
- SSM实现邮箱验证功能
- go语言使用thrift协议实现客户端和服务端报not enough arguments in call to oprot.WriteMessageBegin错误解决方案
- 一个公众号绑定多个商户支付
- Python学习-爬虫入门知识点整理
- php里怎么输入,PHP是怎么进行输入输出的
- ESP8266-NodeMCU (1) 开发板介绍
- CVPR2022新作:P图不会,深度学习来帮忙,基于GAN逆映射的图像编辑(中)
- 程序猿之--医保卡的正确使用
热门文章
- 画解算法:盛最多水的容器 | 腾讯面试编程50题(二)
- LPC11U3x系列IAP升级 BootLoader分析
- 2018百度校招、腾讯校招 面试经验
- MSI(微星)GP62mvr WIN10+ubuntu16.04双系统安装教程
- 计算机考研b区国家线,2020考研国家线:A区和B区分数线,到底有什么区别?
- android 检测手机是否安装了应用宝 app跳转应用宝
- 小米大BOSS雷军写Java代码水平如何?一起来扒一扒
- Hotspot 垃圾回收之oop_iterate(二) 源码解析
- 《CCNP ROUTE 300-101学习指南》——1.4节路由和TCP/IP操作
- 免费试用Windows Azure云平台(无须提供信用卡)