本文章为原创,转载请注明出处!

登录平台:IOTOS®爱投斯物联中台

账号:iotos_test    密码:iotos123

代码地址:IOTOSDK-Python: IOTOS Python版本SDK,自带原生接口和采集引擎 (gitee.com)

目录

前言

驱动目的

适用范围

使用示例

驱动代码

驱动解析


  • 前言

窄带物联网(Narrow Band Internet of Things, NB-IoT)成为万物互联网络的一个重要分支。NB-IoT构建于蜂窝网络,只消耗大约180kHz的带宽,可直接部署于GSM网络、UMTS网络或LTE网络,以降低部署成本、实现平滑升级。NB-IoT的设备可以连接至三大运营商的IoT平台,方便了设备的对接

  • 驱动目的

将连接至电信平台的NB光电感烟火灾探测报警器的数据拿到并上云展示

  • 适用范围

品牌为JTY-GD-TCN,型号为2010或者1010的光电感烟火灾探测报警器 ,能将设备绑定在电信平台

  • 使用示例

  • 进入爱投斯中台,账号为iotos_test,密码为iotos123,在【创建模板】->【我的模板】,创建模板,填写相关信息,配置需要的参数,参数application为电信平台分配的唯一应用标识(在应用管理栏) ,参数key为电信平台分配的唯一应用标识(在应用管理栏),productId为电信平台的产品id ,deviceId为电信平台的设备id ,MasterKey在该设备所属产品的概况中可以查看

  • 进入爱投斯中台,账号为iotos_test,密码为iotos123,创建网关

  • 填好网关名称后点击确认

  • 创建设备示例点击【我的设备】 -> 【通信网关】 -> 【设备实例】->【创建设备】

  • 填写【设备名称】、选择刚刚创建的【模板驱动】和【通信网关】。

  • 创建数据点,点击【系统设备】 -> 【通信网关】 -> 【设备实例】 -> 【数据点】,并在【设备实例】下拉列表选择刚刚创建的设备实例

  • 点击右边的创建数据点,填写名称

  • 并在高级配置中配置数据点的相关标志,“烟雾报警时小区位置信息”,“防拆报警时小区位置信息”,“电池电量”这三个为接收数据的三种数据类型的第一个点,配置"private"属性用于驱动识别,分别配置为smoke_event,tamper_event,show_data,例如“烟雾报警时小区位置信息”这个点的配置:

  • 其他数据数据点中的属性需要填写"point"和"index",第一种类型的数据点中的point填写"电池电量"该数据的oid,第二类数据点中的point填写"防拆报警时小区位置信息"该数据点的oid,第三类数据点中的point填写"烟雾报警时小区位置信息"该数据点的oid,第一类数据点IMSI、ICCID、物理小区标识、硬件版本、电池电压、参考信号接收功率、终端型号、IMEI、厂家名称、信号与干扰加噪声比、无线信号覆盖等级、心跳周期、数据上传时间、软件版本、小区位置信息。index分别填写15、14、13、.........、1。第二类数据点防拆报警时间、防拆报警时信号强度、防拆报警时终端发射功率、防拆报警、防拆报警时信噪比。index分别填写5、4、3、2、1。第三类数据点烟雾报警时间、烟雾报警时信号强度、烟雾报警时终端发射功率、烟雾报警时信噪比、烟雾浓度、烟雾报警。index分别填写6、5、4、3、2、1。例如烟雾报警:

  • 在【系统设备】 -> 【通信网关】中找到刚才创建的网关,点击【高级】

  • 开启云网关,密码为账号密码

  • 点击 【系统设备】 -> 【通信网关】 -> 【设备实例】->【数据点】,选择刚才创建的设备实例

  • 即可查看数据已经上报成功

  • 驱动代码

#coding=utf-8
import sys
sys.path.append("..")
from driver import *
import time
import datetime
from urllib import urlencode
import urllib2
import base64
import hmac
import json
from hashlib import sha1
reload(sys)
sys.setdefaultencoding('utf8')#签名算法
def signature(key, application, timestamp, param, body):code = "application:" + application + "\n" + "timestamp:" + timestamp + "\n"for v in param:code += str(v[0]) + ":" + str(v[1]) + "\n"if (body is not None) and (body.strip()) :code += body + '\n'return base64.b64encode(hash_hmac(key, code, sha1))#数据加密
def hash_hmac(key, code, sha1):hmac_code = hmac.new(key.encode(), code.encode(), sha1)print("hmac_code=" + str(hmac_code.hexdigest()))return hmac_code.digest()#时间戳误差调整
def getTimeOffset(url):request = urllib2.Request(url)start = int(time.time() * 1000)response = urllib2.urlopen(request)end = int(time.time() * 1000)if response is not None:return int(int(response.headers['x-ag-timestamp']) - (end + start) / 2);else:return 0baseUrl = 'https://ag-api.ctwing.cn'
timeUrl = 'https://ag-api.ctwing.cn/echo'
offset = getTimeOffset(timeUrl)#发送http请求函数
def sendSDKRequest(path, head, param, body, version, application, MasterKey, key, method=None, isNeedSort=True,isNeedGetTimeOffset=False):paramList = []for key_value in param:paramList.append([key_value, param[key_value]])print("paramList=" + str(paramList))if (MasterKey is not None) and (MasterKey.strip()):paramList.append(['MasterKey', MasterKey])if isNeedSort:paramList = sorted(paramList)headers = {}if (MasterKey is not None) and (MasterKey.strip()):headers['MasterKey'] = MasterKeyheaders['application'] = applicationheaders['Date'] = str(datetime.datetime.now())headers['version'] = versiontemp = dict(param.items())if (MasterKey is not None) and (MasterKey.strip()):temp['MasterKey'] = MasterKeyurl_params = urlencode(temp)url = baseUrl + pathif (url_params is not None) and (url_params.strip()):url = url + '?' + url_paramsprint("url=" + str(url))global offsetif isNeedGetTimeOffset:offset = getTimeOffset(timeUrl)timestamp = str(int(time.time() * 1000) + offset)headers['timestamp'] = timestampsign = signature(key, application, timestamp, paramList, body)headers['signature'] = signheaders.update(head)print("headers : %s" % (str(headers)))if (body is not None) and (body.strip()):request = urllib2.Request(url=url, headers=headers, data=body.encode('utf-8'))else:request = urllib2.Request(url=url, headers=headers)if (method is not None):request.get_method = lambda: methodresponse = urllib2.urlopen(request)if ('response' in vars()):print("response.code: %d" % (response.code))return responseelse:return None#分页查询设备历史数据
def getDeviceStatusHisInPage(appKey, appSecret, body):path = '/aep_device_status/getDeviceStatusHisInPage'head = {}param = {}version = '20190928013337'application = appKeykey = appSecretresponse = sendSDKRequest(path, head, param, body, version, application, None, key, 'POST')if response is not None:return response.read()return None#查询事件上报
def QueryDeviceEventList(appKey, appSecret, MasterKey, body):path = '/aep_device_event/device/events'head = {}param = {}version = '20210327064751'application = appKeykey = appSecretresponse = sendSDKRequest(path, head, param, body, version, application, MasterKey, key, 'POST')if response is not None:return response.read()return Noneclass Project(IOTOSDriverI):def InitComm(self,attrs):self.setPauseCollect(False)self.setCollectingOneCircle(False)self.online(True)try:#获取中台配置的参数(必不可少)self.Nbapplication = self.sysAttrs['config']['param']['application']  # APPKEYself.Nbkey = self.sysAttrs['config']['param']['key']  # APPScretself.NbMasterKey=self.sysAttrs['config']['param']['MasterKey']  #MasterKey# self.NbMasterKey = "5fef44837aa54f42a7a49b73a4d95a15"self.NbproductId = self.sysAttrs['config']['param']['productId']self.NbdeviceId = self.sysAttrs['config']['param']['deviceId']except Exception,e:self.debug(u'获取参数失败!'+e.message)def Collecting(self, dataId):try:cfgtmp = self.data2attrs[dataId]['config']# 过滤掉非采集点if cfgtmp["param"] == "":return ()# 过滤采集点if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:return ()else:self.debug(self.name(dataId))#请求需要用的参数timearry = (datetime.datetime.now() + datetime.timedelta(days=-29)).timetuple()  # 当前时间减去29天后转化为timetuple的格式用于转换成timestamp格式begin_timestamp = str(int(time.mktime(timearry) * 1000) + offset)end_timestamp = str(int(time.time() * 1000) + offset)page_size = "20"page_timestamp = ""# 上传设备上传数据if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='show_data':body_HisInPage = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","begin_timestamp":"' + begin_timestamp + '","end_timestamp":"' + end_timestamp + '","page_size":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'#http请求,拿去设备上报的数据res = getDeviceStatusHisInPage(self.Nbapplication, self.Nbkey, body_HisInPage)data = json.loads(res)['deviceStatusList']self.debug(data)#拿取两类数据并且按序拼接成字典data_dic = {}for i in data:if 'terminal_type' in i:data_dic.update(i)break# self.debug(data_dic)self.debug(data_dic)#将字典类型的数据转换为元组data_list = []for key, value in data_dic.items():#拿到时间戳数据时转化为时间if key=='timestamp':value=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(int(value) / 1000))if type(value)==unicode:#unicode转为str类型value=value.encode('utf-8')data_list.append(value)self.debug(data_list)return tuple(data_list)# 上传事件上报的数据,多种数据上报类型if 'private' in self.data2attrs[dataId]['config']['param'] and self.data2attrs[dataId]['config']['param']['private'].find('event'):body_event = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","startTime":"' + begin_timestamp + '","endTime":"' + end_timestamp + '","pageSize":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'# 查询事件上报的数据event_res = QueryDeviceEventList(self.Nbapplication, self.Nbkey, self.NbMasterKey, body_event)event_data = json.loads(event_res)['result']['list']# self.debug(event_data)if self.data2attrs[dataId]['config']['param']['private'] == 'smoke_event':event_list = []for i in event_data:print i['eventContent']if 'smoke_state' in i['eventContent']:for key, value in eval(i['eventContent']).items():event_list.append(value)event_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i['createTime']) / 1000))event_list.append(event_time)self.debug(event_list)breakreturn tuple(event_list)if self.data2attrs[dataId]['config']['param']['private'] == 'tamper_event':event_list = []for i in event_data:print i['eventContent']if 'tamper_alarm' in i['eventContent']:for key, value in eval(i['eventContent']).items():event_list.append(value)event_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i['createTime']) / 1000))event_list.append(event_time)self.debug(event_list)break#不上传重复的时间数据return tuple(event_list)[1:]return ()except Exception ,e:self.debug(u'数据上传失败'+e.message)def Event_setData(self, dataId, value):return json.dumps({'code': 0, 'msg': '', 'data': ''})
  • 驱动解析

  • 编写环境为python2,首先先导入需要的依赖包
#coding=utf-8
import sys
sys.path.append("..")
from driver import *
import time
import datetime
from urllib import urlencode
import urllib2
import base64
import hmac
import json
from hashlib import sha1
reload(sys)
sys.setdefaultencoding('utf8')
  • 定义api请求的签名算法和发送http请求的参数
#签名算法
def signature(key, application, timestamp, param, body):code = "application:" + application + "\n" + "timestamp:" + timestamp + "\n"for v in param:code += str(v[0]) + ":" + str(v[1]) + "\n"if (body is not None) and (body.strip()) :code += body + '\n'return base64.b64encode(hash_hmac(key, code, sha1))#数据加密
def hash_hmac(key, code, sha1):hmac_code = hmac.new(key.encode(), code.encode(), sha1)print("hmac_code=" + str(hmac_code.hexdigest()))return hmac_code.digest()#时间戳误差调整
def getTimeOffset(url):request = urllib2.Request(url)start = int(time.time() * 1000)response = urllib2.urlopen(request)end = int(time.time() * 1000)if response is not None:return int(int(response.headers['x-ag-timestamp']) - (end + start) / 2);else:return 0baseUrl = 'https://ag-api.ctwing.cn'
timeUrl = 'https://ag-api.ctwing.cn/echo'
offset = getTimeOffset(timeUrl)#发送http请求函数
def sendSDKRequest(path, head, param, body, version, application, MasterKey, key, method=None, isNeedSort=True,isNeedGetTimeOffset=False):paramList = []for key_value in param:paramList.append([key_value, param[key_value]])print("paramList=" + str(paramList))if (MasterKey is not None) and (MasterKey.strip()):paramList.append(['MasterKey', MasterKey])if isNeedSort:paramList = sorted(paramList)headers = {}if (MasterKey is not None) and (MasterKey.strip()):headers['MasterKey'] = MasterKeyheaders['application'] = applicationheaders['Date'] = str(datetime.datetime.now())headers['version'] = versiontemp = dict(param.items())if (MasterKey is not None) and (MasterKey.strip()):temp['MasterKey'] = MasterKeyurl_params = urlencode(temp)url = baseUrl + pathif (url_params is not None) and (url_params.strip()):url = url + '?' + url_paramsprint("url=" + str(url))global offsetif isNeedGetTimeOffset:offset = getTimeOffset(timeUrl)timestamp = str(int(time.time() * 1000) + offset)headers['timestamp'] = timestampsign = signature(key, application, timestamp, paramList, body)headers['signature'] = signheaders.update(head)print("headers : %s" % (str(headers)))if (body is not None) and (body.strip()):request = urllib2.Request(url=url, headers=headers, data=body.encode('utf-8'))else:request = urllib2.Request(url=url, headers=headers)if (method is not None):request.get_method = lambda: methodresponse = urllib2.urlopen(request)if ('response' in vars()):print("response.code: %d" % (response.code))return responseelse:return None
  • 定义查询设备上传上来的数据以及上报的报警事件
#分页查询设备历史数据
def getDeviceStatusHisInPage(appKey, appSecret, body):path = '/aep_device_status/getDeviceStatusHisInPage'head = {}param = {}version = '20190928013337'application = appKeykey = appSecretresponse = sendSDKRequest(path, head, param, body, version, application, None, key, 'POST')if response is not None:return response.read()return None#查询事件上报
def QueryDeviceEventList(appKey, appSecret, MasterKey, body):path = '/aep_device_event/device/events'head = {}param = {}version = '20210327064751'application = appKeykey = appSecretresponse = sendSDKRequest(path, head, param, body, version, application, MasterKey, key, 'POST')if response is not None:return response.read()return None
  • 继承IOTOSDriverI这个类,对驱动进行初始化,获取中台设备实例中的参数,用于发送http请求使用
class Project(IOTOSDriverI):def InitComm(self,attrs):self.setPauseCollect(False)self.setCollectingOneCircle(False)self.online(True)try:#获取中台配置的参数(必不可少)self.Nbapplication = self.sysAttrs['config']['param']['application']  # APPKEYself.Nbkey = self.sysAttrs['config']['param']['key']  # APPScretself.NbMasterKey=self.sysAttrs['config']['param']['MasterKey']  #MasterKey# self.NbMasterKey = "5fef44837aa54f42a7a49b73a4d95a15"self.NbproductId = self.sysAttrs['config']['param']['productId']self.NbdeviceId = self.sysAttrs['config']['param']['deviceId']except Exception,e:self.debug(u'获取参数失败!'+e.message)
  • 采集函数,利用数据点的参数先排除非采集点,再利用设置的私有属性,当采集的数据点到含有私有属性的数据点时,进行相关变量的定义,并且查询设备上报的数据或者上报的报警信息,将其进行处理后以标准的格式输出,进行批量上传
    def Collecting(self, dataId):try:cfgtmp = self.data2attrs[dataId]['config']# 过滤掉非采集点if cfgtmp["param"] == "":return ()# 过滤采集点if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:return ()else:self.debug(self.name(dataId))#请求需要用的参数timearry = (datetime.datetime.now() + datetime.timedelta(days=-29)).timetuple()  # 当前时间减去29天后转化为timetuple的格式用于转换成timestamp格式begin_timestamp = str(int(time.mktime(timearry) * 1000) + offset)end_timestamp = str(int(time.time() * 1000) + offset)page_size = "20"page_timestamp = ""# 上传设备上传数据if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='show_data':body_HisInPage = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","begin_timestamp":"' + begin_timestamp + '","end_timestamp":"' + end_timestamp + '","page_size":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'#http请求,拿去设备上报的数据res = getDeviceStatusHisInPage(self.Nbapplication, self.Nbkey, body_HisInPage)data = json.loads(res)['deviceStatusList']self.debug(data)#拿取两类数据并且按序拼接成字典data_dic = {}for i in data:if 'terminal_type' in i:data_dic.update(i)break# self.debug(data_dic)self.debug(data_dic)#将字典类型的数据转换为元组data_list = []for key, value in data_dic.items():#拿到时间戳数据时转化为时间if key=='timestamp':value=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(int(value) / 1000))if type(value)==unicode:#unicode转为str类型value=value.encode('utf-8')data_list.append(value)self.debug(data_list)return tuple(data_list)# 上传事件上报的数据,多种数据上报类型if 'private' in self.data2attrs[dataId]['config']['param'] and self.data2attrs[dataId]['config']['param']['private'].find('event'):body_event = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","startTime":"' + begin_timestamp + '","endTime":"' + end_timestamp + '","pageSize":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'# 查询事件上报的数据event_res = QueryDeviceEventList(self.Nbapplication, self.Nbkey, self.NbMasterKey, body_event)event_data = json.loads(event_res)['result']['list']# self.debug(event_data)if self.data2attrs[dataId]['config']['param']['private'] == 'smoke_event':event_list = []for i in event_data:print i['eventContent']if 'smoke_state' in i['eventContent']:for key, value in eval(i['eventContent']).items():event_list.append(value)event_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i['createTime']) / 1000))event_list.append(event_time)self.debug(event_list)breakreturn tuple(event_list)if self.data2attrs[dataId]['config']['param']['private'] == 'tamper_event':event_list = []for i in event_data:print i['eventContent']if 'tamper_alarm' in i['eventContent']:for key, value in eval(i['eventContent']).items():event_list.append(value)event_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i['createTime']) / 1000))event_list.append(event_time)self.debug(event_list)break#不上传重复的时间数据return tuple(event_list)[1:]return ()except Exception ,e:self.debug(u'数据上传失败'+e.message)

IOTOS物联中台开发驱动支持NB-IoT光电感烟火灾探测报警器设备相关推荐

  1. IOTOS物联中台开发驱动支持中安易科智能门锁API 详解

    本文章为原创,转载请注明出处! 登录平台:IOTOS®爱投斯物联中台 账号:iotos_test    密码:iotos123 代码地址:IOTOSDK-Python: IOTOS Python版本S ...

  2. IOTOS物联中台Bacnet驱动开发实例

    本文章为原创,转载请注明出处! 登录平台:IOTOS®爱投斯物联中台 账号:iotos_test    密码:iotos123 代码地址:IOTOSDK-Python: IOTOS Python版本S ...

  3. IOTOS物联中台modbus驱动对接雅达电表设备

    本文章为原创,转载请注明出处! 登录平台:IOTOS®爱投斯物联中台 账号:iotos_test    密码:iotos123 代码地址:IOTOSDK-Python: IOTOS Python版本S ...

  4. IOTOS物联中台modbus驱动对接科士达精密空调设备

    本文章为原创,转载请注明出处! 登录平台:IOTOS®爱投斯物联中台 账号:iotos_test    密码:iotos123 代码地址:IOTOSDK-Python: IOTOS Python版本S ...

  5. IOTOS物联中台Modbus_Tcp驱动对接Wheelers控制器设备

    目录 前言 驱动目的 适用范围 使用示例 驱动源码 驱动解析 前言 Modbus协议是一种已广泛应用于当今工业控制领域的通用通讯协议.通过此协议,控制器相互之间.或控制器经由网络(如以太网)可以和其它 ...

  6. IOTOS物联中台从0到1开发modbus_rtu驱动 实例详解

    本文章为原创,转载请注明出处! 登录平台:IOTOS®爱投斯物联中台 账号:iotos_test    密码:iotos123 代码地址:IOTOSDK-Python: IOTOS Python版本S ...

  7. IOTOS物联中台非标modbus驱动对接易事特UPS电源设备

    本文章为原创,转载请注明出处! 登录平台:IOTOS®爱投斯物联中台 账号:iotos_test    密码:iotos123 代码地址:IOTOSDK-Python: IOTOS Python版本S ...

  8. 中移物联ML302开发板上手体验

    开始 中移物联网的ML302开发板是支持4G Cat.1网络的开发板,对于Cat.1这里就不再赘述,详细可以去官网了解一下. 接下来介绍中移物联网的ML302开发板以及具体的上手步骤,给那些刚拿到开发 ...

  9. 【4G模块】中移物联ML302 + GD32F407 使用支持MQTT协议AT指令接入阿里云物联网平台

    目录 〇.目的.GD32.ML302相关简介 0.目的: 1.ML302: 2.LTE Cat.1: 3.GD32F407 一.GD芯片创建工程写驱动:LED.串口4.4G模块 1.创建工程 2.LE ...

最新文章

  1. 第四范式联合浪潮商用机器发布AI一体机,接入AI像使用手机一样简单
  2. SolidEdge 工程图中如何标注尺寸公差
  3. 从PCB焊接角度谈画PCB图时应注意的问题
  4. 14.并发与异步 - 2.任务Task -《果壳中的c#》
  5. [新手及懒人适用]轻松恢复误Ghost的硬盘
  6. my-medium.cnf_您的手机如何打开medium.com-我将让门卫和图书管理员解释。
  7. html封装windows,windows 系统封装,打造一份属于自己的系统!
  8. python中sorted函数的用法_Python3 中sorted() 函数的用法
  9. 0.8.11版本ffmpeg一天移植将近完成。
  10. 不同系统可以用一个数据库服务器吗,同一个数据库 不同服务器吗
  11. 断点帧数测试软件,《幽灵行动:断点》PC版性能表现分析
  12. 解决 ifconfig: command not found
  13. ibatis Clob对象处理
  14. android单例模式代码,设计模式(一):Android 源码中的单例模式
  15. python correlation_相关性系数介绍+python代码实现 correlation analysis
  16. 软件项目成本估算的基本方法
  17. DaZeng:雪碧图(精灵图)的使用
  18. 买了北京亲子年票但没有小孩的朋友,接下来的一年我都给你安排好啦!!...
  19. Python爬虫——爬取网站的图片
  20. IDEA Maven配置,Tomcat配置

热门文章

  1. wangEditor编辑器使用
  2. 自己动手写一个印钞机 第一章
  3. word退格键不管用
  4. 相对直方图 绝对直方图_什么是直方图,如何使用它来改善照片?
  5. 名帖212 赵孟頫 行书《秋兴赋》
  6. 【过关斩将】zabbix你都监控哪些参数
  7. myBatis mapper.xml 文件共用
  8. Django报错之django.db.utils.InternalError: (1366, Incorrect string value: '\\xE7\\x94\\xA8
  9. 刘韧:记者无能才急着显摆自己
  10. 计算机性能和显卡的提升,电脑如何提高显卡性能?电脑提高显卡性能的图文教程...