From d0abdbc4d1455f757b80566981d173b512b9143e Mon Sep 17 00:00:00 2001 From: ZunwenYou Date: Thu, 26 Oct 2017 16:44:06 +0800 Subject: [PATCH] [DOCS] improve spark on angel docs. --- docs/algo/spark_on_angel_optimizer.md | 6 +- docs/overview/spark_on_angel.md | 39 ++-- .../spark_on_angel_programing_guide.md | 193 +++++++++--------- docs/tutorials/spark_on_angel_quick_start.md | 58 +++--- 4 files changed, 144 insertions(+), 152 deletions(-) diff --git a/docs/algo/spark_on_angel_optimizer.md b/docs/algo/spark_on_angel_optimizer.md index 7692e957f..2932fbaa7 100755 --- a/docs/algo/spark_on_angel_optimizer.md +++ b/docs/algo/spark_on_angel_optimizer.md @@ -50,9 +50,9 @@ Spark mllib中Logistic Regression算法做了很多数据预处理的逻辑, | item | Spark | Spark on Angel | 加速比例 | |---|---|---|---| -|SGD LR (step_size=0.05,maxIter=100) | 2.9 hour | 2.1 hour | 27.6% | -|L-BFGS LR (m=10, maxIter=50) | 2 hour | 1.3 hour | 35.0% | -|OWL-QN LR (m=10, maxIter=50) | 3.3 hour | 1.9 hour | 42.4% | +|SGD LR (step_size=0.05,maxIter=100) | 2.9 hour | 1.5 hour | 48.3% | +|L-BFGS LR (m=10, maxIter=50) | 2 hour | 1.0 hour | 50.0% | +|OWL-QN LR (m=10, maxIter=50) | 3.3 hour | 1.4 hour | 57.6% | 如上数据所示,Spark on Angel相较于Spark在训练LR模型时有不同程度的加速;对于越复杂的模型,其加速的比例越大。 同时值得强调的是,Spark on Angel的算法逻辑实现与纯Spark的实现没有太多的差别,大大方便了广大Spark用户。 diff --git a/docs/overview/spark_on_angel.md b/docs/overview/spark_on_angel.md index 8eeae9715..d9d3ed1e0 100644 --- a/docs/overview/spark_on_angel.md +++ b/docs/overview/spark_on_angel.md @@ -26,28 +26,34 @@ Angel从v1.0.0版本开始,就加入了**PS-Service**的特性,不仅仅可 * 利用Spark的Context,和Angel的配置,创建AngelContext,在Driver端负责全局的初始化和启动工作 * **PSClient** - * 负责PSVector与local value直接的运算(包括pull、push、increment), 以及PSVector与PSVector之间的运算(包括大部分的代数运算);同时还支持PSF(用户自定义的PS函数) - * PSClient所有运算会被封装到RemotePSVector和BreezePSVector。 + * PSClient集成了PSVector和PSMatrix的所有初始化、运算、Pull/Push等操作 + * 包括三部分Initializer,VectorOps,MatrixOps;分别对应PS的初始化操作,PSVector运算操作和PSMatrix运算操作 -* **PSModelPool** - * PSModelPool对应了Angel PS上的一个矩阵,PSModelPool负责PSVector的申请、回收、销毁等工作。 +* **PSModel** + * PSModel是PS server上PSVector/PSMatrix的总称,包含着PSClient对象 + * PSModel是PSVector和PSMatrix的父类 -* **PSVetorProxy/PSVector** - * PSVectorProxy是PSVector(包括RemotePSVector和BreezePSVector)的代理,指向Angel PS上的某个PSVector。 - * PSVector的RemotePSVector和BreezePSVector封装了在不同场景下的PSVector的运算。RemotePSVector提供了PSVector与local value直接的运算(包括pull、push、increment),而BreezePSVector提供了PSVector与PSVector之间的运算(包括大部分的代数运算),以及PSF(用户自定义的PS函数) +* **PSVector** + * 包括DensePSVecotr和SparsePSVector + * PSVector的申请:通过`PSVector.dense(dim: Int, capacity: Int = 50)`申请PSVector,会创建一个维度为`dim`,容量为`capacity`的VectorPool,同一个VectorPool内的两个PSVector可以做运算。 + 通过`PSVector.duplicate(psVector)`,申请一个与`psVector`在同一个VectorPool的PSVector。 + * PSVector有两个装饰类:`BreezePSVector`和`CachedPSVector`,`BreezePSVector`使PSVector可以支持Breeze算法库里的Vector运算。而`CachedPSVector`支持PSVector在Pull/Push过程中的缓存功能。 + +* **PSMatrix** + * 包括DensePSMatrix和SparsePSMatrix + * PSMatrix的创建和销毁:通过`PSMatrix.dense(rows: Int, cols: Int)`创建,当PSMatrix不再使用后,需要手动调用`destory`销毁该Matrix 使用Spark on Angel的简单代码如下: ```Scala -val psContext = PSContext.getOrCreate(spark.sparkContext) -val pool = psContext.createModelPool(dim, capacity) -val psVector = pool.createModel(0.0) +PSContext.getOrCreate(spark.sparkContext) +val psVector = PSVector.dense(dim, capacity) rdd.map { case (label , feature) => psVector.increment(feature) ... } -println("feature sum size:" + psVector.mkRemote.size()) +println("feature sum:" + psVector.pull.mkString(" ")) ``` ## 3. 启动流程 @@ -56,8 +62,7 @@ Spark on Angel本质上是一个Spark任务。Spark启动后,driver通过Angel Spark driver的执行流程 - 启动SparkSession - 启动PSContext -- 创建PSModelPool -- 申请PSVector +- 申请PSVector/PSMatrix - 执行算法逻辑 - 终止PSContext和SparkSession @@ -101,13 +106,11 @@ def runOWLQN(trainData: RDD[(Vector, Double)], dim: Int, m: Int, maxIter: Int): def runOWLQN(trainData: RDD[(Vector, Double)], dim: Int, m: Int, maxIter: Int): Unit = { - val pool = PSContext.createModelPool(dim, 20) - - val initWeightPS = pool.createZero().mkBreeze() - val l1regPS = pool.createZero().mkBreeze() + val initWeightPS = PSVector.dense(dim, 20).toBreeze() + val l1regPS = PSVector.duplicate(initWeightPS.component).zero().toBreeze val owlqn = new OWLQN(maxIter, m, l1regPS, tol) - val states = owlqn.iterations(CostFunc(trainData), initWeightPS) + val states = owlqn.iterations(PSCostFunc(trainData), initWeightPS) ……… } diff --git a/docs/programmers_guide/spark_on_angel_programing_guide.md b/docs/programmers_guide/spark_on_angel_programing_guide.md index f5698f342..750580fe3 100644 --- a/docs/programmers_guide/spark_on_angel_programing_guide.md +++ b/docs/programmers_guide/spark_on_angel_programing_guide.md @@ -54,93 +54,93 @@ val context = PSContext.getOrCreate(spark.sparkContext) // 第一次启动时,需要传入SparkContext val context = PSContext.getOrCreate(spark.sparkContext) -// 此后,就无需传入SparkContext,直接的PSContext -val context = PSContext.getOrCreate() +// 此后,直接通过PSContext.instance()获取context +val context = PSContext.instance() // 终止PSContext PSContext.stop() ``` -- 申请/销毁PSModelPoool -PSModelPool在Angel PS上其实是一个矩阵,矩阵列数是dim,行数是capacity。 -可以申请多个不同大小的PSModelPool。 - -```scala -val pool = context.createModelPool(dim, capacity) -context.destroyModelPool(pool) -``` - -### 4. PSModelPool -PSModelPool在Angel PS上其实是一个矩阵,矩阵列数是`dim`,行数是`capacity`。 -同一个Application中,可以申请多个不同大小的PSModelPool。 -可以从PSModelPool申请PSVector,存放在PSModle上PSVector的维度都是`dim`; -PSModelPool只能存放、管理维度为`dim`的PSVector。 - -注意:同一个Pool内的PSVector才能做运算。 - -```scala -// 用Array数据初始化一个PSVector,array的维度必须与pool维度保持一致 -val arrayProxy = pool.createModel(array) -// PSVector的每个维度都是value -val valueProxy = pool.createModel(value) - -// 全0的PSVector -val zeroProxy = pool.createZero() -// 随机的PSVector, 随机数服从均匀分布 -val uniformProxy = pool.createRandomUniform(0.0, 1.0) -// 随机的PSVector, 随机数服从正态分布 -val normalProxy = pool.createRandomNormal(0.0, 1.0) -``` - -使用之后的PSVector,可以手动delete、也可以放之不管系统会自动回收;delete后的PSVector就不能再使用。 -```scala -pool.delete(vectorPorxy) -``` - -### 5. PSVectorProxy/PSVector -PSVectorProxy是PSVector(包括BreezePSVector和RemotePSVector)的代理,指向Angel PS上的某个PSVector。 -而PSVector的BreezePSVector和RemotePSVector封装了在不同场景下的PSVector的运算。 - -- PSVectorProxy和PSVector(BreezePSVector和RemotePSVector)之间的转换 - -```scala - // PSVectorProxy to BreezePSVector、RemotePSVector - val brzVector = vectorProxy.mkBreeze() - val remoteVector = vectorProxy.mkRemote() +### 4. PSVector +PSVector是PSModel的子类,同时PSVector有DensePSVector/SparsePSVector和BreezePSVector/CachedPSVector四种不同的实现。DensePSVector/SparsePSVector是两种不同数据格式的PSVector,而BreezePSVector/CachedPSVector是两种不同功能的PSVector。 + +在介绍PSVector之前,需要先了解一下PSVectorPool的概念;PSVectorPool在Spark on Angel的编程接口中不会显式地接触到,但需要了解其概念。 + +- PSVectorPool + PSVectorPool本质上是Angel PS上的一个矩阵,矩阵列数是`dim`,行数是`capacity`。 + PSVectorPool负责PSVector的申请、自动回收。自动回收类似于Java的GC功能,PSVector对象使用后不用手动delete。 + 同一个PSVectorPool里的PSVector的维度都是`dim`,同一个Pool里的PSVector才能做运算。 + +- PSVector的申请和初始化 + PSVector第一次申请的时候,必须通过PSVector的伴生对象中dense/sparse方法申请。 + dense/sparse方法会创建PSVectorPool,因此需要传入dimension和capacity参数。 + + 通过duplicate方法可以申请一个与已有psVector对象同Pool的PSVector。 + + ```scala + // 第一次申请DensePSVector和SparsePSVector + // capacity提供了默认参数 + val dVector = PSVector.dense(dim, capacity) + val sVector = PSVector.sparse(dim, capacity) + + // 从现有的psVector duplicate出新的PSVector + val samePoolVector = PSVector.duplicate(dVector) + + // 初始化 + // fill with 1.0 + dVector.fill(1.0) + // 初始化dVector,使dVector的元素服从[-1.0, 1.0]的均匀分布 + dVector.randomUniform(-1.0, 1.0) + // 初始化dVector,使dVector的元素服从N(0.0, 1.0)的正态分布 + dVector.randomNormal(0.0, 1.0) + ``` +- DensePSVector VS. SparsePSVector + 顾名思义,DensePSVector和SparsePSVector是针对稠密和稀疏两种不同的数据形式设计的PSVector + +- BreezePSVector VS. CachedPSVector + BreezePSVector和CachedPSVector是封装了不同运算功能的PSVector装饰类。 + + BreezePSVector面向于Breeze算法库,封装了同一个PSVectorPool里PSVector之间的运算。包括常用的math运算和blas运算,BreezePSVector实现了Breeze内部的NumbericOps操作,因此BreezePSVector支持+,-,* 这样的操作 + + ```scala + val brzVector1 = brzVector2 :* 2.0 + brzVector3 + ``` + 也可以显式地调用Breeze.math和Breeze.blas里的操作。 + + CachedPSVector为Pull、increment/mergeMax/mergeMin提供了Cache的功能,减少这些操作和PS交互的次数。 + 如,pullWithCache会加Pull下来的Vector缓存到本地,下次Pull同一个Vector时,直接读取缓存的Vector; + incrementWithCache会将多次的increment操作在本地聚合,最后通过flush操作,将本地聚合的结果increment到PSVector。 + + ```scala + val cacheVector = PSVector.dense(dim).toCache + rdd.map { case (label , feature) => + // 并没有立即更新psVector + cacheVector.incrementWithCache(feature) + } + // flushIncrement会将所有executor上的缓存的cacheVector的increment结果,累加到cacheVector + cacheVector.flushIncrement + ``` - // BreezePSVector、RemotePSVector to PSVectorProxy - val vectorProxy = brzVector.proxy - val vectorProxy = remoteVector.proxy +### 5. PSMatrix +PSMatrix是Angel PS上的矩阵,其有DensePSMatrix和SparsePSMatrix两种实现。 - // BreezePSVector, RemotePSVector之间的转换 - val remoteVector = brzVector.toRemote() - val brzVector = remoteVector.toBreeze() -``` +- PSMatrix的创建和销毁 +PSMatrix通过伴生对象中的dense/sparse方法申请对应的matrix。 +PSVector会有PSVectorPool自动回收、销毁无用的PSVector,而PSMatrix需要手动调用destroy方法销毁PS上的matrix -- RemotePSVector - RemotePSVector封装了PSVector和本地Array之间的操作 +如果需要对指定PSMatrix的分区参数,通过rowsInBlock/colsInBlock指定每个分区block的大小。 ```scala - // pull PSVector到本地 - val localArray = remoteVector.pull() - // push 本地的Array到Angel PS - remoteVector.push(localArray) - // 将本地的Array累加到Angel PS上的PSVector - remoteVector.increment(localArray) - - // 本地的Array和PSVector取最大值、最小值 - remoteVector.mergeMax(localArray) - remoteVector.mergeMin(localArray) -``` + // 创建、初始化 + val dMatrix = DensePSMatrix.dense(rows, cols, rowsInBlock, colsInBlock) + val sMatrix = SparsePSMatrix.sparse(rows, cols) -- BreezePSVector - BreezePSVector封装了同一个PSModelPool里PSVector之间的运算。包括常用的math运算和blas运算 - BreezePSVector实现了Breeze内部的NumbericOps操作,因此BreezePSVector支持+,-,* 这样的操作 + dMatrix.destroy() -```scala - val brzVector1 = 2.0 * brzVector2 + brzVector3 + // Pull/Push操作 + val array = dMatrix.pull(rowId) + dMatrix.push(rowId, array) ``` -也可以显式地调用Breeze.math和Breeze.blas里的操作。 ### 6. 支持自定义的PS function @@ -192,51 +192,46 @@ public class MulScalar implements MapFunc { 下面是将RDD[(label, feature)]中的所有feature都累加到PSVector中。 -```java +```scala val dim = 10 -val poolCapacity = 40 +val capacity = 40 -val context = PSContext.getOrCreate() -val pool = context.createModelPool(dim, poolCapacity) -val psProxy = pool.zero() +val psVector = PSVector.dense(dim, capacity).toCache rdd.foreach { case (label , feature) => - psProxy.mkRemote.increment(feature) + psProxy.incrementWithCache(feature) } +psVector.flushIncrement -println("feature sum:" + psProxy.pull()) +println("feature sum:" + psVector.pull().mkString(" ")) ``` - Example 2: Gradient Descent实现 -下面是一个简单版本的Gradient Descent的PS实现 -```java -val context = PSContext.getOrCreate() -val pool = context.createModelPool(dim, poolCapacity) -val w = pool.createModel(initWeights) -val gradient = pool.zeros() +下面是一个简单版本的Gradient Descent的PS实现, +注:这个例子里的instance的label是-1和1。 + +```scala + +val w = PSVector.dense(dim).fill(initWeights) for (i <- 1 to ITERATIONS) { - val totalG = gradient.mkRemote() + val gradient = PSVector.duplicate(w) - val nothing = points.mapPartitions { iter => - val brzW = new DenseVector(w.mkRemote.pull()) + val nothing = instance.mapPartitions { iter => + val brzW = new DenseVector(w.pull()) - val subG = iter.map { p => - p.x * (1 / (1 + math.exp(-p.y * brzW.dot(p.x))) - 1) * p.y + val subG = iter.map { case (label, feature) => + feature * (1 / (1 + math.exp(-label * brzW.dot(feature))) - 1) * label }.reduce(_ + _) - totalG.incrementAndFlush(subG.toArray) + gradient.increment(subG.toArray) Iterator.empty } nothing.count() - w.mkBreeze += -1.0 * gradent.mkBreeze - gradient.mkRemote.fill(0.0) + w.toBreeze :+= gradent.toBreeze :* -1.0 } -println("feature sum:" + w.mkRemote.pull()) - -gradient.delete() -w.delete() +println("w:" + w.pull().mkString(" ")) ``` diff --git a/docs/tutorials/spark_on_angel_quick_start.md b/docs/tutorials/spark_on_angel_quick_start.md index ff0f163e8..3e4263126 100644 --- a/docs/tutorials/spark_on_angel_quick_start.md +++ b/docs/tutorials/spark_on_angel_quick_start.md @@ -44,36 +44,30 @@ Spark on Angel的任务本质上是一个Spark的Application,完成Spark on An ## Example Code: Gradient Descent的Angel PS实现 下面是一个简单版本的Gradient Descent的PS实现 - -```Scala - - val points:RDD[Point] = _ - - val wPS = PSVector.dense(DIM).fill(0.0) - val gradientPS = PSVector.dense(DIM).fill(0.0) - - for (i <- 1 to ITERATIONS) { - val totalG = gradientPS.toCache - - val trigger = points.mapPartitions { iter => - val brzW = new DenseVector(wPS.pull()) - - val subG = iter.map { p => - p.x * (1 / (1 + math.exp(-p.y * brzW.dot(p.x))) - 1) * p.y - }.reduce(_ + _) - - totalG.push(subG.toArray()) - Iterator.empty - } - trigger.count() - - wPS.toBreeze += -1.0 * gradientPS.toBreeze - gradientPS.fill(0.0) - } - - println("feature sum:" + wPS.pull()) - - gradientPS.delete() - wPS.delete() - } +```java +val w = PSVector.dense(dim) +val sc = SparkSession.builder().getOrCreate().sparkContext + +for (i <- 1 to ITERATIONS) { + val bcW = sc.broadcast(w.pull()) + val totalG = PSVector.duplicate(w) + + val tempRDD = trainData.mapPartitions { iter => + val breezeW = new DenseVector(bcW.value) + + val subG = iter.map { case (feat, label) => + val brzData = new DenseVector[Double](feat.toArray) + val margin: Double = -1.0 * breezeW.dot(brzData) + val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label + val gradient = brzData * gradientMultiplier + gradient + }.reduce(_ + _) + totalG.increment(subG.toArray) + Iterator.empty + } + tempRDD.count() + w.toBreeze -= (totalG.toBreeze :* (1.0 / sampleNum)) +} + +println(s"w: ${w.pull().mkString(" ")}") ```