前言

DataHub是 MaxCompute 提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能,让您可以轻松构建基于流式数据的分析和应用。DataHub 可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到 DataHub 的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。用户编写Datahub应用程序最简单直接的方式就是基于Datahub SDK进行,目前Datahub官方提供的SDK包括C++ SDK和Java SDK,随着越来越多的Pythoner使用Datahub,Python版本Datahub SDK需求量也日益上升,这里就告诉各位Pythoner们一个好消息,Datahub官方Python SDK Beta正式Release(Github地址),使用非常简单,这里做个入门介绍,大家如有任何疑问随时在Github上提问留言。

安装

快速安装

$ sudo pip install pydatahub

源码安装

$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git

$ cd aliyun-datahub-sdk-python

$ sudo python setup.py install

安装验证

$ python -c "from datahub import DataHub"

如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!

基本概念

准备工作

访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供访问的服务地址。

创建Project

初始化Datahub

import sys

import traceback

from datahub import DataHub

from datahub.utils import Configer

from datahub.models import Topic, RecordType, FieldType, RecordSchema, BlobRecord, TupleRecord, CursorType

from datahub.errors import DatahubException, ObjectAlreadyExistException

access_id = ***your access id***

access_key = ***your access key***

endpoint = ***your datahub server endpoint***

dh = DataHub(access_id, access_key, endpoint)

Topic操作

Tuple Topic

Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:

类型

含义

值域

Bigint

8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。

-9223372036854775807 ~ 9223372036854775807

String

字符串,只支持UTF-8编码。

单个String列最长允许1MB。

Boolean

布尔型。

可以表示为True/False,true/false, 0/1

Double

8字节双精度浮点数。

-1.0 10308 ~ 1.0 10308

TimeStamp

时间戳类型

表示到微秒的时间戳类型

创建示例

topic = Topic(name=topic_name)

topic.project_name = project_name

topic.shard_count = 3

topic.life_cycle = 7

topic.record_type = RecordType.TUPLE

topic.record_schema = RecordSchema.from_lists(['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'], [Fie

ldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])

try:

dh.create_topic(topic)

print "create topic success!"

print "=======================================\n\n"

except ObjectAlreadyExistException, e:

print "topic already exist!"

print "=======================================\n\n"

except Exception, e:

print traceback.format_exc()

sys.exit(-1)

Blob Topic

Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。

topic = Topic(name=topic_name)

topic.project_name = project_name

topic.shard_count = 3

topic.life_cycle = 7

topic.record_type = RecordType.BLOB

try:

dh.create_topic(topic)

print "create topic success!"

print "=======================================\n\n"

except ObjectAlreadyExistException, e:

print "topic already exist!"

print "=======================================\n\n"

except Exception, e:

print traceback.format_exc()

sys.exit(-1)

数据发布/订阅

获取Shard列表

list_shards接口获取topic下的所有shard

shards = dh.list_shards(project_name, topic_name)

返回结果是一个List对象,每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息

发布数据

put_records接口向一个topic发布数据

failed_indexs = dh.put_records(project_name, topic_name, records)

其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为写入失败记录的数组下标

写入Tuple类型Record示例

try:

# block等待所有shard状态ready

dh.wait_shards_ready(project_name, topic_name)

print "shards all ready!!!"

print "=======================================\n\n"

topic = dh.get_topic(topic_name, project_name)

print "get topic suc! topic=%s" % str(topic)

if topic.record_type != RecordType.TUPLE:

print "topic type illegal!"

sys.exit(-1)

print "=======================================\n\n"

shards = dh.list_shards(project_name, topic_name)

for shard in shards:

print shard

print "=======================================\n\n"

records = []

record0 = TupleRecord(schema=topic.record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])

record0.shard_id = shards[0].shard_id

record0.put_attribute('AK', '47')

records.append(record0)

record1 = TupleRecord(schema=topic.record_schema)

record1['bigint_field'] = 2

record1['string_field'] = 'yc2'

record1['double_field'] = 10.02

record1['bool_field'] = False

record1['time_field'] = 1455869335000011

record1.shard_id = shards[1].shard_id

records.append(record1)

record2 = TupleRecord(schema=topic.record_schema)

record2['bigint_field'] = 3

record2['string_field'] = 'yc3'

record2['double_field'] = 10.03

record2['bool_field'] = False

record2['time_field'] = 1455869335000013

record2.shard_id = shards[2].shard_id

records.append(record2)

failed_indexs = dh.put_records(project_name, topic_name, records)

print "put tuple %d records, failed list: %s" %(len(records), failed_indexs)

# failed_indexs如果非空最好对failed record再进行重试

print "=======================================\n\n"

except DatahubException, e:

print traceback.format_exc()

sys.exit(-1)

else:

sys.exit(-1)

获取cursor

获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIME

OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record

LATEST: 表示获取的cursor指向当前最新的record

SYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record

cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, shard_id)

通过get_cursor接口获取用于读取指定位置之后数据的cursor

订阅数据

从指定shard读取数据,需要指定从哪个Cursor开始读,并指定读取的上限数据条数,如果从Cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。

dh.get_records(topic, shard_id, cursor, 10)

消费Tuple类型Record示例

try:

# block等待所有shard状态ready

dh.wait_shards_ready(project_name, topic_name)

print "shards all ready!!!"

print "=======================================\n\n"

topic = dh.get_topic(topic_name, project_name)

print "get topic suc! topic=%s" % str(topic)

if topic.record_type != RecordType.TUPLE:

print "topic type illegal!"

sys.exit(-1)

print "=======================================\n\n"

cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, '0')

while True:

(record_list, record_num, next_cursor) = dh.get_records(topic, '0', cursor, 10)

for record in record_list:

print record

if 0 == record_num:

time.sleep(1)

cursor = next_cursor

except DatahubException, e:

print traceback.format_exc()

sys.exit(-1)

else:

sys.exit(-1)

结尾

python消费datahub_Datahub Python SDK入门手册相关推荐

  1. Datahub Python SDK入门手册

    前言 DataHub是 MaxCompute 提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能,让您可以轻松构建基于 ...

  2. python消费datahub_DataHub使用指南-阿里云开发者社区

    快速入门教程 1.开通DataHub ? 使用DataHub的第一步,首先点击开通DataHub 2.创建Project和 Topic 创建Topic方式解读,Tuple还是Blob? Tuple支持 ...

  3. python消费datahub_datahub消费数据

    12月17日更新 请问下同时消费多个topic的情况下,在richmap里面可以获取到当前消息所属的topic吗? 各位大佬,你们实时都是怎样重跑数据的? 有木有大神知道Flink能否消费多个kafk ...

  4. python调用阿里云sdk入门(hello world)

    关于Python及pycharm的安装参考:1. python+pycharm 安装及测试_Hehuyi_In的博客-CSDN博客_pycharm安装成功测试 本文直接介绍如何调用阿里云sdk访问云上 ...

  5. Python官方入门手册等你领取!

    Python入门指南 一 Python世界 Python 是一门简单易学且功能强大的编程语言.它拥有高效的高级数据结构,并且能够用简单而又高效的方式进行面向对象编程.Python 优雅的语法和动态类型 ...

  6. aws python库_适用于Alexa的新AWS Python SDK入门指南

    aws python库 by Ralu Bolovan 由Ralu Bolovan 适用于Alexa的新AWS Python SDK入门指南 (A Beginner's guide to the ne ...

  7. Python语言入门手册|这些你要知道

    随着计算机的快速发展与普及,如今,几乎每家每户都有一台甚至两台计算机.也因此,计算机编程这一学科正逐渐被重视.相信只要会使用计算机的人,多少都听说过这几个常见计算机语言:比如C++,Java,HTML ...

  8. python能调用身份证读卡器吗_最近的项目中用到读卡器,用的华视身份证阅读器,附上SDK使用手册...

    最近的项目中用到读卡器,用的华视身份证阅读器,附上SDK使用手册 1.定义 应用函数开发包含下列文件: termb.dll      API函数的动态联接库 sdtapi.dll     内部动态库 ...

  9. 日志服务Python消费组实战(三):实时跨域监测多日志库数据

    解决问题 使用日志服务进行数据处理与传递的过程中,你是否遇到如下监测场景不能很好的解决: 特定数据上传到日志服务中需要检查数据内的异常情况,而没有现成监控工具? 需要检索数据里面的关键字,但数据没有建 ...

最新文章

  1. 在CentOS 6.8 x86_64上安装nginx 1.10.3
  2. 学python需要什么文化基础-和尧名大叔一起从0开始学Python编程-循环
  3. python ‘%r‘或者‘{!r}‘的意思
  4. 喜报丨神策数据荣获“2021 年度金屏奖大屏应用创新奖”
  5. 基于图像切换器(imageSwitcher)的支持动画的图片浏览器
  6. 项目管理软件伙伴https://www.huobanyun.cn/
  7. OTSU_图像二值化分割阈值的算法
  8. Creating a Pager Control for ASP.NET以及Dino Esposito 分页组件的一个 Bug
  9. 让大家信任自己,做个行为和语言上都没黑盒子的技术人员(转)
  10. Android之EditText练习
  11. IE浏览器“SEC7113: CSS 因 Mime 类型不匹配而被忽略”问题的解决方法
  12. ruby设计模式之观察者模式2————更加一般化的观察者模式
  13. 程序员必备的一些数学基础知识
  14. html个人中心源代码,HTML用户注册页面设置源码
  15. 测试低频噪音软件,低频噪音检测
  16. Selenium和Appium教程合集
  17. 延时等待的gcode
  18. 语音信号处理 —— 笔记(一)音频信号处理
  19. 云e办学习笔记(二十七)邮件自动发送功能实现
  20. 资深行业专家王煜全的演讲:“移动互联网中的产品创新机会”

热门文章

  1. 网站/页面即时聊天客服功能
  2. JavaSE-基本程序设计结构(上)
  3. Activity 的 isFinishing()、isDestroy()
  4. 银行计算机上岸经验,在职银行汪,公务员考试零基础上岸经验,供参考噢
  5. 机器学习基础 之 特征选择和稀疏学习
  6. HTML解析库Gumbo的使用(二)
  7. 蓝桥杯基础题 字符串处理(持续更新中)
  8. java 复杂json的操作
  9. java的Scanner类、Random类、ArrayList类
  10. 计算机应用基础书在线看,计算机应用基础