Hive是一个数据仓库基础的应用工具,在Hadoop中用来处理结构化数据,通过类SQL语言对数据进行操作。Hive将sql语句通过解析器转换成MapReduce作业提交到Hadoop集群上,Hadoop监控作业执行过程,并将执行结果返回给用户。
  值得注意的是,Hive并不支持行级数据的更新,主要使用场合为大数据集的批处理作业中。
  下面为Hive中常用的SQL语句,‘[ ]’中的内容根据实际需求来确定要不要写。
复制代码
– 创建数据库
create database name;
– 常用显示命令
show databases; – 查看有哪些数据库
show tables; – 查看当前数据库下有哪些表
show tables like ‘cc’ – 正则表达式显示表
show partitions; – 查看分区
show functions;
describe extended table_name; – 查看表的结构,字段,分区等情况
– 建表语句
create [external] table [if not exists] table_name --创建表,指定表名,默认为内部表
[(col_name data_type [comment col_comment], …)] – 创建字段,指定字段类型、注释
[comment table_comment] – 表的注释
[partitioned by (col_name data_type [comment col_comment], col_name_2 data_type_2, …)] – 指定分区,要注意分区字段不能出现的建表的字段中
[clustered by (col_name, col_name_2, …)] [sorted by (col_name [ASC|DESC], …)] into num_buckets buckets] – 分桶
[row format row_format]
[stored as file_format] – 指定存储文件类型
[location hdfs_path] – 存储路径
·external 表示创建的表是否为外部表,默认为内部表
·if not exists 表示该表不存在时创建该表,否则忽略异常
·comment 为表、字段增加注释
·row_format
row format delimited [fields terminated by char]
[collection items terminated by char]
[map keys terminated by char]
[lines terminated by char]
·file_format
stored as textfile – 纯文本数据
stored as sequencefile – 数据需要压缩,节省存储空间

– like关键字复制表结构
create table table_name like old_table_name;

– 更改表名
alter table table_name rename to new_table_name;

– 增加一个字段 并 添加注释
alter table table_name add columns (col_name data_type comment ‘col_comment’);

– 删除列
alter table table_name replace columns (col_name data_type, col_name_2 data_type_2);

– 增加、删除分区
alter table table_name add [if not exists] partition_name; – 增加
alter table table_name drop partition_name, partition_name_2; – 删除
复制代码

复制代码
– 插入数据
insert into table_1 select * from table_2; – 在table_1后追加数据
insert overwrite table_1 select * from table_2; – 先将table_1中数据清空,然后添加数据

– 提取数据常用语句
select [distinct] select_expr_1, select_expr_2
from table_name
[where condition] – 筛选条件
[group by col_list [having condition]] – 分组、分组返回的条件
[order by col_list] – 排序
[limit num_1, num_2] – 返回数据的起始位置(num_1)以及返回数据的记录数(num_2)
复制代码

一、前言
做大数据分析及应用过程中,时常需要面对海量的数据存储及计算,传统的服务器已经很难再满足一些运算需求,基于hadoop/spark的大数据处理平台得到广泛的应用。本文介绍用python读取hive数据库的方式,其中还是存在一些坑,这里我也把自己遇到的进行分享交流。

基本情况
集团有20台服务器(其中1台采集主节点,1台大数据监控平台,1台集群主节点,17台集群节点),65THDFS的磁盘资源,3.5T的yarn内存,等等。项目目前需要对集团的家庭画像数据分析,通过其楼盘,收视节目偏好,家庭收入等数据进行区域性的分析;同时对节目画像及楼盘详细数据进行判断分析。本人习惯使用R语言和Python来分析,故采用了本次分享的数据获取部分的想法。

二、Python连接hive
1、pyhive方式连接hive数据库
首先是配置相关的环境及使用的库。sasl、thrift、thrift_sasl、pyhive。

   其中sasl是采用0.2.1版本的,选择适合自己的即可。下载网址:https://www.lfd.uci.edu/~gohlke/pythonlibs/#sasl

pip install sasl-0.2.1-cp36-cp36m-win_amd64.whl
pip install thrift -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install thrift_sasl==0.3.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install pyhive -i https://pypi.tuna.tsinghua.edu.cn/simple
下载好相关库后,我们直接上代码。

from pyhive import hive
import pandas as pd

读取数据

def select_pyhive(sql):
# 创建hive连接
conn = hive.Connection(host=‘10.16.15.2’, port=10000, username=‘hive’, database=‘user’)
cur = conn.cursor()
try:
#c = cur.fetchall()
df = pd.read_sql(sql, conn)
return df
finally:
if conn:
conn.close()

sql = “select * from user_huaxiang_wide_table”
df = select_pyhive(sql)
获取到hive数据库中约193W的家庭画像数据,37个字段。

   可以看出代码并不是很复杂,但是大家在测试时可能会出现以下两种常见的问题。

问题一:
‘TSaslClientTransport’ object has no attribute ‘readAll’
解决一:

   pip install thrift_sasl==0.3.0 -i https://pypi.tuna.tsinghua.edu.cn/simple,更新依赖thrift_sasl包到0.3.0即可

问题二:Could not start SASL: Error in sasl_client_start (-4) SASL(-4)
解决二:
1.寻找到sasl的安装位置,一般来说是如下位置:
C:\Users\你计算机的用户名字\AppData\Local\Programs\Python\Python37-32\Lib\site-packages\sasl\sasl2
2. C盘新建文件夹 C:\CMU\bin\sasl2
3. 将第一步中的saslPLAIN.dll拷贝至第二步新建的文件夹中

2、impala方式连接hive数据库
impala方式连接hive数据库,但是数据量过大会导致python卡死,目前还未找到合适方式解决。

   首先是配置相关的环境及使用的库。sasl、thrift、thrift_sasl、impala。其中sasl是采用0.2.1版本的,选择适合自己的即可。下载网址:https://www.lfd.uci.edu/~gohlke/pythonlibs/#sasl

pip install sasl-0.2.1-cp36-cp36m-win_amd64.whl
pip install thrift -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install thrift_sasl==0.2.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install impala -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install thriftpy -i https://pypi.tuna.tsinghua.edu.cn/simple
下载好相关库后,我们直接上代码。

from impala.dbapi import connect
from impala.util import as_pandas
import pandas as pd

获取数据

def select_hive(sql):
# 创建hive连接
conn = connect(host=‘10.16.15.2’, port=10000, auth_mechanism=‘PLAIN’,user=‘hive’, password=‘user@123’, database=‘user’)
cur = conn.cursor()
try:
#cur.execute(sql)
c = cur.fetchall()
df = as_pandas(cur)
return df
finally:
if conn:
conn.close()

data = select_hive(sql = ‘select * from user_huaxiang_wide_table limit 100’)
这个impala方式也是很方便,但是当数据量到达一定程度,则就会在fetchall处一直处于运行状态,几个小时也没有响应。
使用 python 操作 hadoop 好像只有 少量的功能,使用python 操作 hive 其实还有一个hiveserver 的一个包,不过 看这个 pyhive 应该是比较好用的。

安装依赖

pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
操作

from pyhive import hive
conn = hive.Connection(host=‘xxxx’, port=10000, username=‘xxx’, database=‘default’)
cursor.execute(‘select * from url_log limit 10’)
for result in cursor.fetchall():
print result
##真实 内网 测试
from pyhive import hive
conn = hive.Connection(host=‘172.16.16.32’, port=10000, username=‘zhuzheng’,auth=‘LDAP’,password=“abc123.” ,database=‘fkdb’)
cursor=conn.cursor()
cursor.execute(‘select * from fkdb.tab_client_label limit 10’)
for result in cursor.fetchall():
print(result)
###如果 hive 有账号密码 你需要 写上,如果 hive 不在 同一台机器 也要写明 ip 和port,
###授权模式 需要选择合适的,我这里使用的上 LDAP, 数据库呢 ,你 需要连接你自己 正确的。
####其实在捣鼓是 各种报错,有账号密码 写错 和 授权模式错误 数据库不存在 ,thift 报错 等的,整的人心 烦躁

from impala.dbapi import connect
#需要注意的是这里的auth_mechanism必须有,但database不必须
conn = connect(host=‘127.0.0.1’, port=10000, database=‘default’, auth_mechanism=‘PLAIN’)
cur = conn.cursor()

cur.execute(‘SHOW DATABASES’)
print(cur.fetchall())

cur.execute(‘SHOW Tables’)
print(cur.fetchall())
使用 impala的 python客户端连接,我自己 测试 到现在还没有成功
参考
https://blog.csdn.net/Gamer_gyt/article/details/52564335
impala python 依赖的thrift 版本有问题 ,
thrift-sasl==0.2.1
pip uninstall thrift

pip uninstall impyla

pip install thrift==0.9.3

pip install impyla==0.13.8

https://blog.csdn.net/kkevinyang/article/details/79273106
https://github.com/cloudera/impyla/issues/268
https://github.com/cloudera/impyla/issues/234
http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Python-Error-TSaslClientTransport-object-has-no-attribute-trans/td-p/58033

from pyhive import hive
conn = hive.Connection(host=“YOUR_HIVE_HOST”, port=PORT, username=“YOU”)

cursor = conn.cursor()
cursor.execute(“SELECT cool_stuff FROM hive_table”)
for result in cursor.fetchall():
use_result(result)

import pandas as pd

df = pd.read_sql(“SELECT cool_stuff FROM hive_table”, conn)
from pyhive import hive
import pandas as pd

def getData():
conn = hive.Connection(host=“1.0.1.38”, auth=“CUSTOM”, username=‘hive’, password=“pvXxHTsdqrt8”, port=10000, database=‘tapro_atg’)
df = pd.read_sql(“select * from sales_data_leisure_view”, conn)
records = df.head(n=100000)
print(records.to_json(orient=‘records’))

getData();
import pandas as pd
from pyhive import hive

conn = hive.connect(‘192.168.72.135’)
cursor = conn.cursor()
sql = “select * from t2 where city=‘Shanghai’”
cursor.execute(sql)
res = cursor.fetchall()
df = pd.DataFrame(res, columns=[‘id’, ‘name’, ‘year’, ‘city’])

df1 = pd.read_sql(sql, conn, chunksize=3)
for chunk in df1:
print(chunk)

-- coding:utf-8 --

import pandas as pd
from pyhive import hive
import time
import datetime
import os

def rfail(s, file_path):
with open(file_path, “a+”) as f:
f.write(s + “\n”)

def read_query(sql):
hive_line = ‘’‘hive -e “set hive.cli.print.header=true;set mapreduce.job.queuename=hl_report;%s”;’’’ % (sql)
data_buffer = os.popen(hive_line)
data = pd.read_table(data_buffer, sep="\t", chunksize=10000)
return data

def get_from_hive(query, mode, engine_hive):
#engine_hive = hive.Connection(host=“xxxxx”, port=10000, username=“xxxx”)
if mode == “pyhive”:
data = pd.read_sql(query, engine_hive)
return data
elif mode == “raw”:
data = read_query(query)
return data
else:
print(“mode: pyhive or raw”)
return None

def gen_date(bdate, days):
day = datetime.timedelta(days=1)
for i in range(days):
s = bdate + day * i
# print(type(s))
yield s.strftime("%Y-%m-%d")

def get_date_list(start=None, end=None):
if (start is None) | (end is None):
return []
else:
data = []
for d in gen_date(start, (end - start).days):
data.append(d)
return data

import pandas as pd
from pyhive import presto

cursor = presto.connect(host=‘presto-master-lb.prod.hulu.com’, port = 8080,
catalog = ‘hive’, username = ‘xiaomeng.yang@hulu.com’).cursor()
cursor.execute(‘select * from zzz_emma_genre_16H1_2 limit 10’)
result = cursor.fetchall()
result

df = pd.DataFrame(result, columns = [‘userid’,‘genre’, ‘rnk’, ‘dependency_ratio’, ‘cumu_ratio’])
import numpy
from pyhive import hive
import pandas as pd
conn = hive.Connection(host=“localhost”, port=10000, database =‘project’)
cursor = conn.cursor()
murderdf=pd.read_sql_query("select povertyrate from poverty ",conn)
murdertar=pd.read_sql_query(“select murder from crime where district =‘TOTAL’ and year = 2011”,conn)

a=murderdf.as_matrix()
b=murdertar.as_matrix()

print(a)
print(b)

murder = [871, 970, 1095, 1171, 1238, 1244, 1437, 1438, 1631, 1721]

a = [1523, 112]

b = [10, 20]

print(numpy.corrcoef(a,b))
另外可以借助 pyspark的 hivecontext

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

conf = SparkConf().set(“spark.executor.memory”, “2g”)
.set(“spark.dynamicAllocation.initialExecutors”, “2”)
.set(“spark.driver.memory”, “2g”)
.set(“spark.kryoserializer.buffer.max”, “1g”)
.set(“spark.driver.cores”, “4”)
.set(“spark.yarn.queue”, “ace”)
.set(“spark.dynamicAllocation.maxExecutors”, “32”)

sparkContext = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sparkContext)
hiveContext = HiveContext(sparkContext)

import os
import pandas as pd

def hdfs_to_csv(hdfs_table, csv_name):
query = “SELECT * FROM prod_rwi_batch.%s” % hdfs_table
query_df = hiveContext.sql(query)
query_df.cache()
df = query_df.toPandas()

save it locally

csv_file = “%s.csv” % csv_name
df.to_csv(csv_file)

copy it over to BDF

os.system(“hdfs dfs -copyFromLocal %s /user/dhomola” % csv_file)

this didn’t work due to access right issues:

hdfs dfs -copyFromLocal initial_pos.csv /production/ace/data/dpt/dhomola

delete locally

os.system(“rm %s” % csv_file)

hdfs_to_csv(“initialcohort_cp01_1508281365499_1142”, “initial_pos”)
hdfs_to_csv(“restrictivecohort_cp02_1508281365499_1142”, “restricted_pos”)
hdfs_to_csv(“randomsamplecohort_cs03_1508281365499_1142”, “random_sample_scoring”)
hdfs_to_csv(“negativecohort_cn01_1508281365499_1142”, “initial_negative”)
hdfs_to_csv(“cohort_v01_1508281365499_1142”, “v01”)
hdfs_to_csv(“cohort_v02_1508281365499_1142”, “v02”)
参考

https://github.com/dropbox/PyHive* https://stackoverflow.com/questions/21370431/how-to-access-hive-via-python
在使用Pandas进行数据处理的时候,我们通常从CSV或EXCEL中导入数据,但有的时候数据都存在数据库内,除了 mysql 还有 其他的 nosql ,我们并没有现成的数据文件,这时候可以通过Pymongo这个库,从mongoDB中读取数据,然后载入到Pandas中,只需要简单的三步。

第一步 安装依赖 导入包

pip3 install pymongo
import pymongo
import pandas as pd
第二步 设置 MongoDB的连接信息

设置MongoDB连接信息

client = pymongo.MongoClient(‘localhost’,27017)
cn_78 = client[‘cn_78’]
project_info = cn_78[‘project_info’]
第三步 加载数据到pandas

data = pd.DataFrame(list(project_info.find()))

删除mongodb中的_id字段

del data[’_id’]

选择需要显示的字段

data = data[[‘aear’,‘cate’,‘subcate’,‘name’,‘maxmoney’,‘minmoney’,‘time’]]
print(data)

另外参考

import pandas as pd
from pymongo import MongoClient

#建立MongoDB数据库连接
client = MongoClient(‘192.168.248.52’,12000)
#连接所需数据库,locateInfo为数据库名
db = client.locateInfo
db.authenticate(“me”, “me”)
#连接所用集合
collection = db.dataCollect
#从mongodb获取数据
gpsData = pd.DataFrame(list(collection.find({“deviceId”:“05792”})))
#删除数据Id字段
del gpsData[’_id’]

选择需要显示的字段

gpsData = gpsData[[‘deviceId’,‘lating’,‘lnging’,‘gnssTime’,‘locateType’]]
#对数据按照时间的升序排序
gpsData = gpsData.sort(‘gnssTime’)
print(gpsData)
这样就可以轻松地从MongoDB中读取数据到Pandas中进行数据分析了。

pandas 加载 redis 数据
首先有一个 牛逼的聚合 pandas-redistrict
https://github.com/correctiv/pandas-redistrict

还有 redis 自己的 包
https://github.com/andymccurdy/redis-py

pandas序列化方法msgpack:pd.read_msgpack()/to_msgpack()。虽然目前是实验性支持,但应该是最简洁的方法。在读取时其支持迭代化序列。

redis_db = redis.StrictRedis(host=“localhost”, port=6379, db=0)
data = data.to_msgpack(compress=‘zlib’)

redis_db.setex(key, data, expire_time)

cached_data = redis_db.get(key)
df = pd.read_msgpack(cached_data)
另一种思路:参考timeseries2redis,可以将Tick或Bar数据在redis中读取,实现方法很有趣。

不过我在看其performance时发现并没有pd.read_csv快,pandas的csv读取底层是C实现的,可以达到几十ms量级,如果希望再快几倍,可以考虑用HDF5,pandas读写性能的比较:performance-considerations。

这个还是比较靠谱
set:

redisConn.set(“key”, df.to_msgpack(compress=‘zlib’))
get:

pd.read_msgpack(redisConn.get(“key”))

stackoverflow上的redis-pickle方案):

from redis import StrictRedis
import cPickle as pickle

StrictRedis类的子类,可以pickling和unpickling复杂对象,

"pset"和"pget"方法代替StrictRedis类的"set"和"get"方法

class PickledRedis(StrictRedis):

def pset(self, key, value, ex=None, px=None, nx=False, xx=False):value_pickled = pickle.dumps(value, 2)return self.set(key, value_pickled, ex=None, px=None, nx=False, xx=False)def pget(self, key):value_pickled = self.get(key)return pickle.loads(value_pickled)

另外参考 python操作redis
https://www.jianshu.com/p/2639549bedc8

另外 python 操作 memcache 的也可以看看
https://docs.lvrui.io/2016/07/24/Python%E6%93%8D%E4%BD%9Cmemcache%E8%AF%A6%E8%A7%A3/

pandas 操作 Hbase
有两种方案 ,

hbase —> pyspark -->pandas dataframe
2.hbase —> mgpack–> pandas dataframe
靠谱的有一个 安装 happybase 和 pdhbase

Writing DataFrame to HBase
Establish hbase connection using happybase and write the dataframe.

import happybase
import numpy as np
import pandas as pd
import pdhbase as pdh
connection = None
try:
connection = happybase.Connection(‘127.0.0.1’)
connection.open()
df = pd.DataFrame(np.random.randn(10, 5), columns=[‘a’, ‘b’, ‘c’, ‘d’, ‘e’])
df[‘f’] = ‘hello world’
pdh.to_hbase(df, connection, ‘sample_table’, ‘df_key’, cf=‘cf’)
finally:
if connection:
connection.close()

Reading DataFrame from HBase
Establish hbase connection using happybase and read the dataframe.

import happybase
import numpy as np
import pandas as pd
import pdhbase as pdh
connection = None
try:
connection = happybase.Connection(‘127.0.0.1’)
connection.open()
df = pdh.read_hbase(connection, ‘sample_table’, ‘df_key’, cf=‘cf’)
print df
finally:
if connection:
connection.close()
pandas 操作 spark
有一个现成 的 包 比较老 pyspark_pandas

另外其实pyspark rdd 支持与 pandas的dataframe 交互

import pyspark
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pyspark.sql.functions as F

import seaborn as sns

import matplotlib.pyplot as plt

import sys
import numpy as np
from surprise import AlgoBase, Dataset, evaluate
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import alert_program as ap

spark = SparkSession.builder.getOrCreate()

ratings_df = pd.read_table(“data/ratings.dat”, delimiter = ‘::’,
names=[“user”, “movie”, “rating”,
“timestamp”], engine = ‘python’)
spark_df = spark.createDataFrame(ratings_df)

spark_df = spark_df.drop(“timestamp”)
train, test = spark_df.randomSplit([0.8, 0.2], seed=427471138)

als = ALS(
userCol=“user”,
itemCol=“movie”,
ratingCol=“rating”,
nonnegative=False,
regParam=0.1,
rank=10
)
model = als.fit(train)

predictions = model.transform(test)

pandas_df = predictions.toPandas()
pandas_df_clean=pandas_df.fillna(pandas_df.mean())
pandas_df_clean[‘RMSE’]=np.power(pandas_df_clean[‘rating’]-pandas_df_clean[‘prediction’],2)
RMSE = np.sqrt(sum(pandas_df_clean[‘RMSE’]) / len(pandas_df_clean))

print (RMSE)
from pyspark.sql import SQLContext
from pandas import DataFrame, Series
import pandas
sqlContext = SQLContext(sc)

df = sqlContext.load(source=“org.apache.phoenix.spark”, zKUrl=“localhost:2181:/hbase-unsecure”, table=“doctors”)
pandas_df = df.toPandas()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
import pandas as pd
spark=SparkSession.builder.master(“local”).appName(“my app”).config(conf=SparkConf()).getOrCreate()
sc=SparkSession.builder.config(conf=SparkConf())
df=spark.read.format(‘json’).load([’/home/xs/Documents/Weblog.1457006400155.gz’,’/home/xs/Documents/Weblog.1457006400158.gz’,
‘/home/xs/Documents/Weblog.1457006401774.gz’])
g1=df.groupBy(“captcha_id”,F.substring(“request_time”,1,19).alias(“time”)).count().filter(df.captcha_id!=’’)
pandas_df=g1.toPandas()#转换成pandas的dataframe
data_pivot=pandas_df.pivot_table(index=[“captcha_id”],columns=[“time”],values=[“count”])
data_pivot.to_csv("/home/xs/Documents/data.csv",header=True)

from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark
from pyspark.sql.functions import *
import pandas as pd

import numpy as np

from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

sc = pyspark.SparkContext()
sqlcontext = SQLContext(sc)

pandas_df = pd.read_csv(‘rank.csv’)

(trainingData, testData) = pandas_df.randomSplit([0.7, 0.3])

pandas_df[‘split’] = np.random.randn(pandas_df.shape[0], 1)

msk = np.random.rand(len(pandas_df)) <= 0.7

train = pandas_df[msk]
test = pandas_df[~msk]

s_df = sqlcontext.createDataFrame(train)

trainingData=s_df.rdd.map(lambda x:(Vectors.dense(x[2:3]), x[1])).toDF([“features”, “label”])

featureIndexer = VectorIndexer(inputCol=“features”, outputCol=“indexedFeatures”, maxCategories=4).fit(trainingData)

dt = DecisionTreeRegressor(featuresCol=“indexedFeatures”)

pipeline = Pipeline(stages=[featureIndexer, dt])

model = pipeline.fit(trainingData)

test_df = sqlcontext.createDataFrame(test)
testData=test_df.rdd.map(lambda x:(Vectors.dense(x[2:3]), x[1])).toDF([“features”, “label”])
predictions = model.transform(testData)

predictions.select(“prediction”, “label”, “features”).show(5)

pandas 操作 HDFS
主要是通过 webhdfs

最牛逼的 简直是深藏不露
https://github.com/ibis-project/ibis

Ibis: Python data analysis framework for Hadoop and SQL engines
一站到底

Apache Impala (incubating)
Apache Kudu
Hadoop Distributed File System (HDFS)
PostgreSQL
MySQL (Experimental)
SQLite
Pandas DataFrames (Experimental)
Clickhouse
BigQuery
pip install ibis-framework
另外一个 也非常棒
https://github.com/RaRe-Technologies/smart_open

stream lines from an S3 object

for line in smart_open.smart_open(‘s3://mybucket/mykey.txt’):
… print line

using a completely custom s3 server, like s3proxy:

for line in smart_open.smart_open(‘s3u://user:secret@host:port@mybucket/mykey.txt’):
… print line

you can also use a boto.s3.key.Key instance directly:

key = boto.connect_s3().get_bucket(“my_bucket”).get_key(“my_key”)
with smart_open.smart_open(key) as fin:
… for line in fin:
… print line

can use context managers too:

with smart_open.smart_open(‘s3://mybucket/mykey.txt’) as fin:
… for line in fin:
… print line
… fin.seek(0) # seek to the beginning
… print fin.read(1000) # read 1000 bytes

stream from HDFS

for line in smart_open.smart_open(‘hdfs://user/hadoop/my_file.txt’):
… print line

stream from HTTP

for line in smart_open.smart_open(‘http://example.com/index.html’):
… print line

stream from WebHDFS

for line in smart_open.smart_open(‘webhdfs://host:port/user/hadoop/my_file.txt’):
… print line

stream content into S3 (write mode):

with smart_open.smart_open(‘s3://mybucket/mykey.txt’, ‘wb’) as fout:
… for line in [‘first line’, ‘second line’, ‘third line’]:
… fout.write(line + ‘\n’)

stream content into HDFS (write mode):

with smart_open.smart_open(‘hdfs://host:port/user/hadoop/my_file.txt’, ‘wb’) as fout:
… for line in [‘first line’, ‘second line’, ‘third line’]:
… fout.write(line + ‘\n’)

stream content into WebHDFS (write mode):

with smart_open.smart_open(‘webhdfs://host:port/user/hadoop/my_file.txt’, ‘wb’) as fout:
… for line in [‘first line’, ‘second line’, ‘third line’]:
… fout.write(line + ‘\n’)

stream from/to local compressed files:

for line in smart_open.smart_open(’./foo.txt.gz’):
… print line

with smart_open.smart_open(’/home/radim/foo.txt.bz2’, ‘wb’) as fout:
… fout.write(“some content\n”)
另外一个也很牛逼
https://github.com/spotify/snakebite

Snakebite is a python library that provides a pure python HDFS client and a wrapper around Hadoops minicluster. The client uses protobuf for communicating with the NameNode and comes in the form of a library and a command line interface. Currently, the snakebite client supports most actions that involve the Namenode and reading data from DataNodes.

这个也不错
https://github.com/crs4/pydoop
官方网 http://crs4.github.io/pydoop/

下面的也可以的

https://github.com/mtth/hdfs

官网
https://hdfscli.readthedocs.io/en/latest/

这个也超级 叼哦

https://github.com/HariSekhon/pytools
下面的这个比较 新
https://github.com/dask/hdfs3
https://hdfs3.readthedocs.io/en/latest/

from hdfs.client import Client
client = Client(“http://host6.cloud.sinocbd.com:50070/”) # 50070: Hadoop默认namenode
dir(client)

其中用到的方法有:

walk() 类似os.walk,返回值也是包含(路径,目录名,文件名)元素的数组,每层迭代。

read() 类似file.read,官方文档的说法是client.read必须在with块里使用:

path=[]

for i in client.walk(’/tempfiles/temp’,depth=1):

for item in i:

path.append(item)

print(item)

print(path)

with client.read(’/tempfiles/1.csv’, encoding=‘gbk’) as fs:
content = fs.read()
print(content)
import webhdfspy
import pandas as pd
webHDFS = webhdfspy.WebHDFSClient(“host6.cloud.sinocbd.com”, 50070,username=‘root’)
data=pd.DataFrame(webHDFS.listdir(’/’))
print(data)
pathlist=data[‘pathSuffix’]
for i in pathlist:
path="/"+pathlist
# print(path)
# print(webHDFS.listdir(path))
import os
import pickle

from pathlib import PurePath

import hdfs3
import pandas as pd

from ufuncs.storage.utils import check_abs_path

def hdfs_chmod_dirs(path, *, permission_code, raise_errors=False, hdfs3_conn=None):
“”“Try to change permissions of each part of the path.
Args:
path (str): Path in HDFS
permission_code (int/str): Permission to set on each part of the
path (eg. 777, 775).
hdfs3_conn (hdfs3.core.HDFileSystem): HDFS connector.
Raises:
IOError: If not possible to change permission of a path part.
“””
check_abs_path(path)
hdfs = hdfs3_conn if hdfs3_conn else hdfs3.HDFileSystem()
# change permissions starting with top dir
_path = ‘/’
for d in PurePath(path).parts[1:]:
_path = os.path.join(_path, d)
try:
hdfs.chmod(_path, int(str(permission_code), 8))
except IOError as e:
if raise_errors:
raise e

def hdfs_put_object(obj, storage_path,
*, permission_code=755, overwrite=True, hdfs_conn=None):
“”“Store a python object to HDFS in pickle file.
Args:
obj: Python object.
storage_path (str): HDFS full path of the file to write to.
permission_code (int/str): Permission to set on the pickle file
(eg. 777, 775). Defaults to 755.
overwrite (bool): Overwrite if file already exists.
Defaults to True.
hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
Raises:
FileExistsError: If file exists and overwrite is set to False.
“””
# create connector if not exists
hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
if not overwrite and hdfs.exists(storage_path):
raise FileExistsError("HDFS file ‘{}’ already exists. "
“Argument overwrite is {}.”
.format(storage_path, overwrite))
hdfs.mkdir(os.path.dirname(storage_path))
with hdfs.open(storage_path, ‘wb’) as f:
pickle.dump(obj, f)
hdfs.chmod(storage_path, int(str(permission_code), 8))

def hdfs_get_object(storage_path, *, hdfs_conn=None):
“”“Retrieve a python object from a pickle file in HDFS.
Args:
storage_path (str): HDFS full path of the file to write to.
hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
Returns:
The python object that was loaded from the pickle file.
Raises:
NameError: If the object’s class is not defined in the namespace.
FileNotFoundError: If file is not found.
“””
hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
try:
with hdfs.open(storage_path, ‘rb’) as f:
obj = pickle.load(f)
except IOError as _:
raise FileNotFoundError(“No such file or directory in HDFS: ‘{}’.”
.format(storage_path))
except AttributeError as _:
raise NameError(“Pickle file object class not found in the namespace.”)
return obj

def hdfs_df2csv(df, storage_path, *,
permission_code=755, overwrite=True, hdfs_conn=None, **kwargs):
“”“Save pandas dataframe to csv in HDFS.
The kwargs are used to represent any argument from the known
pandas.DataFrame.to_csv function.
Args:
df (pandas.DataFrame): Dataframe to write as csv
permission_code (int/str): Permission to set on the pickle file
(eg. 777, 775). Defaults to 755.
overwrite (bool): Overwrite if file already exists.
Defaults to True.
hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
Raises:
TypeError: If df is not a pandas DataFrame
FileExistsError: If file exists and overwrite is set to False.
“””
if not isinstance(df, pd.DataFrame):
raise TypeError(“Expected pandas Dataframe, got {}”
.format(type(df).name))
check_abs_path(storage_path)
# make hdfs connection
hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
# check if file to overrite
if not overwrite and hdfs.exists(storage_path):
raise FileExistsError("HDFS file ‘{}’ already exists. "
“Argument overwrite is {}.”
.format(storage_path, overwrite))
# make directories
hdfs.mkdir(os.path.dirname(storage_path))
# write csv bytes to HDFS file
with hdfs.open(storage_path, ‘wb’) as f:
f.write(df.to_csv(**kwargs))
# change permission
hdfs.chmod(storage_path, int(str(permission_code), 8))

def hdfs_csv2df(storage_path, *, hdfs_conn=None, **kwargs):
“”“Read .csv from HDFS into a pandas dataframe.
The kwargs are used to represent any argument from the known
pandas.DataFrame.read_csv function.
Args:
storage_path (str): Location of .csv file in HDFS
hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
Returns:
pd.DataFrame: Dataframe with .csv data
“””
check_abs_path(storage_path)
hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
with hdfs.open(storage_path, ‘rb’) as f:
df = pd.read_csv(f, **kwargs)
return df
import warnings
warnings.filterwarnings(‘ignore’)

import sys
import random
import numpy as np

from sklearn import linear_model, cross_validation, metrics, svm
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import pydoop.hdfs as hdfs

def read_csv_from_hdfs(path, cols, col_types=None):
files = hdfs.ls(path);
pieces = []
for f in files:
fhandle = hdfs.open(f)
pieces.append(pd.read_csv(fhandle, names=cols, dtype=col_types))
fhandle.close()
return pd.concat(pieces, ignore_index=True)
import os
import pytest

import fastparquet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pandas.util.testing as tm

def hdfs_test_client(driver=‘libhdfs’):
host = os.environ.get(‘ARROW_HDFS_TEST_HOST’, ‘localhost’)
user = os.environ[‘ARROW_HDFS_TEST_USER’]
try:
port = int(os.environ.get(‘ARROW_HDFS_TEST_PORT’, 20500))
except ValueError:
raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ’
‘an integer’)

return pa.HdfsClient(host, port, user, driver=driver)

def test_fastparquet_read_with_hdfs():
fs = hdfs_test_client()

df = tm.makeDataFrame()
table = pa.Table.from_pandas(df)path = '/tmp/testing.parquet'
with fs.open(path, 'wb') as f:pq.write_table(table, f)parquet_file = fastparquet.ParquetFile(path, open_with=fs.open)result = parquet_file.to_pandas()
tm.assert_frame_equal(result, df)

python 操作 flink

使用 python 操作 hadoop 好像只有 少量的功能,使用python 操作 hive 其实还有一个hiveserver 的一个包,不过 看这个 pyhive 应该是比较好用的。安装依赖pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
操作from pyhive import hive
conn = hive.Connection(host='xxxx', port=10000, username='xxx', database='default')
cursor.execute('select * from url_log limit 10')
for result in cursor.fetchall():print result
##真实 内网 测试
from pyhive import hive
conn = hive.Connection(host='172.16.16.32', port=10000, username='zhuzheng',auth='LDAP',password="abc123." ,database='fkdb')
cursor=conn.cursor()
cursor.execute('select * from fkdb.tab_client_label limit 10')
for result in cursor.fetchall():print(result)
###如果 hive 有账号密码  你需要  写上,如果   hive 不在 同一台机器 也要写明  ip  和port,
###授权模式 需要选择合适的,我这里使用的上 LDAP, 数据库呢 ,你 需要连接你自己 正确的。
####其实在捣鼓是 各种报错,有账号密码 写错 和 授权模式错误  数据库不存在 ,thift 报错 等的,整的人心 烦躁from impala.dbapi import connect
#需要注意的是这里的auth_mechanism必须有,但database不必须
conn = connect(host='127.0.0.1', port=10000, database='default', auth_mechanism='PLAIN')
cur = conn.cursor()cur.execute('SHOW DATABASES')
print(cur.fetchall())cur.execute('SHOW Tables')
print(cur.fetchall())
使用 impala的 python客户端连接,我自己 测试 到现在还没有成功
参考
https://blog.csdn.net/Gamer_gyt/article/details/52564335
impala python 依赖的thrift 版本有问题 ,
thrift-sasl==0.2.1
pip uninstall thriftpip uninstall impylapip install thrift==0.9.3pip install impyla==0.13.8https://blog.csdn.net/kkevinyang/article/details/79273106
https://github.com/cloudera/impyla/issues/268
https://github.com/cloudera/impyla/issues/234
http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Python-Error-TSaslClientTransport-object-has-no-attribute-trans/td-p/58033from pyhive import hive
conn = hive.Connection(host="YOUR_HIVE_HOST", port=PORT, username="YOU")cursor = conn.cursor()
cursor.execute("SELECT cool_stuff FROM hive_table")
for result in cursor.fetchall():use_result(result)import pandas as pddf = pd.read_sql("SELECT cool_stuff FROM hive_table", conn)
from pyhive import hive
import pandas as pddef getData():conn = hive.Connection(host="1.0.1.38", auth="CUSTOM", username='hive', password="pvXxHTsdqrt8", port=10000, database='tapro_atg')df = pd.read_sql("select * from sales_data_leisure_view", conn)records = df.head(n=100000)print(records.to_json(orient='records'))getData();
import pandas as pd
from pyhive import hiveconn = hive.connect('192.168.72.135')
cursor = conn.cursor()
sql = "select * from t2 where city='Shanghai'"
cursor.execute(sql)
res = cursor.fetchall()
df = pd.DataFrame(res, columns=['id', 'name', 'year', 'city'])df1 = pd.read_sql(sql, conn, chunksize=3)
for chunk in df1:print(chunk)# -*- coding:utf-8 -*-
import pandas as pd
from pyhive import hive
import time
import datetime
import osdef rfail(s, file_path):with open(file_path, "a+") as f:f.write(s + "\n")def read_query(sql):hive_line = '''hive -e "set hive.cli.print.header=true;set mapreduce.job.queuename=hl_report;%s";''' % (sql)data_buffer = os.popen(hive_line)data = pd.read_table(data_buffer, sep="\t", chunksize=10000)return datadef get_from_hive(query, mode, engine_hive):#engine_hive = hive.Connection(host="xxxxx", port=10000, username="xxxx")if mode == "pyhive":data = pd.read_sql(query, engine_hive)return dataelif mode == "raw":data = read_query(query)return dataelse:print("mode: pyhive or raw")return Nonedef gen_date(bdate, days):day = datetime.timedelta(days=1)for i in range(days):s = bdate + day * i# print(type(s))yield s.strftime("%Y-%m-%d")def get_date_list(start=None, end=None):if (start is None) | (end is None):return []else:data = []for d in gen_date(start, (end - start).days):data.append(d)return dataimport pandas as pd
from pyhive import prestocursor = presto.connect(host='presto-master-lb.prod.hulu.com', port = 8080, catalog = 'hive', username = 'xiaomeng.yang@hulu.com').cursor()
cursor.execute('select * from zzz_emma_genre_16H1_2 limit 10')
result = cursor.fetchall()
resultdf = pd.DataFrame(result, columns = ['userid','genre', 'rnk', 'dependency_ratio', 'cumu_ratio'])
import numpy
from pyhive import hive
import pandas as pd
conn = hive.Connection(host="localhost", port=10000, database ='project')
cursor = conn.cursor()
murderdf=pd.read_sql_query("select povertyrate from poverty ",conn)
murdertar=pd.read_sql_query("select murder from crime where district ='TOTAL' and year = 2011",conn)a=murderdf.as_matrix()
b=murdertar.as_matrix()print(a)
print(b)
#
# murder = [871, 970, 1095, 1171, 1238, 1244, 1437, 1438, 1631, 1721]
#
# a = [1523, 112]
# b = [10, 20]print(numpy.corrcoef(a,b))
另外可以借助 pyspark的 hivecontextfrom pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *conf = SparkConf().set("spark.executor.memory", "2g") \.set("spark.dynamicAllocation.initialExecutors", "2") \.set("spark.driver.memory", "2g") \.set("spark.kryoserializer.buffer.max", "1g") \.set("spark.driver.cores", "4") \.set("spark.yarn.queue", "ace") \.set("spark.dynamicAllocation.maxExecutors", "32")sparkContext = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sparkContext)
hiveContext = HiveContext(sparkContext)import os
import pandas as pddef hdfs_to_csv(hdfs_table, csv_name):query = "SELECT * FROM prod_rwi_batch.%s" % hdfs_tablequery_df = hiveContext.sql(query)query_df.cache()df = query_df.toPandas()  # save it locallycsv_file = "%s.csv" % csv_namedf.to_csv(csv_file)# copy it over to BDFos.system("hdfs dfs -copyFromLocal %s /user/dhomola" % csv_file) # this didn't work due to access right issues: # hdfs dfs -copyFromLocal initial_pos.csv /production/ace/data/dpt/dhomola# delete locallyos.system("rm %s" % csv_file)hdfs_to_csv("initialcohort_cp01_1508281365499_1142", "initial_pos")
hdfs_to_csv("restrictivecohort_cp02_1508281365499_1142", "restricted_pos")
hdfs_to_csv("randomsamplecohort_cs03_1508281365499_1142", "random_sample_scoring")
hdfs_to_csv("negativecohort_cn01_1508281365499_1142", "initial_negative")
hdfs_to_csv("cohort_v01_1508281365499_1142", "v01")
hdfs_to_csv("cohort_v02_1508281365499_1142", "v02")
参考https://github.com/dropbox/PyHive* https://stackoverflow.com/questions/21370431/how-to-access-hive-via-python
在使用Pandas进行数据处理的时候,我们通常从CSV或EXCEL中导入数据,但有的时候数据都存在数据库内,除了 mysql 还有 其他的 nosql ,我们并没有现成的数据文件,这时候可以通过Pymongo这个库,从mongoDB中读取数据,然后载入到Pandas中,只需要简单的三步。第一步 安装依赖 导入包pip3 install  pymongo
import pymongo
import pandas as pd
第二步 设置 MongoDB的连接信息# 设置MongoDB连接信息
client = pymongo.MongoClient('localhost',27017)
cn_78 = client['cn_78']
project_info = cn_78['project_info']
第三步 加载数据到pandasdata = pd.DataFrame(list(project_info.find()))
# 删除mongodb中的_id字段
del data['_id']
# 选择需要显示的字段
data = data[['aear','cate','subcate','name','maxmoney','minmoney','time']]
print(data)另外参考import pandas as pd
from pymongo import MongoClient#建立MongoDB数据库连接
client = MongoClient('192.168.248.52',12000)
#连接所需数据库,locateInfo为数据库名
db = client.locateInfo
db.authenticate("me", "me")
#连接所用集合
collection = db.dataCollect
#从mongodb获取数据
gpsData = pd.DataFrame(list(collection.find({"deviceId":"05792"})))
#删除数据Id字段
del gpsData['_id']
# 选择需要显示的字段
gpsData = gpsData[['deviceId','lating','lnging','gnssTime','locateType']]
#对数据按照时间的升序排序
gpsData = gpsData.sort('gnssTime')
print(gpsData)
这样就可以轻松地从MongoDB中读取数据到Pandas中进行数据分析了。pandas 加载 redis 数据
首先有一个 牛逼的聚合 pandas-redistrict
https://github.com/correctiv/pandas-redistrict还有 redis 自己的 包
https://github.com/andymccurdy/redis-pypandas序列化方法msgpack:pd.read_msgpack()/to_msgpack()。虽然目前是实验性支持,但应该是最简洁的方法。在读取时其支持迭代化序列。redis_db = redis.StrictRedis(host="localhost", port=6379, db=0)
data = data.to_msgpack(compress='zlib')
#
redis_db.setex(key, data, expire_time)cached_data = redis_db.get(key)
df = pd.read_msgpack(cached_data)
另一种思路:参考timeseries2redis,可以将Tick或Bar数据在redis中读取,实现方法很有趣。不过我在看其performance时发现并没有pd.read_csv快,pandas的csv读取底层是C实现的,可以达到几十ms量级,如果希望再快几倍,可以考虑用HDF5,pandas读写性能的比较:performance-considerations。这个还是比较靠谱
set:redisConn.set("key", df.to_msgpack(compress='zlib'))
get:pd.read_msgpack(redisConn.get("key"))stackoverflow上的redis-pickle方案):from redis import StrictRedis
import cPickle as pickle# StrictRedis类的子类,可以pickling和unpickling复杂对象,
# "pset"和"pget"方法代替StrictRedis类的"set"和"get"方法class PickledRedis(StrictRedis):def pset(self, key, value, ex=None, px=None, nx=False, xx=False):value_pickled = pickle.dumps(value, 2)return self.set(key, value_pickled, ex=None, px=None, nx=False, xx=False)def pget(self, key):value_pickled = self.get(key)return pickle.loads(value_pickled)
另外参考 python操作redis
https://www.jianshu.com/p/2639549bedc8另外 python 操作 memcache 的也可以看看
https://docs.lvrui.io/2016/07/24/Python%E6%93%8D%E4%BD%9Cmemcache%E8%AF%A6%E8%A7%A3/pandas 操作 Hbase
有两种方案 ,hbase ---> pyspark -->pandas dataframe
2.hbase ---> mgpack--> pandas dataframe
靠谱的有一个 安装 happybase 和 pdhbaseWriting DataFrame to HBase
Establish hbase connection using happybase and write the dataframe.import happybaseimport numpy as npimport pandas as pdimport pdhbase as pdhconnection = Nonetry:connection = happybase.Connection('127.0.0.1')connection.open()df = pd.DataFrame(np.random.randn(10, 5), columns=['a', 'b', 'c', 'd', 'e'])df['f'] = 'hello world'pdh.to_hbase(df, connection, 'sample_table', 'df_key', cf='cf')finally:if connection:connection.close()Reading DataFrame from HBase
Establish hbase connection using happybase and read the dataframe.import happybase
import numpy as np
import pandas as pd
import pdhbase as pdh
connection = None
try:connection = happybase.Connection('127.0.0.1')connection.open()df = pdh.read_hbase(connection, 'sample_table', 'df_key', cf='cf')print df
finally:if connection:connection.close()
pandas 操作 spark
有一个现成 的 包 比较老 pyspark_pandas另外其实pyspark rdd 支持与 pandas的dataframe 交互import pyspark
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pyspark.sql.functions as F
# import seaborn as sns
# import matplotlib.pyplot as plt
import sys
import numpy as np
from surprise import AlgoBase, Dataset, evaluate
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# import alert_program as apspark = SparkSession.builder.getOrCreate()ratings_df = pd.read_table("data/ratings.dat", delimiter = '::',names=["user", "movie", "rating","timestamp"], engine = 'python')
spark_df = spark.createDataFrame(ratings_df)spark_df = spark_df.drop("timestamp")
train, test = spark_df.randomSplit([0.8, 0.2], seed=427471138)als = ALS(userCol="user",itemCol="movie",ratingCol="rating",nonnegative=False,regParam=0.1,rank=10)
model = als.fit(train)predictions = model.transform(test)pandas_df = predictions.toPandas()
pandas_df_clean=pandas_df.fillna(pandas_df.mean())
pandas_df_clean['RMSE']=np.power(pandas_df_clean['rating']-pandas_df_clean['prediction'],2)
RMSE = np.sqrt(sum(pandas_df_clean['RMSE']) / len(pandas_df_clean))print (RMSE)
from pyspark.sql import SQLContext
from pandas import DataFrame, Series
import pandas
sqlContext = SQLContext(sc)df = sqlContext.load(source="org.apache.phoenix.spark", zKUrl="localhost:2181:/hbase-unsecure", table="doctors")
pandas_df = df.toPandas()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
import pandas as pd
spark=SparkSession.builder.master("local").appName("my app").config(conf=SparkConf()).getOrCreate()
sc=SparkSession.builder.config(conf=SparkConf())
df=spark.read.format('json').load(['/home/xs/Documents/Weblog.1457006400155.gz','/home/xs/Documents/Weblog.1457006400158.gz',
'/home/xs/Documents/Weblog.1457006401774.gz'])
g1=df.groupBy("captcha_id",F.substring("request_time",1,19).alias("time")).count().filter(df.captcha_id!='')
pandas_df=g1.toPandas()#转换成pandas的dataframe
data_pivot=pandas_df.pivot_table(index=["captcha_id"],columns=["time"],values=["count"])
data_pivot.to_csv("/home/xs/Documents/data.csv",header=True)from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark
from pyspark.sql.functions import *
import pandas as pdimport numpy as npfrom pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressorfrom pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluatorsc = pyspark.SparkContext()
sqlcontext = SQLContext(sc)pandas_df = pd.read_csv('rank.csv')(trainingData, testData) = pandas_df.randomSplit([0.7, 0.3])pandas_df['split'] = np.random.randn(pandas_df.shape[0], 1)msk = np.random.rand(len(pandas_df)) <= 0.7train = pandas_df[msk]
test = pandas_df[~msk]s_df = sqlcontext.createDataFrame(train)trainingData=s_df.rdd.map(lambda x:(Vectors.dense(x[2:3]), x[1])).toDF(["features", "label"])featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(trainingData)dt = DecisionTreeRegressor(featuresCol="indexedFeatures")pipeline = Pipeline(stages=[featureIndexer, dt])model = pipeline.fit(trainingData)test_df = sqlcontext.createDataFrame(test)
testData=test_df.rdd.map(lambda x:(Vectors.dense(x[2:3]), x[1])).toDF(["features", "label"])
predictions = model.transform(testData)predictions.select("prediction", "label", "features").show(5)pandas 操作 HDFS
主要是通过 webhdfs最牛逼的 简直是深藏不露
https://github.com/ibis-project/ibisIbis: Python data analysis framework for Hadoop and SQL engines
一站到底Apache Impala (incubating)
Apache Kudu
Hadoop Distributed File System (HDFS)
PostgreSQL
MySQL (Experimental)
SQLite
Pandas DataFrames (Experimental)
Clickhouse
BigQuery
pip install ibis-framework
另外一个 也非常棒
https://github.com/RaRe-Technologies/smart_open>>> # stream lines from an S3 object
>>> for line in smart_open.smart_open('s3://mybucket/mykey.txt'):
...    print line>>> # using a completely custom s3 server, like s3proxy:
>>> for line in smart_open.smart_open('s3u://user:secret@host:port@mybucket/mykey.txt'):
...    print line>>> # you can also use a boto.s3.key.Key instance directly:
>>> key = boto.connect_s3().get_bucket("my_bucket").get_key("my_key")
>>> with smart_open.smart_open(key) as fin:
...     for line in fin:
...         print line>>> # can use context managers too:
>>> with smart_open.smart_open('s3://mybucket/mykey.txt') as fin:
...     for line in fin:
...         print line
...     fin.seek(0)  # seek to the beginning
...     print fin.read(1000)  # read 1000 bytes>>> # stream from HDFS
>>> for line in smart_open.smart_open('hdfs://user/hadoop/my_file.txt'):
...     print line>>> # stream from HTTP
>>> for line in smart_open.smart_open('http://example.com/index.html'):
...     print line>>> # stream from WebHDFS
>>> for line in smart_open.smart_open('webhdfs://host:port/user/hadoop/my_file.txt'):
...     print line>>> # stream content *into* S3 (write mode):
>>> with smart_open.smart_open('s3://mybucket/mykey.txt', 'wb') as fout:
...     for line in ['first line', 'second line', 'third line']:
...          fout.write(line + '\n')>>> # stream content *into* HDFS (write mode):
>>> with smart_open.smart_open('hdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
...     for line in ['first line', 'second line', 'third line']:
...          fout.write(line + '\n')>>> # stream content *into* WebHDFS (write mode):
>>> with smart_open.smart_open('webhdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
...     for line in ['first line', 'second line', 'third line']:
...          fout.write(line + '\n')>>> # stream from/to local compressed files:
>>> for line in smart_open.smart_open('./foo.txt.gz'):
...    print line>>> with smart_open.smart_open('/home/radim/foo.txt.bz2', 'wb') as fout:
...    fout.write("some content\n")
另外一个也很牛逼
https://github.com/spotify/snakebiteSnakebite is a python library that provides a pure python HDFS client and a wrapper around Hadoops minicluster. The client uses protobuf for communicating with the NameNode and comes in the form of a library and a command line interface. Currently, the snakebite client supports most actions that involve the Namenode and reading data from DataNodes.这个也不错
https://github.com/crs4/pydoop
官方网 http://crs4.github.io/pydoop/下面的也可以的https://github.com/mtth/hdfs官网
https://hdfscli.readthedocs.io/en/latest/这个也超级 叼哦https://github.com/HariSekhon/pytools
下面的这个比较 新
https://github.com/dask/hdfs3
https://hdfs3.readthedocs.io/en/latest/from hdfs.client import Client
client = Client("http://host6.cloud.sinocbd.com:50070/")  # 50070: Hadoop默认namenode
dir(client)
# 其中用到的方法有:
# walk() 类似os.walk,返回值也是包含(路径,目录名,文件名)元素的数组,每层迭代。
# read() 类似file.read,官方文档的说法是client.read必须在with块里使用:
# path=[]
# for i in client.walk('/tempfiles/temp',depth=1):
#     for item in i:
#      path.append(item)
#      print(item)
# print(path)
with client.read('/tempfiles/1.csv', encoding='gbk') as fs:content = fs.read()print(content)
import webhdfspy
import pandas as pd
webHDFS = webhdfspy.WebHDFSClient("host6.cloud.sinocbd.com", 50070,username='root')
data=pd.DataFrame(webHDFS.listdir('/'))
print(data)
pathlist=data['pathSuffix']
for i in pathlist:path="/"+pathlist# print(path)# print(webHDFS.listdir(path))
import os
import picklefrom pathlib import PurePathimport hdfs3
import pandas as pdfrom ufuncs.storage.utils import check_abs_pathdef hdfs_chmod_dirs(path, *, permission_code, raise_errors=False, hdfs3_conn=None):"""Try to change permissions of each part of the path.Args:path (str): Path in HDFSpermission_code (int/str): Permission to set on each part of thepath (eg. 777, 775).hdfs3_conn (hdfs3.core.HDFileSystem): HDFS connector.Raises:IOError: If not possible to change permission of a path part."""check_abs_path(path)hdfs = hdfs3_conn if hdfs3_conn else hdfs3.HDFileSystem()# change permissions starting with top dir_path = '/'for d in PurePath(path).parts[1:]:_path = os.path.join(_path, d)try:hdfs.chmod(_path, int(str(permission_code), 8))except IOError as e:if raise_errors:raise edef hdfs_put_object(obj, storage_path,*, permission_code=755, overwrite=True, hdfs_conn=None):"""Store a python object to HDFS in pickle file.Args:obj: Python object.storage_path (str): HDFS full path of the file to write to.permission_code (int/str): Permission to set on the pickle file(eg. 777, 775). Defaults to 755.overwrite (bool): Overwrite if file already exists.Defaults to True.hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.Raises:FileExistsError: If file exists and overwrite is set to False."""# create connector if not existshdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()if not overwrite and hdfs.exists(storage_path):raise FileExistsError("HDFS file '{}' already exists. ""Argument overwrite is {}.".format(storage_path, overwrite))hdfs.mkdir(os.path.dirname(storage_path))with hdfs.open(storage_path, 'wb') as f:pickle.dump(obj, f)hdfs.chmod(storage_path, int(str(permission_code), 8))def hdfs_get_object(storage_path, *, hdfs_conn=None):"""Retrieve a python object from a pickle file in HDFS.Args:storage_path (str): HDFS full path of the file to write to.hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.Returns:The python object that was loaded from the pickle file.Raises:NameError: If the object's class is not defined in the namespace.FileNotFoundError: If file is not found."""hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()try:with hdfs.open(storage_path, 'rb') as f:obj = pickle.load(f)except IOError as _:raise FileNotFoundError("No such file or directory in HDFS: '{}'.".format(storage_path))except AttributeError as _:raise NameError("Pickle file object class not found in the namespace.")return objdef hdfs_df2csv(df, storage_path, *,permission_code=755, overwrite=True, hdfs_conn=None, **kwargs):"""Save pandas dataframe to csv in HDFS.The kwargs are used to represent any argument from the knownpandas.DataFrame.to_csv function.Args:df (pandas.DataFrame): Dataframe to write as csvpermission_code (int/str): Permission to set on the pickle file(eg. 777, 775). Defaults to 755.overwrite (bool): Overwrite if file already exists.Defaults to True.hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.Raises:TypeError: If df is not a pandas DataFrameFileExistsError: If file exists and overwrite is set to False."""if not isinstance(df, pd.DataFrame):raise TypeError("Expected pandas Dataframe, got {}".format(type(df).__name__))check_abs_path(storage_path)# make hdfs connectionhdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()# check if file to overriteif not overwrite and hdfs.exists(storage_path):raise FileExistsError("HDFS file '{}' already exists. ""Argument overwrite is {}.".format(storage_path, overwrite))# make directorieshdfs.mkdir(os.path.dirname(storage_path))# write csv bytes to HDFS filewith hdfs.open(storage_path, 'wb') as f:f.write(df.to_csv(**kwargs))# change permissionhdfs.chmod(storage_path, int(str(permission_code), 8))def hdfs_csv2df(storage_path, *, hdfs_conn=None, **kwargs):"""Read .csv from HDFS into a pandas dataframe.The kwargs are used to represent any argument from the knownpandas.DataFrame.read_csv function.Args:storage_path (str): Location of .csv file in HDFShdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.Returns:pd.DataFrame: Dataframe with .csv data"""check_abs_path(storage_path)hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()with hdfs.open(storage_path, 'rb') as f:df = pd.read_csv(f, **kwargs)return df
import warnings
warnings.filterwarnings('ignore')import sys
import random
import numpy as npfrom sklearn import linear_model, cross_validation, metrics, svm
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScalerimport pandas as pd
import matplotlib.pyplot as plt
%matplotlib inlineimport pydoop.hdfs as hdfsdef read_csv_from_hdfs(path, cols, col_types=None):files = hdfs.ls(path);pieces = []for f in files:fhandle = hdfs.open(f)pieces.append(pd.read_csv(fhandle, names=cols, dtype=col_types))fhandle.close()return pd.concat(pieces, ignore_index=True)
import os
import pytestimport fastparquet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pandas.util.testing as tmdef hdfs_test_client(driver='libhdfs'):host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')user = os.environ['ARROW_HDFS_TEST_USER']try:port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500))except ValueError:raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ''an integer')return pa.HdfsClient(host, port, user, driver=driver)def test_fastparquet_read_with_hdfs():fs = hdfs_test_client()df = tm.makeDataFrame()table = pa.Table.from_pandas(df)path = '/tmp/testing.parquet'with fs.open(path, 'wb') as f:pq.write_table(table, f)parquet_file = fastparquet.ParquetFile(path, open_with=fs.open)result = parquet_file.to_pandas()tm.assert_frame_equal(result, df)
python 操作 flink

python操作hive和hive_sql语句相关推荐

  1. python mysql返回,python操作mysql数据-执行语句返回值直接返回字典类型

    fetchall()将结果放在二维数组里面,每一行的结果在元组里面 import pymysql def export(table_name): conn =pymysql.connect(host ...

  2. 【阿尼亚喜欢BigData】“红亚杯”数据分析进阶—使用Python操作Hive专题赛——满分解析③

    大家好,喜欢Bigdata的阿尼亚来了!希望大家会喜欢阿尼亚的文章!!哇酷哇酷!!! 本次为师傅们带来的是"红亚杯"数据分析进阶-使用Python操作Hive专题赛--满分解析系列 ...

  3. 【阿尼亚喜欢BigData】“红亚杯”数据分析进阶—使用Python操作Hive专题赛——满分解析②

    大家好,喜欢Bigdata的阿尼亚来了!希望大家会喜欢阿尼亚的文章!!哇酷哇酷!!! 本次为师傅们带来的是"红亚杯"数据分析进阶-使用Python操作Hive专题赛--满分解析系列 ...

  4. python操作hive数据库代码_windows下怎么用python连接hive数据库

    展开全部 由于版本的不同,Python 连接 Hive 的方式也就不一样. 在网上搜索关键字 python hive 的时候可以找到一些解决方案.大部分是这3231313335323631343130 ...

  5. python操作hive数据库代码_python导出hive数据表的schema实例代码

    本文研究的主要问题是python语言导出hive数据表的schema,分享了实现代码,具体如下. 为了避免运营提出无穷无尽的查询需求,我们决定将有查询价值的数据从mysql导入hive中,让他们使用H ...

  6. python操作hive数据库代码_Python连接Hive操作数据库

    前言 客户端连接Hive需要使用HiveServer2.HiveServer2是HiveServer的重写版本,HiveServer不支持多个客户端的并发请求.当前HiveServer2是基于Thri ...

  7. python操作hive表_python处理数据,存进hive表的方法

    {"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],&q ...

  8. python数据导入hive_Python操作HIve,将数据插入到Mysql

    Python操作HIve,将数据插入到Mysql import sys from hive_service import ThriftHive from hive_service.ttypes imp ...

  9. hive表信息查询:查看表结构、表操作、建表语句

    问题导读: 1.如何查看hive表结构? 2.如何查看表结构信息? 3.如何查看分区信息? 4.哪个命令可以模糊搜索表 28.怎么查询创建表语句 1.hive模糊搜索表 show tables lik ...

最新文章

  1. Python学习之数组
  2. python 数据分析学什么-python数据分析学什么?python数据分析入门
  3. boost::fusion::as_list用法的测试程序
  4. 成都优步uber司机第四组奖励政策
  5. 画面风格写实的飞鸽传书
  6. day 45 SQLAlchemy,和增删查改
  7. 使用 Preload/Prefetch 优化
  8. [洛谷P2384]最短路
  9. mobile webiste 中的css的font-size em及line-height等换算
  10. 设计模式笔记(15)---命令模式(行为型)
  11. Python基础知识笔记(二)
  12. 【离散数学】图论 第七章(8) 运输网络、流和割、Ford-Fulkerson定理
  13. cplex的下载、安装、IDE编程及相关问题解决
  14. Algorithm:数学建模大赛(CUMCM/NPMCM)之全国大学生数模竞赛简介 相关书籍、文章推荐等详细攻略
  15. STM32 串口程序下载
  16. c语言中的圆方程,高中圆的方程练习题
  17. NeurIPS十年高引学者TOP100榜单发布!这些大牛值得膜拜!
  18. 华图教育_南京_前端实习面试
  19. 储能系统双向DCDC变换器蓄电池充放电仿真模型有buck模式 储能系统双向
  20. echarts中国地图飞线

热门文章

  1. android多行文字正中间显示,Android自定义View五(绘制文本大小、多行多列居中)...
  2. 基于Stm32F1和Openmv的热成像人脸跟随测温仪
  3. 二维对流扩散方程 差分 matlab,扩散方程的高精度加权差分格式
  4. python 编程中字典排序的方法
  5. 探索“华为”的研发项目管理精髓
  6. vue项目中由于eslint检查太严谨而报错的bug,怎么解决删除eslint
  7. 计算机动听的音乐响起来,动听音乐响起来.docx
  8. 数据可视化干货:使用pandas和seaborn制作炫酷图表(附代码)
  9. 14201771010119穷吉
  10. Java job interview:项目架构研发使用Apache Kylin搭建企业级开源大数据分析平台