Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ai-help): log message metadata #424

Merged
merged 27 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b5be667
feat(ai-help): log message metadata
fiji-flo Feb 20, 2024
631ca37
fixes
fiji-flo Feb 21, 2024
eb3ca5b
remove dead code
fiji-flo Feb 21, 2024
fae4dec
Merge branch 'main' into MP-813-message-metadata
caugner Feb 23, 2024
c258a44
add model
fiji-flo Feb 23, 2024
aadc3f2
Merge remote-tracking branch 'upstream/main' into MP-813-message-meta…
fiji-flo Feb 27, 2024
7013e45
fix(ai-help): record {query,context}_len
caugner Feb 27, 2024
4572cd4
chore(ai-help): rename message status
caugner Feb 29, 2024
44eefc6
fix(ai-help): record context_len in bytes
caugner Feb 29, 2024
69243f9
chore(ai-help): document meta message fields
caugner Feb 29, 2024
85d5215
Merge branch 'main' into MP-813-message-metadata
caugner Mar 1, 2024
88ba56e
fix(ai-help): record metadata for AI response without finish_reason
caugner Mar 1, 2024
a4fc0ff
refactor(ai-help): merge Err() matchers
caugner Mar 1, 2024
e5f816c
revert(ai-help): extract perf fix
caugner Mar 4, 2024
6af96c5
Merge remote-tracking branch 'upstream/main' into MP-813-message-meta…
fiji-flo Mar 25, 2024
dd64849
add finish reason
fiji-flo Mar 25, 2024
711d351
Merge remote-tracking branch 'upstream/MP-813-message-metadata' into …
fiji-flo Mar 25, 2024
80dcad8
fixes
fiji-flo Mar 25, 2024
79e5734
less indent
fiji-flo Mar 25, 2024
ba22ffa
fmt
fiji-flo Mar 25, 2024
6c79961
make data nullable
fiji-flo Mar 26, 2024
c8a97e8
fix(ai-help): reuse search_duration/query_len/context_len for error
caugner Mar 26, 2024
ad0cf5a
fix test
fiji-flo Mar 26, 2024
4090413
chore(ai-help): record response duration on error
caugner Mar 26, 2024
039fece
chore(ai-help): record model/sources on error
caugner Mar 26, 2024
e851bac
Revert "chore(ai-help): record response duration on error"
caugner Mar 26, 2024
92d4e1f
Merge branch 'main' into MP-813-message-metadata
caugner Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions ai-test/src/ai_help.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,16 @@ pub async fn ai_help_all(
function_call: None,
})
.collect();
if let Some(req) =
let (req, _) =
prepare_ai_help_req(openai_client, supabase_pool, !no_subscription, messages)
.await?
{
let mut res = openai_client.chat().create(req.req.clone()).await?;
let res = res.choices.pop().map(|res| res.message);
let storage = Storage { req, res };
println!("writing: {}", json_out.display());
fs::write(json_out, serde_json::to_vec_pretty(&storage)?).await?;
println!("writing: {}", md_out.display());
fs::write(md_out, storage.to_md().as_bytes()).await?;
}
.await?;
let mut res = openai_client.chat().create(req.req.clone()).await?;
let res = res.choices.pop().map(|res| res.message);
let storage = Storage { req, res };
println!("writing: {}", json_out.display());
fs::write(json_out, serde_json::to_vec_pretty(&storage)?).await?;
println!("writing: {}", md_out.display());
fs::write(md_out, storage.to_md().as_bytes()).await?;
Ok(())
})
.await?;
Expand Down
2 changes: 2 additions & 0 deletions migrations/2024-02-20-093804_ai_help_metadata/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE ai_help_message_meta;
DROP TYPE ai_help_message_status;
30 changes: 30 additions & 0 deletions migrations/2024-02-20-093804_ai_help_metadata/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
CREATE TYPE ai_help_message_status AS ENUM (
'success',
'search_error',
'open_ai_api_error',
'completion_error',
'moderation_error',
'no_user_prompt_error',
'token_limit_error',
'stopped',
'timeout',
'unknown'
);

CREATE TABLE ai_help_message_meta (
id BIGSERIAL PRIMARY KEY,
user_id BIGSERIAL REFERENCES users (id) ON DELETE CASCADE,
chat_id UUID NOT NULL,
message_id UUID NOT NULL,
parent_id UUID DEFAULT NULL REFERENCES ai_help_message_meta (message_id) ON DELETE CASCADE,
created_at TIMESTAMP NOT NULL DEFAULT now(),
search_duration BIGINT NOT NULL,
response_duration BIGINT NOT NULL,
query_len BIGINT NOT NULL,
context_len BIGINT NOT NULL,
response_len BIGINT NOT NULL,
model text NOT NULL,
status ai_help_message_status NOT NULL DEFAULT 'unknown',
sources JSONB NOT NULL DEFAULT '[]'::jsonb,
UNIQUE(message_id)
);
32 changes: 25 additions & 7 deletions src/ai/help.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::{Duration, Instant};

use async_openai::{
config::OpenAIConfig,
types::{
Expand Down Expand Up @@ -32,12 +34,18 @@ pub struct AIHelpRequest {
pub refs: Vec<RefDoc>,
}

pub struct AIHelpRequestMeta {
pub query_len: usize,
pub context_len: usize,
pub search_duration: Duration,
}

pub async fn prepare_ai_help_req(
client: &Client<OpenAIConfig>,
pool: &SupaPool,
is_subscriber: bool,
messages: Vec<ChatCompletionRequestMessage>,
) -> Result<Option<AIHelpRequest>, AIError> {
) -> Result<(AIHelpRequest, AIHelpRequestMeta), AIError> {
let config = if is_subscriber {
AI_HELP_GPT4_FULL_DOC_NEW_PROMPT
} else {
Expand Down Expand Up @@ -76,24 +84,27 @@ pub async fn prepare_ai_help_req(
.last()
.and_then(|msg| msg.content.as_ref())
.ok_or(AIError::NoUserPrompt)?;
let query_len = last_user_message.len();

let start = Instant::now();
let related_docs = if config.full_doc {
get_related_macro_docs(client, pool, last_user_message.replace('\n', " ")).await?
} else {
get_related_docs(client, pool, last_user_message.replace('\n', " ")).await?
};
let search_duration = start.elapsed();

let mut context = vec![];
let mut refs = vec![];
let mut token_len = 0;
let mut context_token_len = 0;
for doc in related_docs.into_iter() {
debug!("url: {}", doc.url);
let bpe = tiktoken_rs::r50k_base().unwrap();
let tokens = bpe.encode_with_special_tokens(&doc.content).len();
token_len += tokens;
debug!("tokens: {}, token_len: {}", tokens, token_len);
if token_len >= config.context_limit {
token_len -= tokens;
context_token_len += tokens;
debug!("tokens: {}, token_len: {}", tokens, context_token_len);
if context_token_len >= config.context_limit {
context_token_len -= tokens;
continue;
}
if !refs.iter().any(|r: &RefDoc| r.url == doc.url) {
Expand Down Expand Up @@ -139,7 +150,14 @@ pub async fn prepare_ai_help_req(
.temperature(0.0)
.build()?;

Ok(Some(AIHelpRequest { req, refs }))
Ok((
AIHelpRequest { req, refs },
AIHelpRequestMeta {
query_len,
context_len: context_token_len,
search_duration,
},
))
}

pub fn prepare_ai_help_summary_req(
Expand Down
168 changes: 96 additions & 72 deletions src/api/ai_help.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{future, time::Instant};

use actix_identity::Identity;
use actix_web::{
web::{Data, Json, Path},
Either, HttpResponse, Responder,
HttpResponse, Responder,
};
use actix_web_lab::{__reexports::tokio::sync::mpsc, sse};
use async_openai::{
Expand All @@ -11,22 +13,23 @@ use async_openai::{
Client,
};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures_util::{stream, StreamExt, TryStreamExt};
use futures_util::{stream, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::Value::Null;
use serde_json::Value::{self, Null};
use uuid::Uuid;

use crate::{
ai::help::{prepare_ai_help_req, prepare_ai_help_summary_req, RefDoc},
api::common::{GeneratedChunk, GeneratedChunkChoice},
db::{
self,
ai_help::{
add_help_history_message, create_or_increment_total, delete_full_help_history,
delete_help_history, get_count, help_history, help_history_get_message,
list_help_history, update_help_history_label, AI_HELP_LIMIT,
add_help_history_message, add_help_message_meta, create_or_increment_total,
delete_full_help_history, delete_help_history, get_count, help_history,
help_history_get_message, list_help_history, update_help_history_label, AI_HELP_LIMIT,
},
model::{
AIHelpHistoryMessage, AIHelpHistoryMessageInsert, AiHelpMessageMetaInsert, Settings,
},
model::{AIHelpHistoryMessage, AIHelpHistoryMessageInsert, Settings},
settings::get_settings,
SupaPool,
},
Expand Down Expand Up @@ -269,6 +272,9 @@ fn log_errors_and_record_response(
user_id: i64,
help_ids: HelpIds,
) -> Result<Option<mpsc::UnboundedSender<CreateChatCompletionStreamResponse>>, ApiError> {
if !history_enabled {
return Ok(None);
}
let mut conn = pool.get()?;
let (tx, mut rx) = mpsc::unbounded_channel::<CreateChatCompletionStreamResponse>();
actix_web::rt::spawn(async move {
Expand Down Expand Up @@ -320,46 +326,13 @@ fn log_errors_and_record_response(
Ok(Some(tx))
}

pub fn sorry_response(
chat_id: Option<Uuid>,
message_id: Uuid,
parent_id: Option<Uuid>,
quota: Option<AIHelpLimit>,
) -> Result<Vec<sse::Data>, ApiError> {
let parts = vec![
sse::Data::new_json(AIHelpMeta {
typ: MetaType::Metadata,
chat_id: chat_id.unwrap_or_else(Uuid::new_v4),
message_id,
parent_id,
sources: vec![],
quota,
created_at: Utc::now(),
})
.map_err(OpenAIError::JSONDeserialize)?,
sse::Data::new_json(GeneratedChunk::from(
"Sorry, I don't know how to help with that.",
))
.map_err(OpenAIError::JSONDeserialize)?,
sse::Data::new_json(GeneratedChunk {
choices: vec![GeneratedChunkChoice {
finish_reason: Some("stop".to_owned()),
..Default::default()
}],
..Default::default()
})
.map_err(OpenAIError::JSONDeserialize)?,
];
Ok(parts)
}

pub async fn ai_help(
user_id: Identity,
openai_client: Data<Option<Client<OpenAIConfig>>>,
supabase_pool: Data<Option<SupaPool>>,
diesel_pool: Data<Pool>,
messages: Json<ChatRequestMessages>,
) -> Result<Either<impl Responder, impl Responder>, ApiError> {
) -> Result<impl Responder, ApiError> {
let mut conn = diesel_pool.get()?;
let user = get_user(&mut conn, user_id.id().unwrap())?;
let settings = get_settings(&mut conn, &user)?;
Expand Down Expand Up @@ -397,9 +370,10 @@ pub async fn ai_help(
)?;
}

match prepare_ai_help_req(client, pool, user.is_subscriber(), messages).await? {
Some(ai_help_req) => {
match prepare_ai_help_req(client, pool, user.is_subscriber(), messages).await {
Ok((ai_help_req, ai_help_req_meta)) => {
let sources = ai_help_req.refs;
let model = ai_help_req.req.model.clone();
let created_at = match record_sources(
&diesel_pool,
&sources,
Expand All @@ -411,6 +385,15 @@ pub async fn ai_help(
None => Utc::now(),
};

let tx = log_errors_and_record_response(
&diesel_pool,
history_enabled(&settings),
user.id,
help_ids,
)?;
let start = Instant::now();
let stream = client.chat().create_stream(ai_help_req.req).await.unwrap();
let sources_value = serde_json::to_value(&sources).unwrap_or(Value::Null);
let ai_help_meta = AIHelpMeta {
typ: MetaType::Metadata,
chat_id,
Expand All @@ -420,42 +403,83 @@ pub async fn ai_help(
quota: current.map(AIHelpLimit::from_count),
created_at,
};
let tx = log_errors_and_record_response(
&diesel_pool,
history_enabled(&settings),
user.id,
help_ids,
)?;
let stream = client.chat().create_stream(ai_help_req.req).await.unwrap();
let refs = stream::once(async move {
Ok(sse::Event::Data(
Ok::<_, OpenAIError>(sse::Event::Data(
sse::Data::new_json(ai_help_meta).map_err(OpenAIError::JSONDeserialize)?,
))
});

Ok(Either::Left(sse::Sse::from_stream(refs.chain(
stream.map_ok(move |res| {
if let Some(ref tx) = tx {
if let Err(e) = tx.send(res.clone()) {
error!("{e}");
}
}
sse::Event::Data(sse::Data::new_json(res).unwrap())
}),
))))
Ok(sse::Sse::from_stream(
refs.chain(
stream
.map(Some) // Wrapping response chunks in some.
.chain(stream::once(async move { None })) // Adding a None at the end.
.scan(0, move |response_len, res| {
future::ready(match res {
Some(Ok(res)) => {
if let Some(ref tx) = tx {
if let Err(e) = tx.send(res.clone()) {
error!("{e}");
}
}
if let Some(c) = res.choices.first() {
if let Some(part) = &c.delta.content {
*response_len += part.len();
}
}
Some(Ok(sse::Event::Data(
sse::Data::new_json(res).unwrap(),
)))
}
Some(Err(e)) => Some(Err(e)),
None => {
// This is the added artificial chunk.
let response_duration = start.elapsed();
let ai_help_message_meta = AiHelpMessageMetaInsert {
user_id: user.id,
chat_id,
message_id,
parent_id,
created_at: Some(created_at.naive_utc()),
search_duration: i64::try_from(
ai_help_req_meta.search_duration.as_millis(),
)
.unwrap_or(-1),
response_duration: i64::try_from(
response_duration.as_millis(),
)
.unwrap_or(-1),
query_len: i64::try_from(ai_help_req_meta.query_len)
.unwrap_or(-1),
context_len: i64::try_from(
ai_help_req_meta.context_len,
)
.unwrap_or(-1),
response_len: i64::try_from(*response_len)
.unwrap_or(-1),
model: &model,
status: db::types::AiHelpMessageStatus::Success,
sources: Some(&sources_value),
};
add_help_message_meta(&mut conn, ai_help_message_meta);
None
}
})
}),
),
))
}
None => {
let parts = sorry_response(
Some(chat_id),
Err(e) => {
let ai_help_message_meta = AiHelpMessageMetaInsert {
user_id: user.id,
chat_id,
message_id,
parent_id,
current.map(AIHelpLimit::from_count),
)?;
let stream = futures::stream::iter(parts.into_iter());
let res =
sse::Sse::from_stream(stream.map(|r| Ok::<_, ApiError>(sse::Event::Data(r))));

Ok(Either::Right(res))
status: (&e).into(),
..Default::default()
};
add_help_message_meta(&mut conn, ai_help_message_meta);
Err(e.into())
}
}
} else {
Expand Down
Loading
Loading