@@ -220,23 +220,25 @@ pub async fn embed_bulk_insert_unstructured_response(
220
220
let datasource_id = datasource. id . to_string ( ) ;
221
221
match embed_text_chunks_async ( list_of_text. clone ( ) , & embedding_model) . await {
222
222
Ok ( embeddings) => {
223
- let mut search_request =
224
- SearchRequest :: new ( search_type. clone ( ) , datasource. collection_name . clone ( ) . unwrap_or ( datasource_id. clone ( ) ) ) ;
223
+ let mut search_request = SearchRequest :: new (
224
+ search_type. clone ( ) ,
225
+ datasource
226
+ . collection_name
227
+ . clone ( )
228
+ . unwrap_or ( datasource_id. clone ( ) ) ,
229
+ ) ;
225
230
search_request. byo_vector_db = Some ( true ) ;
226
231
search_request. namespace = datasource. namespace . clone ( ) ;
227
232
let mut points_to_upload: Vec < Point > = vec ! [ ] ;
228
233
229
- // Construct points to upload
230
234
for ( i, document) in documents. iter ( ) . enumerate ( ) {
231
235
let mut point_metadata: HashMap < String , Value > = HashMap :: new ( ) ;
232
236
233
- // Always store the text content in page_content
234
237
point_metadata. insert (
235
238
"page_content" . to_string ( ) ,
236
239
Value :: String ( clean_text ( document. text . clone ( ) ) ) ,
237
240
) ;
238
241
239
- // Add metadata from the original document
240
242
if let Ok ( Value :: Object ( map) ) = serde_json:: to_value ( document. metadata . clone ( ) ) {
241
243
for ( key, value) in map {
242
244
point_metadata. insert ( key, value) ;
@@ -251,67 +253,52 @@ pub async fn embed_bulk_insert_unstructured_response(
251
253
let embedding_vector = embeddings. get ( i) ;
252
254
if let Some ( vector) = embedding_vector {
253
255
let point = Point :: new (
254
- point_metadata
255
- . get ( "index" )
256
- . map_or_else (
257
- || Some ( Value :: String ( Uuid :: new_v4 ( ) . to_string ( ) ) ) ,
258
- |id| match id {
259
- Value :: String ( s ) => Some ( Value :: String ( s . clone ( ) ) ) ,
260
- _ => Some ( Value :: String ( id . to_string ( ) . trim_matches ( '"' ) . to_string ( ) ) )
261
- }
262
- ) ,
256
+ point_metadata. get ( "index" ) . map_or_else (
257
+ || Some ( Value :: String ( Uuid :: new_v4 ( ) . to_string ( ) ) ) ,
258
+ |id| match id {
259
+ Value :: String ( s ) => Some ( Value :: String ( s . clone ( ) ) ) ,
260
+ _ => Some ( Value :: String (
261
+ id . to_string ( ) . trim_matches ( '"' ) . to_string ( ) ,
262
+ ) ) ,
263
+ } ,
264
+ ) ,
263
265
vector. to_vec ( ) ,
264
266
Some ( point_metadata) ,
265
267
) ;
266
268
points_to_upload. push ( point)
267
269
}
268
270
}
269
271
270
- // Only create one vector database client for all points
271
272
let vector_database_client =
272
273
check_byo_vector_database ( datasource. clone ( ) , & mongo_connection)
273
274
. await
274
275
. unwrap_or ( default_vector_db_client ( ) . await ) ;
275
276
276
- // Initialize vector database client
277
277
let vector_database = Arc :: clone ( & vector_database_client) ;
278
278
let vector_database_client = vector_database. read ( ) . await ;
279
279
280
- // Bulk insert all points at once
281
280
if let Ok ( bulk_insert_status) = vector_database_client
282
281
. bulk_insert_points ( search_request. clone ( ) , points_to_upload)
283
282
. await
284
283
{
285
284
match bulk_insert_status {
286
285
VectorDatabaseStatus :: Ok => {
287
286
log:: debug!( "points uploaded successfully!" ) ;
288
- // Increment success count
289
- increment_by_one (
290
- & mongo_connection,
291
- & datasource_id,
292
- "recordCount.success" ,
293
- )
294
- . await
295
- . unwrap ( ) ;
287
+
288
+ increment_by_one ( & mongo_connection, & datasource_id, "recordCount.success" )
289
+ . await
290
+ . unwrap ( ) ;
296
291
}
297
292
VectorDatabaseStatus :: Failure | VectorDatabaseStatus :: NotFound => {
298
- increment_by_one (
299
- & mongo_connection,
300
- & datasource_id,
301
- "recordCount.failure" ,
302
- )
303
- . await
304
- . unwrap ( ) ;
293
+ increment_by_one ( & mongo_connection, & datasource_id, "recordCount.failure" )
294
+ . await
295
+ . unwrap ( ) ;
305
296
log:: warn!( "Could not find collection :{}" , datasource_id) ;
306
297
}
307
298
VectorDatabaseStatus :: Error ( e) => {
308
- increment_by_one (
309
- & mongo_connection,
310
- & datasource_id,
311
- "recordCount.failure" ,
312
- )
313
- . await
314
- . unwrap ( ) ;
299
+ increment_by_one ( & mongo_connection, & datasource_id, "recordCount.failure" )
300
+ . await
301
+ . unwrap ( ) ;
315
302
log:: error!(
316
303
"An error occurred while attempting point insert operation. Error: {:?}" ,
317
304
e
0 commit comments