diff --git a/jvm-packages/.gitignore b/jvm-packages/.gitignore
index 40572b4d7931..d60fb844dc94 100644
--- a/jvm-packages/.gitignore
+++ b/jvm-packages/.gitignore
@@ -2,3 +2,4 @@ build.sh
xgboost4j-tester/pom.xml
xgboost4j-tester/iris.csv
dependency-reduced-pom.xml
+.factorypath
diff --git a/jvm-packages/create_jni.py b/jvm-packages/create_jni.py
index 9f04d7f43129..918da2addbf5 100755
--- a/jvm-packages/create_jni.py
+++ b/jvm-packages/create_jni.py
@@ -73,6 +73,8 @@ def native_build(cli_args: argparse.Namespace) -> None:
os.environ["JAVA_HOME"] = (
subprocess.check_output("/usr/libexec/java_home").strip().decode()
)
+ if cli_args.use_debug == "ON":
+ CONFIG["CMAKE_BUILD_TYPE"] = "Debug"
print("building Java wrapper", flush=True)
with cd(".."):
@@ -187,5 +189,6 @@ def native_build(cli_args: argparse.Namespace) -> None:
)
parser.add_argument("--use-cuda", type=str, choices=["ON", "OFF"], default="OFF")
parser.add_argument("--use-openmp", type=str, choices=["ON", "OFF"], default="ON")
+ parser.add_argument("--use-debug", type=str, choices=["ON", "OFF"], default="OFF")
cli_args = parser.parse_args()
native_build(cli_args)
diff --git a/jvm-packages/pom.xml b/jvm-packages/pom.xml
index c8a9178f2c78..440f261e1c44 100644
--- a/jvm-packages/pom.xml
+++ b/jvm-packages/pom.xml
@@ -57,6 +57,7 @@
OFF
OFF
ON
+ OFF
24.10.0
24.10.0
cuda12
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java
index 2f1870c580be..1599e3078b84 100644
--- a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java
+++ b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java
@@ -86,6 +86,17 @@ private List initializeCudfColumns(Table table) {
.collect(Collectors.toList());
}
+ // visible for testing
+ public Table getFeatureTable() {
+ return featureTable;
+ }
+
+ // visible for testing
+ public Table getLabelTable() {
+ return labelTable;
+ }
+
+
public List getFeatures() {
return features;
}
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java
new file mode 100644
index 000000000000..a2e54190a882
--- /dev/null
+++ b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/ExtMemQuantileDMatrix.java
@@ -0,0 +1,97 @@
+/*
+ Copyright (c) 2025 by Contributors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package ml.dmlc.xgboost4j.java;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+public class ExtMemQuantileDMatrix extends QuantileDMatrix {
+ // on_host is set to true by default as we only support GPU at the moment
+ // cache_prefix is not used yet since we have on_host=true.
+ public ExtMemQuantileDMatrix(Iterator iter,
+ float missing,
+ int maxBin,
+ DMatrix ref,
+ int nthread,
+ int maxNumDevicePages,
+ int maxQuantileBatches,
+ int minCachePageBytes) throws XGBoostError {
+ long[] out = new long[1];
+ long[] refHandle = null;
+ if (ref != null) {
+ refHandle = new long[1];
+ refHandle[0] = ref.getHandle();
+ }
+ String conf = this.getConfig(missing, maxBin, nthread, maxNumDevicePages,
+ maxQuantileBatches, minCachePageBytes);
+ XGBoostJNI.checkCall(XGBoostJNI.XGExtMemQuantileDMatrixCreateFromCallback(
+ iter, refHandle, conf, out));
+ handle = out[0];
+ }
+
+ public ExtMemQuantileDMatrix(
+ Iterator iter,
+ float missing,
+ int maxBin,
+ DMatrix ref) throws XGBoostError {
+ this(iter, missing, maxBin, ref, 1, -1, -1, -1);
+ }
+
+ public ExtMemQuantileDMatrix(
+ Iterator iter,
+ float missing,
+ int maxBin) throws XGBoostError {
+ this(iter, missing, maxBin, null);
+ }
+
+ private String getConfig(float missing, int maxBin, int nthread, int maxNumDevicePages,
+ int maxQuantileBatches, int minCachePageBytes) {
+ Map conf = new java.util.HashMap<>();
+ conf.put("missing", missing);
+ conf.put("max_bin", maxBin);
+ conf.put("nthread", nthread);
+
+ if (maxNumDevicePages > 0) {
+ conf.put("max_num_device_pages", maxNumDevicePages);
+ }
+ if (maxQuantileBatches > 0) {
+ conf.put("max_quantile_batches", maxQuantileBatches);
+ }
+ if (minCachePageBytes > 0) {
+ conf.put("min_cache_page_bytes", minCachePageBytes);
+ }
+
+ conf.put("on_host", true);
+ conf.put("cache_prefix", ".");
+ ObjectMapper mapper = new ObjectMapper();
+
+ // Handle NaN values. Jackson by default serializes NaN values into strings.
+ SimpleModule module = new SimpleModule();
+ module.addSerializer(Double.class, new F64NaNSerializer());
+ module.addSerializer(Float.class, new F32NaNSerializer());
+ mapper.registerModule(module);
+
+ try {
+ return mapper.writeValueAsString(conf);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to serialize configuration", e);
+ }
+ }
+};
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java
index c3af64a8972d..9e36b0a38cc2 100644
--- a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java
+++ b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java
@@ -54,6 +54,11 @@ public void serialize(Float value, JsonGenerator gen,
* QuantileDMatrix will only be used to train
*/
public class QuantileDMatrix extends DMatrix {
+ // implicit constructor for the ext mem version of the QDM.
+ protected QuantileDMatrix() {
+ super(0);
+ }
+
/**
* Create QuantileDMatrix from iterator based on the cuda array interface
*
@@ -158,8 +163,7 @@ private String getConfig(float missing, int maxBin, int nthread) {
mapper.registerModule(module);
try {
- String config = mapper.writeValueAsString(conf);
- return config;
+ return mapper.writeValueAsString(conf);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize configuration", e);
}
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala
new file mode 100644
index 000000000000..d978a1b1fcfd
--- /dev/null
+++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrix.scala
@@ -0,0 +1,50 @@
+/*
+ Copyright (c) 2025 by Contributors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package ml.dmlc.xgboost4j.scala
+
+import scala.collection.JavaConverters._
+
+import ml.dmlc.xgboost4j.java.{ColumnBatch, ExtMemQuantileDMatrix => jExtMemQuantileDMatrix}
+
+class ExtMemQuantileDMatrix private[scala](
+ private[scala] override val jDMatrix: jExtMemQuantileDMatrix) extends QuantileDMatrix(jDMatrix) {
+
+ def this(iter: Iterator[ColumnBatch],
+ missing: Float,
+ maxBin: Int,
+ ref: Option[QuantileDMatrix],
+ nthread: Int,
+ maxNumDevicePages: Int,
+ maxQuantileBatches: Int,
+ minCachePageBytes: Int) {
+ this(new jExtMemQuantileDMatrix(iter.asJava, missing, maxBin,
+ ref.map(_.jDMatrix).orNull,
+ nthread, maxNumDevicePages, maxQuantileBatches, minCachePageBytes))
+ }
+
+ def this(iter: Iterator[ColumnBatch], missing: Float, maxBin: Int) {
+ this(new jExtMemQuantileDMatrix(iter.asJava, missing, maxBin))
+ }
+
+ def this(
+ iter: Iterator[ColumnBatch],
+ ref: ExtMemQuantileDMatrix,
+ missing: Float,
+ maxBin: Int
+ ) {
+ this(new jExtMemQuantileDMatrix(iter.asJava, missing, maxBin, ref.jDMatrix))
+ }
+}
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala
index a9fac0245abf..679e16231d32 100644
--- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala
+++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala
@@ -39,22 +39,22 @@ class QuantileDMatrix private[scala](
/**
* Create QuantileDMatrix from iterator based on the array interface
*
- * @param iter the XGBoost ColumnBatch batch to provide the corresponding array interface
- * @param refDMatrix The reference QuantileDMatrix that provides quantile information, needed
- * when creating validation/test dataset with QuantileDMatrix. Supplying the
- * training DMatrix as a reference means that the same quantisation applied
- * to the training data is applied to the validation/test data
- * @param missing the missing value
- * @param maxBin the max bin
- * @param nthread the parallelism
+ * @param iter the XGBoost ColumnBatch batch to provide the corresponding array interface
+ * @param ref The reference QuantileDMatrix that provides quantile information, needed
+ * when creating validation/test dataset with QuantileDMatrix. Supplying the
+ * training DMatrix as a reference means that the same quantisation applied
+ * to the training data is applied to the validation/test data
+ * @param missing the missing value
+ * @param maxBin the max bin
+ * @param nthread the parallelism
* @throws XGBoostError
*/
def this(iter: Iterator[ColumnBatch],
- ref: QuantileDMatrix,
+ ref: Option[QuantileDMatrix],
missing: Float,
maxBin: Int,
nthread: Int) {
- this(new JQuantileDMatrix(iter.asJava, ref.jDMatrix, missing, maxBin, nthread))
+ this(new JQuantileDMatrix(iter.asJava, ref.map(_.jDMatrix).orNull, missing, maxBin, nthread))
}
/**
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala
new file mode 100644
index 000000000000..591e0b6742a8
--- /dev/null
+++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala
@@ -0,0 +1,220 @@
+/*
+ Copyright (c) 2025 by Contributors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package ml.dmlc.xgboost4j.scala.spark
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+
+import ai.rapids.cudf._
+
+import ml.dmlc.xgboost4j.java.{ColumnBatch, CudfColumnBatch}
+import ml.dmlc.xgboost4j.scala.spark.Utils.withResource
+
+private[spark] trait ExternalMemory[T] extends Iterator[Table] with AutoCloseable {
+
+ protected val buffers = ArrayBuffer.empty[T]
+ private lazy val buffersIterator = buffers.toIterator
+
+ /**
+ * Convert the table to T which will be cached
+ *
+ * @param table to be converted
+ * @return the content
+ */
+ def convertTable(table: Table): T
+
+ /**
+ * Load the content to the Table
+ *
+ * @param content to be loaded
+ * @return Table
+ */
+ def loadTable(content: T): Table
+
+ // Cache the table
+ def cacheTable(table: Table): Unit = {
+ val content = convertTable(table)
+ buffers.append(content)
+ }
+
+ override def hasNext: Boolean = buffersIterator.hasNext
+
+ override def next(): Table = loadTable(buffersIterator.next())
+
+ override def close(): Unit = {}
+}
+
+// The data will be cached into disk.
+private[spark] class DiskExternalMemoryIterator(val path: String) extends ExternalMemory[String] {
+
+ private lazy val root = {
+ val tmp = path + "/xgboost"
+ createDirectory(tmp)
+ tmp
+ }
+
+ private var counter = 0
+
+ private def createDirectory(dirPath: String): Unit = {
+ val path = Paths.get(dirPath)
+ if (!Files.exists(path)) {
+ Files.createDirectories(path)
+ } else {
+ }
+ }
+
+ /**
+ * Convert the table to file path which will be cached
+ *
+ * @param table to be converted
+ * @return the content
+ */
+ override def convertTable(table: Table): String = {
+ val names = (1 to table.getNumberOfColumns).map(_.toString)
+ val options = ArrowIPCWriterOptions.builder().withColumnNames(names: _*).build()
+ val path = root + "/table_" + counter + "_" + System.nanoTime();
+ counter += 1
+ withResource(Table.writeArrowIPCChunked(options, new File(path))) { writer =>
+ writer.write(table)
+ }
+ path
+ }
+
+ private def closeOnExcept[T <: AutoCloseable, V](r: ArrayBuffer[T])
+ (block: ArrayBuffer[T] => V): V = {
+ try {
+ block(r)
+ } catch {
+ case t: Throwable =>
+ r.foreach(_.close())
+ throw t
+ }
+ }
+
+ /**
+ * Load the path from disk to the Table
+ *
+ * @param name to be loaded
+ * @return Table
+ */
+ override def loadTable(name: String): Table = {
+ val file = new File(name)
+ if (!file.exists()) {
+ throw new RuntimeException(s"The cached file ${name} not exist" )
+ }
+ try {
+ withResource(Table.readArrowIPCChunked(file)) { reader =>
+ val tables = ArrayBuffer.empty[Table]
+ closeOnExcept(tables) { tables =>
+ var table = Option(reader.getNextIfAvailable())
+ while (table.isDefined) {
+ tables.append(table.get)
+ table = Option(reader.getNextIfAvailable())
+ }
+ }
+ if (tables.size > 1) {
+ closeOnExcept(tables) { tables =>
+ Table.concatenate(tables.toArray: _*)
+ }
+ } else {
+ tables(0)
+ }
+ }
+ } catch {
+ case e: Throwable =>
+ close()
+ throw e
+ } finally {
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+ }
+
+ override def close(): Unit = {
+ buffers.foreach { path =>
+ val file = new File(path)
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+ buffers.clear()
+ }
+}
+
+private[spark] object ExternalMemory {
+ def apply(path: Option[String] = None): ExternalMemory[_] = {
+ path.map(new DiskExternalMemoryIterator(_))
+ .getOrElse(throw new RuntimeException("No disk path provided"))
+ }
+}
+
+/**
+ * ExternalMemoryIterator supports iterating the data twice if the `swap` is called.
+ *
+ * The first round iteration gets the input batch that will be
+ * 1. cached in the external memory
+ * 2. fed in QuantilDmatrix
+ * The second round iteration returns the cached batch got from external memory.
+ *
+ * @param input the spark input iterator
+ * @param indices column index
+ */
+private[scala] class ExternalMemoryIterator(val input: Iterator[Table],
+ val indices: ColumnIndices,
+ val path: Option[String] = None)
+ extends Iterator[ColumnBatch] {
+
+ private var iter = input
+
+ // Flag to indicate the input has been consumed.
+ private var inputIsConsumed = false
+ // Flag to indicate the input.next has been called which is valid
+ private var inputNextIsCalled = false
+
+ // visible for testing
+ private[spark] val externalMemory = ExternalMemory(path)
+
+ override def hasNext: Boolean = {
+ val value = iter.hasNext
+ if (!value && inputIsConsumed && inputNextIsCalled) {
+ externalMemory.close()
+ }
+ if (!inputIsConsumed && !value && inputNextIsCalled) {
+ inputIsConsumed = true
+ iter = externalMemory
+ }
+ value
+ }
+
+ override def next(): ColumnBatch = {
+ inputNextIsCalled = true
+ withResource(new GpuColumnBatch(iter.next())) { batch =>
+ if (iter.eq(input)) {
+ externalMemory.cacheTable(batch.table)
+ }
+ new CudfColumnBatch(
+ batch.select(indices.featureIds.get),
+ batch.select(indices.labelId),
+ batch.select(indices.weightId.getOrElse(-1)),
+ batch.select(indices.marginId.getOrElse(-1)),
+ batch.select(indices.groupId.getOrElse(-1)));
+ }
+ }
+}
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala
index 36e5b7da4299..bb20eaf40a44 100644
--- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala
+++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{DataType, FloatType, IntegerType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import ml.dmlc.xgboost4j.java.CudfColumnBatch
-import ml.dmlc.xgboost4j.scala.{DMatrix, QuantileDMatrix}
+import ml.dmlc.xgboost4j.scala.{DMatrix, ExtMemQuantileDMatrix, QuantileDMatrix}
import ml.dmlc.xgboost4j.scala.spark.Utils.withResource
import ml.dmlc.xgboost4j.scala.spark.params.HasGroupCol
@@ -127,22 +127,38 @@ class GpuXGBoostPlugin extends XGBoostPlugin {
val nthread = estimator.getNthread
val missing = estimator.getMissing
+ val useExtMem = estimator.getUseExternalMemory
+ val extMemPath = if (useExtMem) {
+ Some(dataset.sparkSession.conf.get("spark.local.dir", "/tmp"))
+ } else None
+
+ val maxQuantileBatches = estimator.getMaxQuantileBatches
+ val minCachePageBytes = estimator.getMinCachePageBytes
+ val maxNumDevicePages = estimator.getMaxNumDevicePages
+
/** build QuantileDMatrix on the executor side */
- def buildQuantileDMatrix(iter: Iterator[Table],
+ def buildQuantileDMatrix(input: Iterator[Table],
ref: Option[QuantileDMatrix] = None): QuantileDMatrix = {
- val colBatchIter = iter.map { table =>
- withResource(new GpuColumnBatch(table)) { batch =>
- new CudfColumnBatch(
- batch.select(indices.featureIds.get),
- batch.select(indices.labelId),
- batch.select(indices.weightId.getOrElse(-1)),
- batch.select(indices.marginId.getOrElse(-1)),
- batch.select(indices.groupId.getOrElse(-1)));
- }
+
+ extMemPath match {
+ case Some(_) =>
+ val itr = new ExternalMemoryIterator(input, indices, extMemPath)
+ new ExtMemQuantileDMatrix(itr, missing, maxBin, ref, nthread, maxNumDevicePages,
+ maxQuantileBatches, minCachePageBytes)
+
+ case None =>
+ val itr = input.map { table =>
+ withResource(new GpuColumnBatch(table)) { batch =>
+ new CudfColumnBatch(
+ batch.select(indices.featureIds.get),
+ batch.select(indices.labelId),
+ batch.select(indices.weightId.getOrElse(-1)),
+ batch.select(indices.marginId.getOrElse(-1)),
+ batch.select(indices.groupId.getOrElse(-1)));
+ }
+ }
+ new QuantileDMatrix(itr, ref, missing, maxBin, nthread)
}
- ref.map(r => new QuantileDMatrix(colBatchIter, r, missing, maxBin, nthread)).getOrElse(
- new QuantileDMatrix(colBatchIter, missing, maxBin, nthread)
- )
}
estimator.getEvalDataset().map { evalDs =>
@@ -295,7 +311,7 @@ class GpuXGBoostPlugin extends XGBoostPlugin {
}
}
-private class GpuColumnBatch(table: Table) extends AutoCloseable {
+private[scala] class GpuColumnBatch(val table: Table) extends AutoCloseable {
def select(index: Int): Table = {
select(Seq(index))
@@ -308,9 +324,5 @@ private class GpuColumnBatch(table: Table) extends AutoCloseable {
new Table(indices.map(table.getColumn): _*)
}
- override def close(): Unit = {
- if (Option(table).isDefined) {
- table.close()
- }
- }
+ override def close(): Unit = Option(table).foreach(_.close())
}
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java b/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java
index 5a4c67bcca38..edb7b518d259 100644
--- a/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java
+++ b/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java
@@ -198,5 +198,4 @@ private float[] convertFloatTofloat(Float[]... datas) {
}
return floatArray;
}
-
}
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala
new file mode 100644
index 000000000000..c0174c835a63
--- /dev/null
+++ b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/ExtMemQuantileDMatrixSuite.scala
@@ -0,0 +1,104 @@
+/*
+ Copyright (c) 2025 by Contributors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package ml.dmlc.xgboost4j.scala
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import ai.rapids.cudf.Table
+import org.scalatest.funsuite.AnyFunSuite
+
+import ml.dmlc.xgboost4j.java.{ColumnBatch, CudfColumnBatch}
+import ml.dmlc.xgboost4j.scala.rapids.spark.TmpFolderSuite
+import ml.dmlc.xgboost4j.scala.spark.{ColumnIndices, ExternalMemoryIterator, GpuColumnBatch}
+import ml.dmlc.xgboost4j.scala.spark.Utils.withResource
+
+class ExtMemQuantileDMatrixSuite extends AnyFunSuite with TmpFolderSuite {
+
+ private def runTest(buildIterator: (Iterator[Table], ColumnIndices) => Iterator[ColumnBatch]) = {
+ val label1 = Array[java.lang.Float](25f, 21f, 22f, 20f, 24f)
+ val weight1 = Array[java.lang.Float](1.3f, 2.31f, 0.32f, 3.3f, 1.34f)
+ val baseMargin1 = Array[java.lang.Float](1.2f, 0.2f, 1.3f, 2.4f, 3.5f)
+ val group1 = Array[java.lang.Integer](1, 1, 7, 7, 19, 26)
+
+ val label2 = Array[java.lang.Float](9f, 5f, 4f, 10f, 12f)
+ val weight2 = Array[java.lang.Float](3.0f, 1.3f, 3.2f, 0.3f, 1.34f)
+ val baseMargin2 = Array[java.lang.Float](0.2f, 2.5f, 3.1f, 4.4f, 2.2f)
+ val group2 = Array[java.lang.Integer](30, 30, 30, 40, 40)
+
+ val expectedGroup = Array(0, 2, 4, 5, 6, 9, 11)
+
+ withResource(new Table.TestBuilder()
+ .column(1.2f, null.asInstanceOf[java.lang.Float], 5.2f, 7.2f, 9.2f)
+ .column(0.2f, 0.4f, 0.6f, 2.6f, 0.10f.asInstanceOf[java.lang.Float])
+ .build) { X_0 =>
+ withResource(new Table.TestBuilder().column(label1: _*).build) { y_0 =>
+ withResource(new Table.TestBuilder().column(weight1: _*).build) { w_0 =>
+ withResource(new Table.TestBuilder().column(baseMargin1: _*).build) { m_0 =>
+ withResource(new Table.TestBuilder().column(group1: _*).build) { q_0 =>
+ withResource(new Table.TestBuilder()
+ .column(11.2f, 11.2f, 15.2f, 17.2f, 19.2f.asInstanceOf[java.lang.Float])
+ .column(1.2f, 1.4f, null.asInstanceOf[java.lang.Float], 12.6f, 10.10f).build) {
+ X_1 =>
+ withResource(new Table.TestBuilder().column(label2: _*).build) { y_1 =>
+ withResource(new Table.TestBuilder().column(weight2: _*).build) { w_1 =>
+ withResource(new Table.TestBuilder().column(baseMargin2: _*).build) { m_1 =>
+ withResource(new Table.TestBuilder().column(group2: _*).build) { q_2 =>
+ val tables = new ArrayBuffer[Table]()
+ tables += new Table(X_0.getColumn(0), X_0.getColumn(1), y_0.getColumn(0),
+ w_0.getColumn(0), m_0.getColumn(0))
+ tables += new Table(X_1.getColumn(0), X_1.getColumn(1), y_1.getColumn(0),
+ w_1.getColumn(0), m_1.getColumn(0))
+
+ val indices = ColumnIndices(
+ labelId = 2,
+ featureId = None,
+ featureIds = Option(Seq(0, 1)),
+ weightId = Option(3),
+ marginId = Option(4),
+ groupId = Option(5)
+ )
+ val iter = buildIterator(tables.toIterator, indices);
+ val dmatrix = new ExtMemQuantileDMatrix(iter, 0.0f, 8)
+
+ def check(dm: ExtMemQuantileDMatrix) = {
+ assert(dm.getLabel.sameElements(label1 ++ label2))
+ assert(dm.getWeight.sameElements(weight1 ++ weight2))
+ assert(dm.getBaseMargin.sameElements(baseMargin1 ++ baseMargin2))
+ }
+ check(dmatrix)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("ExtMemQuantileDMatrix test") {
+ val buildIter = (input: Iterator[Table], indices: ColumnIndices) =>
+ new ExternalMemoryIterator(
+ input, indices, Option(new File(tempDir.toFile, "xgboost").getPath)
+ )
+ runTest(buildIter)
+ }
+}
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala
new file mode 100644
index 000000000000..bd0b3adc609a
--- /dev/null
+++ b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala
@@ -0,0 +1,101 @@
+/*
+ Copyright (c) 2025 by Contributors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package ml.dmlc.xgboost4j.scala.spark
+
+import scala.collection.mutable.ArrayBuffer
+
+import ai.rapids.cudf.Table
+
+import ml.dmlc.xgboost4j.java.CudfColumnBatch
+import ml.dmlc.xgboost4j.scala.rapids.spark.GpuTestSuite
+import ml.dmlc.xgboost4j.scala.spark.Utils.withResource
+
+class ExternalMemorySuite extends GpuTestSuite {
+
+ private def assertColumnBatchEqual(lhs: Array[CudfColumnBatch],
+ rhs: Array[CudfColumnBatch]): Unit = {
+ def assertTwoTable(lhsTable: Table, rhsTable: Table): Unit = {
+ assert(lhsTable.getNumberOfColumns === rhsTable.getNumberOfColumns)
+ for (i <- 0 until lhsTable.getNumberOfColumns) {
+ val lColumn = lhsTable.getColumn(i)
+ val rColumn = rhsTable.getColumn(i)
+
+ val lHost = lColumn.copyToHost()
+ val rHost = rColumn.copyToHost()
+
+ assert(lHost.getRowCount === rHost.getRowCount)
+ for (j <- 0 until lHost.getRowCount.toInt) {
+ assert(lHost.getFloat(j) === rHost.getFloat(j))
+ }
+ }
+ }
+
+ assert(lhs.length === rhs.length)
+ for ((l, r) <- lhs.zip(rhs)) {
+ assertTwoTable(l.getFeatureTable, r.getFeatureTable)
+ assertTwoTable(l.getLabelTable, r.getLabelTable)
+ }
+ }
+
+ def runExternalMemoryTest(buildExternalMemory: (Iterator[Table], ColumnIndices) =>
+ ExternalMemoryIterator): Unit = {
+
+ withResource(new Table.TestBuilder()
+ .column(1.0f, 2.0f, 3.0f.asInstanceOf[java.lang.Float])
+ .column(4.0f, 5.0f, 6.0f.asInstanceOf[java.lang.Float])
+ .column(7.0f, 8.0f, 9.0f.asInstanceOf[java.lang.Float])
+ .build) { table1 =>
+
+ withResource(new Table.TestBuilder()
+ .column(11.0f, 12.0f, 13.0f.asInstanceOf[java.lang.Float])
+ .column(14.0f, 15.0f, 16.0f.asInstanceOf[java.lang.Float])
+ .column(17.0f, 18.0f, 19.0f.asInstanceOf[java.lang.Float])
+ .build) { table2 =>
+
+ val tables = Seq(table1, table2)
+
+ val indices = ColumnIndices(labelId = 0, featureIds = Some(Seq(1, 2)), featureId = None,
+ weightId = None, marginId = None, groupId = None)
+ val extMemIter = buildExternalMemory(tables.toIterator, indices)
+ val expectTables = ArrayBuffer.empty[CudfColumnBatch]
+ while (extMemIter.hasNext) {
+ val table = extMemIter.next().asInstanceOf[CudfColumnBatch]
+ expectTables.append(table)
+ }
+ // The hasNext has swap the iterator internally, so we can still get the
+ // value for the next round of iteration
+
+ val targetTables = ArrayBuffer.empty[CudfColumnBatch]
+ while (extMemIter.hasNext) {
+ val table = extMemIter.next().asInstanceOf[CudfColumnBatch]
+ targetTables.append(table)
+ }
+
+ assertColumnBatchEqual(expectTables.toArray, targetTables.toArray)
+ }
+ }
+ }
+
+ test("DiskExternalMemory") {
+ val buildIterator = (input: Iterator[Table], indices: ColumnIndices) => {
+ val iter = new ExternalMemoryIterator(input, indices, Some("/tmp/"))
+ assert(iter.externalMemory.isInstanceOf[DiskExternalMemoryIterator])
+ iter
+ }
+ runExternalMemoryTest(buildIterator)
+ }
+}
diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala
index d4a24f7745c5..395c03bdfd27 100644
--- a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala
+++ b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala
@@ -357,25 +357,101 @@ class GpuXGBoostPluginSuite extends GpuTestSuite {
}
}
- Seq("binary:logistic", "multi:softprob").foreach { case objective =>
- test(s"$objective: XGBoost-Spark should match xgboost4j") {
- withGpuSparkSession() { spark =>
- import spark.implicits._
+ Seq(false, true).foreach { useExtMem =>
+ Seq("binary:logistic", "multi:softprob").foreach { objective =>
+ test(s"$objective: XGBoost-Spark should match xgboost4j with useExtMem: ${useExtMem}") {
+ withGpuSparkSession() { spark =>
+ import spark.implicits._
+
+ val numRound = 100
+ var xgboostParams: Map[String, Any] = Map(
+ "objective" -> objective,
+ "device" -> "cuda"
+ )
- val numRound = 100
- var xgboostParams: Map[String, Any] = Map(
- "objective" -> objective,
- "device" -> "cuda"
- )
+ val (trainPath, testPath) = if (objective == "binary:logistic") {
+ (writeFile(Classification.train.toDF("label", "weight", "c1", "c2", "c3")),
+ writeFile(Classification.test.toDF("label", "weight", "c1", "c2", "c3")))
+ } else {
+ xgboostParams = xgboostParams ++ Map("num_class" -> 6)
+ (writeFile(MultiClassification.train.toDF("label", "weight", "c1", "c2", "c3")),
+ writeFile(MultiClassification.test.toDF("label", "weight", "c1", "c2", "c3")))
+ }
+
+ val df = spark.read.parquet(trainPath)
+ val testdf = spark.read.parquet(testPath)
+
+ val features = Array("c1", "c2", "c3")
+ val featuresIndices = features.map(df.schema.fieldIndex)
+ val label = "label"
+
+ val classifier = new XGBoostClassifier(xgboostParams)
+ .setFeaturesCol(features)
+ .setLabelCol(label)
+ .setNumRound(numRound)
+ .setLeafPredictionCol("leaf")
+ .setContribPredictionCol("contrib")
+ .setDevice("cuda")
+ .setUseExternalMemory(useExtMem)
+
+ val xgb4jModel = withResource(new GpuColumnBatch(
+ Table.readParquet(new File(trainPath)))) { batch =>
+ val cb = new CudfColumnBatch(batch.select(featuresIndices),
+ batch.select(df.schema.fieldIndex(label)), null, null, null
+ )
+ val qdm = new QuantileDMatrix(Seq(cb).iterator, classifier.getMissing,
+ classifier.getMaxBins, classifier.getNthread)
+ ScalaXGBoost.train(qdm, xgboostParams, numRound)
+ }
+
+ val (xgb4jLeaf, xgb4jContrib, xgb4jProb, xgb4jRaw) = withResource(new GpuColumnBatch(
+ Table.readParquet(new File(testPath)))) { batch =>
+ val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null
+ )
+ val qdm = new DMatrix(cb, classifier.getMissing, classifier.getNthread)
+ (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm),
+ xgb4jModel.predict(qdm), xgb4jModel.predict(qdm, outPutMargin = true))
+ }
+
+ val rows = classifier.fit(df).transform(testdf).collect()
+
+ // Check Leaf
+ val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat))
+ checkEqual(xgb4jLeaf, xgbSparkLeaf)
+
+ // Check contrib
+ val xgbSparkContrib = rows.map(row =>
+ row.getAs[DenseVector]("contrib").toArray.map(_.toFloat))
+ checkEqual(xgb4jContrib, xgbSparkContrib)
+
+ // Check probability
+ var xgbSparkProb = rows.map(row =>
+ row.getAs[DenseVector]("probability").toArray.map(_.toFloat))
+ if (objective == "binary:logistic") {
+ xgbSparkProb = xgbSparkProb.map(v => Array(v(1)))
+ }
+ checkEqual(xgb4jProb, xgbSparkProb)
+
+ // Check raw
+ var xgbSparkRaw = rows.map(row =>
+ row.getAs[DenseVector]("rawPrediction").toArray.map(_.toFloat))
+ if (objective == "binary:logistic") {
+ xgbSparkRaw = xgbSparkRaw.map(v => Array(v(1)))
+ }
+ checkEqual(xgb4jRaw, xgbSparkRaw)
- val (trainPath, testPath) = if (objective == "binary:logistic") {
- (writeFile(Classification.train.toDF("label", "weight", "c1", "c2", "c3")),
- writeFile(Classification.test.toDF("label", "weight", "c1", "c2", "c3")))
- } else {
- xgboostParams = xgboostParams ++ Map("num_class" -> 6)
- (writeFile(MultiClassification.train.toDF("label", "weight", "c1", "c2", "c3")),
- writeFile(MultiClassification.test.toDF("label", "weight", "c1", "c2", "c3")))
}
+ }
+ }
+ }
+
+ Seq(false, true).foreach { useExtMem =>
+ test(s"Regression: XGBoost-Spark should match xgboost4j with useExtMem: ${useExtMem}") {
+ withGpuSparkSession() { spark =>
+ import spark.implicits._
+
+ val trainPath = writeFile(Regression.train.toDF("label", "weight", "c1", "c2", "c3"))
+ val testPath = writeFile(Regression.test.toDF("label", "weight", "c1", "c2", "c3"))
val df = spark.read.parquet(trainPath)
val testdf = spark.read.parquet(testPath)
@@ -384,34 +460,40 @@ class GpuXGBoostPluginSuite extends GpuTestSuite {
val featuresIndices = features.map(df.schema.fieldIndex)
val label = "label"
- val classifier = new XGBoostClassifier(xgboostParams)
+ val numRound = 100
+ val xgboostParams: Map[String, Any] = Map(
+ "device" -> "cuda"
+ )
+
+ val regressor = new XGBoostRegressor(xgboostParams)
.setFeaturesCol(features)
.setLabelCol(label)
.setNumRound(numRound)
.setLeafPredictionCol("leaf")
.setContribPredictionCol("contrib")
.setDevice("cuda")
+ .setUseExternalMemory(useExtMem)
val xgb4jModel = withResource(new GpuColumnBatch(
Table.readParquet(new File(trainPath)))) { batch =>
val cb = new CudfColumnBatch(batch.select(featuresIndices),
batch.select(df.schema.fieldIndex(label)), null, null, null
)
- val qdm = new QuantileDMatrix(Seq(cb).iterator, classifier.getMissing,
- classifier.getMaxBins, classifier.getNthread)
+ val qdm = new QuantileDMatrix(Seq(cb).iterator, regressor.getMissing,
+ regressor.getMaxBins, regressor.getNthread)
ScalaXGBoost.train(qdm, xgboostParams, numRound)
}
- val (xgb4jLeaf, xgb4jContrib, xgb4jProb, xgb4jRaw) = withResource(new GpuColumnBatch(
+ val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch(
Table.readParquet(new File(testPath)))) { batch =>
val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null
)
- val qdm = new DMatrix(cb, classifier.getMissing, classifier.getNthread)
+ val qdm = new DMatrix(cb, regressor.getMissing, regressor.getNthread)
(xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm),
- xgb4jModel.predict(qdm), xgb4jModel.predict(qdm, outPutMargin = true))
+ xgb4jModel.predict(qdm))
}
- val rows = classifier.fit(df).transform(testdf).collect()
+ val rows = regressor.fit(df).transform(testdf).collect()
// Check Leaf
val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat))
@@ -422,90 +504,14 @@ class GpuXGBoostPluginSuite extends GpuTestSuite {
row.getAs[DenseVector]("contrib").toArray.map(_.toFloat))
checkEqual(xgb4jContrib, xgbSparkContrib)
- // Check probability
- var xgbSparkProb = rows.map(row =>
- row.getAs[DenseVector]("probability").toArray.map(_.toFloat))
- if (objective == "binary:logistic") {
- xgbSparkProb = xgbSparkProb.map(v => Array(v(1)))
- }
- checkEqual(xgb4jProb, xgbSparkProb)
-
- // Check raw
- var xgbSparkRaw = rows.map(row =>
- row.getAs[DenseVector]("rawPrediction").toArray.map(_.toFloat))
- if (objective == "binary:logistic") {
- xgbSparkRaw = xgbSparkRaw.map(v => Array(v(1)))
- }
- checkEqual(xgb4jRaw, xgbSparkRaw)
-
+ // Check prediction
+ val xgbSparkPred = rows.map(row =>
+ Array(row.getAs[Double]("prediction").toFloat))
+ checkEqual(xgb4jPred, xgbSparkPred)
}
}
}
- test(s"Regression: XGBoost-Spark should match xgboost4j") {
- withGpuSparkSession() { spark =>
- import spark.implicits._
-
- val trainPath = writeFile(Regression.train.toDF("label", "weight", "c1", "c2", "c3"))
- val testPath = writeFile(Regression.test.toDF("label", "weight", "c1", "c2", "c3"))
-
- val df = spark.read.parquet(trainPath)
- val testdf = spark.read.parquet(testPath)
-
- val features = Array("c1", "c2", "c3")
- val featuresIndices = features.map(df.schema.fieldIndex)
- val label = "label"
-
- val numRound = 100
- val xgboostParams: Map[String, Any] = Map(
- "device" -> "cuda"
- )
-
- val regressor = new XGBoostRegressor(xgboostParams)
- .setFeaturesCol(features)
- .setLabelCol(label)
- .setNumRound(numRound)
- .setLeafPredictionCol("leaf")
- .setContribPredictionCol("contrib")
- .setDevice("cuda")
-
- val xgb4jModel = withResource(new GpuColumnBatch(
- Table.readParquet(new File(trainPath)))) { batch =>
- val cb = new CudfColumnBatch(batch.select(featuresIndices),
- batch.select(df.schema.fieldIndex(label)), null, null, null
- )
- val qdm = new QuantileDMatrix(Seq(cb).iterator, regressor.getMissing,
- regressor.getMaxBins, regressor.getNthread)
- ScalaXGBoost.train(qdm, xgboostParams, numRound)
- }
-
- val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch(
- Table.readParquet(new File(testPath)))) { batch =>
- val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null
- )
- val qdm = new DMatrix(cb, regressor.getMissing, regressor.getNthread)
- (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm),
- xgb4jModel.predict(qdm))
- }
-
- val rows = regressor.fit(df).transform(testdf).collect()
-
- // Check Leaf
- val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat))
- checkEqual(xgb4jLeaf, xgbSparkLeaf)
-
- // Check contrib
- val xgbSparkContrib = rows.map(row =>
- row.getAs[DenseVector]("contrib").toArray.map(_.toFloat))
- checkEqual(xgb4jContrib, xgbSparkContrib)
-
- // Check prediction
- val xgbSparkPred = rows.map(row =>
- Array(row.getAs[Double]("prediction").toFloat))
- checkEqual(xgb4jPred, xgbSparkPred)
- }
- }
-
test("The group col should be sorted in each partition") {
withGpuSparkSession() { spark =>
import spark.implicits._
@@ -592,71 +598,74 @@ class GpuXGBoostPluginSuite extends GpuTestSuite {
}
}
- test("Ranker: XGBoost-Spark should match xgboost4j") {
- withGpuSparkSession() { spark =>
- import spark.implicits._
+ Seq(false, true).foreach { useExtMem =>
+ test(s"Ranker: XGBoost-Spark should match xgboost4j with useExtMem=$useExtMem") {
+ withGpuSparkSession() { spark =>
+ import spark.implicits._
- val trainPath = writeFile(Ranking.train.toDF("label", "weight", "group", "c1", "c2", "c3"))
- val testPath = writeFile(Ranking.test.toDF("label", "weight", "group", "c1", "c2", "c3"))
+ val trainPath = writeFile(Ranking.train.toDF("label", "weight", "group", "c1", "c2", "c3"))
+ val testPath = writeFile(Ranking.test.toDF("label", "weight", "group", "c1", "c2", "c3"))
- val df = spark.read.parquet(trainPath)
- val testdf = spark.read.parquet(testPath)
+ val df = spark.read.parquet(trainPath)
+ val testdf = spark.read.parquet(testPath)
- val features = Array("c1", "c2", "c3")
- val featuresIndices = features.map(df.schema.fieldIndex)
- val label = "label"
- val group = "group"
+ val features = Array("c1", "c2", "c3")
+ val featuresIndices = features.map(df.schema.fieldIndex)
+ val label = "label"
+ val group = "group"
- val numRound = 100
- val xgboostParams: Map[String, Any] = Map(
- "device" -> "cuda",
- "objective" -> "rank:ndcg"
- )
+ val numRound = 100
+ val xgboostParams: Map[String, Any] = Map(
+ "device" -> "cuda",
+ "objective" -> "rank:ndcg"
+ )
- val ranker = new XGBoostRanker(xgboostParams)
- .setFeaturesCol(features)
- .setLabelCol(label)
- .setNumRound(numRound)
- .setLeafPredictionCol("leaf")
- .setContribPredictionCol("contrib")
- .setGroupCol(group)
- .setDevice("cuda")
+ val ranker = new XGBoostRanker(xgboostParams)
+ .setFeaturesCol(features)
+ .setLabelCol(label)
+ .setNumRound(numRound)
+ .setLeafPredictionCol("leaf")
+ .setContribPredictionCol("contrib")
+ .setGroupCol(group)
+ .setDevice("cuda")
+ .setUseExternalMemory(useExtMem)
- val xgb4jModel = withResource(new GpuColumnBatch(
- Table.readParquet(new File(trainPath)
- ).orderBy(OrderByArg.asc(df.schema.fieldIndex(group))))) { batch =>
- val cb = new CudfColumnBatch(batch.select(featuresIndices),
- batch.select(df.schema.fieldIndex(label)), null, null,
- batch.select(df.schema.fieldIndex(group)))
- val qdm = new QuantileDMatrix(Seq(cb).iterator, ranker.getMissing,
- ranker.getMaxBins, ranker.getNthread)
- ScalaXGBoost.train(qdm, xgboostParams, numRound)
- }
+ val xgb4jModel = withResource(new GpuColumnBatch(
+ Table.readParquet(new File(trainPath)
+ ).orderBy(OrderByArg.asc(df.schema.fieldIndex(group))))) { batch =>
+ val cb = new CudfColumnBatch(batch.select(featuresIndices),
+ batch.select(df.schema.fieldIndex(label)), null, null,
+ batch.select(df.schema.fieldIndex(group)))
+ val qdm = new QuantileDMatrix(Seq(cb).iterator, ranker.getMissing,
+ ranker.getMaxBins, ranker.getNthread)
+ ScalaXGBoost.train(qdm, xgboostParams, numRound)
+ }
- val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch(
- Table.readParquet(new File(testPath)))) { batch =>
- val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null
- )
- val qdm = new DMatrix(cb, ranker.getMissing, ranker.getNthread)
- (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm),
- xgb4jModel.predict(qdm))
- }
+ val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch(
+ Table.readParquet(new File(testPath)))) { batch =>
+ val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null
+ )
+ val qdm = new DMatrix(cb, ranker.getMissing, ranker.getNthread)
+ (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm),
+ xgb4jModel.predict(qdm))
+ }
- val rows = ranker.fit(df).transform(testdf).collect()
+ val rows = ranker.fit(df).transform(testdf).collect()
- // Check Leaf
- val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat))
- checkEqual(xgb4jLeaf, xgbSparkLeaf)
+ // Check Leaf
+ val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat))
+ checkEqual(xgb4jLeaf, xgbSparkLeaf)
- // Check contrib
- val xgbSparkContrib = rows.map(row =>
- row.getAs[DenseVector]("contrib").toArray.map(_.toFloat))
- checkEqual(xgb4jContrib, xgbSparkContrib)
+ // Check contrib
+ val xgbSparkContrib = rows.map(row =>
+ row.getAs[DenseVector]("contrib").toArray.map(_.toFloat))
+ checkEqual(xgb4jContrib, xgbSparkContrib)
- // Check prediction
- val xgbSparkPred = rows.map(row =>
- Array(row.getAs[Double]("prediction").toFloat))
- checkEqual(xgb4jPred, xgbSparkPred)
+ // Check prediction
+ val xgbSparkPred = rows.map(row =>
+ Array(row.getAs[Double]("prediction").toFloat))
+ checkEqual(xgb4jPred, xgbSparkPred)
+ }
}
}
diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala
index 9f9fc22755fd..19f6f324497e 100644
--- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala
+++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala
@@ -44,7 +44,7 @@ import ml.dmlc.xgboost4j.scala.spark.params._
/**
* Hold the column index
*/
-private[spark] case class ColumnIndices(
+private[scala] case class ColumnIndices(
labelId: Int,
featureId: Option[Int], // the feature type is VectorUDT or Array
featureIds: Option[Seq[Int]], // the feature type is columnar
diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala
index d7a83eb1b76a..bf27c51f8a72 100644
--- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala
+++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala
@@ -179,10 +179,35 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe
final def getFeatureTypes: Array[String] = $(featureTypes)
+ final val useExternalMemory = new BooleanParam(this, "useExternalMemory", "Whether to use " +
+ "the external memory or not when building QuantileDMatrix. Please note that " +
+ "useExternalMemory is useful only when `device` is set to `cuda` or `gpu`. When " +
+ "useExternalMemory is enabled, the directory specified by spark.local.dir if set will be " +
+ "used to cache the temporary files, if spark.local.dir is not set, the /tmp directory " +
+ "will be used.")
+
+ final def getUseExternalMemory: Boolean = $(useExternalMemory)
+
+ final val maxNumDevicePages = new IntParam(this, "maxNumDevicePages", "Maximum number of " +
+ "pages cached in device")
+
+ final def getMaxNumDevicePages: Int = $(maxNumDevicePages)
+
+ final val maxQuantileBatches = new IntParam(this, "maxQuantileBatches", "Maximum quantile " +
+ "batches")
+
+ final def getMaxQuantileBatches: Int = $(maxQuantileBatches)
+
+ final val minCachePageBytes = new IntParam(this, "minCachePageBytes", "Minimum number of " +
+ "bytes for each ellpack page in cache. Only used for in-host")
+
+ final def getMinCachePageBytes: Int = $(minCachePageBytes)
+
setDefault(numRound -> 100, numWorkers -> 1, inferBatchSize -> (32 << 10),
numEarlyStoppingRounds -> 0, forceRepartition -> false, missing -> Float.NaN,
featuresCols -> Array.empty, customObj -> null, customEval -> null,
- featureNames -> Array.empty, featureTypes -> Array.empty)
+ featureNames -> Array.empty, featureTypes -> Array.empty, useExternalMemory -> false,
+ maxNumDevicePages -> -1, maxQuantileBatches -> -1, minCachePageBytes -> -1)
addNonXGBoostParam(numWorkers, numRound, numEarlyStoppingRounds, inferBatchSize, featuresCol,
labelCol, baseMarginCol, weightCol, predictionCol, leafPredictionCol, contribPredictionCol,
@@ -224,6 +249,14 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe
def setFeatureTypes(value: Array[String]): T = set(featureTypes, value).asInstanceOf[T]
+ def setUseExternalMemory(value: Boolean): T = set(useExternalMemory, value).asInstanceOf[T]
+
+ def setMaxNumDevicePages(value: Int): T = set(maxNumDevicePages, value).asInstanceOf[T]
+
+ def setMaxQuantileBatches(value: Int): T = set(maxQuantileBatches, value).asInstanceOf[T]
+
+ def setMinCachePageBytes(value: Int): T = set(minCachePageBytes, value).asInstanceOf[T]
+
protected[spark] def featureIsArrayType(schema: StructType): Boolean =
schema(getFeaturesCol).dataType.isInstanceOf[ArrayType]
diff --git a/jvm-packages/xgboost4j/pom.xml b/jvm-packages/xgboost4j/pom.xml
index dcce62d09094..217cdcbe6a8a 100644
--- a/jvm-packages/xgboost4j/pom.xml
+++ b/jvm-packages/xgboost4j/pom.xml
@@ -100,6 +100,8 @@
${use.cuda}
--use-openmp
${use.openmp}
+ --use-debug
+ ${use.debug}
${user.dir}
${skip.native.build}
diff --git a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/XGBoostJNI.java b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/XGBoostJNI.java
index fa2f18be7ded..45b49a3b40f7 100644
--- a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/XGBoostJNI.java
+++ b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/XGBoostJNI.java
@@ -172,7 +172,10 @@ public final static native int XGDMatrixSetInfoFromInterface(
long handle, String field, String json);
public final static native int XGQuantileDMatrixCreateFromCallback(
- java.util.Iterator iter, long[] ref, String config, long[] out);
+ java.util.Iterator iter, long[] ref, String config, long[] out);
+
+ public final static native int XGExtMemQuantileDMatrixCreateFromCallback(
+ java.util.Iterator iter, long[] ref, String config, long[] out);
public final static native int XGDMatrixCreateFromArrayInterfaceColumns(
String featureJson, float missing, int nthread, long[] out);
diff --git a/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cpp b/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cpp
index 00bf5095cd03..40babba73082 100644
--- a/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cpp
+++ b/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cpp
@@ -9,9 +9,7 @@
#include "../../../../src/common/common.h"
namespace xgboost::jni {
-XGB_DLL int XGQuantileDMatrixCreateFromCallbackImpl(JNIEnv *jenv, jclass jcls, jobject jdata_iter,
- jlongArray jref, char const *config,
- jlongArray jout) {
+int QdmFromCallback(JNIEnv *, jobject, jlongArray, char const *, bool, jlongArray) {
API_BEGIN();
common::AssertGPUSupport();
API_END();
diff --git a/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu b/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu
index 6885624bd302..6adffb39447a 100644
--- a/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu
+++ b/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu
@@ -4,6 +4,7 @@
#include
#include
+#include "../../../../src/common/common.h"
#include "../../../../src/common/cuda_pinned_allocator.h"
#include "../../../../src/common/device_vector.cuh" // for device_vector
#include "../../../../src/data/array_interface.h"
@@ -58,7 +59,7 @@ void CopyColumnMask(xgboost::ArrayInterface<1> const &interface, std::vector
@@ -124,7 +125,6 @@ struct Symbols {
static constexpr StringView kBaseMargin{"baseMargin"};
static constexpr StringView kQid{"qid"};
};
-} // namespace
class JvmIter {
JNIEnv *jenv_;
@@ -201,7 +201,17 @@ class DMatrixProxy {
}
};
-class DataIteratorProxy {
+template
+Json GetLabel(Map const &jmap) {
+ auto it = jmap.find(Symbols::kLabel);
+ StringView msg{"Must have a label field."};
+ CHECK(it != jmap.cend()) << msg;
+ Json label = it->second;
+ CHECK(!IsA(label)) << msg;
+ return label;
+}
+
+class HostMemProxy {
DMatrixProxy proxy_;
JvmIter jiter_;
@@ -237,33 +247,28 @@ class DataIteratorProxy {
cudaStream_t copy_stream_;
public:
- explicit DataIteratorProxy(jobject jiter) : jiter_{jiter} {
+ explicit HostMemProxy(jobject jiter) : jiter_{jiter} {
this->Reset();
dh::safe_cuda(cudaStreamCreateWithFlags(©_stream_, cudaStreamNonBlocking));
}
- ~DataIteratorProxy() { dh::safe_cuda(cudaStreamDestroy(copy_stream_)); }
+ ~HostMemProxy() { dh::safe_cuda(cudaStreamDestroy(copy_stream_)); }
DMatrixHandle GetDMatrixHandle() const { return proxy_.GetDMatrixHandle(); }
// Helper function for staging meta info.
- void StageMetaInfo(Json json_interface) {
- CHECK(!IsA(json_interface));
- auto json_map = get