diff --git a/ai-test/src/ai_help.rs b/ai-test/src/ai_help.rs index 08456f1a..833c130a 100644 --- a/ai-test/src/ai_help.rs +++ b/ai-test/src/ai_help.rs @@ -96,18 +96,22 @@ pub async fn ai_help_all( function_call: None, }) .collect(); - if let Some(req) = - prepare_ai_help_req(openai_client, supabase_pool, !no_subscription, messages) - .await? - { - let mut res = openai_client.chat().create(req.req.clone()).await?; - let res = res.choices.pop().map(|res| res.message); - let storage = Storage { req, res }; - println!("writing: {}", json_out.display()); - fs::write(json_out, serde_json::to_vec_pretty(&storage)?).await?; - println!("writing: {}", md_out.display()); - fs::write(md_out, storage.to_md().as_bytes()).await?; - } + let mut meta = Default::default(); + let req = prepare_ai_help_req( + openai_client, + supabase_pool, + !no_subscription, + messages, + &mut meta, + ) + .await?; + let mut res = openai_client.chat().create(req.req.clone()).await?; + let res = res.choices.pop().map(|res| res.message); + let storage = Storage { req, res }; + println!("writing: {}", json_out.display()); + fs::write(json_out, serde_json::to_vec_pretty(&storage)?).await?; + println!("writing: {}", md_out.display()); + fs::write(md_out, storage.to_md().as_bytes()).await?; Ok(()) }) .await?; diff --git a/migrations/2024-02-20-093804_ai_help_metadata/down.sql b/migrations/2024-02-20-093804_ai_help_metadata/down.sql new file mode 100644 index 00000000..cb20a1de --- /dev/null +++ b/migrations/2024-02-20-093804_ai_help_metadata/down.sql @@ -0,0 +1,2 @@ +DROP TABLE ai_help_message_meta; +DROP TYPE ai_help_message_status; diff --git a/migrations/2024-02-20-093804_ai_help_metadata/up.sql b/migrations/2024-02-20-093804_ai_help_metadata/up.sql new file mode 100644 index 00000000..d15852b2 --- /dev/null +++ b/migrations/2024-02-20-093804_ai_help_metadata/up.sql @@ -0,0 +1,34 @@ +CREATE TYPE ai_help_message_status AS ENUM ( + 'success', + 'search_error', + 'ai_api_error', + 'completion_error', + 'moderation_error', + 'no_user_prompt_error', + 'token_limit_error', + 'timeout', + 'finished_too_long', + 'finished_content_filter', + 'finished_no_reason', + 'user_stopped', + 'user_timeout', + 'unknown' +); + +CREATE TABLE ai_help_message_meta ( + id BIGSERIAL PRIMARY KEY, + user_id BIGSERIAL REFERENCES users (id) ON DELETE CASCADE, + chat_id UUID NOT NULL, + message_id UUID NOT NULL, + parent_id UUID DEFAULT NULL, + created_at TIMESTAMP NOT NULL DEFAULT now(), + search_duration BIGINT DEFAULT NULL, + response_duration BIGINT DEFAULT NULL, + query_len BIGINT DEFAULT NULL, + context_len BIGINT DEFAULT NULL, + response_len BIGINT DEFAULT NULL, + model text NOT NULL, + status ai_help_message_status NOT NULL DEFAULT 'unknown', + sources JSONB NOT NULL DEFAULT '[]'::jsonb, + UNIQUE(message_id) +); diff --git a/src/ai/help.rs b/src/ai/help.rs index bdd87e89..ab8c15d7 100644 --- a/src/ai/help.rs +++ b/src/ai/help.rs @@ -1,3 +1,5 @@ +use std::time::{Duration, Instant}; + use async_openai::{ config::OpenAIConfig, types::{ @@ -33,12 +35,22 @@ pub struct AIHelpRequest { pub refs: Vec, } +#[derive(Default)] +pub struct AIHelpRequestMeta { + pub query_len: Option, + pub context_len: Option, + pub search_duration: Option, + pub model: Option<&'static str>, + pub sources: Option>, +} + pub async fn prepare_ai_help_req( client: &Client, pool: &SupaPool, is_subscriber: bool, messages: Vec, -) -> Result, AIError> { + request_meta: &mut AIHelpRequestMeta, +) -> Result { let config = if is_subscriber { AI_HELP_GPT4_FULL_DOC_NEW_PROMPT } else { @@ -81,24 +93,29 @@ pub async fn prepare_ai_help_req( .last() .and_then(|msg| msg.content.as_ref()) .ok_or(AIError::NoUserPrompt)?; + request_meta.query_len = Some(last_user_message.len()); + let start = Instant::now(); let related_docs = if config.full_doc { get_related_macro_docs(client, pool, last_user_message.replace('\n', " ")).await? } else { get_related_docs(client, pool, last_user_message.replace('\n', " ")).await? }; + request_meta.search_duration = Some(start.elapsed()); let mut context = vec![]; let mut refs = vec![]; - let mut token_len = 0; + let mut context_len = 0; + let mut context_token_len = 0; for doc in related_docs.into_iter() { debug!("url: {}", doc.url); + context_len += doc.content.len(); let bpe = tiktoken_rs::r50k_base().unwrap(); let tokens = bpe.encode_with_special_tokens(&doc.content).len(); - token_len += tokens; - debug!("tokens: {}, token_len: {}", tokens, token_len); - if token_len >= config.context_limit { - token_len -= tokens; + context_token_len += tokens; + debug!("tokens: {}, token_len: {}", tokens, context_token_len); + if context_token_len >= config.context_limit { + context_token_len -= tokens; continue; } if !refs.iter().any(|r: &RefDoc| r.url == doc.url) { @@ -109,6 +126,9 @@ pub async fn prepare_ai_help_req( } context.push(doc); } + request_meta.sources = Some(refs.clone()); + request_meta.context_len = Some(context_len); + let system_message = ChatCompletionRequestMessageArgs::default() .role(Role::System) .content(config.system_prompt) @@ -143,8 +163,9 @@ pub async fn prepare_ai_help_req( .messages(messages) .temperature(0.0) .build()?; + request_meta.model = Some(config.model); - Ok(Some(AIHelpRequest { req, refs })) + Ok(AIHelpRequest { req, refs }) } pub fn prepare_ai_help_summary_req( diff --git a/src/api/ai_help.rs b/src/api/ai_help.rs index 072016d9..7fd20860 100644 --- a/src/api/ai_help.rs +++ b/src/api/ai_help.rs @@ -1,7 +1,9 @@ +use std::{future, time::Instant}; + use actix_identity::Identity; use actix_web::{ web::{Data, Json, Path}, - Either, HttpResponse, Responder, + HttpResponse, Responder, }; use actix_web_lab::{__reexports::tokio::sync::mpsc, sse}; use async_openai::{ @@ -14,22 +16,24 @@ use async_openai::{ Client, }; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; -use futures_util::{stream, StreamExt, TryStreamExt}; +use futures_util::{stream, StreamExt}; use serde::{Deserialize, Serialize}; -use serde_json::Value::Null; +use serde_json::Value::{self, Null}; use uuid::Uuid; use crate::{ - ai::help::{prepare_ai_help_req, prepare_ai_help_summary_req, RefDoc}, - api::common::{GeneratedChunk, GeneratedChunkChoice}, + ai::help::{prepare_ai_help_req, prepare_ai_help_summary_req, AIHelpRequestMeta, RefDoc}, db::{ self, ai_help::{ - add_help_history_message, create_or_increment_total, decrement_limit, - delete_full_help_history, delete_help_history, get_count, help_history, - help_history_get_message, list_help_history, update_help_history_label, AI_HELP_LIMIT, + add_help_history_message, add_help_message_meta, create_or_increment_total, + decrement_limit, delete_full_help_history, delete_help_history, get_count, + help_history, help_history_get_message, list_help_history, update_help_history_label, + AI_HELP_LIMIT, + }, + model::{ + AIHelpHistoryMessage, AIHelpHistoryMessageInsert, AiHelpMessageMetaInsert, Settings, }, - model::{AIHelpHistoryMessage, AIHelpHistoryMessageInsert, Settings}, settings::get_settings, SupaPool, }, @@ -40,6 +44,12 @@ use crate::{ db::{ai_help::create_or_increment_limit, users::get_user, Pool}, }; +#[derive(Debug, Clone, Copy, Default)] +struct ResponseContext { + len: usize, + status: db::types::AiHelpMessageStatus, +} + #[derive(Deserialize, Serialize, Clone, Debug)] pub struct ChatRequestMessages { chat_id: Option, @@ -324,46 +334,13 @@ fn log_errors_and_record_response( Ok(Some(tx)) } -pub fn sorry_response( - chat_id: Option, - message_id: Uuid, - parent_id: Option, - quota: Option, -) -> Result, ApiError> { - let parts = vec![ - sse::Data::new_json(AIHelpMeta { - typ: MetaType::Metadata, - chat_id: chat_id.unwrap_or_else(Uuid::new_v4), - message_id, - parent_id, - sources: vec![], - quota, - created_at: Utc::now(), - }) - .map_err(OpenAIError::JSONDeserialize)?, - sse::Data::new_json(GeneratedChunk::from( - "Sorry, I don't know how to help with that.", - )) - .map_err(OpenAIError::JSONDeserialize)?, - sse::Data::new_json(GeneratedChunk { - choices: vec![GeneratedChunkChoice { - finish_reason: Some("stop".to_owned()), - ..Default::default() - }], - ..Default::default() - }) - .map_err(OpenAIError::JSONDeserialize)?, - ]; - Ok(parts) -} - pub async fn ai_help( user_id: Identity, openai_client: Data>>, supabase_pool: Data>, diesel_pool: Data, messages: Json, -) -> Result, ApiError> { +) -> Result { let mut conn = diesel_pool.get()?; let user = get_user(&mut conn, user_id.id().unwrap())?; let settings = get_settings(&mut conn, &user)?; @@ -401,7 +378,15 @@ pub async fn ai_help( )?; } - let prepare_res = prepare_ai_help_req(client, pool, user.is_subscriber(), messages).await; + let mut ai_help_req_meta = AIHelpRequestMeta::default(); + let prepare_res = prepare_ai_help_req( + client, + pool, + user.is_subscriber(), + messages, + &mut ai_help_req_meta, + ) + .await; // Reinstate the user quota if we fail to do the preparation step. // Flagged/moderation errors DO count towards the limit, otherwise // it is on us. @@ -415,9 +400,10 @@ pub async fn ai_help( } _ => (), } + let user_id = user.id; - match prepare_res? { - Some(ai_help_req) => { + match prepare_res { + Ok(ai_help_req) => { let sources = ai_help_req.refs; let created_at = match record_sources( &diesel_pool, @@ -430,6 +416,7 @@ pub async fn ai_help( None => Utc::now(), }; + let start = Instant::now(); let ai_help_meta = AIHelpMeta { typ: MetaType::Metadata, chat_id, @@ -447,46 +434,129 @@ pub async fn ai_help( )?; let qa_error_triggered = qa_check_for_error_trigger(&ai_help_req.req.messages).is_err(); - let stream = client.chat().create_stream(ai_help_req.req).await.unwrap(); - let refs = stream::once(async move { - let sse_data = if qa_error_triggered { - Err(OpenAIError::InvalidArgument("Artificial Error".to_owned())) - } else { - sse::Data::new_json(ai_help_meta).map_err(OpenAIError::JSONDeserialize) - }; - match sse_data { - Ok(sse_data) => Ok(sse::Event::Data(sse_data)), - Err(err) => { - // reinstate the user quota and pass on the error - let _ = decrement_limit(&mut conn, &user); - Err(err) - } - } + let ai_help_res_stream = + client.chat().create_stream(ai_help_req.req).await.unwrap(); + let refs_sse_data = if qa_error_triggered { + Err(OpenAIError::InvalidArgument("Artificial Error".to_owned())) + } else { + sse::Data::new_json(ai_help_meta).map_err(OpenAIError::JSONDeserialize) + } + .map(sse::Event::Data) + .map_err(|e| { + let _ = decrement_limit(&mut conn, &user); + e }); - Ok(Either::Left(sse::Sse::from_stream(refs.chain( - stream.map_ok(move |res| { - if let Some(ref tx) = tx { - if let Err(e) = tx.send(res.clone()) { - error!("{e}"); + let refs = stream::once(async move { refs_sse_data }); + + let res_stream = ai_help_res_stream + .map(Some) // Wrapping response chunks in some. + .chain(stream::once(async move { None })) // Adding a None at the end. + .scan(ResponseContext::default(), move |context, res| { + future::ready(match res { + Some(Ok(res)) => { + if let Some(ref tx) = tx { + if let Err(e) = tx.send(res.clone()) { + error!("{e}"); + } + } + if let Some(c) = res.choices.first() { + if let Some(part) = &c.delta.content { + context.len += part.len(); + } + context.status = match c.finish_reason.as_deref() { + Some("length") => { + db::types::AiHelpMessageStatus::FinishedTooLong + } + Some("stop") => db::types::AiHelpMessageStatus::Success, + Some("content_filter") => { + db::types::AiHelpMessageStatus::FinishedContentFilter + } + Some(_) => db::types::AiHelpMessageStatus::Unknown, + None => db::types::AiHelpMessageStatus::FinishedNoReason, + } + } + Some(Ok(sse::Event::Data(sse::Data::new_json(res).unwrap()))) + } + res => { + let response_duration = start.elapsed(); + let status = if let Some(Err(e)) = &res { + // reinstate the user quota and pass on the error + let _ = decrement_limit(&mut conn, &user); + e.into() + } else { + context.status + }; + let ai_help_message_meta = AiHelpMessageMetaInsert { + user_id, + chat_id, + message_id, + parent_id, + created_at: Some(created_at.naive_utc()), + search_duration: default_meta_big_int( + ai_help_req_meta.search_duration.map(|d| d.as_millis()), + ), + response_duration: default_meta_big_int(Some( + response_duration.as_millis(), + )), + query_len: default_meta_big_int(ai_help_req_meta.query_len), + context_len: default_meta_big_int(ai_help_req_meta.context_len), + response_len: default_meta_big_int(Some(context.len)), + model: ai_help_req_meta.model.unwrap_or(""), + status, + sources: ai_help_req_meta.sources.as_ref().map(|sources| { + serde_json::to_value(sources).unwrap_or(Value::Null) + }), + }; + add_help_message_meta(&mut conn, ai_help_message_meta); + + if let Some(Err(e)) = res { + Some(Err(e)) + } else { + None + } } - } - sse::Event::Data(sse::Data::new_json(res).unwrap()) - }), - )))) + }) + }); + + Ok(sse::Sse::from_stream(refs.chain(res_stream))) } - None => { - let parts = sorry_response( - Some(chat_id), + Err(e) => { + let ai_help_message_meta = AiHelpMessageMetaInsert { + user_id: user.id, + chat_id, message_id, parent_id, - current.map(AIHelpLimit::from_count), - )?; - let stream = futures::stream::iter(parts.into_iter()); - let res = - sse::Sse::from_stream(stream.map(|r| Ok::<_, ApiError>(sse::Event::Data(r)))); + search_duration: default_meta_big_int( + ai_help_req_meta.search_duration.map(|d| d.as_millis()), + ), + query_len: default_meta_big_int(ai_help_req_meta.query_len), + context_len: default_meta_big_int(ai_help_req_meta.context_len), + model: ai_help_req_meta.model.unwrap_or(""), + status: (&e).into(), + sources: ai_help_req_meta + .sources + .as_ref() + .map(|sources| serde_json::to_value(sources).unwrap_or(Value::Null)), + ..Default::default() + }; + add_help_message_meta(&mut conn, ai_help_message_meta); + + // Reinstate the user quota if we fail to do the preparation step. + // Flagged/moderation errors DO count towards the limit, otherwise + // it is on us. + match e { + crate::ai::error::AIError::OpenAIError(_) + | crate::ai::error::AIError::TiktokenError(_) + | crate::ai::error::AIError::TokenLimit + | crate::ai::error::AIError::SqlXError(_) + | crate::ai::error::AIError::NoUserPrompt => { + let _ = decrement_limit(&mut conn, &user); + } + _ => (), + } - Ok(Either::Right(res)) + Err(e.into()) } } } else { @@ -632,3 +702,7 @@ fn qa_check_for_error_trigger( } Ok(()) } + +fn default_meta_big_int(value: Option>) -> Option { + value.and_then(|v| v.try_into().ok()) +} diff --git a/src/db/ai_help.rs b/src/db/ai_help.rs index 5820ec3c..56e0dc68 100644 --- a/src/db/ai_help.rs +++ b/src/db/ai_help.rs @@ -8,10 +8,10 @@ use uuid::Uuid; use crate::db::error::DbError; use crate::db::model::{ AIHelpHistoryInsert, AIHelpHistoryMessage, AIHelpHistoryMessageInsert, AIHelpLimitInsert, - UserQuery, + AiHelpMessageMetaInsert, UserQuery, }; -use crate::db::schema::ai_help_limits as limits; use crate::db::schema::{ai_help_history, ai_help_history_messages}; +use crate::db::schema::{ai_help_limits as limits, ai_help_message_meta}; use crate::settings::SETTINGS; pub const AI_HELP_LIMIT: i64 = 5; @@ -302,3 +302,14 @@ pub fn update_help_history_label( .execute(conn)?; Ok(()) } + +pub fn add_help_message_meta(conn: &mut PgConnection, meta: AiHelpMessageMetaInsert) { + if let Err(e) = insert_into(ai_help_message_meta::table) + .values(&meta) + .on_conflict(ai_help_message_meta::message_id) + .do_nothing() + .execute(conn) + { + error!("{}", e) + } +} diff --git a/src/db/model.rs b/src/db/model.rs index a3227e0a..e05afe66 100644 --- a/src/db/model.rs +++ b/src/db/model.rs @@ -1,4 +1,4 @@ -use crate::db::types::{FxaEventStatus, Subscription}; +use crate::db::types::{AiHelpMessageStatus, FxaEventStatus, Subscription}; use crate::db::{schema::*, types::FxaEvent}; use crate::helpers::to_utc; use chrono::NaiveDateTime; @@ -313,3 +313,34 @@ pub struct SubscriptionChangeQuery { pub new_subscription_type: Subscription, pub created_at: NaiveDateTime, } + +#[derive(Insertable, Default, Debug)] +#[diesel(table_name = ai_help_message_meta)] +pub struct AiHelpMessageMetaInsert<'a> { + // ID of the user. + pub user_id: i64, + /// UUID of the conversation. + pub chat_id: Uuid, + /// UUID of the message. + pub message_id: Uuid, + /// UUID of parent message, if this was a follow-up question. + pub parent_id: Option, + /// Timestamp at which the message failed or finished. + pub created_at: Option, + /// Time it took to search related content in milliseconds. + pub search_duration: Option, + /// Time it took to generate the answer in milliseconds. + pub response_duration: Option, + /// Length of user's question in bytes. + pub query_len: Option, + /// Length of MDN content passed as context in bytes. + pub context_len: Option, + /// Length of LLM's reply in bytes. + pub response_len: Option, + /// Model used to generate the answer. + pub model: &'a str, + /// Status of the message. + pub status: AiHelpMessageStatus, + /// Consulted MDN content to answer the question. + pub sources: Option, +} diff --git a/src/db/schema.rs b/src/db/schema.rs index 7b2f4c0b..535a792a 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -1,6 +1,10 @@ // @generated automatically by Diesel CLI. pub mod sql_types { + #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "ai_help_message_status"))] + pub struct AiHelpMessageStatus; + #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "bcd_event_type"))] pub struct BcdEventType; @@ -102,6 +106,29 @@ diesel::table! { } } +diesel::table! { + use diesel::sql_types::*; + use crate::db::types::*; + use super::sql_types::AiHelpMessageStatus; + + ai_help_message_meta (id) { + id -> Int8, + user_id -> Int8, + chat_id -> Uuid, + message_id -> Uuid, + parent_id -> Nullable, + created_at -> Timestamp, + search_duration -> Nullable, + response_duration -> Nullable, + query_len -> Nullable, + context_len -> Nullable, + response_len -> Nullable, + model -> Text, + status -> AiHelpMessageStatus, + sources -> Jsonb, + } +} + diesel::table! { use diesel::sql_types::*; use crate::db::types::*; @@ -312,6 +339,7 @@ diesel::joinable!(activity_pings -> users (user_id)); diesel::joinable!(ai_help_history -> users (user_id)); diesel::joinable!(ai_help_history_messages -> users (user_id)); diesel::joinable!(ai_help_limits -> users (user_id)); +diesel::joinable!(ai_help_message_meta -> users (user_id)); diesel::joinable!(bcd_updates -> bcd_features (feature)); diesel::joinable!(bcd_updates -> browser_releases (browser_release)); diesel::joinable!(browser_releases -> browsers (browser)); @@ -329,6 +357,7 @@ diesel::allow_tables_to_appear_in_same_query!( ai_help_history, ai_help_history_messages, ai_help_limits, + ai_help_message_meta, bcd_features, bcd_updates, browser_releases, diff --git a/src/db/types.rs b/src/db/types.rs index da7a9788..de109029 100644 --- a/src/db/types.rs +++ b/src/db/types.rs @@ -1,6 +1,9 @@ #![allow(non_camel_case_types)] +use async_openai::error::OpenAIError; use serde::{Deserialize, Serialize}; +use crate::{ai::error::AIError, db}; + #[derive(Copy, Clone, diesel_derive_enum::DbEnum, Debug, Deserialize, Eq, PartialEq, Serialize)] #[ExistingTypePath = "crate::db::schema::sql_types::Locale"] pub enum Locale { @@ -191,3 +194,50 @@ pub enum EngineType { #[serde(other)] Unknown, } + +#[derive( + Copy, Clone, diesel_derive_enum::DbEnum, Debug, Deserialize, Eq, Default, PartialEq, Serialize, +)] +#[ExistingTypePath = "crate::db::schema::sql_types::AiHelpMessageStatus"] +#[serde(rename_all = "snake_case")] +pub enum AiHelpMessageStatus { + Success, + SearchError, + AiApiError, + CompletionError, + ModerationError, + NoUserPromptError, + TokenLimitError, + Timeout, // not yet used + FinishedTooLong, + FinishedContentFilter, + FinishedNoReason, + UserStopped, // not yet used + UserTimeout, // not yet used + #[default] + Unknown, +} + +impl From<&AIError> for AiHelpMessageStatus { + fn from(e: &AIError) -> Self { + match e { + crate::ai::error::AIError::OpenAIError(_) => db::types::AiHelpMessageStatus::AiApiError, + crate::ai::error::AIError::SqlXError(_) => db::types::AiHelpMessageStatus::SearchError, + crate::ai::error::AIError::FlaggedError => { + db::types::AiHelpMessageStatus::ModerationError + } + crate::ai::error::AIError::NoUserPrompt => { + db::types::AiHelpMessageStatus::NoUserPromptError + } + crate::ai::error::AIError::TokenLimit | crate::ai::error::AIError::TiktokenError(_) => { + db::types::AiHelpMessageStatus::TokenLimitError + } + } + } +} + +impl From<&OpenAIError> for AiHelpMessageStatus { + fn from(_: &OpenAIError) -> Self { + db::types::AiHelpMessageStatus::AiApiError + } +} diff --git a/tests/api/ai_help_history.rs b/tests/api/ai_help_history.rs index 296e5a72..41ef22ad 100644 --- a/tests/api/ai_help_history.rs +++ b/tests/api/ai_help_history.rs @@ -195,7 +195,7 @@ async fn test_history_deletion() -> Result<(), Error> { // we only compare up to millisecond precision assert_eq!( ts.timestamp_micros(), - records.get(0).unwrap().timestamp_micros() + records.first().unwrap().timestamp_micros() ); drop_stubr(stubr).await;