pipelines简介

pipelines是一个机器学习工作流的抽象概念,这个工作流可以小到函数的过程、也可以大到机器学习从数据加载、变换、清洗、特征构建、模型训练等多个环节。
在kubeflow中,该组件能以ui界面的方式记录、交互、反馈实验、任务和每一次运行。
pipelines各流程组件构建成功后,会依据事先定义好的组件依赖关系构建DAG(有向无环图)。
在pipelines构建各流程组件前,需要将对应流程的业务代码打包成docker镜像文件(kubeflow中运行业务代码均以容器的方式实现)
业务代码、各组件的关系可以参考如下的关系:

构建pipelines依赖kubeflow提供的SDK,目前官方文档中有比较详细几种构建组件和组件复用等方法,关于SDK的使用可以参考:pipelines sdk简介
目前提交运行pipelines有2种方法:
二者本质都是使用sdk编译pipelines组件

  1. 在notebook中使用sdk提交pipelines至服务中心,直接可以在ui中查看pipelines实验运行进度。
  2. 将pipelines组件打成zip包通过ui上传至服务中心,同样可以在ui查看实验运行进度。

kubeflow虽然开源了一段时间,但是一些功能与google云平台耦合依然较重,官方提供的例子在本地k8s集群中使用有诸多坑,也不方便新手理解。下面我将以tensorflow经典的手写数字识别算法为例,在本地k8s集群实践pipelines。

划分业务流程制作业务镜像

为了方便理解pipelines流程组件,我在这里将实验流程分成了数据加载、模型训练、测试数据预测三个环节(实际工作中你可以依据自己的喜好划分流程)
各流程的组件结构如下图所示:

在构建pipelines前先简单介绍一下三个环节的业务代码:
数据加载环节:
load_data.py

from __future__ import absolute_import, division, print_function, \unicode_literalsimport argparse
import numpy as np# load data
def load_data(path):with np.load(path) as f:x_train, y_train = f['x_train'], f['y_train']x_test, y_test = f['x_test'], f['y_test']return (x_train, y_train), (x_test, y_test)# do data transform
def transform(output_dir, file_name):x_train_name = 'x_train.npy'x_test_name = 'x_test.npy'y_train_name = 'y_train.npy'y_test_name = 'y_test.npy'(x_train, y_train), (x_test, y_test) = load_data(output_dir + file_name)print("### loading data done.")x_train, x_test = x_train / 255.0, x_test / 255.0np.save(output_dir + x_train_name, x_train)np.save(output_dir + x_test_name, x_test)np.save(output_dir + y_train_name, y_train)np.save(output_dir + y_test_name, y_test)print("### data transform done.")with open(output_dir + 'train_test_data.txt', 'w') as f:f.write(output_dir + x_train_name + ',')f.write(output_dir + x_test_name + ',')f.write(output_dir + y_train_name + ',')f.write(output_dir + y_test_name)print("### write train and test data name to: train_test_data.txt done.")def parse_arguments():"""Parse command line arguments."""parser = argparse.ArgumentParser(description='Kubeflow MNIST load data script')parser.add_argument('--data_dir', type=str, required=True, help='local file dir')parser.add_argument('--file_name', type=str, required=True, help='local file to be input')args = parser.parse_args()return argsdef run():args = parse_arguments()transform(args.data_dir, args.file_name)if __name__ == '__main__':run()

这个环节的业务代码主要实现了:
从本地读取源数据 -> 特征变换 -> 分割训练集 -> 存储数据集到本地

  • 代码中使用argparse,用户可以通过rest api的方式传入路径和数据参数运行代码。
  • 代码后续的运行将在容器中进行,在这里使用NFS挂载k8s集群路径作为文件存储的固定地址。
  • 代码中将分割数据集的绝对路径写入train_test_data.txt 为的是方便后续环节引用,同时便于与后续环节构成流程关系。

测试数据预测环节:
predict.py

from __future__ import absolute_import, division, print_function, \unicode_literalsimport argparse
import numpy as np
import pandas as pd
import tensorflow as tfdef predict(output_dir, model_file, data_file):"""all file use absolute dir:param output_dir::param model_file: `model.txt` absolute dir:param data_file: `train_test_data.txt` absolute dir:return:"""with open(model_file, 'r') as f:line = f.readline()model = tf.keras.models.load_model(line)with open(data_file, 'r') as f:line = f.readline()data_list = line.split(',')with open(data_list[1], 'rb') as f:x_test = np.load(f)with open(data_list[3], 'rb') as f:y_test = np.load(f)pre = model.predict(x_test)model.evaluate(x_test, y_test)df = pd.DataFrame(data=pre,columns=["prob_0", "prob_1", "prob_2", "prob_3", "prob_4", "prob_5", "prob_6", "prob_7", "prob_8","prob_9"])y_real = pd.DataFrame(data=y_test, columns=["real_number"])result = pd.concat([df, y_real], axis=1)result.to_csv(output_dir + 'result.csv')print("### save predict result file: result.csv")with open(output_dir + 'result.txt', 'w') as f:f.write(output_dir + 'result.csv')print("### write result path and name to: result.txt done.")def parse_arguments():"""Parse command line arguments."""parser = argparse.ArgumentParser(description='Kubeflow MNIST predict model script')parser.add_argument('--data_dir', type=str, required=True, help='local file dir')parser.add_argument('--model_file', type=str, required=True, help='a file write trained model absolute dir')parser.add_argument('--data_file', type=str, required=True, help='la file write train and test data absolute dir')args = parser.parse_args()return argsdef run():args = parse_arguments()predict(args.data_dir, args.model_file, args.data_file)if __name__ == '__main__':run()

这个环节的业务代码主要实现了:
依据train环节生成的 model.txt文件获取模型文件地址 -> 测试数据集预测 -> 模型性能评估 -> 保留预测结果文件

  • 代码中使用argparse,用户可以通过rest api的方式传入路径和数据参数运行代码。
  • 使用模型预测测试数据集,预测结果存储到集群挂载卷

制作业务代码镜像

构建pipelines组件前需要将业务代码打包成docker镜像,在这里以 load_data 组件为例介绍一下打包业务代码镜像:
准备好dockerfile:包含业务代码、打包依赖的基础镜像、程序入口
Dockerfile

FROM tensorflow/tensorflow:1.14.0-py3
ADD load_data.py .
ENTRYPOINT ["python", "load_data.py"]

使用dockerfile打包业务代码镜像:
build_image

#!/usr/bin/env bashdocker build -t mnist-load_data:v0.0.1 .

tips:依赖的tensorflow基础镜像,大家去dockerhub不限版本随意拉取一个就好了

更多工程细节和流程结构参考本例github地址:https://github.com/liuweibin6566396837/kubeflow-examples/tree/master/mnist_stage

在notebook中提交pipelines

在notebook中可以使用pipelines python SDK实现编译pipelines同时将pipelines提交运行,构建pipelines组件代码如下:
client.py

import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.gcp as gcpclient = kfp.Client()
from kubernetes import client as k8s_clientEXPERIMENT_NAME = 'mnist_op'
exp = client.create_experiment(name=EXPERIMENT_NAME)class load_dataOp(dsl.ContainerOp):"""load raw data from tensorflow, do data transform"""def __init__(self, data_dir, file_name):super(load_dataOp, self).__init__(name='load_data',image='mnist-load_data:v0.0.1',arguments=['--file_name', file_name,'--data_dir', data_dir,],file_outputs={'data_file': data_dir + 'train_test_data.txt'})class trainOp(dsl.ContainerOp):"""train keras model"""def __init__(self, data_dir, data_file):super(trainOp, self).__init__(name='train',image='mnist-train:v0.0.1',arguments=['--data_dir', data_dir,'--data_file', data_file,],file_outputs={'model_file': data_dir + 'model.txt'})class predictOp(dsl.ContainerOp):"""get predict by trained model"""def __init__(self, data_dir, model_file, data_file):super(predictOp, self).__init__(name='predict',image='mnist-predict:v0.0.1',arguments=['--data_dir', data_dir,'--model_file', model_file,'--data_file', data_file],file_outputs={'result_file': data_dir + 'result.txt'})@dsl.pipeline(name='MnistStage',description='shows how to define dsl.Condition.'
)
def MnistTest():data_dir = '/DATA/nfs-data/kubeflow-pv1/'file_name = 'mnist.npz'load_data = load_dataOp(data_dir, file_name).add_volume(k8s_client.V1Volume(name='mnist-pv',nfs=k8s_client.V1NFSVolumeSource(path='/DATA/nfs-data/kubeflow-pv1/',server='10.5.188.249'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/DATA/nfs-data/kubeflow-pv1/', name='mnist-pv'))train = trainOp(data_dir, load_data.outputs['data_file']).add_volume(k8s_client.V1Volume(name='mnist-pv',nfs=k8s_client.V1NFSVolumeSource(path='/DATA/nfs-data/kubeflow-pv1/',server='10.5.188.249'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/DATA/nfs-data/kubeflow-pv1/', name='mnist-pv'))predict = predictOp(data_dir, train.outputs['model_file'], load_data.outputs['data_file']).add_volume(k8s_client.V1Volume(name='mnist-pv',nfs=k8s_client.V1NFSVolumeSource(path='/DATA/nfs-data/kubeflow-pv1/',server='10.5.188.249'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/DATA/nfs-data/kubeflow-pv1/', name='mnist-pv'))compiler.Compiler().compile(MnistTest, 'mnist.tar.gz')
run = client.run_pipeline(exp.id, 'wbliu4', 'mnist.tar.gz')

在pipelines代码中完成了load_data、train、predict三个环节实例化,以此实现pipelines组件的构造。
在组件中需引用各自业务代码的镜像、明确业务代码需要的rest api 传入参数、业务代码生成的文件和路径
最后在 MnistTest 显示实现pipelines各个组件的关联:
例如在本例中,train组件需要引用load_data组件生成的 train_test_data.txt文件获取训练、测试数据集地址;
predict组件需要引用load_data组件生成的 train_test_data.txt文件获取测试数据集地址、train组件生成的model.txt文件获取模型文件的地址;
notebook中运行完代码后会生成一个pipelines实验链接,通过链接可以访问到实验:

查看DAG结构:

pipelines各组件运行结果:



至此使用notebook提交pipelines实验成功!!!

将pipelines打包上传至UI

将业务代码打包成镜像
编译压缩工作流程代码为.tar.gz文件
将压缩文件上传至kubeflow ui后,k8s集群将解压缩该压缩文件中的yaml配置文件,然后起一个pipeline工作流任务。
在pipelines的ui界面可以看到pipeline信息和运行状态。

使用Kubeflow pipelines相关推荐

  1. com 对象与其基础 rcw 分开后就不能再使用_如何使用 Kubeflow 机器学习流水线

    作者:George Novack 翻译:Bach(才云) 校对:星空下的文仔(才云).bot(才云) 为什么要使用机器学习流水线 现在,机器学习流水线(Machine Learning Pipelin ...

  2. 气流与路易吉,阿戈,MLFlow,KubeFlow

    任务编排工具和工作流程 (Task orchestration tools and workflows) Recently there's been an explosion of new tools ...

  3. Kubernetes 和 Kubeflow 学习笔记

    Kubernetes Kubernetes是一个完备的分布式系统支撑平台,具有完备的集群管理能力,多扩多层次的安全防护和准入机制.多租户应用支撑能力.透明的服务注册和发现机制.內建智能负载均衡器.强大 ...

  4. pipelines sdk简介

    pipeline sdk是使用python配合kubeflow pipelines功能的工具包,文档在此进行了简单搬运,如果有想了解更多可以阅读原始文档: https://www.bookstack. ...

  5. 从零搭建机器学习平台Kubeflow

    1 Kubeflow简介 1.1 什么是Kubeflow 来自官网的一段介绍: Kubeflow 项目致力于使机器学习 (ML) 工作流在 Kubernetes 上的部署变得简单.可移植和可扩展. K ...

  6. ubuntu20.04下使用juju+maas环境部署k8s-12-charmed kubeflow-1-kubeflow270和kubeflow介绍

    参考文档: Kubeflow #270 Charmed Kubeflow 简介 Kubeflow #270 Kubeflow 运算符 介绍: Charmed Kubeflow 是全套 Kubernet ...

  7. 开源技术 * IBM 微讲堂 | Kubeflow 系列(观看回放 | 下载讲义)

    开源技术 * IBM 微讲堂 | Kubeflow 系列(观看回放 | 下载讲义) 简介 目标听众 日程安排 01. Kubeflow 概览和功能介绍 02. 使用 Operator 框架管理和部署 ...

  8. Kubeflow第一篇--大致了解

    0 前言 予读者言: 本系列博客本义作为笔者记录所用,所以可能稍显冗长,但同时也记录了我的学习研究思路,会在学习的过程中不断更新,可供读者借鉴,能对大家有些许帮助就是笔者最为开心之事- 1 Offic ...

  9. Yann LeCun、吴恩达的新年AI预测:强调“少样本学习”,AI恐慌在减少

    来源:大数据文摘 新年伊始,海外媒体VentureBeat电话访谈了包括吴恩达.Yann Lecun在内的四位人工智能领域领军者,询问了他们对于过去一年人工智能领域发展的看法,以及他们认为新一年人工智 ...

最新文章

  1. tableau技术小积累
  2. portlet java_探秘企业门户开发:Java Portlet入门(2)
  3. 旷视科技完成4.6亿美元C轮融资,打破商汤4.1亿美元单轮融资记录
  4. 深入学习SAP UI5框架代码系列之一:UI5 Module的懒加载机制
  5. 使用 CSS 用户选择控制选择
  6. Meidapipe 3D手势姿态跟踪算法,手机端实时检测 ,多个手势同时捕捉
  7. 十大笔记本品牌型号命名规则【惠普】
  8. 网易云易盾牵手百视通 助力广电领域新媒体内容安全
  9. 数据--第45课 - 最短路径
  10. (转)解决PowerDesigner 反向工程没有注释(备注)
  11. Linux系统——sqlist数据库
  12. jasperreport报表导出excel锁定行和列
  13. virtualbox 靶机无法获取IP地址
  14. 网络数据保障ptop_网络影响未来十大预言 宽带应用将与新媒体融合
  15. 软件质量模型(ISO/IEC 9126和Mc Call)
  16. 关于修真、法力、法术、武功等的介绍
  17. 数据分析技术:时间序列分析的AR/MA/ARMA/ARIMA模型体系
  18. Charon的python数据可视化分析4(饼状图)
  19. Java算法大全_java贪心算法几个经典例子
  20. java使用Selenium模拟登陆58(验证码登陆密码登陆)若快平台识别文字点击验证码

热门文章

  1. AE教程——2D角色动画设计(Science of Character Animation)
  2. Paraview学习小笔记
  3. 修车案例 | 梅赛德斯奔驰Sprinter 906动力不足
  4. 【原创】一个计算斗地主谁必赢谁必输的程序
  5. 李克平教授讲座——《城市道路交叉口规划规范》解读与绿灯间隔问题分析
  6. The dependencies of some of the beans in the application context form a cycle 循环依赖异常处理
  7. 推荐一款具有恶性负载识别功能的YXD121校园电表
  8. SpringMVC mybatis SSM 多数据源 java redis shiro 代码生成器
  9. 马云谈徐晓东事件:太极和搏击规则不一致 拿鸭和鸡比
  10. QT界面:重写鼠标事件/实时显示Label中鼠标位置