Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add blob and s3 read support #20

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ParquetBlockInputFormat : public IInputFormat
private:
Chunk generate() override;

protected:
void prepareReader();

void onCancel() override
Expand Down
6 changes: 6 additions & 0 deletions utils/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ include_directories(
${JNI_INCLUDE_DIRS}
${CMAKE_CURRENT_BINARY_DIR}/proto
${ARROW_INCLUDE_DIR}
${ClickHouse_SOURCE_DIR}/contrib/arrow-cmake/cpp/src
${ClickHouse_SOURCE_DIR}/utils/local-engine
${ClickHouse_SOURCE_DIR}/src
${ClickHouse_SOURCE_DIR}/base
Expand Down Expand Up @@ -186,6 +187,11 @@ target_compile_options(_mariadbclient PRIVATE -fPIC)
target_compile_options(_hdfs3 PRIVATE -fPIC)
target_compile_options(_libxml2 PRIVATE -fPIC)
target_compile_options(_gsasl PRIVATE -fPIC)
target_compile_options(_parquet PRIVATE -fPIC)
target_compile_options(_arrow PRIVATE -fPIC)
target_compile_options(_thrift PRIVATE -fPIC)
target_compile_options(_aws_s3_checksums PRIVATE -fPIC)

target_compile_options(absl_str_format_internal PRIVATE -fPIC)
target_compile_options(absl_strings PRIVATE -fPIC)
target_compile_options(absl_raw_logging_internal PRIVATE -fPIC)
Expand Down
34 changes: 34 additions & 0 deletions utils/local-engine/Common/ChunkBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "ChunkBuffer.h"

namespace local_engine
{
void ChunkBuffer::add(DB::Chunk & columns, int start, int end)
{
if (accumulated_columns.empty())
{
auto num_cols = columns.getNumColumns();
accumulated_columns.reserve(num_cols);
for (size_t i = 0; i < num_cols; i++)
{
accumulated_columns.emplace_back(columns.getColumns()[i]->cloneEmpty());
}
}

for (size_t i = 0; i < columns.getNumColumns(); ++i)
accumulated_columns[i]->insertRangeFrom(*columns.getColumns()[i], start, end - start);
}
size_t ChunkBuffer::size() const
{
if (accumulated_columns.empty())
return 0;
return accumulated_columns.at(0)->size();
}
DB::Chunk ChunkBuffer::releaseColumns()
{
auto rows = size();
DB::Columns res(std::make_move_iterator(accumulated_columns.begin()), std::make_move_iterator(accumulated_columns.end()));
accumulated_columns.clear();
return DB::Chunk(res, rows);
}

}
17 changes: 17 additions & 0 deletions utils/local-engine/Common/ChunkBuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once
#include <Processors/Chunk.h>

namespace local_engine
{
class ChunkBuffer
{
public:
void add(DB::Chunk & columns, int start, int end);
size_t size() const;
DB::Chunk releaseColumns();

private:
DB::MutableColumns accumulated_columns;
};

}
114 changes: 114 additions & 0 deletions utils/local-engine/Common/DebugUtils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#include "DebugUtils.h"
#include <DataTypes/DataTypeDate.h>
#include <Formats/FormatSettings.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteBufferFromString.h>

namespace debug
{

void headBlock(const DB::Block & block, size_t count)
{
std::cerr << "============Block============" << std::endl;
// print header
for (auto name : block.getNames())
{
std::cerr << name << "\t";
}
std::cerr << std::endl;
// print rows
for (size_t row = 0; row < std::min(count, block.rows()); ++row)
{
for (size_t column = 0; column < block.columns(); ++column)
{
const auto type = block.getByPosition(column).type;
auto col = block.getByPosition(column).column;
DB::WhichDataType which(type);
if (which.isUInt())
{
auto value = col->getUInt(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isString())
{
auto value = DB::checkAndGetColumn<DB::ColumnString>(*col)->getDataAt(row).toString();
std::cerr << value << "\t";
}
else if (which.isInt())
{
auto value = col->getInt(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isFloat32())
{
auto value = col->getFloat32(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isFloat64())
{
auto value = col->getFloat64(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isDate())
{
auto * date_type = DB::checkAndGetDataType<DB::DataTypeDate>(type.get());
String date_string;
DB::WriteBufferFromString wb(date_string);
date_type->getSerialization(DB::ISerialization::Kind::DEFAULT)->serializeText(*col, row, wb, {});
std::cerr << date_string.substr(0, 10) << "\t";
}
else
{
std::cerr << "N/A"
<< "\t";
}
}
std::cerr << std::endl;
}
}

void headColumn(const DB::ColumnPtr column, size_t count)
{
std::cerr << "============Column============" << std::endl;
// print header

std::cerr << column->getName() << "\t";
std::cerr << std::endl;
// print rows
for (size_t row = 0; row < std::min(count, column->size()); ++row)
{
auto type = column->getDataType();
auto col = column;
DB::WhichDataType which(type);
if (which.isUInt())
{
auto value = col->getUInt(row);
std::cerr << std::to_string(value) << std::endl;
}
else if (which.isString())
{
auto value = DB::checkAndGetColumn<DB::ColumnString>(*col)->getDataAt(row).toString();
std::cerr << value << std::endl;
}
else if (which.isInt())
{
auto value = col->getInt(row);
std::cerr << std::to_string(value) << std::endl;
}
else if (which.isFloat32())
{
auto value = col->getFloat32(row);
std::cerr << std::to_string(value) << std::endl;
}
else if (which.isFloat64())
{
auto value = col->getFloat64(row);
std::cerr << std::to_string(value) << std::endl;
}
else
{
std::cerr << "N/A" << std::endl;
}
}
}
}
98 changes: 2 additions & 96 deletions utils/local-engine/Common/DebugUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,101 +6,7 @@
namespace debug
{

void headBlock(const DB::Block & block, size_t count=10)
{
std::cerr << "============Block============" << std::endl;
// print header
for (auto name : block.getNames())
{
std::cerr << name << "\t";
}
std::cerr << std::endl;
// print rows
for (size_t row = 0; row < std::min(count, block.rows()); ++row)
{
for (size_t column = 0; column < block.columns(); ++column)
{
auto type = block.getByPosition(column).type;
auto col = block.getByPosition(column).column;
DB::WhichDataType which(type);
if (which.isUInt())
{
auto value = DB::checkAndGetColumn<DB::ColumnUInt64>(*col)->getUInt(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isString())
{
auto value = DB::checkAndGetColumn<DB::ColumnString>(*col)->getDataAt(row).toString();
std::cerr << value << "\t";
}
else if (which.isInt())
{
auto value = col->getInt(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isFloat32())
{
auto value = col->getFloat32(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isFloat64())
{
auto value = col->getFloat64(row);
std::cerr << std::to_string(value) << "\t";
}
else
{
std::cerr << "N/A"
<< "\t";
}
}
std::cerr << std::endl;
}
}
void headBlock(const DB::Block & block, size_t count=10);

void headColumn(const DB::ColumnPtr column, size_t count=10)
{
std::cerr << "============Column============" << std::endl;
// print header

std::cerr << column->getName() << "\t";
std::cerr << std::endl;
// print rows
for (size_t row = 0; row < std::min(count, column->size()); ++row)
{
auto type = column->getDataType();
auto col = column;
DB::WhichDataType which(type);
if (which.isUInt())
{
auto value = DB::checkAndGetColumn<DB::ColumnUInt64>(*col)->getUInt(row);
std::cerr << std::to_string(value) << std::endl;
}
else if (which.isString())
{
auto value = DB::checkAndGetColumn<DB::ColumnString>(*col)->getDataAt(row).toString();
std::cerr << value << std::endl;
}
else if (which.isInt())
{
auto value = col->getInt(row);
std::cerr << std::to_string(value) << std::endl;
}
else if (which.isFloat32())
{
auto value = col->getFloat32(row);
std::cerr << std::to_string(value) << std::endl;
}
else if (which.isFloat64())
{
auto value = col->getFloat64(row);
std::cerr << std::to_string(value) << std::endl;
}
else
{
std::cerr << "N/A"
<< std::endl;
}
}
}
void headColumn(const DB::ColumnPtr column, size_t count=10);
}
2 changes: 1 addition & 1 deletion utils/local-engine/Common/MergeTreeTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ std::unique_ptr<SelectQueryInfo> buildQueryInfo(NamesAndTypesList& names_and_typ
}


MergeTreeTable parseMergeTreeTable(std::string & info)
MergeTreeTable parseMergeTreeTableString(std::string & info)
{
ReadBufferFromString in(info);
assertString("MergeTree;", in);
Expand Down
2 changes: 1 addition & 1 deletion utils/local-engine/Common/MergeTreeTool.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ namespace local_engine
std::string toString() const;
};

MergeTreeTable parseMergeTreeTable(std::string & info);
MergeTreeTable parseMergeTreeTableString(std::string & info);
}
Loading