diff --git a/velox/common/caching/CachedFactory.h b/velox/common/caching/CachedFactory.h index 22d266ec6235..8d11f78e7dcc 100644 --- a/velox/common/caching/CachedFactory.h +++ b/velox/common/caching/CachedFactory.h @@ -146,6 +146,7 @@ template < typename Value, typename Generator, typename Properties = void, + typename Stats = void, typename Sizer = DefaultSizer, typename Comparator = std::equal_to, typename Hash = std::hash> @@ -178,7 +179,8 @@ class CachedFactory { /// will probably mess with your memory model, so really try to avoid it. CachedPtr generate( const Key& key, - const Properties* properties = nullptr); + const Properties* properties = nullptr, + Stats* ioStats = nullptr); /// Looks up the cache entry of the given key if it exists, otherwise returns /// null. @@ -358,17 +360,25 @@ template < typename Value, typename Generator, typename Properties, + typename Stats, typename Sizer, typename Comparator, typename Hash> -CachedPtr -CachedFactory:: - generate(const Key& key, const Properties* properties) { +CachedPtr CachedFactory< + Key, + Value, + Generator, + Properties, + Stats, + Sizer, + Comparator, + Hash>:: + generate(const Key& key, const Properties* properties, Stats* stats) { process::TraceContext trace("CachedFactory::generate"); if (cache_ == nullptr) { return CachedPtr{ /*fromCache=*/false, - (*generator_)(key, properties).release(), + (*generator_)(key, properties, stats).release(), nullptr, std::make_unique(key)}; } @@ -408,7 +418,7 @@ CachedFactory:: pendingCv_.notify_all(); }; - std::unique_ptr generatedValue = (*generator_)(key, properties); + std::unique_ptr generatedValue = (*generator_)(key, properties, stats); const uint64_t valueSize = Sizer()(*generatedValue); Value* rawValue = generatedValue.release(); const bool inserted = addCache(key, rawValue, valueSize); @@ -433,12 +443,19 @@ template < typename Value, typename Generator, typename Properties, + typename Stats, typename Sizer, typename Comparator, typename Hash> -CachedPtr -CachedFactory::get( - const Key& key) { +CachedPtr CachedFactory< + Key, + Value, + Generator, + Properties, + Stats, + Sizer, + Comparator, + Hash>::get(const Key& key) { if (cache_ == nullptr) { return {}; } @@ -460,10 +477,19 @@ template < typename Value, typename Generator, typename Properties, + typename Stats, typename Sizer, typename Comparator, typename Hash> -void CachedFactory:: +void CachedFactory< + Key, + Value, + Generator, + Properties, + Stats, + Sizer, + Comparator, + Hash>:: retrieveCached( const std::vector& keys, std::vector>>& diff --git a/velox/common/caching/tests/CachedFactoryTest.cpp b/velox/common/caching/tests/CachedFactoryTest.cpp index e8161a979256..7ddf22a27052 100644 --- a/velox/common/caching/tests/CachedFactoryTest.cpp +++ b/velox/common/caching/tests/CachedFactoryTest.cpp @@ -30,7 +30,8 @@ namespace { struct DoublerGenerator { std::unique_ptr operator()( const int& value, - const void* properties = nullptr) { + const void* properties = nullptr, + const void* stats = nullptr) { ++generated; return std::make_unique(value * 2); } @@ -40,7 +41,8 @@ struct DoublerGenerator { struct IdentityGenerator { std::unique_ptr operator()( const int& value, - const void* properties = nullptr) { + const void* properties = nullptr, + const void* stats = nullptr) { return std::make_unique(value); } }; @@ -113,7 +115,8 @@ TEST(CachedFactoryTest, basicGeneration) { struct DoublerWithExceptionsGenerator { std::unique_ptr operator()( const int& value, - const void* properties = nullptr) { + const void* properties = nullptr, + const void* stats = nullptr) { if (value == 3) { VELOX_FAIL("3 is bad"); } diff --git a/velox/common/file/FileSystems.cpp b/velox/common/file/FileSystems.cpp index aa738b6a43ba..f4e84e587b16 100644 --- a/velox/common/file/FileSystems.cpp +++ b/velox/common/file/FileSystems.cpp @@ -115,7 +115,8 @@ class LocalFileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options) override { + const FileOptions& options, + io::IoStatistics* ioStats) override { return std::make_unique( extractPath(path), executor_.get(), options.bufferIo); } diff --git a/velox/common/file/FileSystems.h b/velox/common/file/FileSystems.h index 1829215b625b..a01695588488 100644 --- a/velox/common/file/FileSystems.h +++ b/velox/common/file/FileSystems.h @@ -16,6 +16,7 @@ #pragma once #include "velox/common/base/Exceptions.h" +#include "velox/common/io/IoStatistics.h" #include "velox/common/memory/MemoryPool.h" #include @@ -103,7 +104,8 @@ class FileSystem { /// Returns a ReadFile handle for a given file path virtual std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options = {}) = 0; + const FileOptions& options = {}, + io::IoStatistics* ioStats = nullptr) = 0; /// Returns a WriteFile handle for a given file path virtual std::unique_ptr openFileForWrite( diff --git a/velox/common/file/tests/FaultyFileSystem.cpp b/velox/common/file/tests/FaultyFileSystem.cpp index 339e267c0971..dff950a2a8c4 100644 --- a/velox/common/file/tests/FaultyFileSystem.cpp +++ b/velox/common/file/tests/FaultyFileSystem.cpp @@ -55,7 +55,8 @@ fileSystemGenerator() { std::unique_ptr FaultyFileSystem::openFileForRead( std::string_view path, - const FileOptions& options) { + const FileOptions& options, + io::IoStatistics* ioStats) { const std::string delegatedPath = std::string(extractPath(path)); auto delegatedFile = getFileSystem(delegatedPath, config_) ->openFileForRead(delegatedPath, options); diff --git a/velox/common/file/tests/FaultyFileSystem.h b/velox/common/file/tests/FaultyFileSystem.h index f85314c75e8f..d7ac86e044c2 100644 --- a/velox/common/file/tests/FaultyFileSystem.h +++ b/velox/common/file/tests/FaultyFileSystem.h @@ -54,7 +54,8 @@ class FaultyFileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options = {}) override; + const FileOptions& options = {}, + io::IoStatistics* ioStats = nullptr) override; std::unique_ptr openFileForWrite( std::string_view path, diff --git a/velox/common/io/IoStatistics.cpp b/velox/common/io/IoStatistics.cpp index 7dfddc6dc483..ba318af9808f 100644 --- a/velox/common/io/IoStatistics.cpp +++ b/velox/common/io/IoStatistics.cpp @@ -108,6 +108,24 @@ IoStatistics::operationStats() const { return operationStats_; } +std::unordered_map IoStatistics::storageStats() + const { + std::lock_guard lock{storageStatsMutex_}; + return storageStats_; +} + +void IoStatistics::addStorageStats( + const std::string& name, + const RuntimeCounter& counter) { + std::lock_guard lock{storageStatsMutex_}; + if (storageStats_.count(name) == 0) { + storageStats_.emplace(name, RuntimeMetric(counter.unit)); + } else { + VELOX_CHECK_EQ(storageStats_.at(name).unit, counter.unit); + } + storageStats_.at(name).addValue(counter.value); +} + void IoStatistics::merge(const IoStatistics& other) { rawBytesRead_ += other.rawBytesRead_; rawBytesWritten_ += other.rawBytesWritten_; @@ -123,6 +141,11 @@ void IoStatistics::merge(const IoStatistics& other) { for (auto& item : other.operationStats_) { operationStats_[item.first].merge(item.second); } + + std::lock_guard l2(storageStatsMutex_); + for (auto& item : other.storageStats_) { + storageStats_[item.first].merge(item.second); + } } void OperationCounters::merge(const OperationCounters& other) { diff --git a/velox/common/io/IoStatistics.h b/velox/common/io/IoStatistics.h index 2111a8877b47..6d31e0dfef14 100644 --- a/velox/common/io/IoStatistics.h +++ b/velox/common/io/IoStatistics.h @@ -23,6 +23,8 @@ #include #include +#include "velox/common/base/Exceptions.h" +#include "velox/common/base/RuntimeMetrics.h" namespace facebook::velox::io { @@ -140,6 +142,9 @@ class IoStatistics { const uint64_t partialThrottleCount = 0); std::unordered_map operationStats() const; + std::unordered_map storageStats() const; + + void addStorageStats(const std::string& name, const RuntimeCounter& counter); void merge(const IoStatistics& other); @@ -172,7 +177,9 @@ class IoStatistics { IoCounter queryThreadIoLatency_; std::unordered_map operationStats_; + std::unordered_map storageStats_; mutable std::mutex operationStatsMutex_; + mutable std::mutex storageStatsMutex_; }; } // namespace facebook::velox::io diff --git a/velox/connectors/hive/FileHandle.cpp b/velox/connectors/hive/FileHandle.cpp index 7678fb7a6c35..a05d2a3e9b0d 100644 --- a/velox/connectors/hive/FileHandle.cpp +++ b/velox/connectors/hive/FileHandle.cpp @@ -41,7 +41,8 @@ std::string groupName(const std::string& filename) { std::unique_ptr FileHandleGenerator::operator()( const std::string& filename, - const FileProperties* properties) { + const FileProperties* properties, + io::IoStatistics* ioStats) { // We have seen cases where drivers are stuck when creating file handles. // Adding a trace here to spot this more easily in future. process::TraceContext trace("FileHandleGenerator::operator()"); @@ -55,7 +56,7 @@ std::unique_ptr FileHandleGenerator::operator()( options.fileSize = properties->fileSize; } fileHandle->file = filesystems::getFileSystem(filename, properties_) - ->openFileForRead(filename, options); + ->openFileForRead(filename, options, ioStats); fileHandle->uuid = StringIdLease(fileIds(), filename); fileHandle->groupId = StringIdLease(fileIds(), groupName(filename)); VLOG(1) << "Generating file handle for: " << filename diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index 5db30b1d7f4c..4f69535bc563 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -29,6 +29,7 @@ #include "velox/common/caching/FileIds.h" #include "velox/common/config/Config.h" #include "velox/common/file/File.h" +#include "velox/common/io/IoStatistics.h" #include "velox/connectors/hive/FileProperties.h" namespace facebook::velox { @@ -69,7 +70,8 @@ class FileHandleGenerator { : properties_(std::move(properties)) {} std::unique_ptr operator()( const std::string& filename, - const FileProperties* properties); + const FileProperties* properties, + io::IoStatistics* ioStats); private: const std::shared_ptr properties_; @@ -80,6 +82,7 @@ using FileHandleFactory = CachedFactory< FileHandle, FileHandleGenerator, FileProperties, + io::IoStatistics, FileHandleSizer>; using FileHandleCachedPtr = CachedPtr; diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 0edfb3a0ea7c..dbac54e81889 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -513,6 +513,11 @@ std::unordered_map HiveDataSource::runtimeStats() { if (numBucketConversion_ > 0) { res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)}); } + for (const auto& storageStats : ioStats_->storageStats()) { + res.emplace( + storageStats.first, + RuntimeCounter(storageStats.second.sum, storageStats.second.unit)); + } return res; } diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index ebbb543e0e54..d5d32b5d7873 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -229,8 +229,8 @@ RowTypePtr SplitReader::createReader() { try { fileHandleCachePtr = fileHandleFactory_->generate( hiveSplit_->filePath, - hiveSplit_->properties.has_value() ? &*hiveSplit_->properties - : nullptr); + hiveSplit_->properties.has_value() ? &*hiveSplit_->properties : nullptr, + ioStats_.get()); VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get()); } catch (const VeloxRuntimeError& e) { if (e.errorCode() == error_code::kFileNotFound && diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp index 7e63c2df1438..74c7129ff612 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp @@ -208,7 +208,8 @@ std::string AbfsFileSystem::name() const { std::unique_ptr AbfsFileSystem::openFileForRead( std::string_view path, - const FileOptions& options) { + const FileOptions& options, + io::IoStatistics* ioStats) { auto abfsfile = std::make_unique(path, *config_); abfsfile->initialize(options); return abfsfile; diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h index c0d3d60ccdee..a5339e0bb15e 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h @@ -40,7 +40,8 @@ class AbfsFileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options = {}) override; + const FileOptions& options = {}, + io::IoStatistics* ioStats = nullptr) override; std::unique_ptr openFileForWrite( std::string_view path, diff --git a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp index f5d0830d3de3..11100b477781 100644 --- a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp @@ -340,7 +340,8 @@ void GcsFileSystem::initializeClient() { std::unique_ptr GcsFileSystem::openFileForRead( std::string_view path, - const FileOptions& options) { + const FileOptions& options, + io::IoStatistics* ioStats) { const auto gcspath = gcsPath(path); auto gcsfile = std::make_unique(gcspath, impl_->getClient()); gcsfile->initialize(options); diff --git a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h index 34daff8d6c64..a7a5ef7d44b4 100644 --- a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h @@ -41,7 +41,8 @@ class GcsFileSystem : public FileSystem { /// [[https://cloud.google.com/storage/docs/samples/storage-stream-file-download]]. std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options = {}) override; + const FileOptions& options = {}, + io::IoStatistics* ioStats = nullptr) override; /// Initialize a WriteFile /// First the method google::cloud::storage::Client::GetObjectMetadata diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp index 856f2b2526de..ca60a8c71ed6 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -90,7 +90,8 @@ std::string HdfsFileSystem::name() const { std::unique_ptr HdfsFileSystem::openFileForRead( std::string_view path, - const FileOptions& /*unused*/) { + const FileOptions& /*unused*/, + io::IoStatistics* /*unused*/) { // Only remove the schema for hdfs path. if (path.find(kScheme) == 0) { path.remove_prefix(kScheme.length()); diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h index b541ec629baf..e320d634d12d 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h @@ -55,7 +55,8 @@ class HdfsFileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options = {}) override; + const FileOptions& options = {}, + io::IoStatistics* ioStats = nullptr) override; std::unique_ptr openFileForWrite( std::string_view path, diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index cd57ce79e846..ecbf994c64ab 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -740,7 +740,8 @@ std::string S3FileSystem::getLogLevelName() const { std::unique_ptr S3FileSystem::openFileForRead( std::string_view s3Path, - const FileOptions& options) { + const FileOptions& options, + io::IoStatistics* ioStats) { const auto path = getPath(s3Path); auto s3file = std::make_unique(path, impl_->s3Client()); s3file->initialize(options); diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index c1e73198d48f..a66d783b664f 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -37,7 +37,8 @@ class S3FileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view s3Path, - const FileOptions& options = {}) override; + const FileOptions& options = {}, + io::IoStatistics* ioStats = nullptr) override; std::unique_ptr openFileForWrite( std::string_view s3Path, diff --git a/velox/dwio/common/Throttler.cpp b/velox/dwio/common/Throttler.cpp index 2e1431bdc5c8..87a7eabef796 100644 --- a/velox/dwio/common/Throttler.cpp +++ b/velox/dwio/common/Throttler.cpp @@ -270,6 +270,7 @@ uint64_t Throttler::calculateBackoffDurationAndUpdateThrottleCache( std::unique_ptr Throttler::ThrottleSignalGenerator::operator()( const std::string& /*unused*/, + const void* /*unused*/, const void* /*unused*/) { return std::unique_ptr(new ThrottleSignal{1}); } diff --git a/velox/dwio/common/Throttler.h b/velox/dwio/common/Throttler.h index 0ebf1e088205..5d76785d43ca 100644 --- a/velox/dwio/common/Throttler.h +++ b/velox/dwio/common/Throttler.h @@ -175,6 +175,7 @@ class Throttler { std::unique_ptr operator()( const std::string& /*unused*/, + const void* /*unused*/, const void* /*unused*/); }; diff --git a/velox/experimental/wave/common/KernelCache.cpp b/velox/experimental/wave/common/KernelCache.cpp index 0604512b69df..674aa197163b 100644 --- a/velox/experimental/wave/common/KernelCache.cpp +++ b/velox/experimental/wave/common/KernelCache.cpp @@ -94,9 +94,8 @@ class AsyncCompiledKernel : public CompiledKernel { class KernelGenerator { public: - std::unique_ptr operator()( - const std::string, - const KernelGenFunc* gen) { + std::unique_ptr + operator()(const std::string, const KernelGenFunc* gen, const void* stats) { using ModulePromise = folly::Promise; struct PromiseHolder { ModulePromise promise;