Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35083][CORE] Support remote scheduler pool files #32184

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark.scheduler

import java.io.{FileInputStream, InputStream}
import java.io.InputStream
import java.util.{Locale, NoSuchElementException, Properties}

import scala.util.control.NonFatal
import scala.xml.{Node, XML}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
Expand Down Expand Up @@ -54,10 +56,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
}
}

private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext)
extends SchedulableBuilder with Logging {

val schedulerAllocFile = conf.get(SCHEDULER_ALLOCATION_FILE)
val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE)
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL
val DEFAULT_POOL_NAME = "default"
Expand All @@ -74,7 +76,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
var fileData: Option[(InputStream, String)] = None
try {
fileData = schedulerAllocFile.map { f =>
val fis = new FileInputStream(f)
val filePath = new Path(f)
val fis = filePath.getFileSystem(sc.hadoopConfiguration).open(filePath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code can do many things. Please see my above comment. (#32184 (comment))

logInfo(s"Creating Fair Scheduler pools from $f")
Some((fis, f))
}.getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private[spark] class TaskSchedulerImpl(
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
new FairSchedulableBuilder(rootPool, sc)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
Expand Down
45 changes: 37 additions & 8 deletions core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ package org.apache.spark.scheduler
import java.io.FileNotFoundException
import java.util.Properties

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.SchedulingMode._
import org.apache.spark.util.Utils

/**
* Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
Expand Down Expand Up @@ -87,7 +91,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()

// Ensure that the XML file was read in correctly.
Expand Down Expand Up @@ -185,9 +189,10 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml")
.getFile()
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
sc = new SparkContext(LOCAL, APP_NAME, conf)

val rootPool = new Pool("", FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()

verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
Expand Down Expand Up @@ -239,7 +244,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()

// Submit a new task set manager with pool properties set to null. This should result
Expand Down Expand Up @@ -267,7 +272,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()

assert(rootPool.getSchedulableByName(TEST_POOL) === null)
Expand Down Expand Up @@ -302,7 +307,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(LOCAL, APP_NAME, conf)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()

verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
Expand All @@ -317,7 +322,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(LOCAL, APP_NAME, conf)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()

verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
Expand All @@ -332,12 +337,36 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(LOCAL, APP_NAME, conf)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
intercept[FileNotFoundException] {
schedulableBuilder.buildPools()
}
}

test("SPARK-35083: Support remote scheduler pool file") {
val hadoopVersion = VersionInfo.getVersion.split("\\.")
// HttpFileSystem supported since hadoop 2.9
assume(hadoopVersion.head.toInt >= 3 ||
(hadoopVersion.head.toInt == 2 && hadoopVersion(1).toInt >= 9))

val xmlPath = new Path(
Utils.getSparkClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile)
TestUtils.withHttpServer(xmlPath.getParent.toUri.getPath) { baseURL =>
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE,
baseURL + "fairscheduler-with-valid-data.xml")
sc = new SparkContext(LOCAL, APP_NAME, conf)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()

verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
verifyPool(rootPool, "pool1", 3, 1, FIFO)
verifyPool(rootPool, "pool2", 4, 2, FAIR)
verifyPool(rootPool, "pool3", 2, 3, FAIR)
}
}

private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
val selectedPool = rootPool.getSchedulableByName(poolName)
Expand Down
3 changes: 2 additions & 1 deletion docs/job-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,11 @@ properties:

The pool properties can be set by creating an XML file, similar to `conf/fairscheduler.xml.template`,
and either putting a file named `fairscheduler.xml` on the classpath, or setting `spark.scheduler.allocation.file` property in your
[SparkConf](configuration.html#spark-properties).
[SparkConf](configuration.html#spark-properties). The file path can either be a local file path or HDFS file path.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this line isn't completely true. It can be a local file path only when fs.default.scheme is file:/// which usually isn't in production.

So, if users from old Spark versions use a path like /path/to/file, the files will be written into HDFS after the upgrade.

Can we at least update the migration guide? We should also mention that it respects Hadoop properties now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for @HyukjinKwon 's advice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon sorry, not sure I get your point.

So, if users from old Spark versions use a path like /path/to/file, the files will be written into HDFS after the upgrade.

Why we need to write files into HDFS ? This PR is to support read remote file as the schedule pool. I think there is no behavior change but just a new feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see , if user specify a path /path/to/file with no scheme then after this PR, Spark will load it from HDFS instead of local. That worth a migration guide , thank you point out this ! @HyukjinKwon @dongjoon-hyun

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks man!


{% highlight scala %}
conf.set("spark.scheduler.allocation.file", "/path/to/file")
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think HDFS also supports reading the local file. Could you mention it in the above document? e.g., "The file path can either be a local file path or HDFS file path."

{% endhighlight %}

The format of the XML file is simply a `<pool>` element for each pool, with different elements
Expand Down