Skip to content

Commit

Permalink
Add a new CTE Page Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
pdabre12 authored and Pratik Joseph Dabre committed Nov 6, 2024
1 parent 13c18db commit 37acf0d
Show file tree
Hide file tree
Showing 13 changed files with 366 additions and 4 deletions.
2 changes: 1 addition & 1 deletion velox/common/file/FileInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace facebook::velox::common {

FileInputStream::FileInputStream(
std::unique_ptr<ReadFile>&& file,
const std::shared_ptr<ReadFile>& file,
uint64_t bufferSize,
memory::MemoryPool* pool)
: file_(std::move(file)),
Expand Down
4 changes: 2 additions & 2 deletions velox/common/file/FileInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace facebook::velox::common {
class FileInputStream : public ByteInputStream {
public:
FileInputStream(
std::unique_ptr<ReadFile>&& file,
const std::shared_ptr<ReadFile>& file,
uint64_t bufferSize,
memory::MemoryPool* pool);

Expand Down Expand Up @@ -108,7 +108,7 @@ class FileInputStream : public ByteInputStream {

void updateStats(uint64_t readBytes, uint64_t readTimeNs);

const std::unique_ptr<ReadFile> file_;
const std::shared_ptr<ReadFile> file_;
const uint64_t fileSize_;
const uint64_t bufferSize_;
memory::MemoryPool* const pool_;
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ velox_link_libraries(

add_subdirectory(common)
add_subdirectory(catalog)
add_subdirectory(cte)
add_subdirectory(dwrf)
add_subdirectory(orc)
add_subdirectory(parquet)
4 changes: 4 additions & 0 deletions velox/dwio/common/Options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ FileFormat toFileFormat(std::string_view s) {
return FileFormat::NIMBLE;
} else if (s == "orc") {
return FileFormat::ORC;
} else if (s == "pagefile") {
return FileFormat::PAGEFILE;
}
return FileFormat::UNKNOWN;
}
Expand All @@ -61,6 +63,8 @@ std::string_view toString(FileFormat fmt) {
return "nimble";
case FileFormat::ORC:
return "orc";
case FileFormat::PAGEFILE:
return "pagefile";
default:
return "unknown";
}
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ enum class FileFormat {
PARQUET = 7,
NIMBLE = 8,
ORC = 9,
PAGEFILE = 10,
};

FileFormat toFileFormat(std::string_view s);
Expand Down
19 changes: 19 additions & 0 deletions velox/dwio/cte/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_subdirectory(reader)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
25 changes: 25 additions & 0 deletions velox/dwio/cte/RegisterCtePageReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

namespace facebook::velox::cte {

void registerCtePageReaderFactory();

void unregisterCtePageReaderFactory();

} // namespace facebook::velox::cte
16 changes: 16 additions & 0 deletions velox/dwio/cte/reader/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

velox_add_library(velox_dwio_cte_page_reader CtePageReader.cpp)
velox_link_libraries(velox_dwio_cte_page_reader velox_dwio_common fmt::fmt)
130 changes: 130 additions & 0 deletions velox/dwio/cte/reader/CtePageReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/cte/reader/CtePageReader.h"

namespace facebook::velox::cte {
using namespace facebook::velox::common;
using namespace dwio::common;

/// Metadata and options for reading Parquet.
class ReaderBase {
public:
ReaderBase(
std::unique_ptr<dwio::common::BufferedInput>,
const dwio::common::ReaderOptions& options);

virtual ~ReaderBase() = default;

memory::MemoryPool& getMemoryPool() const {
return pool_;
}

FileInputStream* fileInputStream() const {
return input_.get();
}

const dwio::common::ReaderOptions& getReaderOptions() const {
return options_;
}

private:
memory::MemoryPool& pool_;
// Copy of options. Must be owned by 'this'.
const dwio::common::ReaderOptions options_;
std::unique_ptr<FileInputStream> input_;
};

ReaderBase::ReaderBase(
std::unique_ptr<dwio::common::BufferedInput> input,
const dwio::common::ReaderOptions& options)
: pool_{options.memoryPool()}, options_{options} {
input_ = std::make_unique<facebook::velox::common::FileInputStream>(
input->getReadFile(), 1 << 20, &(options.memoryPool()));
}

CtePageReader::CtePageReader(
const dwio::common::ReaderOptions& options,
std::unique_ptr<dwio::common::BufferedInput> input)
: readerBase_(std::make_shared<ReaderBase>(std::move(input), options)){};

std::optional<uint64_t> CtePageReader::numberOfRows() const {
return readerBase_->fileInputStream()->size();
}

std::unique_ptr<RowReader> CtePageReader::createRowReader(
const RowReaderOptions& opts) const {
return std::make_unique<CtePageRowReader>(readerBase_, opts);
}

std::unique_ptr<CtePageReader> CtePageReader::create(
std::unique_ptr<dwio::common::BufferedInput> input,
const dwio::common::ReaderOptions& options) {
return std::make_unique<CtePageReader>(options, std::move(input));
}

CtePageRowReader::CtePageRowReader(
const std::shared_ptr<ReaderBase>& readerBase,
const dwio::common::RowReaderOptions& options)
: readerBase_{readerBase},
options_{options},
schema_{readerBase_->getReaderOptions().fileSchema()} {};

uint64_t CtePageRowReader::next(
uint64_t size,
velox::VectorPtr& result,
const dwio::common::Mutation*) {
if (readerBase_->fileInputStream()->atEnd()) {
return 0;
}

RowVectorPtr rowVectorPtr = vectorPtrToRowVector(size, result);
VectorStreamGroup::read(
readerBase_->fileInputStream(),
&(readerBase_->getMemoryPool()),
schema_,
&(rowVectorPtr),
&readOptions_);
return size;
}

const RowVectorPtr CtePageRowReader::vectorPtrToRowVector(
uint64_t size,
velox::VectorPtr result) {
std::vector<velox::VectorPtr> vectors;
vectors.emplace_back(result);

return std::make_shared<RowVector>(
&(readerBase_->getMemoryPool()),
schema_,
BufferPtr(nullptr),
size,
vectors);
}
} // namespace facebook::velox::cte

namespace {
void registerCtePageReaderFactory() {
facebook::velox::dwio::common::registerReaderFactory(
std::make_shared<facebook::velox::cte::CtePageReaderFactory>());
}

void unregisterCtePageReaderFactory() {
facebook::velox::dwio::common::unregisterReaderFactory(
facebook::velox::dwio::common::FileFormat::PAGEFILE);
}

} // namespace
135 changes: 135 additions & 0 deletions velox/dwio/cte/reader/CtePageReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once
#include "velox/common/file/FileInputStream.h"
#include "velox/dwio/common/Reader.h"
#include "velox/dwio/common/ReaderFactory.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::cte {

class ReaderBase;
class CtePageReader : public dwio::common::Reader {
public:
/**
* Constructor that lets the user specify reader options and input stream.
*/
CtePageReader(
const dwio::common::ReaderOptions& options,
std::unique_ptr<dwio::common::BufferedInput> input);

~CtePageReader() override = default;

std::optional<uint64_t> numberOfRows() const override;

/**
* Get statistics for a specified column.
* @param index column index
* @return column statisctics
*/
std::unique_ptr<dwio::common::ColumnStatistics> columnStatistics(
uint32_t index) const override;

/**
* Get the file schema.
* @return file schema
*/
const velox::RowTypePtr& rowType() const override;

/**
* Get the file schema attributed with type and column ids.
* @return file schema
*/
const std::shared_ptr<const dwio::common::TypeWithId>& typeWithId()
const override;

/**
* Create row reader object to fetch the data.
* @param options Row reader options describing the data to fetch
* @return Row reader
*/
std::unique_ptr<dwio::common::RowReader> createRowReader(
const dwio::common::RowReaderOptions& options = {}) const override;

// /**
// * Create a reader to the for the pagefile.
// * @param input the stream to read
// * @param options the options for reading the file
// */
static std::unique_ptr<CtePageReader> create(
std::unique_ptr<dwio::common::BufferedInput> input,
const dwio::common::ReaderOptions& options);

private:
std::shared_ptr<ReaderBase> readerBase_;
};

/// Implements the RowReader interface for TEMP.
class CtePageRowReader : public dwio::common::RowReader {
public:
CtePageRowReader(
const std::shared_ptr<ReaderBase>& readerBase,
const dwio::common::RowReaderOptions& options);
~CtePageRowReader() override = default;

int64_t nextRowNumber() override;

int64_t nextReadSize(uint64_t size) override;

uint64_t next(
uint64_t size,
velox::VectorPtr& result,
const dwio::common::Mutation* = nullptr) override;

void updateRuntimeStats(
dwio::common::RuntimeStatistics& stats) const override;

void resetFilterCaches() override;

std::optional<size_t> estimatedRowSize() const override;

bool allPrefetchIssued() const override {
// Allow opening the next split while this is reading.
return true;
}

protected:
std::shared_ptr<ReaderBase> readerBase_;
dwio::common::RowReaderOptions options_;
// memory::MemoryPool& pool_;
RowTypePtr schema_;
const serializer::presto::PrestoVectorSerde::PrestoOptions readOptions_;

private:
const RowVectorPtr vectorPtrToRowVector(
uint64_t size,
velox::VectorPtr vector);
};

class CtePageReaderFactory : public dwio::common::ReaderFactory {
public:
CtePageReaderFactory() : ReaderFactory(dwio::common::FileFormat::PAGEFILE) {}

std::unique_ptr<dwio::common::Reader> createReader(
std::unique_ptr<dwio::common::BufferedInput> input,
const dwio::common::ReaderOptions& options) override {
return CtePageReader::create(std::move(input), options);
}
};

} // namespace facebook::velox::cte
Loading

0 comments on commit 37acf0d

Please sign in to comment.