From 91d66ebc709ec3517af45832f965fced2ffbefe3 Mon Sep 17 00:00:00 2001 From: Rohan Chitale Date: Mon, 17 Feb 2025 08:31:42 -0800 Subject: [PATCH] [Remote Vector Index Build] Initial implementation of Index Build Service (#2532) * fix broken build flag, move build to one directory (#2442) * move build to one directory, fix broken flag Signed-off-by: Samuel Herman * fix make path Signed-off-by: Samuel Herman * changelog update Signed-off-by: Samuel Herman * add fix for classpath change and for cmake discovery on macos Signed-off-by: Samuel Herman * fix make discovery for gradle Signed-off-by: Samuel Herman * fix cmake path for macOS Signed-off-by: Samuel Herman --------- Signed-off-by: Samuel Herman * Update package name to fix compilation issue (#2513) * Update package name to fix compilation issue Core renamed this package in https://github.com/opensearch-project/OpenSearch/pull/17272 This commit renames package accordingly Signed-off-by: Balasubramanian * Update build.gradle and build.sh to separate x64 linux nmslib build with different gcc versions (#2506) (#2508) * Update build gradle to separate nmslib / faiss generation Signed-off-by: Peter Zhu * Update scripts/build.sh to separate enable gcc switch Signed-off-by: Peter Zhu * Remove test comments Signed-off-by: Peter Zhu * Remove test comments Signed-off-by: Peter Zhu * Remove test comments Signed-off-by: Peter Zhu * Updating restart and rolling upgrade bwc test bundle.gradle Signed-off-by: Peter Zhu * Enforce gcc10 for nmslib to compile and avx512_spr have no-op Signed-off-by: Peter Zhu --------- Signed-off-by: Peter Zhu (cherry picked from commit 107c4f1b485e37061f526856dd8a52cbf5103993) Co-authored-by: Peter Zhu --------- Signed-off-by: Balasubramanian Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Co-authored-by: Peter Zhu * Adds debug logs for KNNQuery and KNNWeight (#2466) (#2470) * Adds debug logs for KNNQuery and KNNWeight Signed-off-by: Tejas Shah * Adds check to see if log is enabled to start and stop StopWatch Signed-off-by: Tejas Shah * Addressing comments on the PR Signed-off-by: Tejas Shah * Adds shard and segment info in the logs Signed-off-by: Tejas Shah * Removes unnecessary segment name param from exact search Signed-off-by: Tejas Shah * Fixes the build Signed-off-by: Tejas Shah --------- Signed-off-by: Tejas Shah (cherry picked from commit f322e27538f5d3eb8bd330f4a2c1d6bc28026774) Co-authored-by: Tejas Shah * Clean JNI artifacts with ./gradlew clean (#2516) * clean JNI artifacts with ./gradlew clean Signed-off-by: Samuel Herman * nest release under build directory Signed-off-by: Samuel Herman * adjust all references to the old release path Signed-off-by: Samuel Herman * remove irrelevant paths from gitignore, add jni/build Signed-off-by: Samuel Herman * fix logging on linux Signed-off-by: Samuel Herman --------- Signed-off-by: Samuel Herman * Backport to main (#2520) * Remove skip building graph check for quantization use case (#2430) For quantization indices, we don't have to apply building graph check since it is already faster, this is now only applied for fp32/16 indices and where threshold is configured. Signed-off-by: Vijayan Balasubramanian * Update default to 0 to always build graph as default behavior (#2452) Signed-off-by: Balasubramanian * Update changelog Signed-off-by: Balasubramanian --------- Signed-off-by: Vijayan Balasubramanian Signed-off-by: Balasubramanian * [Backport main] Add release notes for 2.19.0 (#2503) * Add release notes for 2.19.0 Signed-off-by: Kunal Kotwani * Fix links for release notes Co-authored-by: John Mazanec Signed-off-by: Kunal Kotwani --------- Signed-off-by: Kunal Kotwani Co-authored-by: John Mazanec * Fix main knnlib dir in build script based on #2442 (#2526) Signed-off-by: Peter Zhu * Minor performance improvments in KNNQueryBuilder (#2528) Signed-off-by: Tejas Shah * Initial implementation of control flow Signed-off-by: Rohan Chitale --------- Signed-off-by: Samuel Herman Signed-off-by: Balasubramanian Signed-off-by: Vijayan Balasubramanian Signed-off-by: Kunal Kotwani Signed-off-by: Peter Zhu Signed-off-by: Tejas Shah Signed-off-by: Rohan Chitale Co-authored-by: sam-herman <97131656+sam-herman@users.noreply.github.com> Co-authored-by: Vijayan Balasubramanian Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Co-authored-by: Peter Zhu Co-authored-by: Tejas Shah Co-authored-by: Kunal Kotwani Co-authored-by: John Mazanec --- .github/workflows/CI.yml | 5 + .github/workflows/test_security.yml | 4 + .gitignore | 17 +- CHANGELOG.md | 39 +---- build.gradle | 106 +++++++++++-- jni/cmake/init-faiss.cmake | 6 +- jni/cmake/macros.cmake | 4 +- jni/external/faiss | 2 +- jni/external/nmslib | 2 +- qa/restart-upgrade/build.gradle | 7 +- qa/rolling-upgrade/build.gradle | 15 +- .../opensearch-knn.release-notes-2.19.0.0.md | 53 +++++++ remote-index-build-service/Dockerfile | 25 +++ remote-index-build-service/README.md | 7 + remote-index-build-service/app/__init__.py | 0 .../app/api/__init__.py | 2 + .../app/api/routes/__init__.py | 2 + .../app/api/routes/build.py | 24 +++ .../app/api/routes/cancel.py | 18 +++ .../app/api/routes/status.py | 26 +++ .../app/builder/__init__.py | 2 + .../app/core/__init__.py | 2 + remote-index-build-service/app/core/config.py | 29 ++++ .../app/core/exceptions.py | 21 +++ .../app/core/resources.py | 40 +++++ .../app/executors/__init__.py | 2 + .../app/executors/workflow_executor.py | 98 ++++++++++++ remote-index-build-service/app/main.py | 60 +++++++ .../app/metric/__init__.py | 0 .../app/models/__init__.py | 2 + remote-index-build-service/app/models/job.py | 21 +++ .../app/models/request.py | 15 ++ .../app/models/workflow.py | 10 ++ .../app/object_store/__init__.py | 0 .../app/schemas/__init__.py | 2 + remote-index-build-service/app/schemas/api.py | 49 ++++++ .../app/services/__init__.py | 2 + .../app/services/index_builder.py | 72 +++++++++ .../app/services/job_service.py | 148 ++++++++++++++++++ .../app/storage/__init__.py | 2 + .../app/storage/base.py | 32 ++++ .../app/storage/factory.py | 16 ++ .../app/storage/memory.py | 74 +++++++++ .../app/storage/types.py | 6 + .../app/utils/__init__.py | 2 + remote-index-build-service/app/utils/hash.py | 9 ++ .../app/utils/logging_config.py | 11 ++ .../app/utils/memory.py | 38 +++++ .../app/utils/request.py | 11 ++ remote-index-build-service/requirements.txt | 3 + scripts/build.sh | 12 +- .../knn/index/KNNCircuitBreaker.java | 2 +- .../org/opensearch/knn/index/KNNSettings.java | 4 +- .../NativeEngines990KnnVectorsWriter.java | 10 +- .../opensearch/knn/index/query/KNNQuery.java | 23 ++- .../knn/index/query/KNNQueryBuilder.java | 78 ++++----- .../knn/index/query/KNNQueryFactory.java | 6 +- .../opensearch/knn/index/query/KNNWeight.java | 39 ++++- .../org/opensearch/knn/indices/ModelDao.java | 2 +- .../org/opensearch/knn/plugin/KNNPlugin.java | 2 +- .../plugin/rest/RestClearCacheHandler.java | 2 +- .../plugin/rest/RestDeleteModelHandler.java | 2 +- .../knn/plugin/rest/RestGetModelHandler.java | 2 +- .../knn/plugin/rest/RestKNNStatsHandler.java | 2 +- .../knn/plugin/rest/RestKNNWarmupHandler.java | 2 +- .../plugin/rest/RestSearchModelHandler.java | 2 +- .../plugin/rest/RestTrainModelHandler.java | 2 +- .../TrainingJobRouterTransportAction.java | 2 +- .../opensearch/knn/training/VectorReader.java | 2 +- ...eEngines990KnnVectorsWriterFlushTests.java | 11 +- ...TrainingJobRouterTransportActionTests.java | 2 +- .../QuantizationStateCacheTests.java | 2 +- 72 files changed, 1196 insertions(+), 158 deletions(-) create mode 100644 release-notes/opensearch-knn.release-notes-2.19.0.0.md create mode 100644 remote-index-build-service/Dockerfile create mode 100644 remote-index-build-service/README.md create mode 100644 remote-index-build-service/app/__init__.py create mode 100644 remote-index-build-service/app/api/__init__.py create mode 100644 remote-index-build-service/app/api/routes/__init__.py create mode 100644 remote-index-build-service/app/api/routes/build.py create mode 100644 remote-index-build-service/app/api/routes/cancel.py create mode 100644 remote-index-build-service/app/api/routes/status.py create mode 100644 remote-index-build-service/app/builder/__init__.py create mode 100644 remote-index-build-service/app/core/__init__.py create mode 100644 remote-index-build-service/app/core/config.py create mode 100644 remote-index-build-service/app/core/exceptions.py create mode 100644 remote-index-build-service/app/core/resources.py create mode 100644 remote-index-build-service/app/executors/__init__.py create mode 100644 remote-index-build-service/app/executors/workflow_executor.py create mode 100644 remote-index-build-service/app/main.py create mode 100644 remote-index-build-service/app/metric/__init__.py create mode 100644 remote-index-build-service/app/models/__init__.py create mode 100644 remote-index-build-service/app/models/job.py create mode 100644 remote-index-build-service/app/models/request.py create mode 100644 remote-index-build-service/app/models/workflow.py create mode 100644 remote-index-build-service/app/object_store/__init__.py create mode 100644 remote-index-build-service/app/schemas/__init__.py create mode 100644 remote-index-build-service/app/schemas/api.py create mode 100644 remote-index-build-service/app/services/__init__.py create mode 100644 remote-index-build-service/app/services/index_builder.py create mode 100644 remote-index-build-service/app/services/job_service.py create mode 100644 remote-index-build-service/app/storage/__init__.py create mode 100644 remote-index-build-service/app/storage/base.py create mode 100644 remote-index-build-service/app/storage/factory.py create mode 100644 remote-index-build-service/app/storage/memory.py create mode 100644 remote-index-build-service/app/storage/types.py create mode 100644 remote-index-build-service/app/utils/__init__.py create mode 100644 remote-index-build-service/app/utils/hash.py create mode 100644 remote-index-build-service/app/utils/logging_config.py create mode 100644 remote-index-build-service/app/utils/memory.py create mode 100644 remote-index-build-service/app/utils/request.py create mode 100644 remote-index-build-service/requirements.txt diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index f844161d0..b886b25a5 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -40,6 +40,11 @@ jobs: matrix: java: [21, 23] + env: + CC: gcc10-gcc + CXX: gcc10-g++ + FC: gcc10-gfortran + name: Build and Test k-NN Plugin on Linux runs-on: ubuntu-latest needs: Get-CI-Image-Tag diff --git a/.github/workflows/test_security.yml b/.github/workflows/test_security.yml index 4cfbbf47c..0845c792a 100644 --- a/.github/workflows/test_security.yml +++ b/.github/workflows/test_security.yml @@ -39,6 +39,10 @@ jobs: strategy: matrix: java: [21] + env: + CC: gcc10-gcc + CXX: gcc10-g++ + FC: gcc10-gfortran name: Run Integration Tests on Linux runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index 34f7ff154..016891ded 100644 --- a/.gitignore +++ b/.gitignore @@ -14,26 +14,11 @@ out/ oss/* *.iml -jni/CMakeCache.txt -jni/CMakeFiles -jni/Makefile -jni/cmake_install.cmake -jni/release -jni/packages -jni/CTestTestfile.cmake -jni/KNNPlugin_JNI.cbp jni/Testing/ -jni/_deps/ jni/bin/ -jni/lib/ -jni/jni_test* -jni/googletest* +jni/build/ jni/cmake/*.cmake-e -jni/.cmake jni/.idea -jni/build.ninja -jni/.ninja_deps -jni/.ninja_log benchmarks/perf-tool/okpt/output benchmarks/perf-tool/okpt/dev diff --git a/CHANGELOG.md b/CHANGELOG.md index 44986af0c..97e814267 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,50 +13,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Upgrade min JDK compatibility to JDK 21 [#2422](https://github.com/opensearch-project/k-NN/pull/2422) ### Documentation ### Maintenance +* Update package name to fix compilation issue [#2513](https://github.com/opensearch-project/k-NN/pull/2513) ### Refactoring -## [Unreleased 2.x](https://github.com/opensearch-project/k-NN/compare/2.18...2.x) +## [Unreleased 2.x](https://github.com/opensearch-project/k-NN/compare/2.19...2.x) ### Features -- Add Support for Multi Values in innerHit for Nested k-NN Fields in Lucene and FAISS (#2283)[https://github.com/opensearch-project/k-NN/pull/2283] -- Add binary index support for Lucene engine. (#2292)[https://github.com/opensearch-project/k-NN/pull/2292] -- Add expand_nested_docs Parameter support to NMSLIB engine (#2331)[https://github.com/opensearch-project/k-NN/pull/2331] -- Add a new build mode, `FAISS_OPT_LEVEL=avx512_spr`, which enables the use of advanced AVX-512 instructions introduced with Intel(R) Sapphire Rapids (#2404)[https://github.com/opensearch-project/k-NN/pull/2404] -- Add cosine similarity support for faiss engine (#2376)[https://github.com/opensearch-project/k-NN/pull/2376] -- Add derived source feature for vector fields (#2449)[https://github.com/opensearch-project/k-NN/pull/2449] ### Enhancements -- Introduced a writing layer in native engines where relies on the writing interface to process IO. (#2241)[https://github.com/opensearch-project/k-NN/pull/2241] -- Allow method parameter override for training based indices (#2290) https://github.com/opensearch-project/k-NN/pull/2290] -- Optimizes lucene query execution to prevent unnecessary rewrites (#2305)[https://github.com/opensearch-project/k-NN/pull/2305] -- Add check to directly use ANN Search when filters match all docs. (#2320)[https://github.com/opensearch-project/k-NN/pull/2320] -- Use one formula to calculate cosine similarity (#2357)[https://github.com/opensearch-project/k-NN/pull/2357] -- Add WithFieldName implementation to KNNQueryBuilder (#2398)[https://github.com/opensearch-project/k-NN/pull/2398] -- Make the build work for M series MacOS without manual code changes and local JAVA_HOME config (#2397)[https://github.com/opensearch-project/k-NN/pull/2397] -- Enabled concurrent graph creation for Lucene engine with index thread qty settings(#2480)[https://github.com/opensearch-project/k-NN/pull/2480] -- Remove DocsWithFieldSet reference from NativeEngineFieldVectorsWriter (#2408)[https://github.com/opensearch-project/k-NN/pull/2408] ### Bug Fixes -* Fixing the bug when a segment has no vector field present for disk based vector search (#2282)[https://github.com/opensearch-project/k-NN/pull/2282] -* Fixing the bug where search fails with "fields" parameter for an index with a knn_vector field (#2314)[https://github.com/opensearch-project/k-NN/pull/2314] -* Fix for NPE while merging segments after all the vector fields docs are deleted (#2365)[https://github.com/opensearch-project/k-NN/pull/2365] -* Allow validation for non knn index only after 2.17.0 (#2315)[https://github.com/opensearch-project/k-NN/pull/2315] -* Fixing the bug to prevent updating the index.knn setting after index creation(#2348)[https://github.com/opensearch-project/k-NN/pull/2348] -* Release query vector memory after execution (#2346)[https://github.com/opensearch-project/k-NN/pull/2346] -* Fix shard level rescoring disabled setting flag (#2352)[https://github.com/opensearch-project/k-NN/pull/2352] -* Fix filter rewrite logic which was resulting in getting inconsistent / incorrect results for cases where filter was getting rewritten for shards (#2359)[https://github.com/opensearch-project/k-NN/pull/2359] -* Fixing it to retrieve space_type from index setting when both method and top level don't have the value. [#2374](https://github.com/opensearch-project/k-NN/pull/2374) -* Fixing the bug where setting rescore as false for on_disk knn_vector query is a no-op (#2399)[https://github.com/opensearch-project/k-NN/pull/2399] -* Fixing bug where mapping accepts both dimension and model-id (#2410)[https://github.com/opensearch-project/k-NN/pull/2410] -* Add version check for full field name validation (#2477)[https://github.com/opensearch-project/k-NN/pull/2477] ### Infrastructure -* Updated C++ version in JNI from c++11 to c++17 [#2259](https://github.com/opensearch-project/k-NN/pull/2259) -* Upgrade bytebuddy and objenesis version to match OpenSearch core and, update github ci runner for macos [#2279](https://github.com/opensearch-project/k-NN/pull/2279) ### Documentation ### Maintenance -* Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236) -* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2308](https://github.com/opensearch-project/k-NN/pull/2308) -* Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field (#2278)[https://github.com/opensearch-project/k-NN/pull/2278] -* Added Lucene BWC tests (#2313)[https://github.com/opensearch-project/k-NN/pull/2313] -* Upgrade jsonpath from 2.8.0 to 2.9.0[2325](https://github.com/opensearch-project/k-NN/pull/2325) -* Bump Faiss commit from 1f42e81 to 0cbc2a8 to accelerate hamming distance calculation using _mm512_popcnt_epi64 intrinsic and also add avx512-fp16 instructions to boost performance [#2381](https://github.com/opensearch-project/k-NN/pull/2381) * Enabled indices.breaker.total.use_real_memory setting via build.gradle for integTest Cluster to catch heap CB in local ITs and github CI actions [#2395](https://github.com/opensearch-project/k-NN/pull/2395/) * Fixing Lucene912Codec Issue with BWC for Lucene 10.0.1 upgrade[#2429](https://github.com/opensearch-project/k-NN/pull/2429) +* Enabled idempotency of local builds when using `./gradlew clean` and nest `jni/release` directory under `jni/build` for easier cleanup [#2516](https://github.com/opensearch-project/k-NN/pull/2516) ### Refactoring diff --git a/build.gradle b/build.gradle index 710a1483b..b5f715847 100644 --- a/build.gradle +++ b/build.gradle @@ -329,11 +329,52 @@ task windowsPatches(type:Exec) { commandLine 'cmd', '/c', "Powershell -File $rootDir\\scripts\\windowsScript.ps1" } -task cmakeJniLib(type:Exec) { - workingDir 'jni' +def findExecutable(String executableName, List additionalPaths = []) { + // Print the task's environment before setting it + // Print PATH specifically + logger.lifecycle("\nSystem PATH:") + logger.lifecycle(System.getenv("PATH")) + + def commonBasePaths = [ + "/opt/homebrew/bin", + "/usr/local/bin", + "/usr/bin" + ] + + // Start with just the executable name (will use system PATH) + def execPath = executableName + + if (Os.isFamily(Os.FAMILY_MAC)) { + def searchPaths = [] + // Add common paths + commonBasePaths.each { basePath -> + searchPaths.add("${basePath}/${executableName}") + } + // Add any additional specific paths + searchPaths.addAll(additionalPaths) + + for (path in searchPaths) { + if (new File(path).exists()) { + logger.lifecycle("Found ${executableName} at: ${path}") + execPath = path + break + } + } + } + + return execPath +} + +tasks.register('cmakeJniLib', Exec) { + def cmakePath = findExecutable("cmake") + logger.lifecycle("Using cmake at: ${cmakePath}") + // Inherit the current environment + environment System.getenv() + def args = [] - args.add("cmake") - args.add(".") + args.add(cmakePath) + args.add("-S jni") // CMakelists.txt directory + args.add("-B jni/build") // Build directory args.add("-DKNN_PLUGIN_VERSION=${opensearch_version}") args.add("-DAVX2_ENABLED=${avx2_enabled}") args.add("-DAVX512_ENABLED=${avx512_enabled}") @@ -343,7 +384,7 @@ task cmakeJniLib(type:Exec) { def javaHome = Jvm.current().getJavaHome() logger.lifecycle("Java home directory used by gradle: $javaHome") if (Os.isFamily(Os.FAMILY_MAC)) { - environment('JAVA_HOME',javaHome) + environment('JAVA_HOME', javaHome) } if (Os.isFamily(Os.FAMILY_WINDOWS)) { dependsOn windowsPatches @@ -353,35 +394,75 @@ task cmakeJniLib(type:Exec) { args.add("-DLAPACK_LIBRARIES=$rootDir\\src\\main\\resources\\windowsDependencies\\libopenblas.dll") } + // Print the task's environment after setting it + logger.lifecycle("\nTask Environment PATH:") + logger.lifecycle(environment.get('PATH')) + + // Print the command that will be executed + logger.lifecycle("CMake command: ${args.join(' ')}") + def outputStream = new ByteArrayOutputStream() commandLine args + standardOutput = outputStream +} + +// Makes sure that `./gradlew clean` removes all JNI build artifacts +tasks.clean.doFirst { + // Delete JNI build directory + delete "${projectDir}/jni/build" +} + +task buildNmslib(type:Exec) { + dependsOn cmakeJniLib + def cmakePath = findExecutable("cmake") + logger.lifecycle("Using cmake at: ${cmakePath}") + + if (cmakePath.isEmpty()) { + throw new GradleException("CMake not found in PATH. Please install CMake.") + } + + commandLine cmakePath, + '--build', 'jni/build', + '--target', 'opensearchknn_nmslib', + '--parallel', "${nproc_count}" } task buildJniLib(type:Exec) { dependsOn cmakeJniLib - workingDir 'jni' - commandLine 'make', 'opensearchknn_nmslib', 'opensearchknn_faiss', 'opensearchknn_common', '-j', "${nproc_count}" + def cmakePath = findExecutable("cmake") + logger.lifecycle("Using cmake at: ${cmakePath}") + + if (cmakePath.isEmpty()) { + throw new GradleException("CMake not found in PATH. Please install CMake.") + } + + commandLine cmakePath, + '--build', 'jni/build', + '--target', 'opensearchknn_faiss', 'opensearchknn_common', + '--parallel', "${nproc_count}" } test { + dependsOn buildNmslib dependsOn buildJniLib systemProperty 'tests.security.manager', 'false' - systemProperty "java.library.path", "$rootDir/jni/release" + systemProperty "java.library.path", "$rootDir/jni/build/release" //this change enables mockito-inline that supports mocking of static classes/calls systemProperty "jdk.attach.allowAttachSelf", true if (Os.isFamily(Os.FAMILY_WINDOWS)) { // Add the paths of built JNI libraries and its dependent libraries to PATH variable in System variables - environment('PATH', System.getenv('PATH') + ";$rootDir/jni/release" + ";$rootDir/src/main/resources/windowsDependencies") + environment('PATH', System.getenv('PATH') + ";$rootDir/jni/build/release" + ";$rootDir/src/main/resources/windowsDependencies") } } def _numNodes = findProperty('numNodes') as Integer ?: 1 integTest { if (integTestDependOnJniLib) { + dependsOn buildNmslib dependsOn buildJniLib } systemProperty 'tests.security.manager', 'false' systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath - systemProperty "java.library.path", "$rootDir/jni/release" + systemProperty "java.library.path", "$rootDir/jni/build/release" // allows integration test classes to access test resource from project root path systemProperty('project.root', project.rootDir.absolutePath) @@ -433,7 +514,7 @@ testClusters.integTest { plugin(project.tasks.bundlePlugin.archiveFile) if (Os.isFamily(Os.FAMILY_WINDOWS)) { // Add the paths of built JNI libraries and its dependent libraries to PATH variable in System variables - environment('PATH', System.getenv('PATH') + ";$rootDir/jni/release" + ";$rootDir/src/main/resources/windowsDependencies") + environment('PATH', System.getenv('PATH') + ";$rootDir/jni/build/release" + ";$rootDir/src/main/resources/windowsDependencies") } // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 @@ -448,7 +529,7 @@ testClusters.integTest { debugPort += 1 } } - systemProperty("java.library.path", "$rootDir/jni/release") + systemProperty("java.library.path", "$rootDir/jni/build/release") systemProperty propertyKeys.breaker.useRealMemory, getBreakerSetting() } @@ -475,6 +556,7 @@ task integTestRemote(type: RestIntegTestTask) { run { useCluster project.testClusters.integTest + dependsOn buildNmslib dependsOn buildJniLib doFirst { // There seems to be an issue when running multi node run or integ tasks with unicast_hosts diff --git a/jni/cmake/init-faiss.cmake b/jni/cmake/init-faiss.cmake index 3cb90b767..523b3c17d 100644 --- a/jni/cmake/init-faiss.cmake +++ b/jni/cmake/init-faiss.cmake @@ -111,10 +111,10 @@ endif() if(NOT DEFINED AVX512_SPR_ENABLED) # Check if the system is Intel(R) Sapphire Rapids or a newer-generation processor execute_process(COMMAND bash -c "lscpu | grep -q 'GenuineIntel' && lscpu | grep -i 'avx512_fp16' | grep -i 'avx512_bf16' | grep -i 'avx512_vpopcntdq'" OUTPUT_VARIABLE SPR_FLAGS OUTPUT_STRIP_TRAILING_WHITESPACE) - if (AND NOT "${SPR_FLAGS}" STREQUAL "") - set(AVX512_SPR_ENABLED true) + if (NOT "${SPR_FLAGS}" STREQUAL "") + set(AVX512_SPR_ENABLED true) else() - set(AVX512_SPR_ENABLED false) + set(AVX512_SPR_ENABLED false) endif() endif() diff --git a/jni/cmake/macros.cmake b/jni/cmake/macros.cmake index 773033b7e..801b35c56 100644 --- a/jni/cmake/macros.cmake +++ b/jni/cmake/macros.cmake @@ -9,8 +9,8 @@ macro(opensearch_set_common_properties TARGET) if (NOT "${WIN32}" STREQUAL "") # Use RUNTIME_OUTPUT_DIRECTORY, to build the target library in the specified directory at runtime. - set_target_properties(${TARGET} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/release) + set_target_properties(${TARGET} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/release) else() - set_target_properties(${TARGET} PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/release) + set_target_properties(${TARGET} PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/release) endif() endmacro() diff --git a/jni/external/faiss b/jni/external/faiss index 0cbc2a885..c162b33e5 160000 --- a/jni/external/faiss +++ b/jni/external/faiss @@ -1 +1 @@ -Subproject commit 0cbc2a885cde923d80c4bf9c9d6f4d81665f3f64 +Subproject commit c162b33e5a6884508ebf8072c483846fba0b9571 diff --git a/jni/external/nmslib b/jni/external/nmslib index a2d6624e1..b54e1aeca 160000 --- a/jni/external/nmslib +++ b/jni/external/nmslib @@ -1 +1 @@ -Subproject commit a2d6624e1315402662025debfdd614b505d9c3ef +Subproject commit b54e1aecaef3edb81a1a466cd496bdf024827df4 diff --git a/qa/restart-upgrade/build.gradle b/qa/restart-upgrade/build.gradle index 3dbf151a7..64f4dd4f5 100644 --- a/qa/restart-upgrade/build.gradle +++ b/qa/restart-upgrade/build.gradle @@ -26,7 +26,7 @@ testClusters { environment "LD_LIBRARY_PATH", "${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/knnlib;${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/lib" if (Os.isFamily(Os.FAMILY_WINDOWS)) { // While running on Windows OS, setting the PATH environment variable to include the paths to dlls of JNI libraries and windows dependencies - environment('PATH', System.getenv('PATH') + ";$rootDir/jni/release" + ";$rootDir/src/main/resources/windowsDependencies") + environment('PATH', System.getenv('PATH') + ";$rootDir/jni/build/release" + ";$rootDir/src/main/resources/windowsDependencies") systemProperty "java.library.path", "${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/knnlib;${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/lib" } else { systemProperty "java.library.path", "${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/knnlib:${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/lib" @@ -135,12 +135,13 @@ testClusters { // All nodes are upgraded to latest version and run the tests task testRestartUpgrade(type: StandaloneRestIntegTestTask) { dependsOn "testAgainstOldCluster" + dependsOn rootProject.tasks.buildNmslib dependsOn rootProject.tasks.buildJniLib dependsOn rootProject.tasks.assemble useCluster testClusters."${baseName}" doFirst { - testClusters."${baseName}".environment("LD_LIBRARY_PATH", "$rootDir/jni/release") - testClusters."${baseName}".systemProperty("java.library.path", "$rootDir/jni/release") + testClusters."${baseName}".environment("LD_LIBRARY_PATH", "$rootDir/jni/build/release") + testClusters."${baseName}".systemProperty("java.library.path", "$rootDir/jni/build/release") testClusters."${baseName}".upgradeAllNodesAndPluginsToNextVersion([rootProject.tasks.bundlePlugin.archiveFile]) } systemProperty 'tests.rest.bwcsuite_cluster', 'upgraded_cluster' diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index 9c7940617..33413d086 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -26,7 +26,7 @@ testClusters { environment "LD_LIBRARY_PATH", "${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/knnlib;${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/lib" if (Os.isFamily(Os.FAMILY_WINDOWS)) { // While running on Windows OS, setting the PATH environment variable to include the paths to dlls of JNI libraries and windows dependencies - environment('PATH', System.getenv('PATH') + ";$rootDir/jni/release" + ";$rootDir/src/main/resources/windowsDependencies") + environment('PATH', System.getenv('PATH') + ";$rootDir/jni/build/release" + ";$rootDir/src/main/resources/windowsDependencies") systemProperty "java.library.path", "${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/knnlib;${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/lib" } else { systemProperty "java.library.path", "${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/knnlib:${buildDir}/testclusters/${baseName}-0/distro/${knn_bwc_version_no_qualifier}-ARCHIVE/plugins/opensearch-knn/lib" @@ -52,12 +52,13 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { // This results in a mixed cluster with 2 nodes on the old version and 1 upgraded node. task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) { useCluster testClusters."${baseName}" + dependsOn rootProject.tasks.buildNmslib dependsOn rootProject.tasks.buildJniLib dependsOn rootProject.tasks.assemble dependsOn "testAgainstOldCluster" doFirst { - testClusters."${baseName}".getNodes().getAt("${baseName}" + "-0").environment("LD_LIBRARY_PATH", "$rootDir/jni/release") - testClusters."${baseName}".getNodes().getAt("${baseName}" + "-0").systemProperty("java.library.path", "$rootDir/jni/release") + testClusters."${baseName}".getNodes().getAt("${baseName}" + "-0").environment("LD_LIBRARY_PATH", "$rootDir/jni/build/release") + testClusters."${baseName}".getNodes().getAt("${baseName}" + "-0").systemProperty("java.library.path", "$rootDir/jni/build/release") testClusters."${baseName}".upgradeNodeAndPluginToNextVersion([rootProject.tasks.bundlePlugin.archiveFile]) } systemProperty 'tests.rest.bwcsuite_cluster', 'mixed_cluster' @@ -75,8 +76,8 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) { dependsOn "testAgainstOneThirdUpgradedCluster" useCluster testClusters."${baseName}" doFirst { - testClusters."${baseName}".getNodes().getAt("${baseName}" + "-1").environment("LD_LIBRARY_PATH", "$rootDir/jni/release") - testClusters."${baseName}".getNodes().getAt("${baseName}" + "-1").systemProperty("java.library.path", "$rootDir/jni/release") + testClusters."${baseName}".getNodes().getAt("${baseName}" + "-1").environment("LD_LIBRARY_PATH", "$rootDir/jni/build/release") + testClusters."${baseName}".getNodes().getAt("${baseName}" + "-1").systemProperty("java.library.path", "$rootDir/jni/build/release") testClusters."${baseName}".upgradeNodeAndPluginToNextVersion([rootProject.tasks.bundlePlugin.archiveFile]) } systemProperty 'tests.rest.bwcsuite_cluster', 'mixed_cluster' @@ -94,8 +95,8 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) { dependsOn "testAgainstTwoThirdsUpgradedCluster" useCluster testClusters."${baseName}" doFirst { - testClusters."${baseName}".getNodes().getAt("${baseName}" + "-2").environment("LD_LIBRARY_PATH", "$rootDir/jni/release") - testClusters."${baseName}".getNodes().getAt("${baseName}" + "-2").systemProperty("java.library.path", "$rootDir/jni/release") + testClusters."${baseName}".getNodes().getAt("${baseName}" + "-2").environment("LD_LIBRARY_PATH", "$rootDir/jni/build/release") + testClusters."${baseName}".getNodes().getAt("${baseName}" + "-2").systemProperty("java.library.path", "$rootDir/jni/build/release") testClusters."${baseName}".upgradeNodeAndPluginToNextVersion([rootProject.tasks.bundlePlugin.archiveFile]) } mustRunAfter "testAgainstOneThirdUpgradedCluster" diff --git a/release-notes/opensearch-knn.release-notes-2.19.0.0.md b/release-notes/opensearch-knn.release-notes-2.19.0.0.md new file mode 100644 index 000000000..8b72a9a04 --- /dev/null +++ b/release-notes/opensearch-knn.release-notes-2.19.0.0.md @@ -0,0 +1,53 @@ +## Version 2.19.0.0 Release Notes + +Compatible with OpenSearch 2.19.0 + +### Features +- Add Support for Multi Values in innerHit for Nested k-NN Fields in Lucene and FAISS [#2283](https://github.com/opensearch-project/k-NN/pull/2283) +- Add binary index support for Lucene engine. [#2292](https://github.com/opensearch-project/k-NN/pull/2292) +- Add expand_nested_docs Parameter support to NMSLIB engine [#2331](https://github.com/opensearch-project/k-NN/pull/2331) +- Add a new build mode, `FAISS_OPT_LEVEL=avx512_spr`, which enables the use of advanced AVX-512 instructions introduced with Intel[R] Sapphire Rapids [#2404](https://github.com/opensearch-project/k-NN/pull/2404) +- Add cosine similarity support for faiss engine [#2376](https://github.com/opensearch-project/k-NN/pull/2376) +- Add concurrency optimizations with native memory graph loading and force eviction [#2265](https://github.com/opensearch-project/k-NN/pull/2345) +- Add derived source feature for vector fields [#2449](https://github.com/opensearch-project/k-NN/pull/2449) +### Enhancements +- Introduced a writing layer in native engines where relies on the writing interface to process IO. [#2241](https://github.com/opensearch-project/k-NN/pull/2241) +- Allow method parameter override for training based indices [#2290](https://github.com/opensearch-project/k-NN/pull/2290) +- Optimizes lucene query execution to prevent unnecessary rewrites [#2305](https://github.com/opensearch-project/k-NN/pull/2305) +- Added more detailed error messages for KNN model training [#2378](https://github.com/opensearch-project/k-NN/pull/2378) +- Add check to directly use ANN Search when filters match all docs. [#2320](https://github.com/opensearch-project/k-NN/pull/2320) +- Use one formula to calculate cosine similarity [#2357](https://github.com/opensearch-project/k-NN/pull/2357) +- Make the build work for M series MacOS without manual code changes and local JAVA_HOME config [#2397](https://github.com/opensearch-project/k-NN/pull/2397) +- Remove DocsWithFieldSet reference from NativeEngineFieldVectorsWriter [#2408](https://github.com/opensearch-project/k-NN/pull/2408) +- Remove skip building graph check for quantization use case [#2430](https://github.com/opensearch-project/k-NN/pull/2430) +- Removing redundant type conversions for script scoring for hamming space with binary vectors [#2351](https://github.com/opensearch-project/k-NN/pull/2351) +- Update default to 0 to always build graph as default behavior [#2452](https://github.com/opensearch-project/k-NN/pull/2452) +- Enabled concurrent graph creation for Lucene engine with index thread qty settings[#2480](https://github.com/opensearch-project/k-NN/pull/2480) +### Bug Fixes +* Fixing the bug when a segment has no vector field present for disk based vector search [#2282](https://github.com/opensearch-project/k-NN/pull/2282) +* Fixing the bug where search fails with "fields" parameter for an index with a knn_vector field [#2314](https://github.com/opensearch-project/k-NN/pull/2314) +* Fix for NPE while merging segments after all the vector fields docs are deleted [#2365](https://github.com/opensearch-project/k-NN/pull/2365) +* Allow validation for non knn index only after 2.17.0 [#2315](https://github.com/opensearch-project/k-NN/pull/2315) +* Fixing the bug to prevent updating the index.knn setting after index creation[#2348](https://github.com/opensearch-project/k-NN/pull/2348) +* Release query vector memory after execution [#2346](https://github.com/opensearch-project/k-NN/pull/2346) +* Fix shard level rescoring disabled setting flag [#2352](https://github.com/opensearch-project/k-NN/pull/2352) +* Fix filter rewrite logic which was resulting in getting inconsistent / incorrect results for cases where filter was getting rewritten for shards [#2359](https://github.com/opensearch-project/k-NN/pull/2359) +* Fixing it to retrieve space_type from index setting when both method and top level don't have the value. [#2374](https://github.com/opensearch-project/k-NN/pull/2374) +* Fixing the bug where setting rescore as false for on_disk knn_vector query is a no-op [#2399](https://github.com/opensearch-project/k-NN/pull/2399) +* Fixing the bug to prevent index.knn setting from being modified or removed on restore snapshot [#2445](https://github.com/opensearch-project/k-NN/pull/2445) +* Fix Faiss byte vector efficient filter bug [#2448](https://github.com/opensearch-project/k-NN/pull/2448) +* Fixing bug where mapping accepts both dimension and model-id [#2410](https://github.com/opensearch-project/k-NN/pull/2410) +* Add version check for full field name validation [#2477](https://github.com/opensearch-project/k-NN/pull/2477) +### Infrastructure +* Updated C++ version in JNI from c++11 to c++17 [#2259](https://github.com/opensearch-project/k-NN/pull/2259) +* Upgrade bytebuddy and objenesis version to match OpenSearch core and, update github ci runner for macos [#2279](https://github.com/opensearch-project/k-NN/pull/2279) +### Documentation +### Maintenance +* Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236) +* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2308](https://github.com/opensearch-project/k-NN/pull/2308) +* Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field [#2278](https://github.com/opensearch-project/k-NN/pull/2278) +* Added Lucene BWC tests [#2313](https://github.com/opensearch-project/k-NN/pull/2313) +* Upgrade jsonpath from 2.8.0 to 2.9.0[2325](https://github.com/opensearch-project/k-NN/pull/2325) +* Bump Faiss commit from 1f42e81 to 0cbc2a8 to accelerate hamming distance calculation using _mm512_popcnt_epi64 intrinsic and also add avx512-fp16 instructions to boost performance [#2381](https://github.com/opensearch-project/k-NN/pull/2381) +* Deprecate nmslib engine [#2427](https://github.com/opensearch-project/k-NN/pull/2427) +* Add spotless mirror repo for fixing builds [#2453](https://github.com/opensearch-project/k-NN/pull/2453) diff --git a/remote-index-build-service/Dockerfile b/remote-index-build-service/Dockerfile new file mode 100644 index 000000000..3420f6050 --- /dev/null +++ b/remote-index-build-service/Dockerfile @@ -0,0 +1,25 @@ +FROM python:3.12 + +WORKDIR /code + +COPY ./requirements.txt /code/requirements.txt + +RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt + +COPY ./app /code/app +ENV PYTHONPATH=/code/app + +ENV REQUEST_STORE_MAX_SIZE=1000 +ENV REQUEST_STORE_TTL_SECONDS=900 +ENV REQUEST_STORE_TYPE='memory' + +ENV GPU_MEMORY_LIMIT=3.0 +ENV CPU_MEMORY_LIMIT=32.0 + +ENV MAX_WORKERS=5 + +ENV SERVICE_NAME='remote-vector-index-builder' + +ENV LOG_LEVEL="INFO" + +CMD ["fastapi", "run", "app/main.py", "--port", "80"] \ No newline at end of file diff --git a/remote-index-build-service/README.md b/remote-index-build-service/README.md new file mode 100644 index 000000000..47e9df07a --- /dev/null +++ b/remote-index-build-service/README.md @@ -0,0 +1,7 @@ +## Build the container + +``` docker build -t worker .``` + +## Run the container + +``` docker run -p 80:80 worker ``` \ No newline at end of file diff --git a/remote-index-build-service/app/__init__.py b/remote-index-build-service/app/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/remote-index-build-service/app/api/__init__.py b/remote-index-build-service/app/api/__init__.py new file mode 100644 index 000000000..cfa39dcf2 --- /dev/null +++ b/remote-index-build-service/app/api/__init__.py @@ -0,0 +1,2 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 diff --git a/remote-index-build-service/app/api/routes/__init__.py b/remote-index-build-service/app/api/routes/__init__.py new file mode 100644 index 000000000..cfa39dcf2 --- /dev/null +++ b/remote-index-build-service/app/api/routes/__init__.py @@ -0,0 +1,2 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 diff --git a/remote-index-build-service/app/api/routes/build.py b/remote-index-build-service/app/api/routes/build.py new file mode 100644 index 000000000..9b467f0a7 --- /dev/null +++ b/remote-index-build-service/app/api/routes/build.py @@ -0,0 +1,24 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +from core.exceptions import HashCollisionError, CapacityError +from fastapi import APIRouter, HTTPException, Request +from schemas.api import CreateJobRequest, CreateJobResponse + +import logging + +router = APIRouter() +logger = logging.getLogger(__name__) + +@router.post("/_build") +def create_job(create_job_request: CreateJobRequest, request: Request) -> CreateJobResponse: + + logger.info(f"Received create job request: {create_job_request}") + + try: + job_service = request.app.state.job_service + job_id = job_service.create_job(create_job_request) + except HashCollisionError as e: + raise HTTPException(status_code=429, detail=str(e)) + except CapacityError as e: + raise HTTPException(status_code=507, detail=str(e)) + return CreateJobResponse(job_id=job_id) \ No newline at end of file diff --git a/remote-index-build-service/app/api/routes/cancel.py b/remote-index-build-service/app/api/routes/cancel.py new file mode 100644 index 000000000..36dabdd0d --- /dev/null +++ b/remote-index-build-service/app/api/routes/cancel.py @@ -0,0 +1,18 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +from fastapi import APIRouter, HTTPException, Request +from schemas.api import CancelJobResponse + +router = APIRouter() + +@router.post("/_cancel/{job_id}") +def cancel_job(job_id: str, request: Request): + + job_service = request.app.state.job_service + job = job_service.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + if job_service.cancel_job(): + return CancelJobResponse(status="success") + else: + return CancelJobResponse(status="fail") diff --git a/remote-index-build-service/app/api/routes/status.py b/remote-index-build-service/app/api/routes/status.py new file mode 100644 index 000000000..fd20c3a95 --- /dev/null +++ b/remote-index-build-service/app/api/routes/status.py @@ -0,0 +1,26 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +from fastapi import APIRouter, HTTPException, Request +from schemas.api import GetStatusResponse + +router = APIRouter() + +@router.get("/_status/{job_id}") +def get_status(job_id: str, request: Request) -> GetStatusResponse: + + job_service = request.app.state.job_service + job = job_service.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + response_data = { + "task_status": job.status + } + + if hasattr(job, 'knn_index_path') and job.knn_index_path is not None: + response_data["knn_index_path"] = job.knn_index_path + + if hasattr(job, 'msg') and job.msg is not None: + response_data["msg"] = job.msg + + return GetStatusResponse(**response_data) \ No newline at end of file diff --git a/remote-index-build-service/app/builder/__init__.py b/remote-index-build-service/app/builder/__init__.py new file mode 100644 index 000000000..cfa39dcf2 --- /dev/null +++ b/remote-index-build-service/app/builder/__init__.py @@ -0,0 +1,2 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 diff --git a/remote-index-build-service/app/core/__init__.py b/remote-index-build-service/app/core/__init__.py new file mode 100644 index 000000000..cfa39dcf2 --- /dev/null +++ b/remote-index-build-service/app/core/__init__.py @@ -0,0 +1,2 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 diff --git a/remote-index-build-service/app/core/config.py b/remote-index-build-service/app/core/config.py new file mode 100644 index 000000000..a31baa750 --- /dev/null +++ b/remote-index-build-service/app/core/config.py @@ -0,0 +1,29 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +from pydantic_settings import BaseSettings +from storage.types import RequestStoreType + +class Settings(BaseSettings): + + """ + Settings class for the application. Pulls the settings + from the Docker container environment variables + """ + + # Request Store settings + request_store_type: RequestStoreType + request_store_max_size: int + request_store_ttl_seconds: int + + # Resource Manager settings + gpu_memory_limit: float + cpu_memory_limit: float + + # Workflow Executor settings + max_workers: int + + # Service settings + service_name: str + log_level: str + diff --git a/remote-index-build-service/app/core/exceptions.py b/remote-index-build-service/app/core/exceptions.py new file mode 100644 index 000000000..f74e6c756 --- /dev/null +++ b/remote-index-build-service/app/core/exceptions.py @@ -0,0 +1,21 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +class BuildServiceError(Exception): + """Base exception for build service errors""" + pass + +class BuildError(BuildServiceError): + """Raised when there's an error during index building""" + pass + +class ObjectStoreError(BuildServiceError): + """Raised when there's an error with object store operations""" + pass + +class HashCollisionError(BuildServiceError): + """Raised when there's a hash collision in the Request Store""" + pass + +class CapacityError(BuildServiceError): + """Raised when the worker does not have enough capacity to fulfill the request""" + pass \ No newline at end of file diff --git a/remote-index-build-service/app/core/resources.py b/remote-index-build-service/app/core/resources.py new file mode 100644 index 000000000..6f19809ad --- /dev/null +++ b/remote-index-build-service/app/core/resources.py @@ -0,0 +1,40 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +import threading +from typing import Optional + +class ResourceManager: + def __init__(self, total_gpu_memory: float, total_cpu_memory: float): + self._total_gpu_memory = total_gpu_memory + self._total_cpu_memory = total_cpu_memory + self._available_gpu_memory = total_gpu_memory + self._available_cpu_memory = total_cpu_memory + self._lock = threading.Lock() + + def can_allocate(self, gpu_memory: float, cpu_memory: float) -> bool: + with self._lock: + return (self._available_gpu_memory >= gpu_memory and + self._available_cpu_memory >= cpu_memory) + + def allocate(self, gpu_memory: float, cpu_memory: float) -> bool: + if not self.can_allocate(gpu_memory, cpu_memory): + return False + with self._lock: + self._available_gpu_memory -= gpu_memory + self._available_cpu_memory -= cpu_memory + return True + return False + + def release(self, gpu_memory: float, cpu_memory: float) -> None: + with self._lock: + self._available_gpu_memory += gpu_memory + self._available_cpu_memory += cpu_memory + + def get_available_gpu_memory(self) -> float: + with self._lock: + return self._available_gpu_memory + + def get_available_cpu_memory(self) -> float: + with self._lock: + return self._available_cpu_memory \ No newline at end of file diff --git a/remote-index-build-service/app/executors/__init__.py b/remote-index-build-service/app/executors/__init__.py new file mode 100644 index 000000000..cfa39dcf2 --- /dev/null +++ b/remote-index-build-service/app/executors/__init__.py @@ -0,0 +1,2 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 diff --git a/remote-index-build-service/app/executors/workflow_executor.py b/remote-index-build-service/app/executors/workflow_executor.py new file mode 100644 index 000000000..3f61f5979 --- /dev/null +++ b/remote-index-build-service/app/executors/workflow_executor.py @@ -0,0 +1,98 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +from concurrent.futures import ThreadPoolExecutor +import logging +from typing import Optional, Dict, Callable +from models.workflow import BuildWorkflow +from core.resources import ResourceManager +from core.exceptions import BuildError, ObjectStoreError +from storage.base import RequestStore +from models.job import JobStatus + +logger = logging.getLogger(__name__) + +class WorkflowExecutor: + def __init__( + self, + max_workers: int, + request_store: RequestStore, + resource_manager: ResourceManager, + build_index_fn: Callable[[BuildWorkflow], tuple[bool, Optional[str], Optional[str]]] + ): + self._executor = ThreadPoolExecutor(max_workers=max_workers) + self._request_store = request_store + self._resource_manager = resource_manager + self._build_index_fn = build_index_fn + + def submit_workflow(self, workflow: BuildWorkflow) -> bool: + """ + Submit a workflow for execution. + Returns True if submission was successful. + """ + + # Submit the workflow to thread pool + self._executor.submit( + self._execute_workflow, + workflow + ) + + return True + + def _execute_workflow(self, workflow: BuildWorkflow) -> None: + """ + Execute the workflow and handle results. + """ + try: + logger.info(f"Starting execution of job {workflow.job_id}") + + if not self._request_store.get(workflow.job_id): + logger.info(f"Job {workflow.job_id} was cancelled during submission") + return + + # Execute the build + success, index_path, msg = self._build_index_fn(workflow) + + # Check if job still exists before updating status + if self._request_store.get(workflow.job_id): + status = JobStatus.COMPLETED if success else JobStatus.FAILED + self._request_store.update( + workflow.job_id, + { + "status": status, + "knn_index_path": index_path, + "msg": msg + } + ) + + logger.info( + f"Job {workflow.job_id} completed with status {status}" + ) + else: + logger.info(f"Job {workflow.job_id} was cancelled during execution") + + except (BuildError, ObjectStoreError, MemoryError, RuntimeError) as e: + logger.error( + f"Build process failed for job {workflow.job_id}: {str(e)}" + ) + self._request_store.update( + workflow.job.id, + { + "status": status, + "knn_index_path": index_path, + "msg": str(e) + } + ) + finally: + # Release resources + self._resource_manager.release( + workflow.gpu_memory_required, + workflow.cpu_memory_required + ) + + + def shutdown(self) -> None: + """ + Shutdown the executor + """ + self._executor.shutdown(wait=True) \ No newline at end of file diff --git a/remote-index-build-service/app/main.py b/remote-index-build-service/app/main.py new file mode 100644 index 000000000..aab341429 --- /dev/null +++ b/remote-index-build-service/app/main.py @@ -0,0 +1,60 @@ +from api.routes import build, status, cancel +from fastapi import FastAPI +from core.config import Settings +from core.resources import ResourceManager +from executors.workflow_executor import WorkflowExecutor +from models.workflow import BuildWorkflow +from services.index_builder import IndexBuilder +from services.job_service import JobService +from storage.factory import RequestStoreFactory +from utils.logging_config import configure_logging + +import logging + +settings = Settings() + +configure_logging(settings.log_level) + +logger = logging.getLogger(__name__) + +request_store = RequestStoreFactory.create( + store_type=settings.request_store_type, + settings=settings +) + +resource_manager = ResourceManager( + total_gpu_memory=settings.gpu_memory_limit, + total_cpu_memory=settings.cpu_memory_limit +) + +index_builder = IndexBuilder(settings) + +workflow_executor = WorkflowExecutor( + max_workers=settings.max_workers, + request_store=request_store, + resource_manager=resource_manager, + build_index_fn=index_builder.build_index +) + +job_service = JobService( + request_store=request_store, + resource_manager=resource_manager, + workflow_executor=workflow_executor, + total_gpu_memory=settings.gpu_memory_limit, + total_cpu_memory=settings.cpu_memory_limit +) + +app = FastAPI( + title=settings.service_name +) + +app.state.job_service = job_service + +@app.on_event("shutdown") +async def shutdown_event(): + logger.info("Shutting down application ...") + workflow_executor.shutdown() + +app.include_router(build.router) +app.include_router(status.router) +app.include_router(cancel.router) \ No newline at end of file diff --git a/remote-index-build-service/app/metric/__init__.py b/remote-index-build-service/app/metric/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/remote-index-build-service/app/models/__init__.py b/remote-index-build-service/app/models/__init__.py new file mode 100644 index 000000000..139597f9c --- /dev/null +++ b/remote-index-build-service/app/models/__init__.py @@ -0,0 +1,2 @@ + + diff --git a/remote-index-build-service/app/models/job.py b/remote-index-build-service/app/models/job.py new file mode 100644 index 000000000..c96ed8ef2 --- /dev/null +++ b/remote-index-build-service/app/models/job.py @@ -0,0 +1,21 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +from enum import Enum +from pydantic import BaseModel +from models.request import RequestParameters +from typing import Optional + +class JobStatus(str, Enum): + RUNNING = "RUNNING_INDEX_BUILD" + FAILED = "FAILED_INDEX_BUILD" + COMPLETED = "COMPLETED_INDEX_BUILD" + +class Job(BaseModel): + id: str + status: JobStatus + request_parameters: RequestParameters + knn_index_path: Optional[str] = None + msg: Optional[str] = None + + def compare_request_parameters(self, other: RequestParameters) -> bool: + return self.request_parameters == other \ No newline at end of file diff --git a/remote-index-build-service/app/models/request.py b/remote-index-build-service/app/models/request.py new file mode 100644 index 000000000..dc944cd72 --- /dev/null +++ b/remote-index-build-service/app/models/request.py @@ -0,0 +1,15 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +from pydantic import BaseModel + +class RequestParameters(BaseModel): + object_path: str + tenant_id: str + + def __str__(self): + return f"{self.object_path}-{self.tenant_id}" + + def __eq__(self, other): + if not isinstance(other, RequestParameters): + return False + return str(self) == str(other) \ No newline at end of file diff --git a/remote-index-build-service/app/models/workflow.py b/remote-index-build-service/app/models/workflow.py new file mode 100644 index 000000000..75d17de5c --- /dev/null +++ b/remote-index-build-service/app/models/workflow.py @@ -0,0 +1,10 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +from pydantic import BaseModel +from schemas.api import CreateJobRequest + +class BuildWorkflow(BaseModel): + job_id: str + gpu_memory_required: float + cpu_memory_required: float + create_job_request: CreateJobRequest diff --git a/remote-index-build-service/app/object_store/__init__.py b/remote-index-build-service/app/object_store/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/remote-index-build-service/app/schemas/__init__.py b/remote-index-build-service/app/schemas/__init__.py new file mode 100644 index 000000000..cfa39dcf2 --- /dev/null +++ b/remote-index-build-service/app/schemas/__init__.py @@ -0,0 +1,2 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 diff --git a/remote-index-build-service/app/schemas/api.py b/remote-index-build-service/app/schemas/api.py new file mode 100644 index 000000000..54f4eae3b --- /dev/null +++ b/remote-index-build-service/app/schemas/api.py @@ -0,0 +1,49 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +from pydantic import BaseModel, Field +from typing import Optional +from enum import Enum + +class DataType(str, Enum): + FLOAT32 = 'fp32' + FLOAT16 = 'fp16' + BYTE = 'byte' + +class AlgorithmParameters(BaseModel): + ef_construction: int = 128 + m: int = 16 + +class IndexParameters(BaseModel): + engine: str = "faiss" + name: str = "hnsw" + space_type: str = "l2" + algorithm_parameters: AlgorithmParameters = Field( + default_factory=AlgorithmParameters + ) + +class CreateJobRequest(BaseModel): + repository_type: str + repository_name: str + object_path: str + tenant_id: str + dimension: int + doc_count: int + data_type: DataType = DataType.FLOAT32 + index_parameters: IndexParameters = Field( + default_factory=IndexParameters + ) + + class Config: + extra = "forbid" + +class CreateJobResponse(BaseModel): + job_id: str + +class GetStatusResponse(BaseModel): + task_status: str + knn_index_path: Optional[str] = None + msg: Optional[str] = None + +class CancelJobResponse(BaseModel): + status: str \ No newline at end of file diff --git a/remote-index-build-service/app/services/__init__.py b/remote-index-build-service/app/services/__init__.py new file mode 100644 index 000000000..cfa39dcf2 --- /dev/null +++ b/remote-index-build-service/app/services/__init__.py @@ -0,0 +1,2 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 diff --git a/remote-index-build-service/app/services/index_builder.py b/remote-index-build-service/app/services/index_builder.py new file mode 100644 index 000000000..6622d221c --- /dev/null +++ b/remote-index-build-service/app/services/index_builder.py @@ -0,0 +1,72 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +import logging +from typing import Optional, Tuple +import tempfile +import time +from models.workflow import BuildWorkflow +from schemas.api import CreateJobRequest + +logger = logging.getLogger(__name__) + +# TODO: Implement object store, GPU builder clients +class IndexBuilder: + def __init__(self, settings): + self.settings = settings + + def build_index(self, workflow: BuildWorkflow) -> Tuple[bool, Optional[str], Optional[str]]: + """ + Builds the index for the given workflow. + Returns (success, index_path). + """ + # Create temporary directory for processing + with tempfile.TemporaryDirectory() as temp_dir: + # Download vectors + vector_path = self._download_vectors( + workflow.create_job_request, + temp_dir + ) + + # Build index + index_path = self._build_gpu_index( + vector_path, + workflow.create_job_request, + temp_dir + ) + + # Upload index + final_path = self._upload_index( + index_path, + workflow.create_job_request, + temp_dir + ) + + return True, final_path, "success!" + + def _download_vectors(self, create_job_request: CreateJobRequest, temp_dir: str) -> str: + """ + Download vectors from object store to temporary directory. + Returns local path to vectors file. + TODO: use object store client from object_store package + """ + time.sleep(5) + return "done" + + def _build_gpu_index(self, vector_path: str, create_job_request: CreateJobRequest, temp_dir: str) -> str: + """ + Build GPU index + Returns path to built index. + TODO: use builder client from builder package + """ + time.sleep(5) + return "done" + + def _upload_index(self, index_path: str, create_job_request: CreateJobRequest, temp_dir: str) -> str: + """ + Upload built index to object store. + Returns final object store path. + TODO: use object store client from object_store package + """ + time.sleep(5) + return "done" diff --git a/remote-index-build-service/app/services/job_service.py b/remote-index-build-service/app/services/job_service.py new file mode 100644 index 000000000..3961ab768 --- /dev/null +++ b/remote-index-build-service/app/services/job_service.py @@ -0,0 +1,148 @@ +from typing import Optional +from core.exceptions import HashCollisionError, CapacityError +from core.resources import ResourceManager +from executors.workflow_executor import WorkflowExecutor +from models.job import Job, JobStatus +from models.request import RequestParameters +from models.workflow import BuildWorkflow +from utils.hash import generate_job_id +from utils.memory import calculate_memory_requirements +from utils.request import create_request_parameters +from storage.base import RequestStore +from schemas.api import CreateJobRequest + +import logging + +logger = logging.getLogger(__name__) + + +class JobService: + def __init__( + self, + request_store: RequestStore, + workflow_executor: WorkflowExecutor, + resource_manager: ResourceManager, + total_gpu_memory: float, + total_cpu_memory: float + ): + self.request_store = request_store + self.workflow_executor = workflow_executor + self.total_gpu_memory = total_gpu_memory + self.total_cpu_memory = total_cpu_memory + self.resource_manager = resource_manager + + def _validate_job_existence(self, job_id: str, request_parameters: RequestParameters) -> bool: + job = self.request_store.get(job_id) + if job: + if job.compare_request_parameters(request_parameters): + return True + raise HashCollisionError(f"Hash collision detected for job_id: {job_id}") + return False + + def _get_required_resources(self, create_job_request: CreateJobRequest) -> tuple[float, float]: + gpu_mem, cpu_mem = calculate_memory_requirements( + create_job_request.dimension, + create_job_request.doc_count, + create_job_request.data_type, + create_job_request.index_parameters.algorithm_parameters.m + ) + + logger.info(f"Job id requirements: GPU memory: {gpu_mem}, CPU memory: {cpu_mem}") + if not self.resource_manager.can_allocate(gpu_mem, cpu_mem): + raise CapacityError(f"Insufficient available GPU and CPU resources to process job") + + return gpu_mem, cpu_mem + + def _add_to_request_store(self, job_id: str, request_parameters: RequestParameters) -> None: + result = self.request_store.add( + job_id, + Job( + id=job_id, + status=JobStatus.RUNNING, + request_parameters=request_parameters + ) + ) + + if not result: + raise CapacityError("Could not add item to request store") + + def _create_workflow(self, job_id: str, gpu_mem: float, cpu_mem: float, create_job_request: CreateJobRequest) -> BuildWorkflow: + workflow = BuildWorkflow( + job_id=job_id, + gpu_memory_required=gpu_mem, + cpu_memory_required=cpu_mem, + create_job_request=create_job_request + ) + + # Allocate resources + allocation_success = self.resource_manager.allocate( + workflow.gpu_memory_required, + workflow.cpu_memory_required + ) + + if not allocation_success: + self.request_store.delete(job_id) + raise CapacityError( + f"Insufficient available resources to process workflow {workflow.job_id}" + ) + + return workflow + + def create_job(self, create_job_request: CreateJobRequest) -> str: + """ + Creates a new job based on the provided request. + + Args: + create_job_request: The job creation request containing necessary parameters + + Returns: + str: The ID of the created job + + Raises: + HashCollisionError: If same job id for different request exists + CapacityError: If worker does not have memory for request + """ + # Create parameters and validate job + request_parameters = create_request_parameters(create_job_request) + job_id = generate_job_id(request_parameters) + job_exists = self._validate_job_existence(job_id, request_parameters) + if job_exists: + logger.info(f"Job with id {job_id} already exists") + return job_id + + gpu_mem, cpu_mem = self._get_required_resources(create_job_request) + + self._add_to_request_store(job_id, request_parameters) + logger.info(f"Added job to request store with job id: {job_id}") + + # used to determine if clean up is necessary + workflow = None + submit_success = False + + try: + workflow = self._create_workflow(job_id, gpu_mem, cpu_mem, create_job_request) + logger.info( + f"Worker resource status - GPU: {self.resource_manager.get_available_gpu_memory():,} units, " + f"CPU: {self.resource_manager.get_available_cpu_memory():,} units" + ) + submit_success = self.workflow_executor.submit_workflow(workflow) + logger.info(f"Successfully created workflow with job id: {job_id}") + + finally: + if not submit_success and workflow: + # submitting to the thread pool executor failed, + # so we need to clean up the resources and request store + self.resource_manager.release( + workflow.gpu_memory_required, + workflow.cpu_memory_required + ) + self.request_store.delete(workflow.job_id) + + return job_id + + def get_job(self, job_id: str) -> Optional[Job]: + return self.request_store.get(job_id) + + def cancel_job(self, job_id: str) -> bool: + return self.request_store.delete(job_id) + diff --git a/remote-index-build-service/app/storage/__init__.py b/remote-index-build-service/app/storage/__init__.py new file mode 100644 index 000000000..cfa39dcf2 --- /dev/null +++ b/remote-index-build-service/app/storage/__init__.py @@ -0,0 +1,2 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 diff --git a/remote-index-build-service/app/storage/base.py b/remote-index-build-service/app/storage/base.py new file mode 100644 index 000000000..dc7bf08ad --- /dev/null +++ b/remote-index-build-service/app/storage/base.py @@ -0,0 +1,32 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any +from models.job import Job + +class RequestStore(ABC): + @abstractmethod + def add(self, job_id: str, job: Job) -> bool: + """Add a job to the store""" + pass + + @abstractmethod + def get(self, job_id: str) -> Optional[Job]: + """Retrieve a job from the store""" + pass + + @abstractmethod + def update(self, job_id: str, data: Dict[str, Any]) -> bool: + """Update a job in the store""" + pass + + @abstractmethod + def delete(self, job_id: str) -> bool: + """Delete a job from the store""" + pass + + @abstractmethod + def cleanup_expired(self) -> None: + """Clean up expired entries""" + pass \ No newline at end of file diff --git a/remote-index-build-service/app/storage/factory.py b/remote-index-build-service/app/storage/factory.py new file mode 100644 index 000000000..f9158c4ad --- /dev/null +++ b/remote-index-build-service/app/storage/factory.py @@ -0,0 +1,16 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +from core.config import Settings +from enum import Enum +from storage.base import RequestStore +from storage.memory import InMemoryRequestStore +from storage.types import RequestStoreType + +class RequestStoreFactory: + @staticmethod + def create(store_type: RequestStoreType, settings: Settings) -> RequestStore: + if store_type == RequestStoreType.MEMORY: + return InMemoryRequestStore(settings) + else: + raise ValueError(f"Unsupported request store type: {store_type}") \ No newline at end of file diff --git a/remote-index-build-service/app/storage/memory.py b/remote-index-build-service/app/storage/memory.py new file mode 100644 index 000000000..c06942395 --- /dev/null +++ b/remote-index-build-service/app/storage/memory.py @@ -0,0 +1,74 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +from typing import Dict, Optional, Any +from datetime import datetime, timedelta, timezone +import threading +import time + +from models.job import Job +from storage.base import RequestStore +from core.config import Settings + +class InMemoryRequestStore(RequestStore): + def __init__(self, settings: Settings): + self._store: Dict[str, tuple[Job, datetime]] = {} + self._lock = threading.Lock() + self._max_size = settings.request_store_max_size + self._ttl_seconds = settings.request_store_ttl_seconds + + # Start cleanup thread + self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) + self._cleanup_thread.start() + + def add(self, job_id: str, job: Job) -> bool: + with self._lock: + if len(self._store) >= self._max_size: + return False + + self._store[job_id] = (job, datetime.now(timezone.utc)) + return True + + def get(self, job_id: str) -> Optional[Job]: + with self._lock: + if job_id in self._store: + job, timestamp = self._store[job_id] + if datetime.now(timezone.utc) - timestamp < timedelta(seconds=self._ttl_seconds): + return job + else: + del self._store[job_id] + return None + + def update(self, job_id: str, data: Dict[str, Any]) -> bool: + with self._lock: + if job_id not in self._store: + return False + + job, timestamp = self._store[job_id] + for key, value in data.items(): + setattr(job, key, value) + self._store[job_id] = (job, timestamp) + return True + + def delete(self, job_id: str) -> bool: + with self._lock: + if job_id in self._store: + del self._store[job_id] + return True + return False + + def cleanup_expired(self) -> None: + with self._lock: + current_time = datetime.now(timezone.utc) + expiration_threshold = current_time - timedelta(seconds=self._ttl_seconds) + + self._store = { + job_id: data + for job_id, data in self._store.items() + if data[1] > expiration_threshold + } + + def _cleanup_loop(self) -> None: + while True: + time.sleep(5) # Run cleanup every 5 seconds + self.cleanup_expired() \ No newline at end of file diff --git a/remote-index-build-service/app/storage/types.py b/remote-index-build-service/app/storage/types.py new file mode 100644 index 000000000..189020caa --- /dev/null +++ b/remote-index-build-service/app/storage/types.py @@ -0,0 +1,6 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +from enum import Enum + +class RequestStoreType(str, Enum): + MEMORY = "memory" \ No newline at end of file diff --git a/remote-index-build-service/app/utils/__init__.py b/remote-index-build-service/app/utils/__init__.py new file mode 100644 index 000000000..cfa39dcf2 --- /dev/null +++ b/remote-index-build-service/app/utils/__init__.py @@ -0,0 +1,2 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 diff --git a/remote-index-build-service/app/utils/hash.py b/remote-index-build-service/app/utils/hash.py new file mode 100644 index 000000000..98a20d99c --- /dev/null +++ b/remote-index-build-service/app/utils/hash.py @@ -0,0 +1,9 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +import hashlib +from models.job import RequestParameters + +def generate_job_id(request_parameters: RequestParameters) -> str: + combined = str(request_parameters).encode() + return hashlib.sha256(combined).hexdigest() \ No newline at end of file diff --git a/remote-index-build-service/app/utils/logging_config.py b/remote-index-build-service/app/utils/logging_config.py new file mode 100644 index 000000000..0bf729764 --- /dev/null +++ b/remote-index-build-service/app/utils/logging_config.py @@ -0,0 +1,11 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +import logging + +def configure_logging(log_level): + # Configure logging + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) \ No newline at end of file diff --git a/remote-index-build-service/app/utils/memory.py b/remote-index-build-service/app/utils/memory.py new file mode 100644 index 000000000..703f5a7dc --- /dev/null +++ b/remote-index-build-service/app/utils/memory.py @@ -0,0 +1,38 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +from schemas.api import DataType + +def calculate_memory_requirements( + vector_dimensions: int, + num_vectors: int, + data_type: DataType, + m: int +) -> tuple[float, float]: + """ + Calculate GPU and CPU memory requirements for a vector workload. + + This function estimates the memory needed for processing vector operations, + taking into account the workload size and complexity. + + Returns: + tuple[float, float]: A tuple containing: + - gpu_memory_gb (float): Required GPU memory in gigabytes + - cpu_memory_gb (float): Required CPU memory in gigabytes + """ + + if data_type == DataType.FLOAT32: + entry_size = 4 + if data_type == DataType.FLOAT16: + entry_size = 2 + if data_type == DataType.BYTE: + entry_size = 1 + + # Vector memory (same for both GPU and CPU) + vector_memory = (vector_dimensions * num_vectors * entry_size) / (2 ** 30) # 4 bytes per float32 + + # use formula to calculate memory taken up by index + index_gpu_memory = (((vector_dimensions * entry_size + m*8) * 1.1 * num_vectors) / (2 ** 30)) * 1.5 + + index_cpu_memory = ((vector_dimensions * entry_size + m*8) * 1.1 * num_vectors) / (2 ** 30) + + return (index_gpu_memory + vector_memory), (index_cpu_memory + vector_memory) \ No newline at end of file diff --git a/remote-index-build-service/app/utils/request.py b/remote-index-build-service/app/utils/request.py new file mode 100644 index 000000000..57ccbb96f --- /dev/null +++ b/remote-index-build-service/app/utils/request.py @@ -0,0 +1,11 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +from schemas.api import CreateJobRequest +from models.request import RequestParameters + +def create_request_parameters(create_job_request: CreateJobRequest) -> RequestParameters: + return RequestParameters( + object_path=create_job_request.object_path, + tenant_id=create_job_request.tenant_id + ) \ No newline at end of file diff --git a/remote-index-build-service/requirements.txt b/remote-index-build-service/requirements.txt new file mode 100644 index 000000000..172d5af88 --- /dev/null +++ b/remote-index-build-service/requirements.txt @@ -0,0 +1,3 @@ +fastapi[standard]>=0.113.0,<0.114.0 +pydantic>=2.7.0,<3.0.0 +pydantic-settings>=2.0.0,<3.0.0 \ No newline at end of file diff --git a/scripts/build.sh b/scripts/build.sh index a31daad94..0d1b3ee8c 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -133,9 +133,14 @@ cd $work_dir ./gradlew :buildJniLib -Davx512.enabled=false -Davx512_spr.enabled=false -Davx2.enabled=false -Dbuild.lib.commit_patches=false -Dnproc.count=${NPROC_COUNT:-1} if [ "$PLATFORM" != "windows" ] && [ "$ARCHITECTURE" = "x64" ]; then + echo "Building k-NN library nmslib with gcc 10 on non-windows x64" + rm -rf jni/CMakeCache.txt jni/CMakeFiles + env CC=gcc10-gcc CXX=gcc10-g++ FC=gcc10-gfortran ./gradlew :buildNmslib -Dbuild.lib.commit_patches=false -Dbuild.lib.apply_patches=false + echo "Building k-NN library after enabling AVX2" # Skip applying patches as patches were applied already from previous :buildJniLib task # If we apply patches again, it fails with conflict + rm -rf jni/CMakeCache.txt jni/CMakeFiles ./gradlew :buildJniLib -Davx2.enabled=true -Davx512.enabled=false -Davx512_spr.enabled=false -Dbuild.lib.commit_patches=false -Dbuild.lib.apply_patches=false echo "Building k-NN library after enabling AVX512" @@ -143,6 +148,9 @@ if [ "$PLATFORM" != "windows" ] && [ "$ARCHITECTURE" = "x64" ]; then echo "Building k-NN library after enabling AVX512_SPR" ./gradlew :buildJniLib -Davx512_spr.enabled=true -Dbuild.lib.commit_patches=false -Dbuild.lib.apply_patches=false + +else + ./gradlew :buildNmslib -Dbuild.lib.commit_patches=false -Dbuild.lib.apply_patches=false fi ./gradlew publishPluginZipPublicationToZipStagingRepository -Dopensearch.version=$VERSION -Dbuild.snapshot=$SNAPSHOT -Dbuild.version_qualifier=$QUALIFIER @@ -151,7 +159,7 @@ fi # Add lib to zip zipPath=$(find "$(pwd)/build/distributions" -path \*.zip) distributions="$(dirname "${zipPath}")" -mkdir $distributions/lib +mkdir -p $distributions/lib libPrefix="libopensearchknn" if [ "$PLATFORM" = "windows" ]; then libPrefix="opensearchknn" @@ -166,7 +174,7 @@ else ompPath=$(ldconfig -p | grep libgomp | cut -d ' ' -f 4) cp -v $ompPath $distributions/lib fi -cp -v ./jni/release/${libPrefix}* $distributions/lib +cp -v ./jni/build/release/${libPrefix}* $distributions/lib ls -l $distributions/lib # Add lib directory to the k-NN plugin zip diff --git a/src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java b/src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java index f5d49e1a3..fbb025c97 100644 --- a/src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java +++ b/src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java @@ -13,7 +13,7 @@ import org.opensearch.knn.plugin.transport.KNNStatsResponse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.client.Client; +import org.opensearch.transport.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; diff --git a/src/main/java/org/opensearch/knn/index/KNNSettings.java b/src/main/java/org/opensearch/knn/index/KNNSettings.java index 035bddd81..ebb55ea2b 100644 --- a/src/main/java/org/opensearch/knn/index/KNNSettings.java +++ b/src/main/java/org/opensearch/knn/index/KNNSettings.java @@ -11,7 +11,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; -import org.opensearch.client.Client; +import org.opensearch.transport.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Booleans; @@ -103,7 +103,7 @@ public class KNNSettings { public static final boolean KNN_DEFAULT_FAISS_AVX512_DISABLED_VALUE = false; public static final boolean KNN_DEFAULT_FAISS_AVX512_SPR_DISABLED_VALUE = false; public static final String INDEX_KNN_DEFAULT_SPACE_TYPE = "l2"; - public static final Integer INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD_DEFAULT_VALUE = 15_000; + public static final Integer INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD_DEFAULT_VALUE = 0; public static final Integer INDEX_KNN_BUILD_VECTOR_DATA_STRUCTURE_THRESHOLD_MIN = -1; public static final Integer INDEX_KNN_BUILD_VECTOR_DATA_STRUCTURE_THRESHOLD_MAX = Integer.MAX_VALUE - 2; public static final String INDEX_KNN_DEFAULT_SPACE_TYPE_FOR_BINARY = "hamming"; diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriter.java b/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriter.java index 3966a2c95..fb93bfc07 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriter.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriter.java @@ -104,9 +104,8 @@ public void flush(int maxDoc, final Sorter.DocMap sortMap) throws IOException { field.getVectors() ); final QuantizationState quantizationState = train(field.getFieldInfo(), knnVectorValuesSupplier, totalLiveDocs); - // Check only after quantization state writer finish writing its state, since it is required - // even if there are no graph files in segment, which will be later used by exact search - if (shouldSkipBuildingVectorDataStructure(totalLiveDocs)) { + // should skip graph building only for non quantization use case and if threshold is met + if (quantizationState == null && shouldSkipBuildingVectorDataStructure(totalLiveDocs)) { log.info( "Skip building vector data structure for field: {}, as liveDoc: {} is less than the threshold {} during flush", fieldInfo.name, @@ -144,9 +143,8 @@ public void mergeOneField(final FieldInfo fieldInfo, final MergeState mergeState } final QuantizationState quantizationState = train(fieldInfo, knnVectorValuesSupplier, totalLiveDocs); - // Check only after quantization state writer finish writing its state, since it is required - // even if there are no graph files in segment, which will be later used by exact search - if (shouldSkipBuildingVectorDataStructure(totalLiveDocs)) { + // should skip graph building only for non quantization use case and if threshold is met + if (quantizationState == null && shouldSkipBuildingVectorDataStructure(totalLiveDocs)) { log.info( "Skip building vector data structure for field: {}, as liveDoc: {} is less than the threshold {} during merge", fieldInfo.name, diff --git a/src/main/java/org/opensearch/knn/index/query/KNNQuery.java b/src/main/java/org/opensearch/knn/index/query/KNNQuery.java index 1a03f4b99..3a4201bff 100644 --- a/src/main/java/org/opensearch/knn/index/query/KNNQuery.java +++ b/src/main/java/org/opensearch/knn/index/query/KNNQuery.java @@ -10,6 +10,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; +import lombok.extern.log4j.Log4j2; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.FieldExistsQuery; @@ -19,6 +20,7 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.search.join.BitSetProducer; +import org.opensearch.common.StopWatch; import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.index.VectorDataType; import org.opensearch.knn.index.query.rescore.RescoreContext; @@ -32,6 +34,7 @@ * Custom KNN query. Query is used for KNNEngine's that create their own custom segment files. These files need to be * loaded and queried in a custom manner throughout the query path. */ +@Log4j2 @Getter @Builder @AllArgsConstructor @@ -45,7 +48,6 @@ public class KNNQuery extends Query { private final String indexName; private final VectorDataType vectorDataType; private final RescoreContext rescoreContext; - @Setter private Query filterQuery; @Getter @@ -53,6 +55,10 @@ public class KNNQuery extends Query { private Float radius; private Context context; + // Note: ideally query should not have to deal with shard level information. Adding it for logging purposes only + // TODO: ThreadContext does not work with logger, remove this from here once its figured out + private int shardId; + public KNNQuery( final String field, final float[] queryVector, @@ -168,7 +174,22 @@ public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float bo if (!KNNSettings.isKNNPluginEnabled()) { throw new IllegalStateException("KNN plugin is disabled. To enable update knn.plugin.enabled to true"); } + StopWatch stopWatch = null; + if (log.isDebugEnabled()) { + stopWatch = new StopWatch().start(); + } + final Weight filterWeight = getFilterWeight(searcher); + if (log.isDebugEnabled() && stopWatch != null) { + stopWatch.stop(); + log.debug( + "Creating filter weight, Shard: [{}], field: [{}] took in nanos: [{}]", + shardId, + field, + stopWatch.totalTime().nanos() + ); + } + if (filterWeight != null) { return new KNNWeight(this, boost, filterWeight); } diff --git a/src/main/java/org/opensearch/knn/index/query/KNNQueryBuilder.java b/src/main/java/org/opensearch/knn/index/query/KNNQueryBuilder.java index 7b27fd5a4..f032210aa 100644 --- a/src/main/java/org/opensearch/knn/index/query/KNNQueryBuilder.java +++ b/src/main/java/org/opensearch/knn/index/query/KNNQueryBuilder.java @@ -9,7 +9,6 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.extern.log4j.Log4j2; -import org.apache.commons.lang.StringUtils; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.opensearch.common.ValidationException; @@ -25,6 +24,7 @@ import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.WithFieldName; import org.opensearch.knn.index.engine.KNNMethodConfigContext; +import org.opensearch.knn.index.engine.KNNMethodContext; import org.opensearch.knn.index.engine.model.QueryContext; import org.opensearch.knn.index.engine.qframe.QuantizationConfig; import org.opensearch.knn.index.mapper.KNNMappingConfig; @@ -48,7 +48,6 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; import static org.opensearch.knn.common.KNNConstants.EXPAND_NESTED; import static org.opensearch.knn.common.KNNConstants.MAX_DISTANCE; @@ -395,40 +394,12 @@ protected Query doToQuery(QueryShardContext context) { } KNNVectorFieldType knnVectorFieldType = (KNNVectorFieldType) mappedFieldType; KNNMappingConfig knnMappingConfig = knnVectorFieldType.getKnnMappingConfig(); - final AtomicReference queryConfigFromMapping = new AtomicReference<>(); - int fieldDimension = knnMappingConfig.getDimension(); - knnMappingConfig.getKnnMethodContext() - .ifPresentOrElse( - knnMethodContext -> queryConfigFromMapping.set( - new QueryConfigFromMapping( - knnMethodContext.getKnnEngine(), - knnMethodContext.getMethodComponentContext(), - knnMethodContext.getSpaceType(), - knnVectorFieldType.getVectorDataType() - ) - ), - () -> knnMappingConfig.getModelId().ifPresentOrElse(modelId -> { - ModelMetadata modelMetadata = getModelMetadataForField(modelId); - queryConfigFromMapping.set( - new QueryConfigFromMapping( - modelMetadata.getKnnEngine(), - modelMetadata.getMethodComponentContext(), - modelMetadata.getSpaceType(), - modelMetadata.getVectorDataType() - ) - ); - }, - () -> { - throw new IllegalArgumentException( - String.format(Locale.ROOT, "Field '%s' is not built for ANN search.", this.fieldName) - ); - } - ) - ); - KNNEngine knnEngine = queryConfigFromMapping.get().getKnnEngine(); - MethodComponentContext methodComponentContext = queryConfigFromMapping.get().getMethodComponentContext(); - SpaceType spaceType = queryConfigFromMapping.get().getSpaceType(); - VectorDataType vectorDataType = queryConfigFromMapping.get().getVectorDataType(); + QueryConfigFromMapping queryConfigFromMapping = getQueryConfig(knnMappingConfig, knnVectorFieldType); + + KNNEngine knnEngine = queryConfigFromMapping.getKnnEngine(); + MethodComponentContext methodComponentContext = queryConfigFromMapping.getMethodComponentContext(); + SpaceType spaceType = queryConfigFromMapping.getSpaceType(); + VectorDataType vectorDataType = queryConfigFromMapping.getVectorDataType(); RescoreContext processedRescoreContext = knnVectorFieldType.resolveRescoreContext(rescoreContext); knnVectorFieldType.transformQueryVector(vector); @@ -437,7 +408,7 @@ protected Query doToQuery(QueryShardContext context) { // This could be null in the case of when a model did not have serialized methodComponent information final String method = methodComponentContext != null ? methodComponentContext.getName() : null; - if (StringUtils.isNotBlank(method)) { + if (method != null && !method.isBlank()) { final KNNLibrarySearchContext engineSpecificMethodContext = knnEngine.getKNNLibrarySearchContext(method); QueryContext queryContext = new QueryContext(vectorQueryType); ValidationException validationException = validateParameters( @@ -496,9 +467,13 @@ protected Query doToQuery(QueryShardContext context) { } int vectorLength = VectorDataType.BINARY == vectorDataType ? vector.length * Byte.SIZE : vector.length; - if (fieldDimension != vectorLength) { + if (knnMappingConfig.getDimension() != vectorLength) { throw new IllegalArgumentException( - String.format("Query vector has invalid dimension: %d. Dimension should be: %d", vectorLength, fieldDimension) + String.format( + "Query vector has invalid dimension: %d. Dimension should be: %d", + vectorLength, + knnMappingConfig.getDimension() + ) ); } @@ -574,6 +549,31 @@ protected Query doToQuery(QueryShardContext context) { throw new IllegalArgumentException(String.format(Locale.ROOT, "[%s] requires k or distance or score to be set", NAME)); } + private QueryConfigFromMapping getQueryConfig(final KNNMappingConfig knnMappingConfig, final KNNVectorFieldType knnVectorFieldType) { + + if (knnMappingConfig.getKnnMethodContext().isPresent()) { + KNNMethodContext knnMethodContext = knnMappingConfig.getKnnMethodContext().get(); + return new QueryConfigFromMapping( + knnMethodContext.getKnnEngine(), + knnMethodContext.getMethodComponentContext(), + knnMethodContext.getSpaceType(), + knnVectorFieldType.getVectorDataType() + ); + } + + if (knnMappingConfig.getModelId().isPresent()) { + ModelMetadata modelMetadata = getModelMetadataForField(knnMappingConfig.getModelId().get()); + return new QueryConfigFromMapping( + modelMetadata.getKnnEngine(), + modelMetadata.getMethodComponentContext(), + modelMetadata.getSpaceType(), + modelMetadata.getVectorDataType() + ); + } + + throw new IllegalArgumentException(String.format(Locale.ROOT, "Field '%s' is not built for ANN search.", this.fieldName)); + } + private ModelMetadata getModelMetadataForField(String modelId) { ModelMetadata modelMetadata = modelDao.getMetadata(modelId); if (!ModelUtil.isModelCreated(modelMetadata)) { diff --git a/src/main/java/org/opensearch/knn/index/query/KNNQueryFactory.java b/src/main/java/org/opensearch/knn/index/query/KNNQueryFactory.java index 8e6c97f05..b6770553b 100644 --- a/src/main/java/org/opensearch/knn/index/query/KNNQueryFactory.java +++ b/src/main/java/org/opensearch/knn/index/query/KNNQueryFactory.java @@ -52,9 +52,11 @@ public static Query create(CreateQueryRequest createQueryRequest) { final KNNEngine knnEngine = createQueryRequest.getKnnEngine(); final boolean expandNested = createQueryRequest.getExpandNested().orElse(false); BitSetProducer parentFilter = null; + int shardId = -1; if (createQueryRequest.getContext().isPresent()) { QueryShardContext context = createQueryRequest.getContext().get(); parentFilter = context.getParentFilter(); + shardId = context.getShardId(); } if (parentFilter == null && expandNested) { @@ -93,6 +95,7 @@ public static Query create(CreateQueryRequest createQueryRequest) { .filterQuery(validatedFilterQuery) .vectorDataType(vectorDataType) .rescoreContext(rescoreContext) + .shardId(shardId) .build(); break; default: @@ -106,6 +109,7 @@ public static Query create(CreateQueryRequest createQueryRequest) { .filterQuery(validatedFilterQuery) .vectorDataType(vectorDataType) .rescoreContext(rescoreContext) + .shardId(shardId) .build(); } @@ -117,7 +121,7 @@ public static Query create(CreateQueryRequest createQueryRequest) { requestEfSearch = (Integer) methodParameters.get(METHOD_PARAMETER_EF_SEARCH); } int luceneK = requestEfSearch == null ? k : Math.max(k, requestEfSearch); - log.debug(String.format("Creating Lucene k-NN query for index: %s \"\", field: %s \"\", k: %d", indexName, fieldName, k)); + log.debug("Creating Lucene k-NN query for index: {}, field:{}, k: {}", indexName, fieldName, k); switch (vectorDataType) { case BYTE: case BINARY: diff --git a/src/main/java/org/opensearch/knn/index/query/KNNWeight.java b/src/main/java/org/opensearch/knn/index/query/KNNWeight.java index dc12fd473..c0f023c94 100644 --- a/src/main/java/org/opensearch/knn/index/query/KNNWeight.java +++ b/src/main/java/org/opensearch/knn/index/query/KNNWeight.java @@ -21,6 +21,8 @@ import org.apache.lucene.util.BitSetIterator; import org.apache.lucene.util.Bits; import org.apache.lucene.util.FixedBitSet; +import org.opensearch.common.Nullable; +import org.opensearch.common.StopWatch; import org.opensearch.common.lucene.Lucene; import org.opensearch.knn.common.FieldInfoExtractor; import org.opensearch.knn.common.KNNConstants; @@ -143,7 +145,13 @@ public long cost() { * @return A Map of docId to scores for top k results */ public PerLeafResult searchLeaf(LeafReaderContext context, int k) throws IOException { + final SegmentReader reader = Lucene.segmentReader(context.reader()); + final String segmentName = reader.getSegmentName(); + + StopWatch stopWatch = startStopWatch(); final BitSet filterBitSet = getFilteredDocsBitSet(context); + stopStopWatchAndLog(stopWatch, "FilterBitSet creation", segmentName); + final int maxDoc = context.reader().maxDoc(); int cardinality = filterBitSet.cardinality(); // We don't need to go to JNI layer if no documents are found which satisfy the filters @@ -167,7 +175,10 @@ public PerLeafResult searchLeaf(LeafReaderContext context, int k) throws IOExcep * so that it will not do a bitset look up in bottom search layer. */ final BitSet annFilter = (filterWeight != null && cardinality == maxDoc) ? null : filterBitSet; - final Map docIdsToScoreMap = doANNSearch(context, annFilter, cardinality, k); + + StopWatch annStopWatch = startStopWatch(); + final Map docIdsToScoreMap = doANNSearch(reader, context, annFilter, cardinality, k); + stopStopWatchAndLog(annStopWatch, "ANN search", segmentName); // See whether we have to perform exact search based on approx search results // This is required if there are no native engine files or if approximate search returned @@ -180,6 +191,14 @@ public PerLeafResult searchLeaf(LeafReaderContext context, int k) throws IOExcep return new PerLeafResult(filterWeight == null ? null : filterBitSet, docIdsToScoreMap); } + private void stopStopWatchAndLog(@Nullable final StopWatch stopWatch, final String prefixMessage, String segmentName) { + if (log.isDebugEnabled() && stopWatch != null) { + stopWatch.stop(); + final String logMessage = prefixMessage + " shard: [{}], segment: [{}], field: [{}], time in nanos:[{}] "; + log.debug(logMessage, knnQuery.getShardId(), segmentName, knnQuery.getField(), stopWatch.totalTime().nanos()); + } + } + private BitSet getFilteredDocsBitSet(final LeafReaderContext ctx) throws IOException { if (this.filterWeight == null) { return new FixedBitSet(0); @@ -236,7 +255,7 @@ private Map doExactSearch( final LeafReaderContext context, final DocIdSetIterator acceptedDocs, final long numberOfAcceptedDocs, - int k + final int k ) throws IOException { final ExactSearcherContextBuilder exactSearcherContextBuilder = ExactSearcher.ExactSearcherContext.builder() .isParentHits(true) @@ -251,13 +270,12 @@ private Map doExactSearch( } private Map doANNSearch( + final SegmentReader reader, final LeafReaderContext context, final BitSet filterIdsBitSet, final int cardinality, final int k ) throws IOException { - final SegmentReader reader = Lucene.segmentReader(context.reader()); - FieldInfo fieldInfo = FieldInfoExtractor.getFieldInfo(reader, knnQuery.getField()); if (fieldInfo == null) { @@ -416,7 +434,11 @@ public Map exactSearch( final LeafReaderContext leafReaderContext, final ExactSearcher.ExactSearcherContext exactSearcherContext ) throws IOException { - return exactSearcher.searchLeaf(leafReaderContext, exactSearcherContext); + StopWatch stopWatch = startStopWatch(); + Map exactSearchResults = exactSearcher.searchLeaf(leafReaderContext, exactSearcherContext); + final SegmentReader reader = Lucene.segmentReader(leafReaderContext.reader()); + stopStopWatchAndLog(stopWatch, "Exact search", reader.getSegmentName()); + return exactSearchResults; } @Override @@ -523,4 +545,11 @@ private boolean isMissingNativeEngineFiles(LeafReaderContext context) { ); return engineFiles.isEmpty(); } + + private StopWatch startStopWatch() { + if (log.isDebugEnabled()) { + return new StopWatch().start(); + } + return null; + } } diff --git a/src/main/java/org/opensearch/knn/indices/ModelDao.java b/src/main/java/org/opensearch/knn/indices/ModelDao.java index 387a23587..f7e2f7777 100644 --- a/src/main/java/org/opensearch/knn/indices/ModelDao.java +++ b/src/main/java/org/opensearch/knn/indices/ModelDao.java @@ -36,7 +36,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.support.clustermanager.AcknowledgedResponse; -import org.opensearch.client.Client; +import org.opensearch.transport.client.Client; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.health.ClusterIndexHealth; import org.opensearch.cluster.metadata.IndexMetadata; diff --git a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java index 7fb880f19..44c824862 100644 --- a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java +++ b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java @@ -49,7 +49,7 @@ import com.google.common.collect.ImmutableList; import org.opensearch.action.ActionRequest; -import org.opensearch.client.Client; +import org.opensearch.transport.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; diff --git a/src/main/java/org/opensearch/knn/plugin/rest/RestClearCacheHandler.java b/src/main/java/org/opensearch/knn/plugin/rest/RestClearCacheHandler.java index 2cbc9cd76..d66c187ea 100644 --- a/src/main/java/org/opensearch/knn/plugin/rest/RestClearCacheHandler.java +++ b/src/main/java/org/opensearch/knn/plugin/rest/RestClearCacheHandler.java @@ -8,7 +8,7 @@ import com.google.common.collect.ImmutableList; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; -import org.opensearch.client.node.NodeClient; +import org.opensearch.transport.client.node.NodeClient; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.common.Strings; diff --git a/src/main/java/org/opensearch/knn/plugin/rest/RestDeleteModelHandler.java b/src/main/java/org/opensearch/knn/plugin/rest/RestDeleteModelHandler.java index 750001e58..ea9a8e23d 100644 --- a/src/main/java/org/opensearch/knn/plugin/rest/RestDeleteModelHandler.java +++ b/src/main/java/org/opensearch/knn/plugin/rest/RestDeleteModelHandler.java @@ -12,7 +12,7 @@ package org.opensearch.knn.plugin.rest; import com.google.common.collect.ImmutableList; -import org.opensearch.client.node.NodeClient; +import org.opensearch.transport.client.node.NodeClient; import org.opensearch.core.common.Strings; import org.opensearch.knn.plugin.KNNPlugin; import org.opensearch.knn.plugin.transport.DeleteModelAction; diff --git a/src/main/java/org/opensearch/knn/plugin/rest/RestGetModelHandler.java b/src/main/java/org/opensearch/knn/plugin/rest/RestGetModelHandler.java index 8b1f0676b..9ab221525 100644 --- a/src/main/java/org/opensearch/knn/plugin/rest/RestGetModelHandler.java +++ b/src/main/java/org/opensearch/knn/plugin/rest/RestGetModelHandler.java @@ -13,7 +13,7 @@ import com.google.common.collect.ImmutableList; import org.apache.commons.lang.StringUtils; -import org.opensearch.client.node.NodeClient; +import org.opensearch.transport.client.node.NodeClient; import org.opensearch.knn.plugin.KNNPlugin; import org.opensearch.knn.plugin.transport.GetModelAction; import org.opensearch.knn.plugin.transport.GetModelRequest; diff --git a/src/main/java/org/opensearch/knn/plugin/rest/RestKNNStatsHandler.java b/src/main/java/org/opensearch/knn/plugin/rest/RestKNNStatsHandler.java index 9049a83db..faeb71558 100644 --- a/src/main/java/org/opensearch/knn/plugin/rest/RestKNNStatsHandler.java +++ b/src/main/java/org/opensearch/knn/plugin/rest/RestKNNStatsHandler.java @@ -11,7 +11,7 @@ import org.opensearch.knn.plugin.transport.KNNStatsAction; import org.opensearch.knn.plugin.transport.KNNStatsRequest; import com.google.common.collect.ImmutableList; -import org.opensearch.client.node.NodeClient; +import org.opensearch.transport.client.node.NodeClient; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestActions; diff --git a/src/main/java/org/opensearch/knn/plugin/rest/RestKNNWarmupHandler.java b/src/main/java/org/opensearch/knn/plugin/rest/RestKNNWarmupHandler.java index 42991af13..e8ab5e675 100644 --- a/src/main/java/org/opensearch/knn/plugin/rest/RestKNNWarmupHandler.java +++ b/src/main/java/org/opensearch/knn/plugin/rest/RestKNNWarmupHandler.java @@ -13,7 +13,7 @@ import com.google.common.collect.ImmutableList; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.client.node.NodeClient; +import org.opensearch.transport.client.node.NodeClient; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; diff --git a/src/main/java/org/opensearch/knn/plugin/rest/RestSearchModelHandler.java b/src/main/java/org/opensearch/knn/plugin/rest/RestSearchModelHandler.java index 675e3c1d1..07eb16650 100644 --- a/src/main/java/org/opensearch/knn/plugin/rest/RestSearchModelHandler.java +++ b/src/main/java/org/opensearch/knn/plugin/rest/RestSearchModelHandler.java @@ -13,7 +13,7 @@ import com.google.common.collect.ImmutableList; import org.opensearch.action.search.SearchRequest; -import org.opensearch.client.node.NodeClient; +import org.opensearch.transport.client.node.NodeClient; import org.opensearch.knn.plugin.KNNPlugin; import org.opensearch.knn.plugin.transport.SearchModelAction; import org.opensearch.rest.BaseRestHandler; diff --git a/src/main/java/org/opensearch/knn/plugin/rest/RestTrainModelHandler.java b/src/main/java/org/opensearch/knn/plugin/rest/RestTrainModelHandler.java index 3c835af60..339cdffb3 100644 --- a/src/main/java/org/opensearch/knn/plugin/rest/RestTrainModelHandler.java +++ b/src/main/java/org/opensearch/knn/plugin/rest/RestTrainModelHandler.java @@ -12,7 +12,7 @@ package org.opensearch.knn.plugin.rest; import com.google.common.collect.ImmutableList; -import org.opensearch.client.node.NodeClient; +import org.opensearch.transport.client.node.NodeClient; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.knn.common.KNNConstants; diff --git a/src/main/java/org/opensearch/knn/plugin/transport/TrainingJobRouterTransportAction.java b/src/main/java/org/opensearch/knn/plugin/transport/TrainingJobRouterTransportAction.java index 4ad6227d5..3e59cc662 100644 --- a/src/main/java/org/opensearch/knn/plugin/transport/TrainingJobRouterTransportAction.java +++ b/src/main/java/org/opensearch/knn/plugin/transport/TrainingJobRouterTransportAction.java @@ -17,7 +17,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.client.Client; +import org.opensearch.transport.client.Client; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.ValidationException; diff --git a/src/main/java/org/opensearch/knn/training/VectorReader.java b/src/main/java/org/opensearch/knn/training/VectorReader.java index 3935ee956..20045260f 100644 --- a/src/main/java/org/opensearch/knn/training/VectorReader.java +++ b/src/main/java/org/opensearch/knn/training/VectorReader.java @@ -17,7 +17,7 @@ import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollRequestBuilder; -import org.opensearch.client.Client; +import org.opensearch.transport.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.ValidationException; diff --git a/src/test/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriterFlushTests.java b/src/test/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriterFlushTests.java index 6685e2b22..f87ed6bcf 100644 --- a/src/test/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriterFlushTests.java +++ b/src/test/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriterFlushTests.java @@ -630,8 +630,7 @@ public void testFlush_whenThresholdIsEqualToFixedValue_thenRelevantNativeIndexWr } } - public void testFlush_whenQuantizationIsProvided_whenBuildGraphDatStructureThresholdIsNotMet_thenSkipBuildingGraph() - throws IOException { + public void testFlush_whenQuantizationIsProvided_whenBuildGraphDatStructureThresholdIsNotMet_thenStillBuildGraph() throws IOException { // Given List> expectedVectorValues = new ArrayList<>(); final Map sizeMap = new HashMap<>(); @@ -714,7 +713,6 @@ public void testFlush_whenQuantizationIsProvided_whenBuildGraphDatStructureThres } else { assertEquals(0, knn990QuantWriterMockedConstruction.constructed().size()); } - verifyNoInteractions(nativeIndexWriter); IntStream.range(0, vectorsPerField.size()).forEach(i -> { try { if (vectorsPerField.get(i).isEmpty()) { @@ -729,12 +727,12 @@ public void testFlush_whenQuantizationIsProvided_whenBuildGraphDatStructureThres final Long expectedTimesGetVectorValuesIsCalled = vectorsPerField.stream().filter(Predicate.not(Map::isEmpty)).count(); knnVectorValuesFactoryMockedStatic.verify( () -> KNNVectorValuesFactory.getVectorValues(any(VectorDataType.class), any(DocsWithFieldSet.class), any()), - times(Math.toIntExact(expectedTimesGetVectorValuesIsCalled)) + times(Math.toIntExact(expectedTimesGetVectorValuesIsCalled) * 2) ); } } - public void testFlush_whenQuantizationIsProvided_whenBuildGraphDatStructureThresholdIsNegative_thenSkipBuildingGraph() + public void testFlush_whenQuantizationIsProvided_whenBuildGraphDatStructureThresholdIsNegative_thenStillBuildGraph() throws IOException { // Given List> expectedVectorValues = new ArrayList<>(); @@ -817,7 +815,6 @@ public void testFlush_whenQuantizationIsProvided_whenBuildGraphDatStructureThres } else { assertEquals(0, knn990QuantWriterMockedConstruction.constructed().size()); } - verifyNoInteractions(nativeIndexWriter); IntStream.range(0, vectorsPerField.size()).forEach(i -> { try { if (vectorsPerField.get(i).isEmpty()) { @@ -832,7 +829,7 @@ public void testFlush_whenQuantizationIsProvided_whenBuildGraphDatStructureThres final Long expectedTimesGetVectorValuesIsCalled = vectorsPerField.stream().filter(Predicate.not(Map::isEmpty)).count(); knnVectorValuesFactoryMockedStatic.verify( () -> KNNVectorValuesFactory.getVectorValues(any(VectorDataType.class), any(DocsWithFieldSet.class), any()), - times(Math.toIntExact(expectedTimesGetVectorValuesIsCalled)) + times(Math.toIntExact(expectedTimesGetVectorValuesIsCalled) * 2) ); } } diff --git a/src/test/java/org/opensearch/knn/plugin/transport/TrainingJobRouterTransportActionTests.java b/src/test/java/org/opensearch/knn/plugin/transport/TrainingJobRouterTransportActionTests.java index 30c5d33a1..0507788f0 100644 --- a/src/test/java/org/opensearch/knn/plugin/transport/TrainingJobRouterTransportActionTests.java +++ b/src/test/java/org/opensearch/knn/plugin/transport/TrainingJobRouterTransportActionTests.java @@ -16,7 +16,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.ActionFilters; -import org.opensearch.client.Client; +import org.opensearch.transport.client.Client; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.node.DiscoveryNode; diff --git a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java index 3886e0c9a..87cb57cdc 100644 --- a/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java +++ b/src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java @@ -9,7 +9,7 @@ import lombok.SneakyThrows; import org.junit.After; import org.junit.Before; -import org.opensearch.client.Client; +import org.opensearch.transport.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings;