python消费datahub_Datahub Python SDK入门手册
前言
DataHub是 MaxCompute 提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能,让您可以轻松构建基于流式数据的分析和应用。DataHub 可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到 DataHub 的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。用户编写Datahub应用程序最简单直接的方式就是基于Datahub SDK进行,目前Datahub官方提供的SDK包括C++ SDK和Java SDK,随着越来越多的Pythoner使用Datahub,Python版本Datahub SDK需求量也日益上升,这里就告诉各位Pythoner们一个好消息,Datahub官方Python SDK Beta正式Release(Github地址),使用非常简单,这里做个入门介绍,大家如有任何疑问随时在Github上提问留言。
安装
快速安装
$ sudo pip install pydatahub
源码安装
$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install
安装验证
$ python -c "from datahub import DataHub"
如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!
基本概念
准备工作
访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供访问的服务地址。
创建Project
初始化Datahub
import sys
import traceback
from datahub import DataHub
from datahub.utils import Configer
from datahub.models import Topic, RecordType, FieldType, RecordSchema, BlobRecord, TupleRecord, CursorType
from datahub.errors import DatahubException, ObjectAlreadyExistException
access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint)
Topic操作
Tuple Topic
Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:
类型
含义
值域
Bigint
8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。
-9223372036854775807 ~ 9223372036854775807
String
字符串,只支持UTF-8编码。
单个String列最长允许1MB。
Boolean
布尔型。
可以表示为True/False,true/false, 0/1
Double
8字节双精度浮点数。
-1.0 10308 ~ 1.0 10308
TimeStamp
时间戳类型
表示到微秒的时间戳类型
创建示例
topic = Topic(name=topic_name)
topic.project_name = project_name
topic.shard_count = 3
topic.life_cycle = 7
topic.record_type = RecordType.TUPLE
topic.record_schema = RecordSchema.from_lists(['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'], [Fie
ldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
try:
dh.create_topic(topic)
print "create topic success!"
print "=======================================\n\n"
except ObjectAlreadyExistException, e:
print "topic already exist!"
print "=======================================\n\n"
except Exception, e:
print traceback.format_exc()
sys.exit(-1)
Blob Topic
Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。
topic = Topic(name=topic_name)
topic.project_name = project_name
topic.shard_count = 3
topic.life_cycle = 7
topic.record_type = RecordType.BLOB
try:
dh.create_topic(topic)
print "create topic success!"
print "=======================================\n\n"
except ObjectAlreadyExistException, e:
print "topic already exist!"
print "=======================================\n\n"
except Exception, e:
print traceback.format_exc()
sys.exit(-1)
数据发布/订阅
获取Shard列表
list_shards接口获取topic下的所有shard
shards = dh.list_shards(project_name, topic_name)
返回结果是一个List对象,每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息
发布数据
put_records接口向一个topic发布数据
failed_indexs = dh.put_records(project_name, topic_name, records)
其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为写入失败记录的数组下标
写入Tuple类型Record示例
try:
# block等待所有shard状态ready
dh.wait_shards_ready(project_name, topic_name)
print "shards all ready!!!"
print "=======================================\n\n"
topic = dh.get_topic(topic_name, project_name)
print "get topic suc! topic=%s" % str(topic)
if topic.record_type != RecordType.TUPLE:
print "topic type illegal!"
sys.exit(-1)
print "=======================================\n\n"
shards = dh.list_shards(project_name, topic_name)
for shard in shards:
print shard
print "=======================================\n\n"
records = []
record0 = TupleRecord(schema=topic.record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
record0.shard_id = shards[0].shard_id
record0.put_attribute('AK', '47')
records.append(record0)
record1 = TupleRecord(schema=topic.record_schema)
record1['bigint_field'] = 2
record1['string_field'] = 'yc2'
record1['double_field'] = 10.02
record1['bool_field'] = False
record1['time_field'] = 1455869335000011
record1.shard_id = shards[1].shard_id
records.append(record1)
record2 = TupleRecord(schema=topic.record_schema)
record2['bigint_field'] = 3
record2['string_field'] = 'yc3'
record2['double_field'] = 10.03
record2['bool_field'] = False
record2['time_field'] = 1455869335000013
record2.shard_id = shards[2].shard_id
records.append(record2)
failed_indexs = dh.put_records(project_name, topic_name, records)
print "put tuple %d records, failed list: %s" %(len(records), failed_indexs)
# failed_indexs如果非空最好对failed record再进行重试
print "=======================================\n\n"
except DatahubException, e:
print traceback.format_exc()
sys.exit(-1)
else:
sys.exit(-1)
获取cursor
获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIME
OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record
LATEST: 表示获取的cursor指向当前最新的record
SYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record
cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, shard_id)
通过get_cursor接口获取用于读取指定位置之后数据的cursor
订阅数据
从指定shard读取数据,需要指定从哪个Cursor开始读,并指定读取的上限数据条数,如果从Cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。
dh.get_records(topic, shard_id, cursor, 10)
消费Tuple类型Record示例
try:
# block等待所有shard状态ready
dh.wait_shards_ready(project_name, topic_name)
print "shards all ready!!!"
print "=======================================\n\n"
topic = dh.get_topic(topic_name, project_name)
print "get topic suc! topic=%s" % str(topic)
if topic.record_type != RecordType.TUPLE:
print "topic type illegal!"
sys.exit(-1)
print "=======================================\n\n"
cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, '0')
while True:
(record_list, record_num, next_cursor) = dh.get_records(topic, '0', cursor, 10)
for record in record_list:
print record
if 0 == record_num:
time.sleep(1)
cursor = next_cursor
except DatahubException, e:
print traceback.format_exc()
sys.exit(-1)
else:
sys.exit(-1)
结尾
python消费datahub_Datahub Python SDK入门手册相关推荐
- Datahub Python SDK入门手册
前言 DataHub是 MaxCompute 提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能,让您可以轻松构建基于 ...
- python消费datahub_DataHub使用指南-阿里云开发者社区
快速入门教程 1.开通DataHub ? 使用DataHub的第一步,首先点击开通DataHub 2.创建Project和 Topic 创建Topic方式解读,Tuple还是Blob? Tuple支持 ...
- python消费datahub_datahub消费数据
12月17日更新 请问下同时消费多个topic的情况下,在richmap里面可以获取到当前消息所属的topic吗? 各位大佬,你们实时都是怎样重跑数据的? 有木有大神知道Flink能否消费多个kafk ...
- python调用阿里云sdk入门(hello world)
关于Python及pycharm的安装参考:1. python+pycharm 安装及测试_Hehuyi_In的博客-CSDN博客_pycharm安装成功测试 本文直接介绍如何调用阿里云sdk访问云上 ...
- Python官方入门手册等你领取!
Python入门指南 一 Python世界 Python 是一门简单易学且功能强大的编程语言.它拥有高效的高级数据结构,并且能够用简单而又高效的方式进行面向对象编程.Python 优雅的语法和动态类型 ...
- aws python库_适用于Alexa的新AWS Python SDK入门指南
aws python库 by Ralu Bolovan 由Ralu Bolovan 适用于Alexa的新AWS Python SDK入门指南 (A Beginner's guide to the ne ...
- Python语言入门手册|这些你要知道
随着计算机的快速发展与普及,如今,几乎每家每户都有一台甚至两台计算机.也因此,计算机编程这一学科正逐渐被重视.相信只要会使用计算机的人,多少都听说过这几个常见计算机语言:比如C++,Java,HTML ...
- python能调用身份证读卡器吗_最近的项目中用到读卡器,用的华视身份证阅读器,附上SDK使用手册...
最近的项目中用到读卡器,用的华视身份证阅读器,附上SDK使用手册 1.定义 应用函数开发包含下列文件: termb.dll API函数的动态联接库 sdtapi.dll 内部动态库 ...
- 日志服务Python消费组实战(三):实时跨域监测多日志库数据
解决问题 使用日志服务进行数据处理与传递的过程中,你是否遇到如下监测场景不能很好的解决: 特定数据上传到日志服务中需要检查数据内的异常情况,而没有现成监控工具? 需要检索数据里面的关键字,但数据没有建 ...
最新文章
- 在CentOS 6.8 x86_64上安装nginx 1.10.3
- 学python需要什么文化基础-和尧名大叔一起从0开始学Python编程-循环
- python ‘%r‘或者‘{!r}‘的意思
- 喜报丨神策数据荣获“2021 年度金屏奖大屏应用创新奖”
- 基于图像切换器(imageSwitcher)的支持动画的图片浏览器
- 项目管理软件伙伴https://www.huobanyun.cn/
- OTSU_图像二值化分割阈值的算法
- Creating a Pager Control for ASP.NET以及Dino Esposito 分页组件的一个 Bug
- 让大家信任自己,做个行为和语言上都没黑盒子的技术人员(转)
- Android之EditText练习
- IE浏览器“SEC7113: CSS 因 Mime 类型不匹配而被忽略”问题的解决方法
- ruby设计模式之观察者模式2————更加一般化的观察者模式
- 程序员必备的一些数学基础知识
- html个人中心源代码,HTML用户注册页面设置源码
- 测试低频噪音软件,低频噪音检测
- Selenium和Appium教程合集
- 延时等待的gcode
- 语音信号处理 —— 笔记(一)音频信号处理
- 云e办学习笔记(二十七)邮件自动发送功能实现
- 资深行业专家王煜全的演讲:“移动互联网中的产品创新机会”