From 915e88401c57fb267811531a1916124297506e90 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Tue, 21 Jan 2025 02:34:10 +0800 Subject: [PATCH 1/2] java-ext-mem --- jvm-packages/.gitignore | 1 + jvm-packages/create_jni.py | 3 + jvm-packages/pom.xml | 1 + .../xgboost4j/java/ExtMemQuantileDMatrix.java | 87 +++++++ .../dmlc/xgboost4j/java/QuantileDMatrix.java | 5 + .../scala/ExtMemQuantileDMatrix.scala | 37 +++ .../scala/spark/ExternalMemory.scala | 220 ++++++++++++++++++ .../scala/spark/GpuXGBoostPlugin.scala | 8 +- .../ml/dmlc/xgboost4j/java/DMatrixTest.java | 1 - .../scala/ExtMemQuantileDMatrixSuite.scala | 104 +++++++++ .../scala/spark/XGBoostEstimator.scala | 2 +- jvm-packages/xgboost4j/pom.xml | 2 + .../ml/dmlc/xgboost4j/java/XGBoostJNI.java | 5 +- .../xgboost4j/src/native/xgboost4j-gpu.cpp | 4 +- .../xgboost4j/src/native/xgboost4j-gpu.cu | 155 ++++++++---- .../xgboost4j/src/native/xgboost4j.cpp | 24 +- jvm-packages/xgboost4j/src/native/xgboost4j.h | 8 + 17 files changed, 611 insertions(+), 56 deletions(-) create mode 100644 jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java create mode 100644 jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala create mode 100644 jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala create mode 100644 jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala diff --git a/jvm-packages/.gitignore b/jvm-packages/.gitignore index 40572b4d7931..d60fb844dc94 100644 --- a/jvm-packages/.gitignore +++ b/jvm-packages/.gitignore @@ -2,3 +2,4 @@ build.sh xgboost4j-tester/pom.xml xgboost4j-tester/iris.csv dependency-reduced-pom.xml +.factorypath diff --git a/jvm-packages/create_jni.py b/jvm-packages/create_jni.py index 9f04d7f43129..918da2addbf5 100755 --- a/jvm-packages/create_jni.py +++ b/jvm-packages/create_jni.py @@ -73,6 +73,8 @@ def native_build(cli_args: argparse.Namespace) -> None: os.environ["JAVA_HOME"] = ( subprocess.check_output("/usr/libexec/java_home").strip().decode() ) + if cli_args.use_debug == "ON": + CONFIG["CMAKE_BUILD_TYPE"] = "Debug" print("building Java wrapper", flush=True) with cd(".."): @@ -187,5 +189,6 @@ def native_build(cli_args: argparse.Namespace) -> None: ) parser.add_argument("--use-cuda", type=str, choices=["ON", "OFF"], default="OFF") parser.add_argument("--use-openmp", type=str, choices=["ON", "OFF"], default="ON") + parser.add_argument("--use-debug", type=str, choices=["ON", "OFF"], default="OFF") cli_args = parser.parse_args() native_build(cli_args) diff --git a/jvm-packages/pom.xml b/jvm-packages/pom.xml index c8a9178f2c78..440f261e1c44 100644 --- a/jvm-packages/pom.xml +++ b/jvm-packages/pom.xml @@ -57,6 +57,7 @@ OFF OFF ON + OFF 24.10.0 24.10.0 cuda12 diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java new file mode 100644 index 000000000000..904844f7212c --- /dev/null +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2025, XGBoost Contributors + */ +package ml.dmlc.xgboost4j.java; + +import java.util.Iterator; +import java.util.Map; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; + +public class ExtMemQuantileDMatrix extends QuantileDMatrix { + // on_host is set to true by default as we only support GPU at the moment + // cache_prefix is not used yet since we have on_host=true. + public ExtMemQuantileDMatrix(Iterator iter, + float missing, + int maxBin, + DMatrix ref, + int nthread, + int max_num_device_pages, + int max_quantile_batches, + int min_cache_page_bytes) throws XGBoostError { + long[] out = new long[1]; + long[] ref_handle = null; + if (ref != null) { + ref_handle = new long[1]; + ref_handle[0] = ref.getHandle(); + } + String conf = this.getConfig(missing, maxBin, nthread, max_num_device_pages, + max_quantile_batches, min_cache_page_bytes); + XGBoostJNI.checkCall(XGBoostJNI.XGExtMemQuantileDMatrixCreateFromCallback( + iter, ref_handle, conf, out)); + handle = out[0]; + } + + public ExtMemQuantileDMatrix( + Iterator iter, + float missing, + int maxBin, + DMatrix ref) throws XGBoostError { + this(iter, missing, maxBin, ref, 1, -1, -1, -1); + } + + public ExtMemQuantileDMatrix( + Iterator iter, + float missing, + int maxBin) throws XGBoostError { + this(iter, missing, maxBin, null); + } + + private String getConfig(float missing, int maxBin, int nthread, int max_num_device_pages, + int max_quantile_batches, + int min_cache_page_bytes) { + Map conf = new java.util.HashMap<>(); + conf.put("missing", missing); + conf.put("max_bin", maxBin); + conf.put("nthread", nthread); + + if (max_num_device_pages > 0) { + conf.put("max_num_device_pages", max_num_device_pages); + } + if (max_quantile_batches > 0) { + conf.put("max_quantile_batches", max_quantile_batches); + } + if (min_cache_page_bytes > 0) { + conf.put("min_cache_page_bytes", min_cache_page_bytes); + } + + conf.put("on_host", true); + conf.put("cache_prefix", "."); + ObjectMapper mapper = new ObjectMapper(); + + // Handle NaN values. Jackson by default serializes NaN values into strings. + SimpleModule module = new SimpleModule(); + module.addSerializer(Double.class, new F64NaNSerializer()); + module.addSerializer(Float.class, new F32NaNSerializer()); + mapper.registerModule(module); + + try { + String config = mapper.writeValueAsString(conf); + return config; + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize configuration", e); + } + } +}; diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java index c3af64a8972d..29db30c71503 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java @@ -54,6 +54,11 @@ public void serialize(Float value, JsonGenerator gen, * QuantileDMatrix will only be used to train */ public class QuantileDMatrix extends DMatrix { + // implicit constructor for the ext mem version of the QDM. + protected QuantileDMatrix() { + super(0); + } + /** * Create QuantileDMatrix from iterator based on the cuda array interface * diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala new file mode 100644 index 000000000000..f0718cd94c88 --- /dev/null +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala @@ -0,0 +1,37 @@ +/* + Copyright (c) 2025 by Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package ml.dmlc.xgboost4j.scala + +import scala.collection.JavaConverters._ + +import ml.dmlc.xgboost4j.java.{Column, ColumnBatch, ExtMemQuantileDMatrix => jExtMemQuantileDMatrix, XGBoostError} + +class ExtMemQuantileDMatrix private[scala]( + private[scala] override val jDMatrix: jExtMemQuantileDMatrix) extends QuantileDMatrix(jDMatrix) { + + def this(iter: Iterator[ColumnBatch], missing: Float, maxBin: Int) { + this(new jExtMemQuantileDMatrix(iter.asJava, missing, maxBin)) + } + + def this( + iter: Iterator[ColumnBatch], + ref: ExtMemQuantileDMatrix, + missing: Float, + maxBin: Int + ) { + this(new jExtMemQuantileDMatrix(iter.asJava, missing, maxBin, ref.jDMatrix)) + } +} diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala new file mode 100644 index 000000000000..591e0b6742a8 --- /dev/null +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala @@ -0,0 +1,220 @@ +/* + Copyright (c) 2025 by Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package ml.dmlc.xgboost4j.scala.spark + +import java.io.File +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf._ + +import ml.dmlc.xgboost4j.java.{ColumnBatch, CudfColumnBatch} +import ml.dmlc.xgboost4j.scala.spark.Utils.withResource + +private[spark] trait ExternalMemory[T] extends Iterator[Table] with AutoCloseable { + + protected val buffers = ArrayBuffer.empty[T] + private lazy val buffersIterator = buffers.toIterator + + /** + * Convert the table to T which will be cached + * + * @param table to be converted + * @return the content + */ + def convertTable(table: Table): T + + /** + * Load the content to the Table + * + * @param content to be loaded + * @return Table + */ + def loadTable(content: T): Table + + // Cache the table + def cacheTable(table: Table): Unit = { + val content = convertTable(table) + buffers.append(content) + } + + override def hasNext: Boolean = buffersIterator.hasNext + + override def next(): Table = loadTable(buffersIterator.next()) + + override def close(): Unit = {} +} + +// The data will be cached into disk. +private[spark] class DiskExternalMemoryIterator(val path: String) extends ExternalMemory[String] { + + private lazy val root = { + val tmp = path + "/xgboost" + createDirectory(tmp) + tmp + } + + private var counter = 0 + + private def createDirectory(dirPath: String): Unit = { + val path = Paths.get(dirPath) + if (!Files.exists(path)) { + Files.createDirectories(path) + } else { + } + } + + /** + * Convert the table to file path which will be cached + * + * @param table to be converted + * @return the content + */ + override def convertTable(table: Table): String = { + val names = (1 to table.getNumberOfColumns).map(_.toString) + val options = ArrowIPCWriterOptions.builder().withColumnNames(names: _*).build() + val path = root + "/table_" + counter + "_" + System.nanoTime(); + counter += 1 + withResource(Table.writeArrowIPCChunked(options, new File(path))) { writer => + writer.write(table) + } + path + } + + private def closeOnExcept[T <: AutoCloseable, V](r: ArrayBuffer[T]) + (block: ArrayBuffer[T] => V): V = { + try { + block(r) + } catch { + case t: Throwable => + r.foreach(_.close()) + throw t + } + } + + /** + * Load the path from disk to the Table + * + * @param name to be loaded + * @return Table + */ + override def loadTable(name: String): Table = { + val file = new File(name) + if (!file.exists()) { + throw new RuntimeException(s"The cached file ${name} not exist" ) + } + try { + withResource(Table.readArrowIPCChunked(file)) { reader => + val tables = ArrayBuffer.empty[Table] + closeOnExcept(tables) { tables => + var table = Option(reader.getNextIfAvailable()) + while (table.isDefined) { + tables.append(table.get) + table = Option(reader.getNextIfAvailable()) + } + } + if (tables.size > 1) { + closeOnExcept(tables) { tables => + Table.concatenate(tables.toArray: _*) + } + } else { + tables(0) + } + } + } catch { + case e: Throwable => + close() + throw e + } finally { + if (file.exists()) { + file.delete() + } + } + } + + override def close(): Unit = { + buffers.foreach { path => + val file = new File(path) + if (file.exists()) { + file.delete() + } + } + buffers.clear() + } +} + +private[spark] object ExternalMemory { + def apply(path: Option[String] = None): ExternalMemory[_] = { + path.map(new DiskExternalMemoryIterator(_)) + .getOrElse(throw new RuntimeException("No disk path provided")) + } +} + +/** + * ExternalMemoryIterator supports iterating the data twice if the `swap` is called. + * + * The first round iteration gets the input batch that will be + * 1. cached in the external memory + * 2. fed in QuantilDmatrix + * The second round iteration returns the cached batch got from external memory. + * + * @param input the spark input iterator + * @param indices column index + */ +private[scala] class ExternalMemoryIterator(val input: Iterator[Table], + val indices: ColumnIndices, + val path: Option[String] = None) + extends Iterator[ColumnBatch] { + + private var iter = input + + // Flag to indicate the input has been consumed. + private var inputIsConsumed = false + // Flag to indicate the input.next has been called which is valid + private var inputNextIsCalled = false + + // visible for testing + private[spark] val externalMemory = ExternalMemory(path) + + override def hasNext: Boolean = { + val value = iter.hasNext + if (!value && inputIsConsumed && inputNextIsCalled) { + externalMemory.close() + } + if (!inputIsConsumed && !value && inputNextIsCalled) { + inputIsConsumed = true + iter = externalMemory + } + value + } + + override def next(): ColumnBatch = { + inputNextIsCalled = true + withResource(new GpuColumnBatch(iter.next())) { batch => + if (iter.eq(input)) { + externalMemory.cacheTable(batch.table) + } + new CudfColumnBatch( + batch.select(indices.featureIds.get), + batch.select(indices.labelId), + batch.select(indices.weightId.getOrElse(-1)), + batch.select(indices.marginId.getOrElse(-1)), + batch.select(indices.groupId.getOrElse(-1))); + } + } +} diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala index 36e5b7da4299..68743bf60f95 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala @@ -295,7 +295,7 @@ class GpuXGBoostPlugin extends XGBoostPlugin { } } -private class GpuColumnBatch(table: Table) extends AutoCloseable { +private[scala] class GpuColumnBatch(val table: Table) extends AutoCloseable { def select(index: Int): Table = { select(Seq(index)) @@ -308,9 +308,5 @@ private class GpuColumnBatch(table: Table) extends AutoCloseable { new Table(indices.map(table.getColumn): _*) } - override def close(): Unit = { - if (Option(table).isDefined) { - table.close() - } - } + override def close(): Unit = Option(table).foreach(_.close()) } diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java b/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java index 5a4c67bcca38..edb7b518d259 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java +++ b/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java @@ -198,5 +198,4 @@ private float[] convertFloatTofloat(Float[]... datas) { } return floatArray; } - } diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala new file mode 100644 index 000000000000..3dc6aec120c0 --- /dev/null +++ b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala @@ -0,0 +1,104 @@ +/* + Copyright (c) 2021-2025 by Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package ml.dmlc.xgboost4j.scala + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf.Table +import org.scalatest.funsuite.AnyFunSuite + +import ml.dmlc.xgboost4j.java.{ColumnBatch, CudfColumnBatch} +import ml.dmlc.xgboost4j.scala.rapids.spark.TmpFolderSuite +import ml.dmlc.xgboost4j.scala.spark.{ColumnIndices, ExternalMemoryIterator, GpuColumnBatch} +import ml.dmlc.xgboost4j.scala.spark.Utils.withResource + +class ExtMemQuantileDMatrixSuite extends AnyFunSuite with TmpFolderSuite { + + private def runTest(buildIterator: (Iterator[Table], ColumnIndices) => Iterator[ColumnBatch]) = { + val label1 = Array[java.lang.Float](25f, 21f, 22f, 20f, 24f) + val weight1 = Array[java.lang.Float](1.3f, 2.31f, 0.32f, 3.3f, 1.34f) + val baseMargin1 = Array[java.lang.Float](1.2f, 0.2f, 1.3f, 2.4f, 3.5f) + val group1 = Array[java.lang.Integer](1, 1, 7, 7, 19, 26) + + val label2 = Array[java.lang.Float](9f, 5f, 4f, 10f, 12f) + val weight2 = Array[java.lang.Float](3.0f, 1.3f, 3.2f, 0.3f, 1.34f) + val baseMargin2 = Array[java.lang.Float](0.2f, 2.5f, 3.1f, 4.4f, 2.2f) + val group2 = Array[java.lang.Integer](30, 30, 30, 40, 40) + + val expectedGroup = Array(0, 2, 4, 5, 6, 9, 11) + + withResource(new Table.TestBuilder() + .column(1.2f, null.asInstanceOf[java.lang.Float], 5.2f, 7.2f, 9.2f) + .column(0.2f, 0.4f, 0.6f, 2.6f, 0.10f.asInstanceOf[java.lang.Float]) + .build) { X_0 => + withResource(new Table.TestBuilder().column(label1: _*).build) { y_0 => + withResource(new Table.TestBuilder().column(weight1: _*).build) { w_0 => + withResource(new Table.TestBuilder().column(baseMargin1: _*).build) { m_0 => + withResource(new Table.TestBuilder().column(group1: _*).build) { q_0 => + withResource(new Table.TestBuilder() + .column(11.2f, 11.2f, 15.2f, 17.2f, 19.2f.asInstanceOf[java.lang.Float]) + .column(1.2f, 1.4f, null.asInstanceOf[java.lang.Float], 12.6f, 10.10f).build) { + X_1 => + withResource(new Table.TestBuilder().column(label2: _*).build) { y_1 => + withResource(new Table.TestBuilder().column(weight2: _*).build) { w_1 => + withResource(new Table.TestBuilder().column(baseMargin2: _*).build) { m_1 => + withResource(new Table.TestBuilder().column(group2: _*).build) { q_2 => + val tables = new ArrayBuffer[Table]() + tables += new Table(X_0.getColumn(0), X_0.getColumn(1), y_0.getColumn(0), + w_0.getColumn(0), m_0.getColumn(0)) + tables += new Table(X_1.getColumn(0), X_1.getColumn(1), y_1.getColumn(0), + w_1.getColumn(0), m_1.getColumn(0)) + + val indices = ColumnIndices( + labelId = 2, + featureId = None, + featureIds = Option(Seq(0, 1)), + weightId = Option(3), + marginId = Option(4), + groupId = Option(5) + ) + val iter = buildIterator(tables.toIterator, indices); + val dmatrix = new ExtMemQuantileDMatrix(iter, 0.0f, 8) + + def check(dm: ExtMemQuantileDMatrix) = { + assert(dm.getLabel.sameElements(label1 ++ label2)) + assert(dm.getWeight.sameElements(weight1 ++ weight2)) + assert(dm.getBaseMargin.sameElements(baseMargin1 ++ baseMargin2)) + } + check(dmatrix) + } + } + } + } + } + } + } + } + } + } + } + + test("ExtMemQuantileDMatrix test") { + val buildIter = (input: Iterator[Table], indices: ColumnIndices) => + new ExternalMemoryIterator( + input, indices, Option(new File(tempDir.toFile, "xgboost").getPath) + ) + runTest(buildIter) + } +} diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala index 9f9fc22755fd..19f6f324497e 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala @@ -44,7 +44,7 @@ import ml.dmlc.xgboost4j.scala.spark.params._ /** * Hold the column index */ -private[spark] case class ColumnIndices( +private[scala] case class ColumnIndices( labelId: Int, featureId: Option[Int], // the feature type is VectorUDT or Array featureIds: Option[Seq[Int]], // the feature type is columnar diff --git a/jvm-packages/xgboost4j/pom.xml b/jvm-packages/xgboost4j/pom.xml index dcce62d09094..217cdcbe6a8a 100644 --- a/jvm-packages/xgboost4j/pom.xml +++ b/jvm-packages/xgboost4j/pom.xml @@ -100,6 +100,8 @@ ${use.cuda} --use-openmp ${use.openmp} + --use-debug + ${use.debug} ${user.dir} ${skip.native.build} diff --git a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/XGBoostJNI.java b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/XGBoostJNI.java index fa2f18be7ded..45b49a3b40f7 100644 --- a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/XGBoostJNI.java +++ b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/XGBoostJNI.java @@ -172,7 +172,10 @@ public final static native int XGDMatrixSetInfoFromInterface( long handle, String field, String json); public final static native int XGQuantileDMatrixCreateFromCallback( - java.util.Iterator iter, long[] ref, String config, long[] out); + java.util.Iterator iter, long[] ref, String config, long[] out); + + public final static native int XGExtMemQuantileDMatrixCreateFromCallback( + java.util.Iterator iter, long[] ref, String config, long[] out); public final static native int XGDMatrixCreateFromArrayInterfaceColumns( String featureJson, float missing, int nthread, long[] out); diff --git a/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cpp b/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cpp index 00bf5095cd03..40babba73082 100644 --- a/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cpp +++ b/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cpp @@ -9,9 +9,7 @@ #include "../../../../src/common/common.h" namespace xgboost::jni { -XGB_DLL int XGQuantileDMatrixCreateFromCallbackImpl(JNIEnv *jenv, jclass jcls, jobject jdata_iter, - jlongArray jref, char const *config, - jlongArray jout) { +int QdmFromCallback(JNIEnv *, jobject, jlongArray, char const *, bool, jlongArray) { API_BEGIN(); common::AssertGPUSupport(); API_END(); diff --git a/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu b/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu index 6885624bd302..6adffb39447a 100644 --- a/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu +++ b/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu @@ -4,6 +4,7 @@ #include #include +#include "../../../../src/common/common.h" #include "../../../../src/common/cuda_pinned_allocator.h" #include "../../../../src/common/device_vector.cuh" // for device_vector #include "../../../../src/data/array_interface.h" @@ -58,7 +59,7 @@ void CopyColumnMask(xgboost::ArrayInterface<1> const &interface, std::vector @@ -124,7 +125,6 @@ struct Symbols { static constexpr StringView kBaseMargin{"baseMargin"}; static constexpr StringView kQid{"qid"}; }; -} // namespace class JvmIter { JNIEnv *jenv_; @@ -201,7 +201,17 @@ class DMatrixProxy { } }; -class DataIteratorProxy { +template +Json GetLabel(Map const &jmap) { + auto it = jmap.find(Symbols::kLabel); + StringView msg{"Must have a label field."}; + CHECK(it != jmap.cend()) << msg; + Json label = it->second; + CHECK(!IsA(label)) << msg; + return label; +} + +class HostMemProxy { DMatrixProxy proxy_; JvmIter jiter_; @@ -237,33 +247,28 @@ class DataIteratorProxy { cudaStream_t copy_stream_; public: - explicit DataIteratorProxy(jobject jiter) : jiter_{jiter} { + explicit HostMemProxy(jobject jiter) : jiter_{jiter} { this->Reset(); dh::safe_cuda(cudaStreamCreateWithFlags(©_stream_, cudaStreamNonBlocking)); } - ~DataIteratorProxy() { dh::safe_cuda(cudaStreamDestroy(copy_stream_)); } + ~HostMemProxy() { dh::safe_cuda(cudaStreamDestroy(copy_stream_)); } DMatrixHandle GetDMatrixHandle() const { return proxy_.GetDMatrixHandle(); } // Helper function for staging meta info. - void StageMetaInfo(Json json_interface) { - CHECK(!IsA(json_interface)); - auto json_map = get(json_interface); - auto it = json_map.find(Symbols::kLabel); - if (it == json_map.cend()) { - LOG(FATAL) << "Must have a label field."; - } + void StageMetaInfo(Json jaif) { + CHECK(!IsA(jaif)); + auto json_map = get(jaif); + Json label = GetLabel(json_map); - Json label = json_interface[Symbols::kLabel.c_str()]; - CHECK(!IsA(label)); labels_.emplace_back(std::make_unique>()); CopyMetaInfo(&label, labels_.back().get(), copy_stream_); label_interfaces_.emplace_back(label); proxy_.SetInfo(Symbols::kLabel, label); - it = json_map.find(Symbols::kWeight); + auto it = json_map.find(Symbols::kWeight); if (it != json_map.cend()) { - Json weight = json_interface[Symbols::kWeight.c_str()]; + Json weight = it->second; CHECK(!IsA(weight)); weights_.emplace_back(new dh::device_vector); CopyMetaInfo(&weight, weights_.back().get(), copy_stream_); @@ -274,7 +279,7 @@ class DataIteratorProxy { it = json_map.find(Symbols::kBaseMargin); if (it != json_map.cend()) { - Json base_margin = json_interface[Symbols::kBaseMargin.c_str()]; + Json base_margin = it->second; base_margins_.emplace_back(new dh::device_vector); CopyMetaInfo(&base_margin, base_margins_.back().get(), copy_stream_); margin_interfaces_.emplace_back(base_margin); @@ -284,7 +289,7 @@ class DataIteratorProxy { it = json_map.find(Symbols::kQid); if (it != json_map.cend()) { - Json qid = json_interface[Symbols::kQid.c_str()]; + Json qid = it->second; qids_.emplace_back(new dh::device_vector); CopyMetaInfo(&qid, qids_.back().get(), copy_stream_); qid_interfaces_.emplace_back(qid); @@ -304,13 +309,13 @@ class DataIteratorProxy { using T = decltype(host_columns_)::value_type::element_type; host_columns_.emplace_back(std::make_unique()); - // Stage the meta info. - auto json_interface = Json::Load({interface_str.c_str(), interface_str.size()}); - CHECK(!IsA(json_interface)); + // Stage the meta info, Json array interface. + auto jaif = Json::Load({interface_str.c_str(), interface_str.size()}); + CHECK(!IsA(jaif)); - StageMetaInfo(json_interface); + StageMetaInfo(jaif); - Json features = json_interface["features"]; + Json features = jaif["features"]; auto json_columns = get(features); std::vector> interfaces; @@ -394,26 +399,82 @@ class DataIteratorProxy { } return NextSecondLoop(); } - }; + } }; -namespace { -void Reset(DataIterHandle self) { - static_cast(self)->Reset(); -} +// An iterator proxy for external memory. +class ExtMemProxy { + JvmIter jiter_; + DMatrixProxy proxy_; -int Next(DataIterHandle self) { - return static_cast(self)->Next(); -} + public: + explicit ExtMemProxy(jobject jiter) : jiter_(jiter) {} + + ~ExtMemProxy() = default; + + DMatrixHandle GetDMatrixHandle() const { return proxy_.GetDMatrixHandle(); } + + void SetArrayInterface(StringView aif) { + auto jaif = Json::Load(aif); + CHECK(!IsA(jaif)); + + Json features = jaif["features"]; + proxy_.SetData(features); + + // set the meta info. + auto json_map = get(jaif); + Json label = GetLabel(json_map); + proxy_.SetInfo(Symbols::kLabel, label); + + auto it = json_map.find(Symbols::kWeight); + if (it != json_map.cend()) { + Json weight = it->second; + CHECK(!IsA(weight)); + proxy_.SetInfo(Symbols::kWeight, weight); + } + + it = json_map.find(Symbols::kBaseMargin); + if (it != json_map.cend()) { + Json basemargin = it->second; + proxy_.SetInfo("base_margin", basemargin); + } + + it = json_map.find(Symbols::kQid); + if (it != json_map.cend()) { + Json qid = it->second; + proxy_.SetInfo(Symbols::kQid, qid); + } + } + + int Next() { + try { + if (this->jiter_.PullIterFromJVM( + [this](char const *cjaif) { this->SetArrayInterface(cjaif); })) { + return 1; + } else { + return 0; + } + } catch (dmlc::Error const &e) { + if (jiter_.Status() == JNI_EDETACHED) { + GlobalJvm()->DetachCurrentThread(); + } + LOG(FATAL) << e.what(); + } + return 0; + } + + void Reset() { this->jiter_.CloseJvmBatch(); } +}; template using Deleter = std::function; -} // anonymous namespace +} // anonymous namespace -XGB_DLL int XGQuantileDMatrixCreateFromCallbackImpl(JNIEnv *jenv, jclass, jobject jdata_iter, - jlongArray jref, char const *config, - jlongArray jout) { - xgboost::jni::DataIteratorProxy proxy(jdata_iter); +/** + * @brief Create QuantileDMatrix for both in-core version and the external memory version. + */ +int QdmFromCallback(JNIEnv *jenv, jobject jdata_iter, jlongArray jref, char const *config, + bool is_extmem, jlongArray jout) { DMatrixHandle result; DMatrixHandle ref{nullptr}; @@ -426,9 +487,25 @@ XGB_DLL int XGQuantileDMatrixCreateFromCallbackImpl(JNIEnv *jenv, jclass, jobjec ref = reinterpret_cast(refptr.get()[0]); } - auto ret = XGQuantileDMatrixCreateFromCallback(&proxy, proxy.GetDMatrixHandle(), ref, Reset, Next, - config, &result); + int ret = 0; + if (is_extmem) { + xgboost::jni::ExtMemProxy proxy{jdata_iter}; + ret = XGExtMemQuantileDMatrixCreateFromCallback( + &proxy, proxy.GetDMatrixHandle(), ref, + [](DataIterHandle self) { static_cast(self)->Reset(); }, + [](DataIterHandle self) { return static_cast(self)->Next(); }, + config, &result); + } else { + xgboost::jni::HostMemProxy proxy{jdata_iter}; + ret = XGQuantileDMatrixCreateFromCallback( + &proxy, proxy.GetDMatrixHandle(), ref, + [](DataIterHandle self) { static_cast(self)->Reset(); }, + [](DataIterHandle self) { return static_cast(self)->Next(); }, + config, &result); + } + + JVM_CHECK_CALL(ret); setHandle(jenv, jout, result); return ret; } -} // namespace xgboost::jni +} // namespace xgboost::jni diff --git a/jvm-packages/xgboost4j/src/native/xgboost4j.cpp b/jvm-packages/xgboost4j/src/native/xgboost4j.cpp index 7aa29ff03f27..b0959a003680 100644 --- a/jvm-packages/xgboost4j/src/native/xgboost4j.cpp +++ b/jvm-packages/xgboost4j/src/native/xgboost4j.cpp @@ -1298,9 +1298,8 @@ JNIEXPORT jint JNICALL Java_ml_dmlc_xgboost4j_java_XGBoostJNI_CommunicatorAllred } namespace xgboost::jni { -XGB_DLL int XGQuantileDMatrixCreateFromCallbackImpl(JNIEnv *jenv, jclass jcls, jobject jdata_iter, - jobject jref_iter, char const *config, - jlongArray jout); +int QdmFromCallback(JNIEnv *jenv, jobject jdata_iter, jlongArray jref, char const *config, + bool is_extmem, jlongArray jout); } // namespace xgboost::jni /* @@ -1309,14 +1308,29 @@ XGB_DLL int XGQuantileDMatrixCreateFromCallbackImpl(JNIEnv *jenv, jclass jcls, j * Signature: (Ljava/util/Iterator;[JLjava/lang/String;[J)I */ JNIEXPORT jint JNICALL Java_ml_dmlc_xgboost4j_java_XGBoostJNI_XGQuantileDMatrixCreateFromCallback( + JNIEnv *jenv, jclass, jobject jdata_iter, jlongArray jref, jstring jconf, + jlongArray jout) { + std::unique_ptr> conf{jenv->GetStringUTFChars(jconf, nullptr), + [&](char const *ptr) { + jenv->ReleaseStringUTFChars(jconf, ptr); + }}; + return xgboost::jni::QdmFromCallback(jenv, jdata_iter, jref, conf.get(), false, jout); +} + +/* + * Class: ml_dmlc_xgboost4j_java_XGBoostJNI + * Method: XGExtMemQuantileDMatrixCreateFromCallback + * Signature: (Ljava/util/Iterator;[JLjava/lang/String;[J)I + */ +JNIEXPORT jint JNICALL +Java_ml_dmlc_xgboost4j_java_XGBoostJNI_XGExtMemQuantileDMatrixCreateFromCallback( JNIEnv *jenv, jclass jcls, jobject jdata_iter, jlongArray jref, jstring jconf, jlongArray jout) { std::unique_ptr> conf{jenv->GetStringUTFChars(jconf, nullptr), [&](char const *ptr) { jenv->ReleaseStringUTFChars(jconf, ptr); }}; - return xgboost::jni::XGQuantileDMatrixCreateFromCallbackImpl(jenv, jcls, jdata_iter, jref, - conf.get(), jout); + return xgboost::jni::QdmFromCallback(jenv, jdata_iter, jref, conf.get(), true, jout); } /* diff --git a/jvm-packages/xgboost4j/src/native/xgboost4j.h b/jvm-packages/xgboost4j/src/native/xgboost4j.h index 825ad14a5372..b62061581046 100644 --- a/jvm-packages/xgboost4j/src/native/xgboost4j.h +++ b/jvm-packages/xgboost4j/src/native/xgboost4j.h @@ -407,6 +407,14 @@ JNIEXPORT jint JNICALL Java_ml_dmlc_xgboost4j_java_XGBoostJNI_XGDMatrixSetInfoFr JNIEXPORT jint JNICALL Java_ml_dmlc_xgboost4j_java_XGBoostJNI_XGQuantileDMatrixCreateFromCallback (JNIEnv *, jclass, jobject, jlongArray, jstring, jlongArray); +/* + * Class: ml_dmlc_xgboost4j_java_XGBoostJNI + * Method: XGExtMemQuantileDMatrixCreateFromCallback + * Signature: (Ljava/util/Iterator;[JLjava/lang/String;[J)I + */ +JNIEXPORT jint JNICALL Java_ml_dmlc_xgboost4j_java_XGBoostJNI_XGExtMemQuantileDMatrixCreateFromCallback + (JNIEnv *, jclass, jobject, jlongArray, jstring, jlongArray); + /* * Class: ml_dmlc_xgboost4j_java_XGBoostJNI * Method: XGDMatrixCreateFromArrayInterfaceColumns From dfa4e8cbc6d1df67b7c49d9619b704be14a30984 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Mon, 27 Jan 2025 10:22:59 +0800 Subject: [PATCH 2/2] Fix --- .../dmlc/xgboost4j/java/CudfColumnBatch.java | 11 + .../xgboost4j/java/ExtMemQuantileDMatrix.java | 52 +-- .../dmlc/xgboost4j/java/QuantileDMatrix.java | 3 +- .../scala/ExtMemQuantileDMatrix.scala | 15 +- .../xgboost4j/scala/QuantileDMatrix.scala | 20 +- .../scala/spark/GpuXGBoostPlugin.scala | 44 ++- .../scala/ExtMemQuantileDMatrixSuite.scala | 2 +- .../scala/spark/ExternalMemorySuite.scala | 101 ++++++ .../scala/spark/GpuXGBoostPluginSuite.scala | 323 +++++++++--------- .../scala/spark/params/XGBoostParams.scala | 35 +- 10 files changed, 399 insertions(+), 207 deletions(-) create mode 100644 jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java index 2f1870c580be..1599e3078b84 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java @@ -86,6 +86,17 @@ private List initializeCudfColumns(Table table) { .collect(Collectors.toList()); } + // visible for testing + public Table getFeatureTable() { + return featureTable; + } + + // visible for testing + public Table getLabelTable() { + return labelTable; + } + + public List getFeatures() { return features; } diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java index 904844f7212c..a2e54190a882 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java @@ -1,5 +1,17 @@ /* - * Copyright (c) 2025, XGBoost Contributors + Copyright (c) 2025 by Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package ml.dmlc.xgboost4j.java; @@ -18,19 +30,19 @@ public ExtMemQuantileDMatrix(Iterator iter, int maxBin, DMatrix ref, int nthread, - int max_num_device_pages, - int max_quantile_batches, - int min_cache_page_bytes) throws XGBoostError { + int maxNumDevicePages, + int maxQuantileBatches, + int minCachePageBytes) throws XGBoostError { long[] out = new long[1]; - long[] ref_handle = null; + long[] refHandle = null; if (ref != null) { - ref_handle = new long[1]; - ref_handle[0] = ref.getHandle(); + refHandle = new long[1]; + refHandle[0] = ref.getHandle(); } - String conf = this.getConfig(missing, maxBin, nthread, max_num_device_pages, - max_quantile_batches, min_cache_page_bytes); + String conf = this.getConfig(missing, maxBin, nthread, maxNumDevicePages, + maxQuantileBatches, minCachePageBytes); XGBoostJNI.checkCall(XGBoostJNI.XGExtMemQuantileDMatrixCreateFromCallback( - iter, ref_handle, conf, out)); + iter, refHandle, conf, out)); handle = out[0]; } @@ -49,22 +61,21 @@ public ExtMemQuantileDMatrix( this(iter, missing, maxBin, null); } - private String getConfig(float missing, int maxBin, int nthread, int max_num_device_pages, - int max_quantile_batches, - int min_cache_page_bytes) { + private String getConfig(float missing, int maxBin, int nthread, int maxNumDevicePages, + int maxQuantileBatches, int minCachePageBytes) { Map conf = new java.util.HashMap<>(); conf.put("missing", missing); conf.put("max_bin", maxBin); conf.put("nthread", nthread); - if (max_num_device_pages > 0) { - conf.put("max_num_device_pages", max_num_device_pages); + if (maxNumDevicePages > 0) { + conf.put("max_num_device_pages", maxNumDevicePages); } - if (max_quantile_batches > 0) { - conf.put("max_quantile_batches", max_quantile_batches); + if (maxQuantileBatches > 0) { + conf.put("max_quantile_batches", maxQuantileBatches); } - if (min_cache_page_bytes > 0) { - conf.put("min_cache_page_bytes", min_cache_page_bytes); + if (minCachePageBytes > 0) { + conf.put("min_cache_page_bytes", minCachePageBytes); } conf.put("on_host", true); @@ -78,8 +89,7 @@ private String getConfig(float missing, int maxBin, int nthread, int max_num_dev mapper.registerModule(module); try { - String config = mapper.writeValueAsString(conf); - return config; + return mapper.writeValueAsString(conf); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize configuration", e); } diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java index 29db30c71503..9e36b0a38cc2 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java @@ -163,8 +163,7 @@ private String getConfig(float missing, int maxBin, int nthread) { mapper.registerModule(module); try { - String config = mapper.writeValueAsString(conf); - return config; + return mapper.writeValueAsString(conf); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize configuration", e); } diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala index f0718cd94c88..d978a1b1fcfd 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala @@ -17,11 +17,24 @@ package ml.dmlc.xgboost4j.scala import scala.collection.JavaConverters._ -import ml.dmlc.xgboost4j.java.{Column, ColumnBatch, ExtMemQuantileDMatrix => jExtMemQuantileDMatrix, XGBoostError} +import ml.dmlc.xgboost4j.java.{ColumnBatch, ExtMemQuantileDMatrix => jExtMemQuantileDMatrix} class ExtMemQuantileDMatrix private[scala]( private[scala] override val jDMatrix: jExtMemQuantileDMatrix) extends QuantileDMatrix(jDMatrix) { + def this(iter: Iterator[ColumnBatch], + missing: Float, + maxBin: Int, + ref: Option[QuantileDMatrix], + nthread: Int, + maxNumDevicePages: Int, + maxQuantileBatches: Int, + minCachePageBytes: Int) { + this(new jExtMemQuantileDMatrix(iter.asJava, missing, maxBin, + ref.map(_.jDMatrix).orNull, + nthread, maxNumDevicePages, maxQuantileBatches, minCachePageBytes)) + } + def this(iter: Iterator[ColumnBatch], missing: Float, maxBin: Int) { this(new jExtMemQuantileDMatrix(iter.asJava, missing, maxBin)) } diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala index a9fac0245abf..679e16231d32 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala @@ -39,22 +39,22 @@ class QuantileDMatrix private[scala]( /** * Create QuantileDMatrix from iterator based on the array interface * - * @param iter the XGBoost ColumnBatch batch to provide the corresponding array interface - * @param refDMatrix The reference QuantileDMatrix that provides quantile information, needed - * when creating validation/test dataset with QuantileDMatrix. Supplying the - * training DMatrix as a reference means that the same quantisation applied - * to the training data is applied to the validation/test data - * @param missing the missing value - * @param maxBin the max bin - * @param nthread the parallelism + * @param iter the XGBoost ColumnBatch batch to provide the corresponding array interface + * @param ref The reference QuantileDMatrix that provides quantile information, needed + * when creating validation/test dataset with QuantileDMatrix. Supplying the + * training DMatrix as a reference means that the same quantisation applied + * to the training data is applied to the validation/test data + * @param missing the missing value + * @param maxBin the max bin + * @param nthread the parallelism * @throws XGBoostError */ def this(iter: Iterator[ColumnBatch], - ref: QuantileDMatrix, + ref: Option[QuantileDMatrix], missing: Float, maxBin: Int, nthread: Int) { - this(new JQuantileDMatrix(iter.asJava, ref.jDMatrix, missing, maxBin, nthread)) + this(new JQuantileDMatrix(iter.asJava, ref.map(_.jDMatrix).orNull, missing, maxBin, nthread)) } /** diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala index 68743bf60f95..bb20eaf40a44 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{DataType, FloatType, IntegerType} import org.apache.spark.sql.vectorized.ColumnarBatch import ml.dmlc.xgboost4j.java.CudfColumnBatch -import ml.dmlc.xgboost4j.scala.{DMatrix, QuantileDMatrix} +import ml.dmlc.xgboost4j.scala.{DMatrix, ExtMemQuantileDMatrix, QuantileDMatrix} import ml.dmlc.xgboost4j.scala.spark.Utils.withResource import ml.dmlc.xgboost4j.scala.spark.params.HasGroupCol @@ -127,22 +127,38 @@ class GpuXGBoostPlugin extends XGBoostPlugin { val nthread = estimator.getNthread val missing = estimator.getMissing + val useExtMem = estimator.getUseExternalMemory + val extMemPath = if (useExtMem) { + Some(dataset.sparkSession.conf.get("spark.local.dir", "/tmp")) + } else None + + val maxQuantileBatches = estimator.getMaxQuantileBatches + val minCachePageBytes = estimator.getMinCachePageBytes + val maxNumDevicePages = estimator.getMaxNumDevicePages + /** build QuantileDMatrix on the executor side */ - def buildQuantileDMatrix(iter: Iterator[Table], + def buildQuantileDMatrix(input: Iterator[Table], ref: Option[QuantileDMatrix] = None): QuantileDMatrix = { - val colBatchIter = iter.map { table => - withResource(new GpuColumnBatch(table)) { batch => - new CudfColumnBatch( - batch.select(indices.featureIds.get), - batch.select(indices.labelId), - batch.select(indices.weightId.getOrElse(-1)), - batch.select(indices.marginId.getOrElse(-1)), - batch.select(indices.groupId.getOrElse(-1))); - } + + extMemPath match { + case Some(_) => + val itr = new ExternalMemoryIterator(input, indices, extMemPath) + new ExtMemQuantileDMatrix(itr, missing, maxBin, ref, nthread, maxNumDevicePages, + maxQuantileBatches, minCachePageBytes) + + case None => + val itr = input.map { table => + withResource(new GpuColumnBatch(table)) { batch => + new CudfColumnBatch( + batch.select(indices.featureIds.get), + batch.select(indices.labelId), + batch.select(indices.weightId.getOrElse(-1)), + batch.select(indices.marginId.getOrElse(-1)), + batch.select(indices.groupId.getOrElse(-1))); + } + } + new QuantileDMatrix(itr, ref, missing, maxBin, nthread) } - ref.map(r => new QuantileDMatrix(colBatchIter, r, missing, maxBin, nthread)).getOrElse( - new QuantileDMatrix(colBatchIter, missing, maxBin, nthread) - ) } estimator.getEvalDataset().map { evalDs => diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala index 3dc6aec120c0..c0174c835a63 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala @@ -1,5 +1,5 @@ /* - Copyright (c) 2021-2025 by Contributors + Copyright (c) 2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala new file mode 100644 index 000000000000..bd0b3adc609a --- /dev/null +++ b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala @@ -0,0 +1,101 @@ +/* + Copyright (c) 2025 by Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package ml.dmlc.xgboost4j.scala.spark + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf.Table + +import ml.dmlc.xgboost4j.java.CudfColumnBatch +import ml.dmlc.xgboost4j.scala.rapids.spark.GpuTestSuite +import ml.dmlc.xgboost4j.scala.spark.Utils.withResource + +class ExternalMemorySuite extends GpuTestSuite { + + private def assertColumnBatchEqual(lhs: Array[CudfColumnBatch], + rhs: Array[CudfColumnBatch]): Unit = { + def assertTwoTable(lhsTable: Table, rhsTable: Table): Unit = { + assert(lhsTable.getNumberOfColumns === rhsTable.getNumberOfColumns) + for (i <- 0 until lhsTable.getNumberOfColumns) { + val lColumn = lhsTable.getColumn(i) + val rColumn = rhsTable.getColumn(i) + + val lHost = lColumn.copyToHost() + val rHost = rColumn.copyToHost() + + assert(lHost.getRowCount === rHost.getRowCount) + for (j <- 0 until lHost.getRowCount.toInt) { + assert(lHost.getFloat(j) === rHost.getFloat(j)) + } + } + } + + assert(lhs.length === rhs.length) + for ((l, r) <- lhs.zip(rhs)) { + assertTwoTable(l.getFeatureTable, r.getFeatureTable) + assertTwoTable(l.getLabelTable, r.getLabelTable) + } + } + + def runExternalMemoryTest(buildExternalMemory: (Iterator[Table], ColumnIndices) => + ExternalMemoryIterator): Unit = { + + withResource(new Table.TestBuilder() + .column(1.0f, 2.0f, 3.0f.asInstanceOf[java.lang.Float]) + .column(4.0f, 5.0f, 6.0f.asInstanceOf[java.lang.Float]) + .column(7.0f, 8.0f, 9.0f.asInstanceOf[java.lang.Float]) + .build) { table1 => + + withResource(new Table.TestBuilder() + .column(11.0f, 12.0f, 13.0f.asInstanceOf[java.lang.Float]) + .column(14.0f, 15.0f, 16.0f.asInstanceOf[java.lang.Float]) + .column(17.0f, 18.0f, 19.0f.asInstanceOf[java.lang.Float]) + .build) { table2 => + + val tables = Seq(table1, table2) + + val indices = ColumnIndices(labelId = 0, featureIds = Some(Seq(1, 2)), featureId = None, + weightId = None, marginId = None, groupId = None) + val extMemIter = buildExternalMemory(tables.toIterator, indices) + val expectTables = ArrayBuffer.empty[CudfColumnBatch] + while (extMemIter.hasNext) { + val table = extMemIter.next().asInstanceOf[CudfColumnBatch] + expectTables.append(table) + } + // The hasNext has swap the iterator internally, so we can still get the + // value for the next round of iteration + + val targetTables = ArrayBuffer.empty[CudfColumnBatch] + while (extMemIter.hasNext) { + val table = extMemIter.next().asInstanceOf[CudfColumnBatch] + targetTables.append(table) + } + + assertColumnBatchEqual(expectTables.toArray, targetTables.toArray) + } + } + } + + test("DiskExternalMemory") { + val buildIterator = (input: Iterator[Table], indices: ColumnIndices) => { + val iter = new ExternalMemoryIterator(input, indices, Some("/tmp/")) + assert(iter.externalMemory.isInstanceOf[DiskExternalMemoryIterator]) + iter + } + runExternalMemoryTest(buildIterator) + } +} diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala index d4a24f7745c5..395c03bdfd27 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala @@ -357,25 +357,101 @@ class GpuXGBoostPluginSuite extends GpuTestSuite { } } - Seq("binary:logistic", "multi:softprob").foreach { case objective => - test(s"$objective: XGBoost-Spark should match xgboost4j") { - withGpuSparkSession() { spark => - import spark.implicits._ + Seq(false, true).foreach { useExtMem => + Seq("binary:logistic", "multi:softprob").foreach { objective => + test(s"$objective: XGBoost-Spark should match xgboost4j with useExtMem: ${useExtMem}") { + withGpuSparkSession() { spark => + import spark.implicits._ + + val numRound = 100 + var xgboostParams: Map[String, Any] = Map( + "objective" -> objective, + "device" -> "cuda" + ) - val numRound = 100 - var xgboostParams: Map[String, Any] = Map( - "objective" -> objective, - "device" -> "cuda" - ) + val (trainPath, testPath) = if (objective == "binary:logistic") { + (writeFile(Classification.train.toDF("label", "weight", "c1", "c2", "c3")), + writeFile(Classification.test.toDF("label", "weight", "c1", "c2", "c3"))) + } else { + xgboostParams = xgboostParams ++ Map("num_class" -> 6) + (writeFile(MultiClassification.train.toDF("label", "weight", "c1", "c2", "c3")), + writeFile(MultiClassification.test.toDF("label", "weight", "c1", "c2", "c3"))) + } + + val df = spark.read.parquet(trainPath) + val testdf = spark.read.parquet(testPath) + + val features = Array("c1", "c2", "c3") + val featuresIndices = features.map(df.schema.fieldIndex) + val label = "label" + + val classifier = new XGBoostClassifier(xgboostParams) + .setFeaturesCol(features) + .setLabelCol(label) + .setNumRound(numRound) + .setLeafPredictionCol("leaf") + .setContribPredictionCol("contrib") + .setDevice("cuda") + .setUseExternalMemory(useExtMem) + + val xgb4jModel = withResource(new GpuColumnBatch( + Table.readParquet(new File(trainPath)))) { batch => + val cb = new CudfColumnBatch(batch.select(featuresIndices), + batch.select(df.schema.fieldIndex(label)), null, null, null + ) + val qdm = new QuantileDMatrix(Seq(cb).iterator, classifier.getMissing, + classifier.getMaxBins, classifier.getNthread) + ScalaXGBoost.train(qdm, xgboostParams, numRound) + } + + val (xgb4jLeaf, xgb4jContrib, xgb4jProb, xgb4jRaw) = withResource(new GpuColumnBatch( + Table.readParquet(new File(testPath)))) { batch => + val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null + ) + val qdm = new DMatrix(cb, classifier.getMissing, classifier.getNthread) + (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm), + xgb4jModel.predict(qdm), xgb4jModel.predict(qdm, outPutMargin = true)) + } + + val rows = classifier.fit(df).transform(testdf).collect() + + // Check Leaf + val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat)) + checkEqual(xgb4jLeaf, xgbSparkLeaf) + + // Check contrib + val xgbSparkContrib = rows.map(row => + row.getAs[DenseVector]("contrib").toArray.map(_.toFloat)) + checkEqual(xgb4jContrib, xgbSparkContrib) + + // Check probability + var xgbSparkProb = rows.map(row => + row.getAs[DenseVector]("probability").toArray.map(_.toFloat)) + if (objective == "binary:logistic") { + xgbSparkProb = xgbSparkProb.map(v => Array(v(1))) + } + checkEqual(xgb4jProb, xgbSparkProb) + + // Check raw + var xgbSparkRaw = rows.map(row => + row.getAs[DenseVector]("rawPrediction").toArray.map(_.toFloat)) + if (objective == "binary:logistic") { + xgbSparkRaw = xgbSparkRaw.map(v => Array(v(1))) + } + checkEqual(xgb4jRaw, xgbSparkRaw) - val (trainPath, testPath) = if (objective == "binary:logistic") { - (writeFile(Classification.train.toDF("label", "weight", "c1", "c2", "c3")), - writeFile(Classification.test.toDF("label", "weight", "c1", "c2", "c3"))) - } else { - xgboostParams = xgboostParams ++ Map("num_class" -> 6) - (writeFile(MultiClassification.train.toDF("label", "weight", "c1", "c2", "c3")), - writeFile(MultiClassification.test.toDF("label", "weight", "c1", "c2", "c3"))) } + } + } + } + + Seq(false, true).foreach { useExtMem => + test(s"Regression: XGBoost-Spark should match xgboost4j with useExtMem: ${useExtMem}") { + withGpuSparkSession() { spark => + import spark.implicits._ + + val trainPath = writeFile(Regression.train.toDF("label", "weight", "c1", "c2", "c3")) + val testPath = writeFile(Regression.test.toDF("label", "weight", "c1", "c2", "c3")) val df = spark.read.parquet(trainPath) val testdf = spark.read.parquet(testPath) @@ -384,34 +460,40 @@ class GpuXGBoostPluginSuite extends GpuTestSuite { val featuresIndices = features.map(df.schema.fieldIndex) val label = "label" - val classifier = new XGBoostClassifier(xgboostParams) + val numRound = 100 + val xgboostParams: Map[String, Any] = Map( + "device" -> "cuda" + ) + + val regressor = new XGBoostRegressor(xgboostParams) .setFeaturesCol(features) .setLabelCol(label) .setNumRound(numRound) .setLeafPredictionCol("leaf") .setContribPredictionCol("contrib") .setDevice("cuda") + .setUseExternalMemory(useExtMem) val xgb4jModel = withResource(new GpuColumnBatch( Table.readParquet(new File(trainPath)))) { batch => val cb = new CudfColumnBatch(batch.select(featuresIndices), batch.select(df.schema.fieldIndex(label)), null, null, null ) - val qdm = new QuantileDMatrix(Seq(cb).iterator, classifier.getMissing, - classifier.getMaxBins, classifier.getNthread) + val qdm = new QuantileDMatrix(Seq(cb).iterator, regressor.getMissing, + regressor.getMaxBins, regressor.getNthread) ScalaXGBoost.train(qdm, xgboostParams, numRound) } - val (xgb4jLeaf, xgb4jContrib, xgb4jProb, xgb4jRaw) = withResource(new GpuColumnBatch( + val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch( Table.readParquet(new File(testPath)))) { batch => val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null ) - val qdm = new DMatrix(cb, classifier.getMissing, classifier.getNthread) + val qdm = new DMatrix(cb, regressor.getMissing, regressor.getNthread) (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm), - xgb4jModel.predict(qdm), xgb4jModel.predict(qdm, outPutMargin = true)) + xgb4jModel.predict(qdm)) } - val rows = classifier.fit(df).transform(testdf).collect() + val rows = regressor.fit(df).transform(testdf).collect() // Check Leaf val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat)) @@ -422,90 +504,14 @@ class GpuXGBoostPluginSuite extends GpuTestSuite { row.getAs[DenseVector]("contrib").toArray.map(_.toFloat)) checkEqual(xgb4jContrib, xgbSparkContrib) - // Check probability - var xgbSparkProb = rows.map(row => - row.getAs[DenseVector]("probability").toArray.map(_.toFloat)) - if (objective == "binary:logistic") { - xgbSparkProb = xgbSparkProb.map(v => Array(v(1))) - } - checkEqual(xgb4jProb, xgbSparkProb) - - // Check raw - var xgbSparkRaw = rows.map(row => - row.getAs[DenseVector]("rawPrediction").toArray.map(_.toFloat)) - if (objective == "binary:logistic") { - xgbSparkRaw = xgbSparkRaw.map(v => Array(v(1))) - } - checkEqual(xgb4jRaw, xgbSparkRaw) - + // Check prediction + val xgbSparkPred = rows.map(row => + Array(row.getAs[Double]("prediction").toFloat)) + checkEqual(xgb4jPred, xgbSparkPred) } } } - test(s"Regression: XGBoost-Spark should match xgboost4j") { - withGpuSparkSession() { spark => - import spark.implicits._ - - val trainPath = writeFile(Regression.train.toDF("label", "weight", "c1", "c2", "c3")) - val testPath = writeFile(Regression.test.toDF("label", "weight", "c1", "c2", "c3")) - - val df = spark.read.parquet(trainPath) - val testdf = spark.read.parquet(testPath) - - val features = Array("c1", "c2", "c3") - val featuresIndices = features.map(df.schema.fieldIndex) - val label = "label" - - val numRound = 100 - val xgboostParams: Map[String, Any] = Map( - "device" -> "cuda" - ) - - val regressor = new XGBoostRegressor(xgboostParams) - .setFeaturesCol(features) - .setLabelCol(label) - .setNumRound(numRound) - .setLeafPredictionCol("leaf") - .setContribPredictionCol("contrib") - .setDevice("cuda") - - val xgb4jModel = withResource(new GpuColumnBatch( - Table.readParquet(new File(trainPath)))) { batch => - val cb = new CudfColumnBatch(batch.select(featuresIndices), - batch.select(df.schema.fieldIndex(label)), null, null, null - ) - val qdm = new QuantileDMatrix(Seq(cb).iterator, regressor.getMissing, - regressor.getMaxBins, regressor.getNthread) - ScalaXGBoost.train(qdm, xgboostParams, numRound) - } - - val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch( - Table.readParquet(new File(testPath)))) { batch => - val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null - ) - val qdm = new DMatrix(cb, regressor.getMissing, regressor.getNthread) - (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm), - xgb4jModel.predict(qdm)) - } - - val rows = regressor.fit(df).transform(testdf).collect() - - // Check Leaf - val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat)) - checkEqual(xgb4jLeaf, xgbSparkLeaf) - - // Check contrib - val xgbSparkContrib = rows.map(row => - row.getAs[DenseVector]("contrib").toArray.map(_.toFloat)) - checkEqual(xgb4jContrib, xgbSparkContrib) - - // Check prediction - val xgbSparkPred = rows.map(row => - Array(row.getAs[Double]("prediction").toFloat)) - checkEqual(xgb4jPred, xgbSparkPred) - } - } - test("The group col should be sorted in each partition") { withGpuSparkSession() { spark => import spark.implicits._ @@ -592,71 +598,74 @@ class GpuXGBoostPluginSuite extends GpuTestSuite { } } - test("Ranker: XGBoost-Spark should match xgboost4j") { - withGpuSparkSession() { spark => - import spark.implicits._ + Seq(false, true).foreach { useExtMem => + test(s"Ranker: XGBoost-Spark should match xgboost4j with useExtMem=$useExtMem") { + withGpuSparkSession() { spark => + import spark.implicits._ - val trainPath = writeFile(Ranking.train.toDF("label", "weight", "group", "c1", "c2", "c3")) - val testPath = writeFile(Ranking.test.toDF("label", "weight", "group", "c1", "c2", "c3")) + val trainPath = writeFile(Ranking.train.toDF("label", "weight", "group", "c1", "c2", "c3")) + val testPath = writeFile(Ranking.test.toDF("label", "weight", "group", "c1", "c2", "c3")) - val df = spark.read.parquet(trainPath) - val testdf = spark.read.parquet(testPath) + val df = spark.read.parquet(trainPath) + val testdf = spark.read.parquet(testPath) - val features = Array("c1", "c2", "c3") - val featuresIndices = features.map(df.schema.fieldIndex) - val label = "label" - val group = "group" + val features = Array("c1", "c2", "c3") + val featuresIndices = features.map(df.schema.fieldIndex) + val label = "label" + val group = "group" - val numRound = 100 - val xgboostParams: Map[String, Any] = Map( - "device" -> "cuda", - "objective" -> "rank:ndcg" - ) + val numRound = 100 + val xgboostParams: Map[String, Any] = Map( + "device" -> "cuda", + "objective" -> "rank:ndcg" + ) - val ranker = new XGBoostRanker(xgboostParams) - .setFeaturesCol(features) - .setLabelCol(label) - .setNumRound(numRound) - .setLeafPredictionCol("leaf") - .setContribPredictionCol("contrib") - .setGroupCol(group) - .setDevice("cuda") + val ranker = new XGBoostRanker(xgboostParams) + .setFeaturesCol(features) + .setLabelCol(label) + .setNumRound(numRound) + .setLeafPredictionCol("leaf") + .setContribPredictionCol("contrib") + .setGroupCol(group) + .setDevice("cuda") + .setUseExternalMemory(useExtMem) - val xgb4jModel = withResource(new GpuColumnBatch( - Table.readParquet(new File(trainPath) - ).orderBy(OrderByArg.asc(df.schema.fieldIndex(group))))) { batch => - val cb = new CudfColumnBatch(batch.select(featuresIndices), - batch.select(df.schema.fieldIndex(label)), null, null, - batch.select(df.schema.fieldIndex(group))) - val qdm = new QuantileDMatrix(Seq(cb).iterator, ranker.getMissing, - ranker.getMaxBins, ranker.getNthread) - ScalaXGBoost.train(qdm, xgboostParams, numRound) - } + val xgb4jModel = withResource(new GpuColumnBatch( + Table.readParquet(new File(trainPath) + ).orderBy(OrderByArg.asc(df.schema.fieldIndex(group))))) { batch => + val cb = new CudfColumnBatch(batch.select(featuresIndices), + batch.select(df.schema.fieldIndex(label)), null, null, + batch.select(df.schema.fieldIndex(group))) + val qdm = new QuantileDMatrix(Seq(cb).iterator, ranker.getMissing, + ranker.getMaxBins, ranker.getNthread) + ScalaXGBoost.train(qdm, xgboostParams, numRound) + } - val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch( - Table.readParquet(new File(testPath)))) { batch => - val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null - ) - val qdm = new DMatrix(cb, ranker.getMissing, ranker.getNthread) - (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm), - xgb4jModel.predict(qdm)) - } + val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch( + Table.readParquet(new File(testPath)))) { batch => + val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null + ) + val qdm = new DMatrix(cb, ranker.getMissing, ranker.getNthread) + (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm), + xgb4jModel.predict(qdm)) + } - val rows = ranker.fit(df).transform(testdf).collect() + val rows = ranker.fit(df).transform(testdf).collect() - // Check Leaf - val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat)) - checkEqual(xgb4jLeaf, xgbSparkLeaf) + // Check Leaf + val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat)) + checkEqual(xgb4jLeaf, xgbSparkLeaf) - // Check contrib - val xgbSparkContrib = rows.map(row => - row.getAs[DenseVector]("contrib").toArray.map(_.toFloat)) - checkEqual(xgb4jContrib, xgbSparkContrib) + // Check contrib + val xgbSparkContrib = rows.map(row => + row.getAs[DenseVector]("contrib").toArray.map(_.toFloat)) + checkEqual(xgb4jContrib, xgbSparkContrib) - // Check prediction - val xgbSparkPred = rows.map(row => - Array(row.getAs[Double]("prediction").toFloat)) - checkEqual(xgb4jPred, xgbSparkPred) + // Check prediction + val xgbSparkPred = rows.map(row => + Array(row.getAs[Double]("prediction").toFloat)) + checkEqual(xgb4jPred, xgbSparkPred) + } } } diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala index d7a83eb1b76a..bf27c51f8a72 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala @@ -179,10 +179,35 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe final def getFeatureTypes: Array[String] = $(featureTypes) + final val useExternalMemory = new BooleanParam(this, "useExternalMemory", "Whether to use " + + "the external memory or not when building QuantileDMatrix. Please note that " + + "useExternalMemory is useful only when `device` is set to `cuda` or `gpu`. When " + + "useExternalMemory is enabled, the directory specified by spark.local.dir if set will be " + + "used to cache the temporary files, if spark.local.dir is not set, the /tmp directory " + + "will be used.") + + final def getUseExternalMemory: Boolean = $(useExternalMemory) + + final val maxNumDevicePages = new IntParam(this, "maxNumDevicePages", "Maximum number of " + + "pages cached in device") + + final def getMaxNumDevicePages: Int = $(maxNumDevicePages) + + final val maxQuantileBatches = new IntParam(this, "maxQuantileBatches", "Maximum quantile " + + "batches") + + final def getMaxQuantileBatches: Int = $(maxQuantileBatches) + + final val minCachePageBytes = new IntParam(this, "minCachePageBytes", "Minimum number of " + + "bytes for each ellpack page in cache. Only used for in-host") + + final def getMinCachePageBytes: Int = $(minCachePageBytes) + setDefault(numRound -> 100, numWorkers -> 1, inferBatchSize -> (32 << 10), numEarlyStoppingRounds -> 0, forceRepartition -> false, missing -> Float.NaN, featuresCols -> Array.empty, customObj -> null, customEval -> null, - featureNames -> Array.empty, featureTypes -> Array.empty) + featureNames -> Array.empty, featureTypes -> Array.empty, useExternalMemory -> false, + maxNumDevicePages -> -1, maxQuantileBatches -> -1, minCachePageBytes -> -1) addNonXGBoostParam(numWorkers, numRound, numEarlyStoppingRounds, inferBatchSize, featuresCol, labelCol, baseMarginCol, weightCol, predictionCol, leafPredictionCol, contribPredictionCol, @@ -224,6 +249,14 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe def setFeatureTypes(value: Array[String]): T = set(featureTypes, value).asInstanceOf[T] + def setUseExternalMemory(value: Boolean): T = set(useExternalMemory, value).asInstanceOf[T] + + def setMaxNumDevicePages(value: Int): T = set(maxNumDevicePages, value).asInstanceOf[T] + + def setMaxQuantileBatches(value: Int): T = set(maxQuantileBatches, value).asInstanceOf[T] + + def setMinCachePageBytes(value: Int): T = set(minCachePageBytes, value).asInstanceOf[T] + protected[spark] def featureIsArrayType(schema: StructType): Boolean = schema(getFeaturesCol).dataType.isInstanceOf[ArrayType]