在这篇文章中,我们将看到如何从 Elasticsearch 索引和 Kibana 的 CSV 报告中导出数据 - post-url 到 pandas 数据帧。 数据的可视化可以在 Kibana 中完成,但如果你想对数据进行更精细的分析并创建更动态的可视化,将数据导出到 pandas dataframe 将是一个不错的选择。

在如下的演示中,我将使用 Elastic Stack 8.5.3 来进行展示。

安装

为了说明问题的方便,我们可以选择只有基本安全的 Elastic Stack 安装。我们可以参考之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单” 中的 “如何配置 Elasticsearch 只带有基本安全” 章节。针对我们的安装,我们配置 Elasticsearch 的超级用户 elastic 的密码为 password。你也可以参考另外一篇文章 “Elasticsearch:如何在 Docker 上运行 Elasticsearch 8.x 进行本地开发” 进行安装。

准备数据

我们选用 Kibana 中自带的数据来进行展示。我们打开 Kibana:

这样就有一个叫做 kibana_sample_data_logs 的索引在 Elasticsearch 中被创造。我们在 Discover 中打开:

如上所示,我们可以看到数据的时序直方图。我们选择合适的时间区域,然后添加一个 filter:

如上所示,我们仅做了一个很简单的表格。第一行是 timestamp,而第二行是 geo.dest。我们在搜索中创建了一个 geo.src 为 US 的过滤器。我们保存当前的搜索:

如上所示,我们有两种方法可以得到一个 CSV 格式的输出。一中是使用 Generate CSV 按钮。点击它后,我们可以在如下的地址下载相应的 CSV 文件。

我们可以看到如上所示 CSV 输出。另外一种方式是使用 POST URL 来通过软件的方式来获得这个数据。我们在上面的图中选择 Copy POST URL。我们可以得到如下所示的一个链接:

http://localhost:5601/api/reporting/generate/csv_searchsource?jobParams=%28browserTimezone%3AAsia%2FShanghai%2Ccolumns%3A%21%28timestamp%2Cgeo.dest%29%2CobjectType%3Asearch%2CsearchSource%3A%28fields%3A%21%28%28field%3Atimestamp%2Cinclude_unmapped%3Atrue%29%2C%28field%3Ageo.dest%2Cinclude_unmapped%3Atrue%29%29%2Cfilter%3A%21%28%28meta%3A%28field%3Atimestamp%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparams%3A%28%29%29%2Cquery%3A%28range%3A%28timestamp%3A%28format%3Astrict_date_optional_time%2Cgte%3Anow-7d%2Fd%2Clte%3Anow%29%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparent%3A%28filter%3A%21%28%28%27%24state%27%3A%28store%3AappState%29%2Cmeta%3A%28alias%3A%21n%2Cdisabled%3A%21f%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Ckey%3Ageo.src%2Cnegate%3A%21f%2Cparams%3A%28query%3AUS%29%2Ctype%3Aphrase%29%2Cquery%3A%28match_phrase%3A%28geo.src%3AUS%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cquery%3A%28language%3Akuery%2Cquery%3A%27%27%29%29%2Csort%3A%21%28%28timestamp%3Adesc%29%29%2CtrackTotalHits%3A%21t%29%2Ctitle%3Asrc-US%2Cversion%3A%278.5.3%27%29

方法一:从 Kibana 中获取数据

我们把上面的链接复制并粘贴到如下的代码中:

kibana-to-pandas.py

import pandas as pd
import requests
from requests.auth import HTTPBasicAuth
from io import StringIO
import json
import timekibana_ip = "0.0.0.0"headers = {"kbn-xsrf": "reporting"}#  post_url = 'http://' + kibana_ip + \# '/api/reporting/generate/csv?jobParams=(conflictedTypesFields:!(),fields:!(xxxxx))'post_url = "http://localhost:5601/api/reporting/generate/csv_searchsource?jobParams=%28browserTimezone%3AAsia%2FShanghai%2Ccolumns%3A%21%28timestamp%2Cgeo.dest%29%2CobjectType%3Asearch%2CsearchSource%3A%28fields%3A%21%28%28field%3Atimestamp%2Cinclude_unmapped%3Atrue%29%2C%28field%3Ageo.dest%2Cinclude_unmapped%3Atrue%29%29%2Cfilter%3A%21%28%28meta%3A%28field%3Atimestamp%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparams%3A%28%29%29%2Cquery%3A%28range%3A%28timestamp%3A%28format%3Astrict_date_optional_time%2Cgte%3Anow-7d%2Fd%2Clte%3Anow%29%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparent%3A%28filter%3A%21%28%28%27%24state%27%3A%28store%3AappState%29%2Cmeta%3A%28alias%3A%21n%2Cdisabled%3A%21f%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Ckey%3Ageo.src%2Cnegate%3A%21f%2Cparams%3A%28query%3AUS%29%2Ctype%3Aphrase%29%2Cquery%3A%28match_phrase%3A%28geo.src%3AUS%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cquery%3A%28language%3Akuery%2Cquery%3A%27%27%29%29%2Csort%3A%21%28%28timestamp%3Adesc%29%29%2CtrackTotalHits%3A%21t%29%2Ctitle%3Asrc-US%2Cversion%3A%278.5.3%27%29"# print(post_url)post_url_data = requests.post(post_url, \auth = HTTPBasicAuth('elastic', 'password'), \headers = headers)get_api_json = json.loads(post_url_data.text)print(get_api_json)time.sleep(10)print(get_api_json['path'])api_url = "http://" + "localhost:5601" + get_api_json['path']
print(api_url)csv_url = requests.get(api_url, \auth = HTTPBasicAuth('elastic', 'password'), \headers = headers)print(csv_url)traffic_data = pd.read_csv(StringIO(csv_url.text))print(traffic_data.head())
print(traffic_data)

在上面的代码中,特别需要注意的是:

time.sleep(10)

我们在发送完请求后,需要等待一定的时间让 Kibana 做相应的处理,并得到相应的数据。在上面,我使用了超级用户 elastic 的账号信息。运行上面的代码:

$ pwd
/Users/liuxg/python/pandas
$ ls
kibana-to-pandas.py
$ python kibana-to-pandas.py
{'path': '/api/reporting/jobs/download/ldfcx8kq1k9b9c2bfe5reij0', 'job': {'id': 'ldfcx8kq1k9b9c2bfe5reij0', 'index': '.reporting-2023-01-22', 'jobtype': 'csv_searchsource', 'created_at': '2023-01-28T02:51:55.178Z', 'created_by': 'elastic', 'meta': {'objectType': 'search'}, 'status': 'pending', 'attempts': 0, 'migration_version': '7.14.0', 'payload': {'browserTimezone': 'Asia/Shanghai', 'columns': ['timestamp', 'geo.dest'], 'objectType': 'search', 'searchSource': {'fields': [{'field': 'timestamp', 'include_unmapped': 'true'}, {'field': 'geo.dest', 'include_unmapped': 'true'}], 'filter': [{'meta': {'field': 'timestamp', 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'params': {}}, 'query': {'range': {'timestamp': {'format': 'strict_date_optional_time', 'gte': 'now-7d/d', 'lte': 'now'}}}}], 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'parent': {'filter': [{'$state': {'store': 'appState'}, 'meta': {'alias': None, 'disabled': False, 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'key': 'geo.src', 'negate': False, 'params': {'query': 'US'}, 'type': 'phrase'}, 'query': {'match_phrase': {'geo.src': 'US'}}}], 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'query': {'language': 'kuery', 'query': ''}}, 'sort': [{'timestamp': 'desc'}], 'trackTotalHits': True}, 'title': 'src-US', 'version': '8.5.3'}, 'output': {}}}
/api/reporting/jobs/download/ldfcx8kq1k9b9c2bfe5reij0
http://localhost:5601/api/reporting/jobs/download/ldfcx8kq1k9b9c2bfe5reij0
<Response [200]>timestamp geo.dest
0  Jan 28, 2023 @ 09:15:02.127       CN
1  Jan 28, 2023 @ 09:00:52.596       CN
2  Jan 28, 2023 @ 08:17:33.769       IN
3  Jan 28, 2023 @ 05:15:19.548       RU
4  Jan 28, 2023 @ 04:18:45.660       KEtimestamp geo.dest
0     Jan 28, 2023 @ 09:15:02.127       CN
1     Jan 28, 2023 @ 09:00:52.596       CN
2     Jan 28, 2023 @ 08:17:33.769       IN
3     Jan 28, 2023 @ 05:15:19.548       RU
4     Jan 28, 2023 @ 04:18:45.660       KE
...                           ...      ...
1608  Jan 21, 2023 @ 11:30:30.110       TJ
1609  Jan 21, 2023 @ 11:14:28.231       IN
1610  Jan 21, 2023 @ 11:05:31.057       FR
1611  Jan 21, 2023 @ 10:40:26.055       TR
1612  Jan 21, 2023 @ 10:24:53.405       IN[1613 rows x 2 columns]

从上面的输出中,我们可以看到 pandas 的 dataframe 输出。

请注意,它只能根据您在 kibana.yml 文件中指定的字节大小检索部分数据。 请增加 xpack.reporting.csv.maxSizeBytes 的值以获得完整数据。

方法二:从 Elasticsearch 中获取数据

下面的代码片段将有助于直接从 Elasticsearch 索引中检索数据,但它不足以检索大量数据,因此你可能需要根据你的情况决定使用 kibana 的 csv 报告还是 Elasticsearch 索引要求。

关于如何连接到 Elasticsearch,请参阅我之前的文章 “Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 - 8.x”。我们创建如下的一个 python 文件:

elasticsearch-to-pandas.py

from elasticsearch import Elasticsearch
import pandas as pd
import numpy as np
import json# create a client instance of the libraryes = Elasticsearch("http://localhost:9200", basic_auth=("elastic", "password"))
resp = es.info()
# print(resp)total_docs = 50search_query = {"match_all": {}
}response = es.search(_source="false",index='kibana_sample_data_logs',query=search_query,size=total_docs,fields=["@timestamp", "clientip","host"]
)# print(response)elastic_docs = response["hits"]["hits"]
# print(elastic_docs)fields = {}
for num, doc in enumerate(elastic_docs):fields_data = doc["fields"]for key, val in fields_data.items():try:fields[key] = np.append(fields[key], val)except KeyError:fields[key] = np.array([val])# print(fields)
traffic_data = pd.DataFrame(fields)
print(traffic_data.info())

在上面我们做了如下的一个搜索:

GET kibana_sample_data_logs/_search
{"_source": false, "query": {"match_all": {}},"fields": ["@timestamp", "clientip","host"]
}

上面的查询返回如下的结果:

{"took": 0,"timed_out": false,"_shards": {"total": 1,"successful": 1,"skipped": 0,"failed": 0},"hits": {"total": {"value": 10000,"relation": "gte"},"max_score": 1,"hits": [{"_index": "kibana_sample_data_logs","_id": "Gf8y9oUBLUWnAhRe7p8u","_score": 1,"fields": {"@timestamp": ["2023-01-15T00:39:02.912Z"],"clientip": ["223.87.60.27"],"host": ["artifacts.elastic.co"]}},{"_index": "kibana_sample_data_logs","_id": "Gv8y9oUBLUWnAhRe7p8u","_score": 1,"fields": {"@timestamp": ["2023-01-15T03:26:21.326Z"],"clientip": ["130.246.123.197"],"host": ["www.elastic.co"]}},{"_index": "kibana_sample_data_logs","_id": "G_8y9oUBLUWnAhRe7p8u","_score": 1,"fields": {"@timestamp": ["2023-01-15T03:30:25.131Z"],"clientip": ["120.49.143.213"],"host": ["cdn.elastic-elastic-elastic.org"]}},{"_index": "kibana_sample_data_logs","_id": "HP8y9oUBLUWnAhRe7p8u","_score": 1,"fields": {"@timestamp": ["2023-01-15T03:34:43.399Z"],"clientip": ["99.74.118.237"],"host": ["artifacts.elastic.co"]}},{"_index": "kibana_sample_data_logs","_id": "Hf8y9oUBLUWnAhRe7p8u","_score": 1,"fields": {"@timestamp": ["2023-01-15T03:37:04.863Z"],"clientip": ["177.111.217.54"],"host": ["www.elastic.co"]}},{"_index": "kibana_sample_data_logs","_id": "Hv8y9oUBLUWnAhRe7p8u","_score": 1,"fields": {"@timestamp": ["2023-01-15T03:49:40.669Z"],"clientip": ["106.225.58.146"],"host": ["www.elastic.co"]}}...

运行上面的代码,我们可以得到如下的数据:

$ pwd
/Users/liuxg/python/pandas
$ ls
elasticsearch-to-pandas.py kibana-to-pandas.py
$ python elasticsearch-to-pandas.py
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50 entries, 0 to 49
Data columns (total 3 columns):#   Column      Non-Null Count  Dtype
---  ------      --------------  ----- 0   @timestamp  50 non-null     object1   clientip    50 non-null     object2   host        50 non-null     object
dtypes: object(3)
memory usage: 1.3+ KB
None

Elasticsearch:将数据从 Elasticsearch 和 Kibana 导出到 Pandas Dataframe相关推荐

  1. python按照日期筛选数据_python – 按时间戳列筛选/选择pandas dataframe行

    我是熊猫新手.我的数据框有两列dt(日期时间戳)和值. 给出两个开始和结束数据时间戳:是否有一种简单的方法可以从包含两个日期时间戳之间的行的原始数据帧创建新数据帧? dt value 84 7/23/ ...

  2. ES(elasticsearch) - kibana导出csv

    文章目录 es系列导航 前言 1.条件过滤 2.下载 3.常见问题 1.进行条件过滤后,没有对应的数据进行展示: 2.kibana 导出失败,completed max size reached 3. ...

  3. ES迁mysql_使用kafka连接器迁移mysql数据到ElasticSearch

    概述 把 mysql 的数据迁移到 es 有很多方式,比如直接用 es 官方推荐的 logstash 工具,或者监听 mysql 的 binlog 进行同步,可以结合一些开源的工具比如阿里的 cana ...

  4. ElasticSearch实战:Linux日志对接Kibana

    本文由云+社区发表 ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTFul web接口.ElasticSearch是用Java开发 ...

  5. 【技术实验】表格存储Tablestore准实时同步数据到Elasticsearch

    实验背景 图书馆Q是一家大型图书馆,图书馆藏书众多,纸质图书600多万册,电子图书7000多万册,总数有八千多万册,这些图书之前都是人工检索维护的,现在需要做一个系统来存储管理这些图书信息. 需求如下 ...

  6. ElasticSearch大数据分布式弹性搜索引擎使用

    阅读目录: 背景 安装 查找.下载rpm包 .执行rpm包安装 配置elasticsearch专属账户和组 设置elasticsearch文件所有者 切换到elasticsearch专属账户测试能否成 ...

  7. elasticsearch 分页_[Springboot实战系列]整合ElasticSearch实现数据模糊搜索

    前言 本文介绍了如何整合搜索引擎elasticsearch与springboot,对外提供数据查询接口. 业务介绍 我的个人网站需要对mysql数据库内存储的京东商品进行模糊查询(模仿淘宝商品搜索), ...

  8. windows下用elasticdump导入json数据到Elasticsearch中

    一.前言 ES的备份,导入和导出相对而言比较麻烦.用logstash的话,经常会出现很多错误,而且不是很方便.用bulk也是一样的,对于咱们的json文件的结构,一些字段名都有要求,也不方便.后来和大 ...

  9. Day 4 - PB级规模数据的Elasticsearch分库分表实践

    Day 4 - PB级规模数据的Elasticsearch分库分表实践 从2018年7月在开始在某阿里云数据中心部署Elasticsearch软件,到2018年12月共创建了15个集群,服务于客户的文 ...

最新文章

  1. 小白的第一本python书_读书笔记:编程小白的第一本python入门书
  2. linux源码编译安装lamp环境搭建,linux下源码包编译安装LAMP环境
  3. Windows上mount NFS V4
  4. 微软打脸,Windows 7 再次成为微软的头号桌面操作系统
  5. C语言带参数的main()函数
  6. 状压DP UVA 10817 Headmaster's Headache
  7. Spring AOP之通知类别
  8. html中图片阴影怎么写,css如何给图片加阴影?
  9. ubuntu16.04下ROS操作系统学习笔记(三 / 二)ROS基础-ROS通信编程
  10. MATLAB自适应平滑滤波
  11. 根据经纬度使用百度和高德地图 进行导航
  12. 【esp32-s3】6.2 文件系统——文件夹列表
  13. 东东在用计算机计算一道题时 把被除数的前,北师大二年级数学下册期末试卷...
  14. 思维导图 XMind 闯关之路(第01关)新建文件 建立分支
  15. 编程王 kingofcoders.com
  16. 使用华为云跑自己的深度学习模型教程
  17. 改变虚拟导航栏(navigation bar)背景色及图标颜色
  18. Paper intensive reading (三):Interactions Between Food and Gut Microbiota: Impact on Human Health
  19. 模糊控制初学入门之概念认知
  20. poj2387- Til the Cows Come Home

热门文章

  1. 解密短信木马为何屡杀不尽
  2. python连接mysql,自动生成数据库表对应的word文档(毕设福利)
  3. C# 7. ShowDialog与Form.Show区别
  4. Java.io.tmpdir介绍
  5. MCU选8位还是32位?这可不是扔钢镚的事!
  6. 如何将一个128G U盘格式化为fat32格式?
  7. 商品分类 mysql表结构_商品多级分类,数据库设计
  8. IOS 保存图片、视频到自定义相簿
  9. 新的CAD看图软件,查看图纸更方便
  10. vsftp匿名访问目录_vsftpd配置禁用匿名用户并设置登陆用户