本文作者:龙利民
企业介绍:

ofo 小黄车是一个无桩共享单车出行平台,缔造了“无桩单车共享”模式,致力于解决城市出行问题。用户只需在微信服务号或App输入车牌号,即可获得密码解锁用车,随取随用,随时随地,也可以共享自己的单车到 ofo 共享平台,获得所有 ofo 小黄车的终身免费使用权,以1换N。

我们在使用MaxCompute的时候,我们其实非常期望知道当前有多少任务在跑,哪些任务耗时长,哪些任务已经完成,并且能通过任务的logview来分析任务耗时长的原因。

任务状态监控

MaxCompute的任务状态分Running和Terminated, 其中Running是包含:正在运行和等待运行的两种状态,Terminated包含:完成、失败、cancel的任务三个状态。阿里云提供了获取上述2种状态的SDK函数,odps.list_instances(status=Running|Terminated, start_time=开始时间,结束时间)。为了实现秒级别更新任务状态我们可以用以下思路来实现。

1、对于已经running的任务,我们需要快速更新它的状态,有可能已经完成了;

2、不断获取新的任务状态。

我们用Mysql来记录任务的状态表设计如下:

CREATE TABLE maxcompute_task (id bigint(20) unsigned NOT NULL AUTO_INCREMENT,instanceid varchar(255) DEFAULT NULL comment '任务实例ID',logview varchar(1024) DEFAULT NULL comment 'logview链接,查看问题非常有用',start_time varchar(64) DEFAULT NULL comment '任务开始时间',end_time varchar(64) DEFAULT NULL comment '任务结束时间',cast_time varchar(32) DEFAULT NULL comment '耗时',project_name varchar(255) DEFAULT NULL comment '项目名',status varchar(64) DEFAULT NULL comment '任务状态',  PRIMARY KEY (id),  UNIQUE KEY instanceid (instanceid)) ENGINE=InnoDB DEFAULT CHARSET=utf8

下面的页面可以查看当前的任务耗时,开始时间,对超过1小时的任务颜色使用红色标注,并且能查看logview,还能对任务进行取消,非常方便。

我们来看看代码的实现:

!/usr/bin/env python

-- coding: utf-8 --

author: lemon

import time
import threading
import traceback
import datetime
from odps import ODPS
from dataflow import config
from libs.myconn import Cursor
from config import DBINFO_BI_MASTER
from libs import logger as _logger

g_table_name = "bi_maxcompute_task"

def save_task(instanceid, odps, mysqlconn):

# 保存任务状态到Mysql, 分别传入odps连接器和mysql连接器
instance = odps.get_instance(instanceid)
project_name = odps.project
status = instance.status.value
start_time = instance.start_time
end_time =  instance.end_timesql = "select logview,status from {0} where instanceid='{1}'".format(g_table_name, instanceid)sqlret = mysqlconn.fetchone(sql)
if sqlret and sqlret["status"] == "Terminated":return
if sqlret and sqlret["logview"] is not None:logview = sqlret["logview"]
else:logview = instance.get_logview_address()
start_time = start_time + datetime.timedelta(hours=8)
if status == "Running":end_time = datetime.datetime.now()
else:end_time = end_time + datetime.timedelta(hours=8)
cast_time = end_time - start_time
colname = "instanceid,start_time,end_time,cast_time,project_name,status,logview"
values = ",".join(["'{0}'".format(r) for r in [instanceid, str(start_time),str(end_time), cast_time, project_name, status,logview]])
sql = """replace into {0}({1}) values({2}) """.format(g_table_name, colname, values)
mysqlconn.execute(sql)

class MaxcomputeTask(threading.Thread):

# 获取所有任务def __init__(self, logger):threading.Thread.__init__(self)self.logger = loggerself.hour = 1self.status_conf = [("demo", "Running"), ("demo", "Terminated"),("demo1", "Running"), ("demo1","Terminated")]def run(self):# 建立mysql连接, 根据你的需要来使用self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)while True:try:self.start_more()time.sleep(10)except:self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)self.logger.error(traceback.format_exc())def start_more(self,):for params in self.status_conf:self.get_task(*params)def get_task(self, project_name, status):odps = ODPS(**config.ODPS_INFO)odps.project = project_namelist = odps.list_instances(status=status, start_time=time.time() - self.hour * 3600)self.logger.info("start {0} {1} ".format(project_name, status))for row in list:save_task(instanceid=str(row), odps=odps, mysqlconn=self.mysqlconn)self.logger.info( "end {0} {1}".format(project_name, status))

class MaxcomputeTaskRunning(threading.Thread):

# 更新running任务的状态def __init__(self, logger):threading.Thread.__init__(self)self.logger = loggerdef run(self):self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)while True:try:self.update_running()time.sleep(1)except:self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)self.logger.error(traceback.format_exc())def update_running(self):sql = "select instanceid, project_name from {0} where status='Running'".format(g_table_name)sqlret = self.mysqlconn.fetchall(sql)if not sqlret:returnself.logger.info("{1} running update length:{0}".format(len(sqlret), time.strftime("%Y-%m-%d %H:%M:%S") ))for row in sqlret:odps = ODPS(**config.ODPS_INFO)odps.project = row["project_name"]save_task(row["instanceid"], odps, self.mysqlconn)

if name == "__main__":

# logger是自己编写的日志工具类
logger = _logger.Logger("maxcompute_task.log").getLogger()
running = MaxcomputeTaskRunning(logger)
running.setDaemon(True)
running.start()task = MaxcomputeTask(logger)
task.start()

多任务执行

MaxCompute可以在命令行下运行,也可以用SDK,阿里云的集成环境跑任务等。很多时候我们面临的任务是非常多的,如何做一个多任务的代码执行器,也是经常遇到的问题。任务执行是一个典型的生产者和消费者的关系,生产者获取任务,消费者执行任务。这么做有2个好处。

1)任务执行的数量是需要可控的,如果同时运行的任务不可控势必对服务器资源造成冲击,
2)多机运行服务,避免单点故障,MaxCompute的任务是运行在云端的,可以通过instanceid获取到结果,此结果是保留7天的。

我大致贴一些我们在实际场景种的一些代码,生产者和消费者的代码:

class Consumer(threading.Thread):
def __init__(self, queue, lock):threading.Thread.__init__(self)self.queue = queueself.lock = lockself.timeout = 1800
def run(self):self.execute = Execute()logger.info("consumer %s start" % threading.current_thread().name)while G_RUN_FLAG:try:task = self.queue.get()self.execute.start(task)except:logger.error(traceback.format_exc())

class Producter(threading.Thread):

def __init__(self, queue, lock):threading.Thread.__init__(self)self.queue = queueself.lock = lockself.sleep_time = 30self.step_sleep_time = 5def run(self):self.mysqlconn_bi_master = Cursor.new(**config.DBINFO_BI_MASTER)logger.info("producter %s start" % threading.current_thread().name)while G_RUN_FLAG:if self.queue.qsize() >= QUEUE_SIZE:time.sleep(self.sleep_time)continue# TODOself.queue.put(task)time.sleep(self.step_sleep_time)

def main():

queue = Queue.LifoQueue(QUEUE_SIZE)
lock = threading.RLock()for _ in xrange(MAX_PROCESS_NUM):consumer = Consumer(queue, lock)consumer.setDaemon(True)consumer.start()producter = Producter(queue, lock)
producter.start()
producter.join()

def signal_runflag(sig, frame):

global G_RUN_FLAG
if sig == signal.SIGHUP:logger.info("receive HUP signal ")G_RUN_FLAG = False

if name == "__main__":

logger.info("execute run")
if platform.system() == "Linux":signal.signal(signal.SIGHUP, signal_runflag)
main()
logger.info("execute exit.")

Maxcompute实际执行时的代码:

 def _max_compute_run(self, taskid, sql):
    # 异步的方式执行hints = {'odps.sql.planner.mode': 'lot','odps.sql.ddl.odps2': 'true','odps.sql.preparse.odps2': 'lot','odps.service.mode': 'off','odps.task.major.version': '2dot0_demo_flighting','odps.sql.hive.compatible': 'true'}new_sql = "{0}".format(sql)instance = self.odps.run_sql(new_sql, hints=hints)#instance = self.odps.run_sql(sql)# 异步的方式执行# instance = self.odps.run_sql(sql)self._save_task_instance_id(taskid, instance.id)# 阻塞直到完成instance.wait_for_success()return instance.id

获取结果时的代码:

def instance_result(odps, instance_id):
# 通过instance_id 获取结果
instance = odps.get_instance(instance_id)
response = []
with instance.open_reader() as reader:raw_response = [r.values for r in reader]column_names = reader._schema.namesfor  line in raw_response:tmp = {}for i in range(len(line)):tmp[column_names[i]] = line[i]response.append(tmp)
return response

总结:

阿里云的MaxCompute是非常好用的云计算服务,它的更新和迭代速度都非常快,使用阿里云解放工程师的搭建基础服务的时间,让我们更多的专注业务,站在巨人的肩膀上聪明的干活。

MaxCompute的任务状态和多任务执行相关推荐

  1. 在使用win 7 无线承载网络时,启动该服务时,有时会提示:组或资源的状态不是执行请求操作的正确状态。 网上有文章指出,解决这个问题的方法是在设备管理器中启动“Microsoft托管网络虚拟适配

    在使用win 7 无线承载网络时,启动该服务时,有时会提示:组或资源的状态不是执行请求操作的正确状态. 网上有文章指出,解决这个问题的方法是在设备管理器中启动"Microsoft托管网络虚拟 ...

  2. WIN7无法启动承载网络,组或资源的状态不是执行请求操作的正确状态

    1.问题 WIN7无法启动承载网络,组或资源的状态不是执行请求操作的正确状态,以及无限网络共享客户端连接不上 2.解决 重启以下Windows服务试试: Wired AutoConfig WLAN A ...

  3. 无线承载网络“组或资源的状态不是执行请求操作的正确状态”解决方法

    在使用win 7 无线承载网络时,启动该服务时,有时会提示:组或资源的状态不是执行请求操作的正确状态. 网上有文章指出,解决这个问题的方法是在设备管理器中启动"Microsoft托管网络虚拟 ...

  4. 无法启动承载网络。 组或资源的状态不是执行请求操作的正确状态,解决办法。(转)...

    摘自:http://www.lihuoqing.cn/other/172.html 以前用的好好的,这段时间就出现以下情况: C:\windows\system32>netsh wlan sta ...

  5. 无法启动承载网络,组或资源的状态不是执行请求操作的正确状态

    [size=large]Win7设置wifi无线热点出现: 无法启动承载网络,组或资源的状态不是执行请求操作的正确状态的解决办法 1. 命令提示符输入netsh wlan show drivers [ ...

  6. 组或资源的状态不是执行请求操作的正确状态

    组或资源的状态不是执行请求操作的正确状态 Get-WindowsEdition -Path C:\Temp\offline\ 组或资源的状态不是执行请求操作的正确状态. TransmogProvide ...

  7. 无线承载网络 组或资源的状态不是执行请求操作的正确状态 解决方法

    在使用win 7 无线承载网络时,启动该服务时,有时会提示:组或资源的状态不是执行请求操作的正确状态. 网上有文章指出,解决这个问题的方法是在设备管理器中启动"Microsoft托管网络虚拟 ...

  8. cmd中执行netsh wlan start hostednetwork 无法启动承载网络。 组或资源的状态不是执行请求操作的正确状态。

    cmd中执行netsh wlan start hostednetwork 无法启动承载网络. 组或资源的状态不是执行请求操作的正确状态. 解决办法: 1.打开网络共享中心,点击"更改适配器& ...

  9. Win8开虚拟wifi ‘无法启动承载网络 组或资源的状态不是执行请求操作的正确状态“...

    第一步,首先我们点开开始按钮菜单,要右键以"管理员身份"打开CMD"命令提示符"并键入或者复制(粘贴)命令:netsh wlan show drivers 查看 ...

最新文章

  1. NOIP2002 均分纸牌
  2. 绝了!这个开源验证码项目,差点晚上瘾...
  3. Sass--@-Rules
  4. VS2010开发ribbon风格的程序
  5. Falsy Bouncer(第二种解决方式)
  6. 菜鸟创业记--第四天
  7. 一年带货2个亿,“小黄鸭”成国内最赚钱IP
  8. 深度学习中的梯度下降算法
  9. linux系统安装snort,linux下SNORT安装.doc
  10. 记一次重构:Android实践从MVC架构到MVP架构
  11. cookie登录_使用UserAgent和Cookie安全登录Facebook账号[安全买号必读]
  12. 大华平台显示归属服务器离线,大华报警联网系统方案
  13. 百度网盘——下载限速问题解决方案(油猴(Tampermonkey)+PanDownload网页版+IDM)
  14. 安卓手机怎么下载ins
  15. linux的fseek函数
  16. 从百草园到三味书屋 鲁迅
  17. C# System命名空间表
  18. 2017 百度之星 复赛 Valley Numer(数位dp)
  19. java aes ecb_java使用AES加密解密 AES-128-ECB加密
  20. 集合工具类 以及 几个小面试题(是我保存的)

热门文章

  1. 电脑硬盘是干什么用的_电脑硬盘位不够用?有了它就不担心、奥睿科硬盘柜体验...
  2. JavaWeb:过滤器Filter
  3. mysql存储加速_mysql存储过程加速
  4. python 中文字符串截取,Python实现针对含中文字符串的截取功能示例
  5. 华为交换机SSH和telnet登录配置
  6. 计算机网络:Socket网络通信底层数据传输
  7. 循环链表的插入和删除
  8. 混合开发的坑(7) ---输入文本时,键盘遮挡
  9. thymeleaf+bootstrap,onclick传参实现模态框中遇到的错误
  10. POJ3045 Cow Acrobats —— 思维证明