Skip to content

Commit

Permalink
fix(hdfs): Use available port for HdfsMiniCluster (facebookincubator#…
Browse files Browse the repository at this point in the history
…11996)

Summary:
Fixes: facebookincubator#11857

Pull Request resolved: facebookincubator#11996

Reviewed By: xiaoxmeng

Differential Revision: D67765034

Pulled By: kevinwilfong

fbshipit-source-id: 779e62ebfd3ae6d852d551e7995187cd95b96d1a
  • Loading branch information
majetideepak authored and athmaja-n committed Jan 10, 2025
1 parent cda8240 commit 2b4ba7f
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
#include <gmock/gmock-matchers.h>
#include <atomic>
#include <random>
#include "HdfsMiniCluster.h"
#include "gtest/gtest.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h"
#include "velox/core/QueryConfig.h"
#include "velox/exec/tests/utils/TempFilePath.h"
#include "velox/external/hdfs/ArrowHdfsInternal.h"
Expand All @@ -36,14 +36,9 @@ using filesystems::arrow::io::internal::LibHdfsShim;

constexpr int kOneMB = 1 << 20;
static const std::string destinationPath = "/test_file.txt";
static const std::string hdfsPort = "7878";
static const std::string localhost = "localhost";
static const std::string fullDestinationPath =
"hdfs://" + localhost + ":" + hdfsPort + destinationPath;
static const std::string simpleDestinationPath = "hdfs://" + destinationPath;
static const std::string viewfsDestinationPath = "viewfs://" + destinationPath;
static const std::unordered_map<std::string, std::string> configurationValues(
{{"hive.hdfs.host", localhost}, {"hive.hdfs.port", hdfsPort}});
std::unordered_map<std::string, std::string> configurationValues;

class HdfsFileSystemTest : public testing::Test {
public:
Expand All @@ -55,6 +50,12 @@ class HdfsFileSystemTest : public testing::Test {
auto tempFile = createFile();
miniCluster->addFile(tempFile->getPath(), destinationPath);
}
configurationValues.insert(
{"hive.hdfs.host", std::string(miniCluster->host())});
configurationValues.insert(
{"hive.hdfs.port", std::string(miniCluster->nameNodePort())});
fullDestinationPath_ =
fmt::format("{}{}", miniCluster->url(), destinationPath);
}

void SetUp() override {
Expand All @@ -67,8 +68,18 @@ class HdfsFileSystemTest : public testing::Test {
static void TearDownTestSuite() {
miniCluster->stop();
}

static std::unique_ptr<WriteFile> openFileForWrite(std::string_view path) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFilePath = fmt::format("{}{}", miniCluster->url(), path);
auto hdfsFileSystem = filesystems::getFileSystem(hdfsFilePath, config);
return hdfsFileSystem->openFileForWrite(path);
}

static std::atomic<bool> startThreads;
static std::shared_ptr<filesystems::test::HdfsMiniCluster> miniCluster;
static std::string fullDestinationPath_;

private:
static std::shared_ptr<::exec::test::TempFilePath> createFile() {
Expand All @@ -84,6 +95,7 @@ class HdfsFileSystemTest : public testing::Test {
std::shared_ptr<filesystems::test::HdfsMiniCluster>
HdfsFileSystemTest::miniCluster = nullptr;
std::atomic<bool> HdfsFileSystemTest::startThreads = false;
std::string HdfsFileSystemTest::fullDestinationPath_;

void readData(ReadFile* readFile) {
ASSERT_EQ(readFile->size(), 15 + kOneMB);
Expand All @@ -107,15 +119,6 @@ void readData(ReadFile* readFile) {
ASSERT_EQ(warfFromBuf, "abbbbbcc");
}

std::unique_ptr<WriteFile> openFileForWrite(std::string_view path) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
std::string hdfsFilePath =
"hdfs://" + localhost + ":" + hdfsPort + std::string(path);
auto hdfsFileSystem = filesystems::getFileSystem(hdfsFilePath, config);
return hdfsFileSystem->openFileForWrite(path);
}

void checkReadErrorMessages(
ReadFile* readFile,
std::string errorMessage,
Expand Down Expand Up @@ -184,54 +187,61 @@ void verifyFailures(LibHdfsShim* driver, hdfsFS hdfs) {
}

hdfsFS connectHdfsDriver(
filesystems::arrow::io::internal::LibHdfsShim** driver) {
filesystems::arrow::io::internal::LibHdfsShim** driver,
const std::string host,
const std::string port) {
filesystems::arrow::io::internal::LibHdfsShim* libhdfs_shim;
auto status = filesystems::arrow::io::internal::ConnectLibHdfs(&libhdfs_shim);
if (!status.ok()) {
LOG(ERROR) << "ConnectLibHdfs failed ";
}
VELOX_CHECK(status.ok(), "ConnectLibHdfs failed.");

// Connect to HDFS with the builder object
hdfsBuilder* builder = libhdfs_shim->NewBuilder();
libhdfs_shim->BuilderSetNameNode(builder, localhost.c_str());
libhdfs_shim->BuilderSetNameNodePort(builder, 7878);
libhdfs_shim->BuilderSetNameNode(builder, host.c_str());
libhdfs_shim->BuilderSetNameNodePort(builder, std::stoi(port));
libhdfs_shim->BuilderSetForceNewInstance(builder);

auto hdfs = libhdfs_shim->BuilderConnect(builder);
VELOX_CHECK_NOT_NULL(
hdfs,
"Unable to connect to HDFS: {}, got error",
std::string(localhost.c_str()) + ":7878");
"Unable to connect to HDFS at {}:{}, got error",
host.c_str(),
port);
*driver = libhdfs_shim;
return hdfs;
}

TEST_F(HdfsFileSystemTest, read) {
filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);
auto hdfs = connectHdfsDriver(
&driver,
std::string(miniCluster->host()),
std::string(miniCluster->nameNodePort()));
HdfsReadFile readFile(driver, hdfs, destinationPath);
readData(&readFile);
}

TEST_F(HdfsFileSystemTest, viaFileSystem) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());
}

TEST_F(HdfsFileSystemTest, initializeFsWithEndpointInfoInFilePath) {
// Without host/port configured.
auto config = std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>());
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());

// Wrong endpoint info specified in hdfs file path.
const std::string wrongFullDestinationPath =
"hdfs://not_exist_host:" + hdfsPort + destinationPath;
"hdfs://not_exist_host:" + std::string(miniCluster->nameNodePort()) +
destinationPath;
VELOX_ASSERT_THROW(
filesystems::getFileSystem(wrongFullDestinationPath, config),
"Unable to connect to HDFS");
Expand All @@ -240,23 +250,25 @@ TEST_F(HdfsFileSystemTest, initializeFsWithEndpointInfoInFilePath) {
TEST_F(HdfsFileSystemTest, fallbackToUseConfig) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());
}

TEST_F(HdfsFileSystemTest, oneFsInstanceForOneEndpoint) {
auto hdfsFileSystem1 =
filesystems::getFileSystem(fullDestinationPath, nullptr);
filesystems::getFileSystem(fullDestinationPath_, nullptr);
auto hdfsFileSystem2 =
filesystems::getFileSystem(fullDestinationPath, nullptr);
filesystems::getFileSystem(fullDestinationPath_, nullptr);
ASSERT_TRUE(hdfsFileSystem1 == hdfsFileSystem2);
}

TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);

VELOX_ASSERT_RUNTIME_THROW_CODE(
hdfsFileSystem->openFileForRead(
Expand All @@ -268,7 +280,7 @@ TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) {
TEST_F(HdfsFileSystemTest, missingHost) {
try {
std::unordered_map<std::string, std::string> missingHostConfiguration(
{{"hive.hdfs.port", hdfsPort}});
{{"hive.hdfs.port", std::string(miniCluster->nameNodePort())}});
auto config = std::make_shared<const config::ConfigBase>(
std::move(missingHostConfiguration));
filesystems::HdfsFileSystem hdfsFileSystem(
Expand All @@ -287,7 +299,7 @@ TEST_F(HdfsFileSystemTest, missingHost) {
TEST_F(HdfsFileSystemTest, missingPort) {
try {
std::unordered_map<std::string, std::string> missingPortConfiguration(
{{"hive.hdfs.host", localhost}});
{{"hive.hdfs.host", std::string(miniCluster->host())}});
auto config = std::make_shared<const config::ConfigBase>(
std::move(missingPortConfiguration));
filesystems::HdfsFileSystem hdfsFileSystem(
Expand All @@ -306,7 +318,10 @@ TEST_F(HdfsFileSystemTest, missingPort) {
TEST_F(HdfsFileSystemTest, missingFileViaReadFile) {
try {
filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);
auto hdfs = connectHdfsDriver(
&driver,
std::string(miniCluster->host()),
std::string(miniCluster->nameNodePort()));
HdfsReadFile readFile(driver, hdfs, "/path/that/does/not/exist");
FAIL() << "expected VeloxException";
} catch (VeloxException const& error) {
Expand All @@ -329,8 +344,8 @@ TEST_F(HdfsFileSystemTest, schemeMatching) {
"No registered file system matched with file path 'file://'"));
}
auto fs = std::dynamic_pointer_cast<filesystems::HdfsFileSystem>(
filesystems::getFileSystem(fullDestinationPath, nullptr));
ASSERT_TRUE(fs->isHdfsFile(fullDestinationPath));
filesystems::getFileSystem(fullDestinationPath_, nullptr));
ASSERT_TRUE(fs->isHdfsFile(fullDestinationPath_));

fs = std::dynamic_pointer_cast<filesystems::HdfsFileSystem>(
filesystems::getFileSystem(viewfsDestinationPath, nullptr));
Expand All @@ -342,7 +357,7 @@ TEST_F(HdfsFileSystemTest, writeNotSupported) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath, config);
filesystems::getFileSystem(fullDestinationPath_, config);
hdfsFileSystem->openFileForWrite("/path");
} catch (VeloxException const& error) {
EXPECT_EQ(error.message(), "Write to HDFS is unsupported");
Expand All @@ -354,7 +369,7 @@ TEST_F(HdfsFileSystemTest, removeNotSupported) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath, config);
filesystems::getFileSystem(fullDestinationPath_, config);
hdfsFileSystem->remove("/path");
} catch (VeloxException const& error) {
EXPECT_EQ(error.message(), "Does not support removing files from hdfs");
Expand All @@ -365,8 +380,10 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) {
startThreads = false;

filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);

auto hdfs = connectHdfsDriver(
&driver,
std::string(miniCluster->host()),
std::string(miniCluster->nameNodePort()));
std::vector<std::thread> threads;
std::mt19937 generator(std::random_device{}());
std::vector<int> sleepTimesInMicroseconds = {0, 500, 50000};
Expand Down Expand Up @@ -396,7 +413,8 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithFileSystem) {
startThreads = false;
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);

std::vector<std::thread> threads;
std::mt19937 generator(std::random_device{}());
Expand All @@ -414,7 +432,7 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithFileSystem) {
}
std::this_thread::sleep_for(
std::chrono::microseconds(sleepTimesInMicroseconds[index]));
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());
});
threads.emplace_back(std::move(thread));
Expand Down Expand Up @@ -478,6 +496,9 @@ TEST_F(HdfsFileSystemTest, writeWithParentDirNotExist) {

TEST_F(HdfsFileSystemTest, readFailures) {
filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);
auto hdfs = connectHdfsDriver(
&driver,
std::string(miniCluster->host()),
std::string(miniCluster->nameNodePort()));
verifyFailures(driver, hdfs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/

#include "HdfsMiniCluster.h"
#include "velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h"

#include "velox/exec/tests/utils/PortUtil.h"

namespace facebook::velox::filesystems::test {
void HdfsMiniCluster::start() {
Expand All @@ -28,16 +30,16 @@ void HdfsMiniCluster::start() {
noMapReduceOption,
formatNameNodeOption,
httpPortOption,
httpPort,
httpPort_,
nameNodePortOption,
nameNodePort,
nameNodePort_,
configurationOption,
turnOffPermissions);
serverProcess_->wait_for(std::chrono::duration<int, std::milli>(60000));
VELOX_CHECK_EQ(
serverProcess_->exit_code(),
383,
"Minicluster process exited, code: ",
"Minicluster process exited, code: {}",
serverProcess_->exit_code());
} catch (const std::exception& e) {
VELOX_FAIL("Failed to launch Minicluster server: {}", e.what());
Expand Down Expand Up @@ -71,6 +73,11 @@ HdfsMiniCluster::HdfsMiniCluster() {
VELOX_FAIL(
"Failed to find minicluster executable {}'", miniClusterExecutableName);
}
constexpr auto kHostAddressTemplate = "hdfs://{}:{}";
auto ports = facebook::velox::exec::test::getFreePorts(2);
nameNodePort_ = fmt::format("{}", ports[0]);
httpPort_ = fmt::format("{}", ports[1]);
filesystemUrl_ = fmt::format(kHostAddressTemplate, host(), nameNodePort_);
boost::filesystem::path hadoopHomeDirectory = exePath_;
hadoopHomeDirectory.remove_leaf().remove_leaf();
setupEnvironment(hadoopHomeDirectory.string());
Expand All @@ -82,12 +89,12 @@ void HdfsMiniCluster::addFile(std::string source, std::string destination) {
exePath_,
filesystemCommand,
filesystemUrlOption,
filesystemUrl,
filesystemUrl_,
filePutOption,
source,
destination);
bool isExited =
filePutProcess->wait_for(std::chrono::duration<int, std::milli>(5000));
filePutProcess->wait_for(std::chrono::duration<int, std::milli>(15000));
if (!isExited) {
VELOX_FAIL(
"Failed to add file to hdfs, exit code: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@ static const std::string miniclusterCommand{"minicluster"};
static const std::string noMapReduceOption{"-nomr"};
static const std::string formatNameNodeOption{"-format"};
static const std::string httpPortOption{"-nnhttpport"};
static const std::string httpPort{"7676"};
static const std::string nameNodePortOption{"-nnport"};
static const std::string nameNodePort{"7878"};
static const std::string configurationOption{"-D"};
static const std::string turnOffPermissions{"dfs.permissions=false"};
static const std::string filesystemCommand{"fs"};
static const std::string filesystemUrlOption{"-fs"};
static const std::string filesystemUrl{"hdfs://localhost:" + nameNodePort};
static const std::string filePutOption{"-put"};

class HdfsMiniCluster {
Expand All @@ -54,11 +51,27 @@ class HdfsMiniCluster {
void addFile(std::string source, std::string destination);
virtual ~HdfsMiniCluster();

std::string_view nameNodePort() const {
return nameNodePort_;
}

std::string_view url() const {
return filesystemUrl_;
}

std::string_view host() const {
static const std::string_view kLocalhost = "localhost";
return kLocalhost;
}

private:
void setupEnvironment(const std::string& homeDirectory);

std::unique_ptr<::boost::process::child> serverProcess_;
boost::filesystem::path exePath_;
boost::process::environment env_;
std::string nameNodePort_;
std::string httpPort_;
std::string filesystemUrl_;
};
} // namespace facebook::velox::filesystems::test
Loading

0 comments on commit 2b4ba7f

Please sign in to comment.