1 分布式机器学习概述

大规模机器学习训练常面临计算量大、训练数据大(单机存不下)、模型规模大的问题,对此分布式机器学习是一个很好的解决方案。

1)对于计算量大的问题,分布式多机并行运算可以基本解决。不过需要与传统HPC中的共享内存式的多线程并行运算(如OpenMP)以及CPU-GPU计算架构做区分,这两种单机的计算模式我们一般称为 计算并行 )。

2)对于训练数据大的问题,需要将数据进行划分并分配到多个工作节点(Worker)上进行训练,这种技巧一般被称为 数据并行 。每个工作节点会根据局部数据训练出一个本地模型,并且会按照一定的规律和其他工作节点进行通信(通信内容主要是本地模型参数或者参数更新),以保证最终可以有效整合来自各个工作节点的训练结果并得到全局的机器学习模型。

如果是训练数据的样本量比较大,则需要对数据按照样本进行划分,我们称之为“数据样本划分”,按实现方法可分为“随机采样法”和“置乱切分法”。样本划分的合理性在于机器学习中的经验风险函数关于样本是可分的,我们将每个子集上的局部梯度平均,仍可得到整个经验风险函数的梯度。

如果训练数据的维度较高,还可对数据按照维度进行划分,我们称之为“数据维度划分”。它相较数据样本划分而言,与模型性质和优化方法的耦合度更高。如神经网络中各维度高度耦合,就难以采用此方式。不过,决策树对维度的处理相对独立可分,将数据按维度划分后,各节点可独立计算局部维度子集中具有最优信息增益的维度,然后进行汇总。此外,在线性模型中,模型参数与数据维度是一一对应的,故数据维度划分常与下面提到的模型并行相互结合。

3)对于模型规模大的问题,则需要对模型进行划分,并且分配到不同的工作节点上进行训练,这种技巧一般被称为 模型并行 。与数据并行不同,模型并行的框架下各个子模型之间的依赖关系非常强,因为某个子模型的输出可能是另外一个子模型的输入,如果不进行中间计算结果的通信,则无法完成整个模型训练。因此,一般而言,模型并行相比数据并行对通信的要求更高。

这里提一下数据样本划分中的几种划分方式。给定 \(n\) 个 \(d\) 维样本和 \(K\) 个工作节点,数据样本划分需要完成的任务是将 \(n\) 个样本以某种形式分配到 \(K\) 个工作节点上。

随机采样法中我们独立同分布地从 \(n\) 个样本中有放回随机采样,每抽取一个样本将其分配到一个工作节点上。这个过程等价于先随机采 \(n\) 个样本,然后均等地划分为 \(K\) 份。

随机采样法便于理论分析,但基于实现难度的考虑,在工程中更多采用的是基于置乱切分的划分方法。即现将 \(n\) 个样本随机置乱,再把数据均等地切分为 \(K\) 份,再分配到 \(K\) 个节点上进行训练。置乱切分相当于无放回采样,每个样本都会出现在某个工作节点上,每个工作节点的本地数据没有重复,故训练数据的信息量一般会更大。我们下面的划分方式都默认采取置乱切分的方法。

我们在后面的博客中会依次介绍针对数据并行和模型并行设计的各种分布式算法。本篇文章我们先看数据并行中最常用的同步并行SGD算法(也称SSGD)是如何在Spark平台上实现的。

2 同步并行SGD算法描述与实现

SSGD [1] 对应的伪代码可以表述如下:

其中,SSGD算法每次依据来自 \(K\) 个不同的工作节点上的样本的梯度来更新模型,设每个工作节点上的小批量大小为 \(b\) ,则该算法等价于批量大小为 \(bK\)

的小批量随机梯度下降法。

我们令 \(f\) 为逻辑回归问题的正则化经验风险。设 \(w\) 为权值(最后一维为偏置),样本总数为 \(n\) , \(\{(x_i, y_i)\}_{i=1}^n\) 为训练样本集。样本维度为 \(d\) , \(x_i\in \mathbb{R}^{d+1}\) (最后一维扩充), \(y_i\in\{0, 1\}\) 。则 \(f\) 表示为:

\[f(w) = \frac{1}{n} \sum_{i=1}^{n}\left[y_{i} \log \pi_{w}\left(x_{i}\right)+\left(1-y_{i}\right) \log \left(1-\pi_w\left(x_{i}\right)\right)\right] + \lambda R(w) \]

这里

\[\begin{aligned} \pi_w(x) = p(y=1 \mid x; w) =\frac{1}{1+\exp \left(-w^{T} x\right)} \end{aligned} \]

其梯度表示如下:

\[\nabla f{(w)} = -\sum_{i=1}^n(y_i - \frac{1}{\exp(-w^Tx)+1})x_i + \lambda\nabla R(w) \]

这里正则项的梯度 \(\nabla R(w)\) 要分情况讨论:

(1) 不使用正则化

此时显然 \(\nabla R(w)=0\) 。

(2) L2正则化( \(\frac{1}{2}\lVert w\rVert^2_2\) )

此时 \(\nabla R(w)=w\) 。

(3) L1正则化( \(\lVert w \rVert_1\) )

该函数不是在每个电都对 \(w\) 可导,只来采用函数的次梯度(subgradient)来进行梯度下降。 \(\lVert w \rVert_1\) 的一个比较好的次梯度估计是 \(\text{sign}(w)\) 。相比于标准的梯度下降,次梯度下降法不能保证每一轮迭代都使目标函数变小,所以其收敛速度较慢。

(4) Elastic net正则化( \(\alpha \lVert w \rVert_1 + (1-\alpha)\frac{1 }{2}\lVert w\rVert^2_2\) )

对 \(\lVert w\rVert_1\) 使用次梯度计算式, \(\frac{1}{2}\lVert w\rVert_2^2\) 使用其梯度计算式,得最终的梯度计算式为 \(\alpha \text{sign}(w) + (1-\alpha) w\) 。

我们约定计算第 \(K\) 个节点小批量 \(\mathcal{I_k}\) 的经验风险的梯度 \(\nabla f_{\mathcal{I_k}}(x)\) 时不包含正则项的梯度,最终将 \(K\) 个节点聚合后再加上正则项梯度。

用PySpark对上述算法实现如下:

from sklearn.datasets import load_breast_cancer
import numpy as np
from pyspark.sql import SparkSession
from operator import add
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_scoren_slices = 3  # Number of Slices
n_iterations = 300  # Number of iterations
eta = 10  # iteration step_size, because gradient sum is divided by minibatch size, it shoulder be larger
mini_batch_fraction = 0.1 # the fraction of mini batch sample
lam = 0.001 # coefficient of regular termdef logistic_f(x, w):return 1 / (np.exp(-x.dot(w)) + 1)def gradient(point: np.ndarray, w: np.ndarray):""" Compute linear regression gradient for a matrix of data points"""y = point[-1]    # point labelx = point[:-1]   # point coordinate# For each point (x, y), compute gradient function, then sum these up# notice thet we need to compute minibatch size, so return(g, 1)return - (y - logistic_f(x, w)) * xdef reg_gradient(w, reg_type="l2", alpha=0):""" gradient for reg_term""" assert(reg_type in ["none", "l2", "l1", "elastic_net"])if reg_type == "none":return 0elif reg_type == "l2":return welif reg_type == "l1":return np.sign(w)else:return alpha * np.sign(w) + (1 - alpha) * wif __name__ == "__main__":X, y = load_breast_cancer(return_X_y=True)D = X.shape[1]X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0, shuffle=True)n_train, n_test = X_train.shape[0], X_test.shape[0]spark = SparkSession\.builder\.appName("SGD")\.getOrCreate()matrix = np.concatenate([X_train, np.ones((n_train, 1)), y_train.reshape(-1, 1)], axis=1)points = spark.sparkContext.parallelize(matrix, n_slices).cache()# Initialize w to a random valuew = 2 * np.random.ranf(size=D + 1) - 1print("Initial w: " + str(w))for t in range(n_iterations):print("On iteration %d" % (t + 1))w_br = spark.sparkContext.broadcast(w)(g, mini_batch_size) = points.sample(False, mini_batch_fraction, 42 + t)\.map(lambda point: gradient(point, w_br.value))\.treeAggregate((0.0, 0),\seqOp=lambda res, g: (res[0] + g, res[1] + 1),\combOp=lambda res_1, res_2: (res_1[0] + res_2[0], res_1[1] + res_2[1]))w -= eta * g/mini_batch_size + lam * reg_gradient(w, "l2")y_pred = logistic_f(np.concatenate([X_test, np.ones((n_test, 1))], axis=1), w)pred_label = np.where(y_pred < 0.5, 0, 1)acc = accuracy_score(y_test, pred_label)print("iterations: %d, accuracy: %f" % (t, acc))print("Final w: %s " % w)print("Final acc: %f" % acc)spark.stop()

我们尝试以 \(L2\) 正则化、0.001的正则系数运行。初始权重如下:

Initial w: [ 0.09802896  0.92943671 -0.04964225  0.63915174 -0.61839489  0.86300117-0.04102299 -0.01428918  0.84966149  0.50712175  0.10373804 -0.009432910.47526645 -0.19537069 -0.17958274  0.67767599  0.24612002  0.55646197-0.76646105  0.86061735  0.48894574  0.87838804  0.05519216 -0.149118650.78695568  0.26498925  0.5789493  -0.20118555 -0.79919906 -0.79261251-0.77243226]

最终的模型权重与在测试集上的准确率结果如下:

Final w: [ 2.22381079e+03  4.00830646e+03  1.34874211e+04  1.38842558e+042.19902064e+01  5.08904164e+00 -1.79005399e+01 -8.85669497e+004.28643902e+01  1.74744234e+01  2.24167323e+00  2.89804554e+02-1.05612399e+01 -5.93151080e+03  1.60754311e+00  2.92290287e+002.46318238e+00  1.51092034e+00  4.23645852e+00  1.38371670e+002.20694252e+03  5.18743708e+03  1.32612364e+04 -1.39388946e+043.03078787e+01  4.41094696e+00 -2.24172172e+01 -5.27976054e+006.10623037e+01  1.83347648e+01  2.78974813e+02]
Final acc: 0.912281

代码中有两个关键点,一个是 points.sample(False, mini_batch_fraction, 42 + t) 。函数 sample 负责返回当前RDD的一个随机采样子集(包含所有分区),其原型为:

RDD.sample(withReplacement: bool, fraction: float, seed: Optional[int] = None) → pyspark.rdd.RDD[T]

参数 withReplacement 的值为 True 表示采样是有放回(with Replacement, 即replace when sampled out),为 False 则表示无放回 (without Replacement)。如果是有放回,参数 fraction 表示每个样本的期望被采次数,fraction必须要满足 \(\geqslant0\) ;如果是无放回,参数 fraction 表示每个样本被采的概率,fraction必须要满足位于 \([0, 1]\) 区间内。

还有一个关键点是

.treeAggregate((0.0, 0),\seqOp=lambda res, g: (res[0] + g, res[1] + 1),\combOp=lambda res_1, res_2: (res_1[0] + res_2[0], res_1[1] + res_2[1]))

该函数负责对RDD中的元素进行树形聚合,它在数据量很大时比 reduce 更高效。该函数的原型为

RDD.treeAggregate(zeroValue: U, seqOp: Callable[[U, T], U], combOp: Callable[[U, U], U], depth: int = 2) → U[source]

其中 zeroValue 为聚合结果的初始值, seqOp 函数用于定义单分区(partition)做聚合操作的方法,该方法第一个参数为聚合结果,第二个参数为分区中的数据变量,返回更新后的聚合结果。 combOp 定义对分区之间做聚合的方法,该方法第一个参数为第二个参数都为聚合结果,返回累加后的聚合结果。 depth 为聚合树的深度。

我们这里 treeAggregate 想要聚合得到一个元组 (g, mini_batch_size) , g 为所有节点样本的随机梯度和, mini_batch_size 为所有节点所采的小批量样本之和,故我们将聚合结果的初始值 zeroVlue 初始化为 (0,0, 0) 。具体的聚合过程描述如下:

  1. 对每个partition:

    a. 初始化聚合结果为 (0.0, 0) 。

    b. 对当前partition的序列元素,依次执行聚合操作 seqOp 。

    c. 得到当前partition的聚合结果 (partition_sum, partition_count) 。

  2. 对所有partition:

    a. 按照树行模式合并各partition的聚合结果,合并方法为 combOp 。

    b. 得到合并结果 (total_sum, total_count) 。

形象化地表示该聚合过程如下图所示:

3 算法收敛性及复杂度分析

3.1 收敛性和计算复杂度

假设目标函数 \(f: \mathbb{R}^d\rightarrow \mathbb{R}\) 是 \(\alpha\) -强凸函数,并且 \(\beta\) 光滑,如果随机梯度的二阶矩有上界,即 \(\mathbb{E}_{i^t}{\lVert\nabla f_{i^t}(w^t) \rVert^2\leqslant G^2}\) ,当步长 \(\eta^t = \frac{1}{\alpha t}\) 时,对于给定的迭代步数 \(T\) ,SGD具有 \(\mathcal{O}(\frac{1}{T})\) 的次线性收敛速率:

\[\mathbb{E}[ f(w^T) - f(w^*) ] \leqslant \frac{2\beta G^2}{\alpha^2T} \]

而这意味着SGD的迭代次数复杂度为 \(\mathcal{O}(\frac{1}{\varepsilon})\) ,也即 \(\mathcal{O}(\frac{1}{\varepsilon})\) 轮迭代后会取得 \(\varepsilon\) 的精度。

尽管梯度的计算可以被分摊到个计算节点上,然而梯度下降的迭代是串行的。每轮迭代中,Spark会执行同步屏障(synchronization barrier)来确保在各worker开始下一轮迭代前 \(w\) 已被更新完毕。如果存在掉队者(stragglers),其它worker就会空闲(idle)等待,直到下一轮迭代。故相比梯度的计算,其迭代计算的“深度”(depth)是其计算瓶颈。

3.2 通信复杂度

map过程显然是并行的,并不需要通信。broadcast过程需要一对多通信,并且reduce过程需要多对一通信(都按照树形结构)。故对于每轮迭代,总通信时间按

\[2\text{log}_2(p)(L + \frac{m}{B}) \]

增长。

这里 \(p\) 为除去driver节点的运算节点个数, \(L\) 是节点之间的通信延迟。 \(B\) 是节点之间的通信带宽。 \(M\)

是每轮通信中节点间传输的信息大小。故消息能够够以

\(\mathcal{O}(\text{log}p)\)

的通信轮数在所有节点间传递。

分布式机器学习:同步并行SGD算法的实现与复杂度分析相关推荐

  1. 【分布式ID】理解Snowflake算法的实现原理

    1.概述 转载:冷饭新炒:理解Snowflake算法的实现原理 我上次也看了一个视频讲解:[分布式ID]键高并发 分布式 全局唯一 ID 雪花算法 snowflake 2.前提# Snowflake( ...

  2. 并行sgd算法和min-batch gd算法

    sgd算法全称随机梯度下降法,有着比批梯度下降法更快收敛的优势,该算法名称中的"随机"二字是改算法的中心精神所在. sgd算法是一种天生的串行的算法,当数据量大的时候们希望通过使用 ...

  3. 分布式机器学习——模型并行训练

    首先还是来介绍一下分布式系统中的并行方式,分为数据并行和模型并行,其实还有一种并行方式:Pipeline并行. Pipeline并行方式有的时候会单独存在,有的时候又归为模型并行.这篇文章重点就介绍一 ...

  4. vb点名程序 随机点名不重复算法的实现 算法复杂度最优为O(n)

    算法: 1.将姓名都装入一个数组,并记录下需随机的数a,首次为数组长度 2.随机产生0~a的一个整数i 3.输出数组arr(i),同时将最末尾的arr(a)放到arr(i)位置,并且a-1 4.再次点 ...

  5. 分布式机器学习系统笔记(一)——模型并行,数据并行,参数平均,ASGD

    欢迎转载,转载请注明:本文出自Bin的专栏blog.csdn.net/xbinworld. 技术交流QQ群:433250724,欢迎对算法.技术.应用感兴趣的同学加入. 文章索引::"机器学 ...

  6. 【大论文】可扩展机器学习的并行与分布式优化算法综述_亢良伊2017

    一.基础知识: 1.目标函数 机器学习要优化的目标函数一般表现为一下形式: 函数J(θ)为目标函数,f为表示真实值与拟合值之差的损失函数,r(θ)为正则项(防止过拟合问题,主要分为L1正则项.L2正则 ...

  7. 分布式机器学习之——Spark MLlib并行训练原理

    这里是 王喆的机器学习笔记 的第二十五篇文章.接下来的几篇文章希望与大家一同讨论一下机器学习模型的分布式训练的问题.这个问题在推荐.广告.搜索领域尤为突出,因为在互联网场景下,动辄TB甚至PB级的数据 ...

  8. 机器学习系列文章——算法的实现(knn,朴素贝叶斯,决策树,模型评估)

    一.机器学习算法分类: 机器学习算法可分为两大类,即分类与回归.其中分类是针对离散型数据,比如判定一直动物是猫是狗,判断一个人的信用等级:而回归问题为针对连续型数据,如预测淘宝店铺销量,预测明天气温等 ...

  9. 参数服务器——分布式机器学习的新杀器

    转自:微信公众号 数据极客 在大规模数据上跑机器学习任务是过去十多年内系统架构师面临的主要挑战之一,许多模型和抽象先后用于这一任务.从早期的MPI,到后来的Hadoop,乃至于目前使用较多的Spark ...

最新文章

  1. 2018-2020年中国服务机器人行业深度研究报告
  2. PIE.htc 让IE使用CSS3
  3. python退出程序-【转】python 退出程序的方式
  4. 全球服务器系统市场份额,IDC发布2017年第一季度全球融合系统报告 超融合市场最火爆...
  5. xml可以html标签吗,自定义html标签(XML)
  6. php pdo mysql query_PHP+MYSQL中使用PDO的query方法
  7. mysql使用裸设备_请教dd清空裸设备问题
  8. AMQP Connection 127.0.0.1:5672] ERROR [o.s.a.rabbit.connection.CachingConnectionFactory] CachingConn
  9. linux lids pdf,Linux入侵监测系统LIDS原理(3)
  10. ❤️《大前端—NPM包管理器》
  11. 关于LCR表和万用表测电容容值不一样的怪事
  12. Redshift渲染器和Octane渲染器哪个更适合渲染C4D动画?
  13. 田颖- 「柴姐ye话」主笔,拉链互动副总裁 | 到「在行」来约见我
  14. java 设置pdf 编码格式_Java 在PDF中添加条形码
  15. 怎么关闭计算机桌面的弹窗,电脑桌面弹出的广告怎么设置关闭
  16. 互联网创业的五大定律
  17. AS 把鼠标放在targetSdkVersion xx下边红波浪线提示:Google Play requires that apps target API level 31 or higher.
  18. 《平凡的世界》中田晓霞和孙少平的爱情
  19. 使用JWT进行用户身份校验(基于token)
  20. 【CSS】如何实现微信聊天气泡

热门文章

  1. 小虎卫手机远控效果演示:电脑远程控制手机
  2. iOS学习资料分享 -- 苹果官方iPhone应用高级开发课程(16集)
  3. 关于mysql删除用户 bug的问题
  4. 双光驱+硬盘同步录制会议档案馆录播录像机
  5. 六一发奖:大家说,书可以这么送
  6. 深度学习(TensorFlow)环境搭建:(二)Ubuntu16.04+1080Ti显卡驱动
  7. 在linux解压文件夹,在linux 下解压 rar 文件
  8. 港科百创 | 教授企业安建科技B轮融资¥1.8亿,开创半导体产业新势力!
  9. 像素鸟简单的背景设置_01
  10. vue element 滚动条颜色更改