Skip to content
This repository was archived by the owner on Sep 18, 2024. It is now read-only.

feature: send message to assistants and assistants respond to message #32

Merged
merged 5 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bun.lockb
Binary file not shown.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"dependencies": {
"@elysiajs/bearer": "^0.7.0",
"@elysiajs/swagger": "^0.8.5",
"dayjs": "^1.11.10",
"elysia": "latest",
"ioredis": "^5.3.2",
"jsonwebtoken": "8.5.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ describe("assistantController", async () => {
method: "POST",
body: JSON.stringify({
name,
model: 'gpt-4',
instruction: 'You are a plant teacher. Always respond with "sprout"'
}),
});

Expand Down Expand Up @@ -44,12 +46,15 @@ describe("assistantController", async () => {
"Content-Type": "application/json",
},
body: JSON.stringify({
name: "test assistant",
name: "sprout_assistant",
model: "gpt-4",
instruction: "You are a plant teacher. Always respond with 'sprout'"
}),
});

const response: any = await app
.handle(request)

.then((response) => response.json());

expect(response.message).toBe(UNAUTHORIZED_MISSING_TOKEN.message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ assistants.post(
// create user for assistant
await createUser(assistantId, {});
// give user the proper role
await assignRole(assistantId, "agent");
await assignRole(assistantId, "assistant");

// creat assistant in db
await createAssistant({
userId,
id: assistantId,
model: "gpt-4",
model: body.model,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we plan a default if ever somebody does not specify a model? That's something we can eventually do from a UI standpoint also.

feel free to ignore, not blocking a merge

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think from the UI standpoint we can do a default!

name,
fileIds: [],
tools: [],
instruction: body.instruction
});

return {
Expand All @@ -55,7 +56,9 @@ assistants.post(
{
body: t.Object({
name: t.String(),
model: t.Literal("gpt-4"), // add more models here
instruction: t.String()
}),
beforeHandle: AuthMiddleware(["create_assistant", "*"]),
},
}
);
44 changes: 41 additions & 3 deletions src/core/application/controllers/thread/threadController.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Elysia, t } from "elysia";
import { ulid } from "ulid";

import { getTokenPermissions, parseToken } from "../../services/tokenService";
import { parseToken } from "../../services/tokenService";
import {
createThread,
deleteThread,
Expand All @@ -10,12 +10,12 @@ import {
} from "@/core/application/services/threadService";
import {
THREAD_DELETED_SUCCESSFULLY,
UNAUTHORIZED_NO_PERMISSION_DELETE,
UNAUTHORIZED_USER_NOT_OWNER,
UNAUTHORIZED_USER_NOT_PARTICIPANT,
} from "./returnValues";
import { createMessage } from "../../services/messageService";
import { AuthMiddleware } from "../../middlewares/authorizationMiddleware";
import { runAssistantWithThread } from "../../services/runService";

type ThreadDecorator = {
request: {
Expand Down Expand Up @@ -113,6 +113,9 @@ threads.get(
},
);

/**
* This adds a message to the thread, it can be from the assistant or from the human user
*/
threads.post(
"/thread/:id/message",
async ({ params, bearer, set, body }) => {
Expand All @@ -126,7 +129,7 @@ threads.post(
threadId,
userId,
);

// Check if the user has the permission to add a message
// if the user has * they can send a message anywhere, if not they need to be in conversation
if (isSuperUser || isParticipant) {
Expand All @@ -147,3 +150,38 @@ threads.post(
beforeHandle: AuthMiddleware(["create_message_in_own_thread", "*"]),
},
);

/**
* This runs and responds once with anything that's in the thread
*/
threads.post("/thread/:id/run", async ({ params, bearer, set, body }) => {
const decodedToken = await parseToken(bearer!);

if(decodedToken) {
const { userId, permissions } = decodedToken
const threadId = params.id;
const isSuperUser = permissions.some((p) => p.key === "*");
const isParticipant = await userOwnsOrParticipatesInThread(threadId, userId);


if(isSuperUser || isParticipant) {
// run the assistant with thread once, and get a single response
// this also adds the message to the thread
const response = await runAssistantWithThread({
thread_id: threadId,
assistant_id: body.assistant_id
})
set.status = 200
return response
}

set.status = 403
return UNAUTHORIZED_USER_NOT_PARTICIPANT;
}

}, {
body: t.Object({
assistant_id: t.String()
}),
beforeHandle: AuthMiddleware(['create_message_in_own_thread', '*'])
})
69 changes: 69 additions & 0 deletions src/core/application/controllers/thread/threadRun.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { app } from "@/index";
import { test, expect, describe } from "bun:test";
import { getLastMessage } from "../../services/messageService";
import { Message } from "@/core/domain/messages";
import { createHumanUserForTesting } from "@/__tests__/utils";

describe.only("threadController", async () => {
const token = await createHumanUserForTesting();

test("Run a created thread with a created assistant and save response from assistant", async () => {
// Creating a new thread
const thread_request = new Request("http://localhost:8080/thread", {
headers: {
authorization: `Bearer ${token}`,
"Content-Type": "application/json",
},
method: "POST",
});

const thread_response = await app.handle(thread_request);
const thread_response_json: any = await thread_response.json();
expect(thread_response_json).toHaveProperty('id')
const thread_id = thread_response_json.id

// Creating a new assistant
const assistant_name = "Skater Assistant"
const assistant_request = new Request("http://localhost:8080/assistant", {
headers: {
authorization: `Bearer ${token}`,
"Content-Type": "application/json",
},
method: "POST",
body: JSON.stringify({
name: assistant_name,
model: 'gpt-4',
instruction: "You are a pro skater, give very short skating tips. Always respond with 'skate on'."
}),
});
const assistant_req = await app.handle(assistant_request)
const assistant_req_json = await assistant_req.json()
expect(assistant_req_json).toHaveProperty("id")
const assistant_id = assistant_req_json.id

// Running a thread
const thread_run_request = new Request(`http://localhost:8080/thread/${thread_id}/run`, {
headers: {
authorization: `Bearer ${token}`,
"Content-Type": "application/json",
},
method: "POST",
body: JSON.stringify({
assistant_id: assistant_id
}),
});
const run_response = await app.handle(thread_run_request)
const run_json = await run_response.json()

// expect run_response to have thread_id, and thread_id's latest message to be from assistant, with the content of 'skate on'
expect(run_json).toHaveProperty('thread_id')
expect(run_json).toHaveProperty('assistant_id')

// get the latest message from thread id
const lastMessage: Message = await getLastMessage(run_json.thread_id)
expect(lastMessage.role).toBe('assistant')
expect(lastMessage.content.toLocaleLowerCase()).toContain('skate on')
})


});
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import {
} from "@/__tests__/utils";
import { app } from "@/index";
import { test, expect, describe, beforeAll } from "bun:test";
import { getUser } from "../../services/userService";
import { UNAUTHORIZED_MISSING_TOKEN } from "../../ports/returnValues";
import { getThread } from "../../services/threadService";
import { parseToken } from "../../services/tokenService";
import { Thread } from "@/core/domain/thread";
import { getUser } from "../../services/userService";

describe.only("userController", async () => {
let superAdminToken: string | null;
Expand Down
14 changes: 12 additions & 2 deletions src/core/application/services/assistantService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import { redis } from "@/infrastructure/adaptaters/redisAdapter";
* @throws {Error} If there is an error creating the assistant or adding the relationship.
*/
export async function createAssistant(args: Assistant & { userId: string }) {
const { id, fileIds, tools, userId, model, name } = args;
const { id, fileIds, tools, userId, model, name, instruction } = args;

// Create a pipeline for atomic operations
const pipeline = redis.pipeline();

// Store the assistant data
pipeline.set(`assistant:${id}`, JSON.stringify({ tools, model, name }));
pipeline.set(`assistant:${id}`, JSON.stringify({ tools, model, name, instruction }));

// Store the relationship between the assistant and the user
pipeline.sadd(`user:${userId}:assistants`, id);
Expand All @@ -33,3 +33,13 @@ export async function createAssistant(args: Assistant & { userId: string }) {
// Parse the assistant data from JSON
return JSON.parse(assistantData);
}

export async function getAssistantData(assistant_id: Assistant["id"]) {
const assistantData = await redis.get(`assistant:${assistant_id}`);
if (!assistantData) {
throw new Error("Failed to get assistant");
}

// Parse the assistant data from JSON
return JSON.parse(assistantData) as Assistant;
}
29 changes: 29 additions & 0 deletions src/core/application/services/messageService.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { v4 as uuidv4 } from "uuid";
import { redis } from "@/infrastructure/adaptaters/redisAdapter";
import { Message } from "@/core/domain/messages";
import dayjs from "dayjs";
import { getUserRole } from "./userService";

export async function createMessage(
userId: string,
Expand All @@ -9,6 +11,10 @@ export async function createMessage(
): Promise<Message> {
const messageId = uuidv4();
const timestamp = Date.now();
const userRole = await getUserRole(userId) // denormalize role information when creating messages for adapter

// if no userRole, throw because cannot create message
if(!userRole) throw new Error("User roles missing")

// Create a pipeline for atomic operations
const pipeline = redis.pipeline();
Expand All @@ -20,6 +26,7 @@ export async function createMessage(
id: messageId,
content: messageContent,
senderId: userId,
role: userRole.role,
timestamp,
})
);
Expand All @@ -38,9 +45,31 @@ export async function createMessage(
content: messageContent,
senderId: userId,
timestamp: new Date(timestamp),
role: userRole.role
};
}

export async function getAllMessage(threadId: string) {
// Get all the message IDs for the thread
const messageIds = await redis.smembers(`thread:${threadId}:messages`);

// If there are no messages, return null
if (messageIds.length === 0) return [];

// Get the data for all messages
const messages = await Promise.all(
messageIds.map(async (messageId) => {
const messageData = await redis.get(`message:${messageId}`) as string;
return JSON.parse(messageData) as Message;
})
);

// Sort the messages by timestamp in descending order
const allMessages = messages.sort((a, b) => dayjs(b.timestamp).diff(dayjs(a.timestamp)));

return allMessages;
}

export async function getLastMessage(threadId: string) {
// Get all the message IDs for the thread
const messageIds = await redis.smembers(`thread:${threadId}:messages`);
Expand Down
54 changes: 54 additions & 0 deletions src/core/application/services/runService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// run the thread with the associated assistant

import { ThreadRun, ThreadRunRequest } from "@/core/domain/run";
import { getAssistantData } from "./assistantService";
import { getThread } from "./threadService";
import { createMessage, getAllMessage } from "./messageService";
import { gpt4Adapter } from "@/infrastructure/adaptaters/openai/gpt4Adapter";
import { Role } from "@/core/domain/roles";
import { v4 as uuidv4 } from "uuid";

export async function runAssistantWithThread(runData: ThreadRunRequest) {
// get all messages from the thread, and run it over to the assistant to get a response
const { assistant_id, thread_id } = runData;
const [assistantData, threadData] = await Promise.all([
getAssistantData(assistant_id),
getThread(thread_id),
]);

// If no thread data or assistant data, an error should be thrown as we need both to run a thread
if (!threadData || !assistantData) throw new Error("No thread or assistant found.");

const everyMessage = await getAllMessage(threadData.id);
// only get role and content from every message for context.
// TODO: We should truncate the context to fit context window for selected model.
const everyRoleAndContent = everyMessage.map((message) => {
// special case for super_admin, which really should just be user
return {
role: message.role === "super_admin" ? "user" : ("assistant" as Role),
content: message.content,
};
});

// Calls the appropriate adapter based on what model the assistant uses
if (assistantData.model === "gpt-4") {
const gpt4AdapterRes: any = await gpt4Adapter(
everyRoleAndContent,
assistantData.instruction
);

const assistantResponse: string = gpt4AdapterRes.choices[0].message.content;

// add assistant response to the thread
await createMessage(assistant_id, thread_id, assistantResponse);

const threadRunResponse: ThreadRun = {
id: uuidv4(),
assistant_id: assistant_id,
thread_id: thread_id,
created_at: new Date(),
};

return threadRunResponse;
}
}
9 changes: 8 additions & 1 deletion src/core/application/services/userService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// userService.js

import { Role, getRolePermissions } from "@/core/domain/roles";
import { HumanUserBody, User } from "@/core/domain/user";
import { HumanUserBody, User, UserRole } from "@/core/domain/user";
import { redis } from "@/infrastructure/adaptaters/redisAdapter";
import { ulid } from "ulid";

Expand Down Expand Up @@ -129,6 +129,13 @@ export const getUser = async (userId: string): Promise<User | null> => {
return JSON.parse(userData)
}

export const getUserRole = async (userId: string): Promise<UserRole | null> => {
const roleData = await redis.hget("user_roles", userId);
if (!roleData) return null

return JSON.parse(roleData) as UserRole
}

/**
* Creates a new human user with a unique identifier and assigns them a 'user' role.
* If the user cannot be created or the role cannot be assigned, the function returns `null`.
Expand Down
Loading
Loading