Skip to content

Commit

Permalink
KvikIO as an alternative GDS backend (#10593)
Browse files Browse the repository at this point in the history
This PR is a new take on #10468 that is less intrusive. It keeps the existing GDS backend and adds a new option `LIBCUDF_CUFILE_POLICY=KVIKIO` that make cudf use KvikIO.  

 The default policy is still `LIBCUDF_CUFILE_POLICY=GDS` 

cc. @vuule, @devavret, @GregoryKimball

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Robert Maynard (https://github.com/robertmaynard)
  - Devavret Makkar (https://github.com/devavret)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #10593
  • Loading branch information
madsbk authored Apr 20, 2022
1 parent c8c7271 commit 01d08af
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 126 deletions.
4 changes: 3 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ include(cmake/thirdparty/get_gtest.cmake)
include(cmake/Modules/JitifyPreprocessKernels.cmake)
# find cuFile
include(cmake/Modules/FindcuFile.cmake)
# find KvikIO
include(cmake/thirdparty/get_kvikio.cmake)

# Workaround until https://github.com/rapidsai/rapids-cmake/issues/176 is resolved
if(NOT BUILD_SHARED_LIBS)
Expand Down Expand Up @@ -586,7 +588,7 @@ add_dependencies(cudf jitify_preprocess_run)
target_link_libraries(
cudf
PUBLIC ${ARROW_LIBRARIES} libcudacxx::libcudacxx cudf::Thrust rmm::rmm
PRIVATE cuco::cuco ZLIB::ZLIB nvcomp::nvcomp
PRIVATE cuco::cuco ZLIB::ZLIB nvcomp::nvcomp kvikio::kvikio
)

# Add Conda library, and include paths if specified
Expand Down
31 changes: 31 additions & 0 deletions cpp/cmake/thirdparty/get_kvikio.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# =============================================================================
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
# =============================================================================

# This function finds KvikIO
function(find_and_configure_kvikio VERSION)

rapids_cpm_find(
KvikIO ${VERSION}
GLOBAL_TARGETS kvikio::kvikio
CPM_ARGS
GIT_REPOSITORY https://github.com/rapidsai/kvikio.git
GIT_TAG branch-${VERSION}
GIT_SHALLOW TRUE SOURCE_SUBDIR cpp
OPTIONS "KvikIO_BUILD_EXAMPLES OFF"
)

endfunction()

set(KVIKIO_MIN_VERSION_cudf "${CUDF_VERSION_MAJOR}.${CUDF_VERSION_MINOR}")
find_and_configure_kvikio(${KVIKIO_MIN_VERSION_cudf})
5 changes: 4 additions & 1 deletion cpp/src/io/utilities/config_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace {
/**
* @brief Defines which cuFile usage to enable.
*/
enum class usage_policy : uint8_t { OFF, GDS, ALWAYS };
enum class usage_policy : uint8_t { OFF, GDS, ALWAYS, KVIKIO };

/**
* @brief Get the current usage policy.
Expand All @@ -46,6 +46,7 @@ usage_policy get_env_policy()
if (env_val == "OFF") return usage_policy::OFF;
if (env_val == "GDS") return usage_policy::GDS;
if (env_val == "ALWAYS") return usage_policy::ALWAYS;
if (env_val == "KVIKIO") return usage_policy::KVIKIO;
CUDF_FAIL("Invalid LIBCUDF_CUFILE_POLICY value: " + env_val);
}
} // namespace
Expand All @@ -54,6 +55,8 @@ bool is_always_enabled() { return get_env_policy() == usage_policy::ALWAYS; }

bool is_gds_enabled() { return is_always_enabled() or get_env_policy() == usage_policy::GDS; }

bool is_kvikio_enabled() { return get_env_policy() == usage_policy::KVIKIO; }

} // namespace cufile_integration

namespace nvcomp_integration {
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/io/utilities/config_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ bool is_always_enabled();
*/
bool is_gds_enabled();

/**
* @brief Returns true if KvikIO is enabled.
*/
bool is_kvikio_enabled();

} // namespace cufile_integration

namespace nvcomp_integration {
Expand Down
44 changes: 31 additions & 13 deletions cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,9 @@
#include "file_io_utilities.hpp"
#include <cudf/io/data_sink.hpp>
#include <cudf/utilities/error.hpp>
#include <io/utilities/config_utils.hpp>

#include <kvikio/file_handle.hpp>
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
Expand All @@ -30,10 +32,15 @@ namespace io {
class file_sink : public data_sink {
public:
explicit file_sink(std::string const& filepath)
: _cufile_out(detail::make_cufile_output(filepath))
{
_output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc);
CUDF_EXPECTS(_output_stream.is_open(), "Cannot open output file");

if (detail::cufile_integration::is_kvikio_enabled()) {
_kvikio_file = kvikio::FileHandle(filepath, "w");
} else {
_cufile_out = detail::make_cufile_output(filepath);
}
}

virtual ~file_sink() { flush(); }
Expand All @@ -49,19 +56,15 @@ class file_sink : public data_sink {

size_t bytes_written() override { return _bytes_written; }

[[nodiscard]] bool supports_device_write() const override { return _cufile_out != nullptr; }

[[nodiscard]] bool is_device_write_preferred(size_t size) const override
[[nodiscard]] bool supports_device_write() const override
{
return _cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size);
return !_kvikio_file.closed() || _cufile_out != nullptr;
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
[[nodiscard]] bool is_device_write_preferred(size_t size) const override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

_cufile_out->write(gpu_data, _bytes_written, size);
_bytes_written += size;
return !_kvikio_file.closed() ||
(_cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size));
}

std::future<void> device_write_async(void const* gpu_data,
Expand All @@ -70,15 +73,30 @@ class file_sink : public data_sink {
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

auto result = _cufile_out->write_async(gpu_data, _bytes_written, size);
size_t offset = _bytes_written;
_bytes_written += size;
return result;

if (!_kvikio_file.closed()) {
// KvikIO's `pwrite()` returns a `std::future<size_t>` so we convert it
// to `std::future<void>`
return std::async(std::launch::deferred, [this, gpu_data, size, offset] {
_kvikio_file.pwrite(gpu_data, size, offset).get();
});
}
return _cufile_out->write_async(gpu_data, offset, size);
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");
return device_write_async(gpu_data, _bytes_written, stream).get();
}

private:
std::ofstream _output_stream;
size_t _bytes_written = 0;
std::unique_ptr<detail::cufile_output_impl> _cufile_out;
kvikio::FileHandle _kvikio_file;
};

/**
Expand Down
52 changes: 31 additions & 21 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,9 @@
#include <cudf/utilities/error.hpp>
#include <io/utilities/config_utils.hpp>

#include <kvikio/file_handle.hpp>
#include <rmm/device_buffer.hpp>

#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
Expand All @@ -33,50 +36,56 @@ namespace {
*/
class file_source : public datasource {
public:
explicit file_source(const char* filepath)
: _file(filepath, O_RDONLY), _cufile_in(detail::make_cufile_input(filepath))
explicit file_source(const char* filepath) : _file(filepath, O_RDONLY)
{
if (detail::cufile_integration::is_kvikio_enabled()) {
_kvikio_file = kvikio::FileHandle(filepath);
} else {
_cufile_in = detail::make_cufile_input(filepath);
}
}

virtual ~file_source() = default;

[[nodiscard]] bool supports_device_read() const override { return _cufile_in != nullptr; }
[[nodiscard]] bool supports_device_read() const override
{
return !_kvikio_file.closed() || _cufile_in != nullptr;
}

[[nodiscard]] bool is_device_read_preferred(size_t size) const override
{
return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size);
return !_kvikio_file.closed() ||
(_cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size));
}

std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read(offset, read_size, stream);
if (!_kvikio_file.closed()) { return _kvikio_file.pread(dst, read_size, offset); }
return _cufile_in->read_async(offset, read_size, dst, stream);
}

size_t device_read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read(offset, read_size, dst, stream);
return device_read_async(offset, size, dst, stream).get();
}

std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read_async(offset, read_size, dst, stream);
rmm::device_buffer out_data(size, stream);
size_t read = device_read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read, stream);
return datasource::buffer::create(std::move(out_data));
}

[[nodiscard]] size_t size() const override { return _file.size(); }
Expand All @@ -86,6 +95,7 @@ class file_source : public datasource {

private:
std::unique_ptr<detail::cufile_input_impl> _cufile_in;
kvikio::FileHandle _kvikio_file;
};

/**
Expand Down
24 changes: 0 additions & 24 deletions cpp/src/io/utilities/file_io_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,6 @@ cufile_input_impl::cufile_input_impl(std::string const& filepath)
pool.sleep_duration = 10;
}

std::unique_ptr<datasource::buffer> cufile_input_impl::read(size_t offset,
size_t size,
rmm::cuda_stream_view stream)
{
rmm::device_buffer out_data(size, stream);
auto read_size = read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read_size, stream);
return datasource::buffer::create(std::move(out_data));
}

namespace {

template <typename DataT,
Expand Down Expand Up @@ -234,27 +224,13 @@ std::future<size_t> cufile_input_impl::read_async(size_t offset,
return std::async(std::launch::deferred, waiter, std::move(slice_tasks));
}

size_t cufile_input_impl::read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream)
{
auto result = read_async(offset, size, dst, stream);
return result.get();
}

cufile_output_impl::cufile_output_impl(std::string const& filepath)
: shim{cufile_shim::instance()},
cf_file(shim, filepath, O_CREAT | O_RDWR | O_DIRECT, 0664),
pool(getenv_or("LIBCUDF_CUFILE_THREAD_COUNT", 16))
{
}

void cufile_output_impl::write(void const* data, size_t offset, size_t size)
{
write_async(data, offset, size).wait();
}

std::future<void> cufile_output_impl::write_async(void const* data, size_t offset, size_t size)
{
int device;
Expand Down
Loading

0 comments on commit 01d08af

Please sign in to comment.