Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-929 mmap #603

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 3 additions & 0 deletions .ccls
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
%compile_commands.json
%h -x
%h c++-header
6 changes: 6 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
BasedOnStyle: Google
ColumnLimit: 150
DerivePointerAlignment: false
PointerAlignment: Right
---
47 changes: 47 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ include(FeatureSummary)
include(ExternalProject)

option(SKIP_TESTS "Skips building all tests." OFF)
option(ENABLE_BENCHMARKING "Enables building of benchmark suites." OFF)

option(PORTABLE "Instructs the compiler to remove architecture specific optimizations" ON)

Expand Down Expand Up @@ -376,6 +377,7 @@ add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-20171024)
include_directories(thirdparty/concurrentqueue)
include_directories(thirdparty/yaml-cpp-yaml-cpp-20171024/include)
include_directories(thirdparty/rapidjson-1.1.0/include)
include_directories(thirdparty/mio/include)

## Expression language extensions
option(DISABLE_EXPRESSION_LANGUAGE "Disables the scripting extensions." OFF)
Expand Down Expand Up @@ -409,6 +411,7 @@ if (WIN32 OR NOT USE_SYSTEM_ZLIB)
add_dependencies(minifi zlib-external)
endif(WIN32 OR NOT USE_SYSTEM_ZLIB)


createExtension(STANDARD-PROCESSORS "STANDARD PROCESSORS" "Provides standard processors" "extensions/standard-processors" "extensions/standard-processors/tests/")


Expand All @@ -428,6 +431,7 @@ endif()

## Add the rocks DB extension
if (NOT ROCKSDB_FOUND OR BUILD_ROCKSDB)
set(USE_RTTI "TRUE")
set(BUILD_RD "TRUE")
endif()

Expand Down Expand Up @@ -706,6 +710,49 @@ include(CPack)

if (NOT SKIP_TESTS)
include(BuildTests)

# BENCHMARKING depends on test support code
if (ENABLE_BENCHMARKING)
set(BENCHMARK_LIB_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/google-benchmark-install/lib64")
set(BENCHMARK_BYPRODUCT "${BENCHMARK_LIB_DIR}/libbenchmark.a")
set(BENCHMARK_MAIN_BYPRODUCT "${BENCHMARK_LIB_DIR}/libbenchmark.a")
# GIT_REPOSITORY "https://github.com/google/benchmark.git"
#GIT_TAG "090faecb454fbd6e6e17a75ef8146acb037118d4" # Version 1.5.0
ExternalProject_Add(
google-benchmark-external
SOURCE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/benchmark-1.5.0"
CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
"-DCMAKE_INSTALL_PREFIX=${CMAKE_CURRENT_BINARY_DIR}/thirdparty/google-benchmark-install"
"-DBENCHMARK_ENABLE_TESTING=OFF"
BUILD_BYPRODUCTS ${BENCHMARK_BYPRODUCT} ${BENCHMARK_MAIN_BYPRODUCT}
)
add_library(benchmark STATIC IMPORTED)
set_target_properties(benchmark PROPERTIES IMPORTED_LOCATION ${BENCHMARK_BYPRODUCT})
add_dependencies(benchmark google-benchmark-external)
add_library(benchmark_main STATIC IMPORTED)
set_target_properties(benchmark_main PROPERTIES IMPORTED_LOCATION ${BENCHMARK_MAIN_BYPRODUCT})
add_dependencies(benchmark_main google-benchmark-external)
file(GLOB LIBMINIFI_BENCHMARKS "libminifi/benchmark/*.cpp")
set(ALL_BENCHMARKS "${LIBMINIFI_BENCHMARKS}")

set(BENCHMARK_COUNT 0)
foreach(benchmarkfile ${ALL_BENCHMARKS})
get_filename_component(benchmarkfilename "${benchmarkfile}" NAME_WE)
add_executable("${benchmarkfilename}" "${benchmarkfile}")
target_link_libraries("${benchmarkfilename}" benchmark benchmark_main "${CMAKE_THREAD_LIBS_INIT}")
target_link_libraries ("${benchmarkfilename}" -Wl,--whole-archive core-minifi minifi -Wl,--no-whole-archive)
appendIncludes("${benchmarkfilename}")
if (DISABLE_ROCKSDB STREQUAL "OFF" OR NOT DISABLE_ROCKSDB)
target_include_directories("${benchmarkfilename}" BEFORE PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/rocksdb/include")
target_include_directories("${benchmarkfilename}" BEFORE PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/extensions/rocksdb-repos")
target_link_libraries ("${benchmarkfilename}" -Wl,--whole-archive minifi-rocksdb-repos -Wl,--no-whole-archive)
add_definitions(-DENABLE_ROCKSDB_BENCHMARKS=1)
endif()
target_include_directories("${benchmarkfilename}" BEFORE PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/benchmark-1.5.0/include")
math(EXPR BENCHMARK_COUNT "${BENCHMARK_COUNT}+1")
endforeach()
message("-- Finished building ${BENCHMARK_COUNT} benchmark file(s)...")
endif()
endif()

include(BuildDocs)
Expand Down
47 changes: 34 additions & 13 deletions extensions/rocksdb-repos/DatabaseContentRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <string>
#include "RocksDbStream.h"
#include "io/DatabaseMemoryMap.h"
#include "rocksdb/merge_operator.h"

namespace org {
Expand All @@ -38,8 +39,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
}
rocksdb::Options options;
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
options.use_direct_reads = true;
// options.use_direct_io_for_flush_and_compaction = true;
// options.use_direct_reads = true;
Copy link
Contributor Author

@ai-christianson ai-christianson Jun 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean to commit this part, but I will note: on my machine without this, build failed, even on master, so we probably will need to look at this separately.

options.merge_operator = std::make_shared<StringAppender>();
options.error_if_exists = false;
options.max_successive_merges = 0;
Expand All @@ -48,7 +49,7 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
logger_->log_debug("NiFi Content DB Repository database open %s success", directory_);
is_valid_ = true;
} else {
logger_->log_error("NiFi Content DB Repository database open %s fail", directory_);
logger_->log_error("NiFi Content DB Repository database open %s fail due to %s", directory_, status.ToString());
is_valid_ = false;
}
return is_valid_;
Expand All @@ -62,19 +63,40 @@ void DatabaseContentRepository::stop() {
}

std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
// the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
if (nullptr == claim || !is_valid_ || !db_)
return nullptr;
// the traditional approach with these has been to return -1 from the stream;
// however, since we have the ability here we can simply return a nullptr,
// which is also valid from the API when this stream is not valid.
if (nullptr == claim || !is_valid_ || !db_) return nullptr;
// append is already supported in all modes
return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, true);
}

std::shared_ptr<io::BaseMemoryMap> DatabaseContentRepository::mmap(const std::shared_ptr<minifi::ResourceClaim> &claim, size_t map_size,
bool read_only) {
/**
* Because the underlying does not support direct mapping of the value to memory, we read the entire value in to memory, then write (iff not
* readOnly) it back to the db upon closure of the MemoryMap
*/

auto mm = std::make_shared<io::DatabaseMemoryMap>(claim, map_size, [this](const std::shared_ptr<minifi::ResourceClaim> &claim) {
remove(claim);
return write(claim);
}, read_only);

auto rs = read(claim);

if (rs != nullptr) {
rs->readData(reinterpret_cast<uint8_t *>(mm->getData()), map_size);
}

return mm;
}

std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
// the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
if (nullptr == claim || !is_valid_ || !db_)
return nullptr;
// the traditional approach with these has been to return -1 from the stream;
// however, since we have the ability here we can simply return a nullptr,
// which is also valid from the API when this stream is not valid.
if (nullptr == claim || !is_valid_ || !db_) return nullptr;
return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, false);
}

Expand All @@ -92,8 +114,7 @@ bool DatabaseContentRepository::exists(const std::shared_ptr<minifi::ResourceCla
}

bool DatabaseContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
if (nullptr == claim || !is_valid_ || !db_)
return false;
if (nullptr == claim || !is_valid_ || !db_) return false;
rocksdb::Status status;
status = db_->Delete(rocksdb::WriteOptions(), claim->getContentFullPath());
if (status.ok()) {
Expand Down
55 changes: 20 additions & 35 deletions extensions/rocksdb-repos/DatabaseContentRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_DatabaseContentRepository_H_
#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_DatabaseContentRepository_H_

#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"
#include "core/Core.h"
#include "core/Connectable.h"
#include "core/ContentRepository.h"
#include "properties/Configure.h"
#include "core/Core.h"
#include "core/logging/LoggerConfiguration.h"
#include "properties/Configure.h"
#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"
namespace org {
namespace apache {
namespace nifi {
Expand All @@ -35,15 +35,15 @@ namespace repository {
class StringAppender : public rocksdb::AssociativeMergeOperator {
public:
// Constructor: specify delimiter
explicit StringAppender() {

}
explicit StringAppender() {}

virtual bool Merge(const rocksdb::Slice& key, const rocksdb::Slice* existing_value, const rocksdb::Slice& value, std::string* new_value, rocksdb::Logger* logger) const {
virtual bool Merge(const rocksdb::Slice &key, const rocksdb::Slice *existing_value, const rocksdb::Slice &value, std::string *new_value,
rocksdb::Logger *logger) const {
// Clear the *new_value for writing.
if (nullptr == new_value) {
return false;
}

new_value->clear();

if (!existing_value) {
Expand All @@ -58,68 +58,53 @@ class StringAppender : public rocksdb::AssociativeMergeOperator {
return true;
}

virtual const char* Name() const {
return "StringAppender";
}
virtual const char *Name() const { return "StringAppender"; }

private:

};

/**
* DatabaseContentRepository is a content repository that stores data onto the local file system.
* DatabaseContentRepository is a content repository that stores data onto the
* local file system.
*/
class DatabaseContentRepository : public core::ContentRepository, public core::Connectable {
public:

DatabaseContentRepository(std::string name = getClassName<DatabaseContentRepository>(), utils::Identifier uuid = utils::Identifier())
: core::Connectable(name, uuid),
is_valid_(false),
db_(nullptr),
logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()) {
}
virtual ~DatabaseContentRepository() {
stop();
}
: core::Connectable(name, uuid), is_valid_(false), db_(nullptr), logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()) {}
virtual ~DatabaseContentRepository() { stop(); }

virtual bool initialize(const std::shared_ptr<minifi::Configure> &configuration);

virtual void stop();

virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append = false);

virtual std::shared_ptr<io::BaseMemoryMap> mmap(const std::shared_ptr<minifi::ResourceClaim> &claim, size_t mapSize, bool readOnly);

virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim);

virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) {
return remove(claim);
}
virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) { return remove(claim); }

virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);

virtual bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);

virtual void yield() {

}
virtual void yield() {}

/**
* Determines if we are connected and operating
*/
virtual bool isRunning() {
return true;
}
virtual bool isRunning() { return true; }

/**
* Determines if work is available by this connectable
* @return boolean if work is available.
*/
virtual bool isWorkAvailable() {
return true;
}
virtual bool isWorkAvailable() { return true; }

private:
bool is_valid_;
rocksdb::DB* db_;
rocksdb::DB *db_;
std::shared_ptr<logging::Logger> logger_;
};

Expand Down
Loading