Skip to content

Commit

Permalink
Pipeline: Ingestion pipeline (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
yassinsws authored Jun 21, 2024
1 parent 5381121 commit 83cd32a
Show file tree
Hide file tree
Showing 33 changed files with 879 additions and 121 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
application.local.yml
llm_config.local.yml

######################
# Docker
######################
/docker/.docker-data/artemis-data/*
!/docker/.docker-data/artemis-data/.gitkeep

########################
# Auto-generated rules #
Expand Down
7 changes: 7 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ class APIKeyConfig(BaseModel):
token: str


class WeaviateSettings(BaseModel):
host: str
port: int
grpc_port: int


class Settings(BaseModel):
api_keys: list[APIKeyConfig]
env_vars: dict[str, str]
weaviate: WeaviateSettings

@classmethod
def get_settings(cls):
Expand Down
7 changes: 4 additions & 3 deletions app/domain/data/image_message_content_dto.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from pydantic import BaseModel
from pydantic import BaseModel, Field, ConfigDict
from typing import Optional


class ImageMessageContentDTO(BaseModel):
base64: str
prompt: Optional[str]
base64: str = Field(..., alias="pdfFile")
prompt: Optional[str] = None
model_config = ConfigDict(populate_by_name=True)
11 changes: 6 additions & 5 deletions app/domain/data/lecture_unit_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

class LectureUnitDTO(BaseModel):
to_update: bool = Field(alias="toUpdate")
pdf_file_base64: str = Field(alias="pdfFile")
base_url: str = Field(alias="artemisBaseUrl")
pdf_file_base64: str = Field(default="", alias="pdfFile")
lecture_unit_id: int = Field(alias="lectureUnitId")
lecture_unit_name: str = Field(alias="lectureUnitName")
lecture_unit_name: str = Field(default="", alias="lectureUnitName")
lecture_id: int = Field(alias="lectureId")
lecture_name: str = Field(alias="lectureName")
lecture_name: str = Field(default="", alias="lectureName")
course_id: int = Field(alias="courseId")
course_name: str = Field(alias="courseName")
course_description: str = Field(alias="courseDescription")
course_name: str = Field(default="", alias="courseName")
course_description: str = Field(default="", alias="courseDescription")
Empty file.
12 changes: 12 additions & 0 deletions app/domain/ingestion/ingestion_pipeline_execution_dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import List

from pydantic import Field

from app.domain import PipelineExecutionDTO
from app.domain.data.lecture_unit_dto import LectureUnitDTO


class IngestionPipelineExecutionDto(PipelineExecutionDTO):
lecture_units: List[LectureUnitDTO] = Field(
..., alias="pyrisLectureUnitWebhookDTOS"
)
7 changes: 7 additions & 0 deletions app/domain/ingestion/ingestion_status_update_dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import Optional

from ...domain.status.status_update_dto import StatusUpdateDTO


class IngestionStatusUpdateDTO(StatusUpdateDTO):
result: Optional[str] = None
4 changes: 3 additions & 1 deletion app/domain/pipeline_execution_settings_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@

class PipelineExecutionSettingsDTO(BaseModel):
authentication_token: str = Field(alias="authenticationToken")
allowed_model_identifiers: List[str] = Field(alias="allowedModelIdentifiers")
allowed_model_identifiers: List[str] = Field(
default=[], alias="allowedModelIdentifiers"
)
artemis_base_url: str = Field(alias="artemisBaseUrl")
14 changes: 0 additions & 14 deletions app/ingestion/abstract_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,3 @@ def chunk_data(self, path: str) -> List[Dict[str, str]]:
Abstract method to chunk code files in the root directory.
"""
pass

@abstractmethod
def ingest(self, path: str) -> bool:
"""
Abstract method to ingest repositories into the database.
"""
pass

@abstractmethod
def update(self, path: str):
"""
Abstract method to update a repository in the database.
"""
pass
40 changes: 30 additions & 10 deletions app/llm/external/openai_chat.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging
import time
from datetime import datetime
from typing import Literal, Any

Expand Down Expand Up @@ -78,16 +80,34 @@ def chat(
self, messages: list[PyrisMessage], arguments: CompletionArguments
) -> PyrisMessage:
# noinspection PyTypeChecker
response = self._client.chat.completions.create(
model=self.model,
messages=convert_to_open_ai_messages(messages),
temperature=arguments.temperature,
max_tokens=arguments.max_tokens,
response_format=ResponseFormat(
type=("json_object" if arguments.response_format == "JSON" else "text")
),
)
return convert_to_iris_message(response.choices[0].message)
retries = 10
backoff_factor = 2
initial_delay = 1

for attempt in range(retries):
try:
if arguments.response_format == "JSON":
response = self._client.chat.completions.create(
model=self.model,
messages=convert_to_open_ai_messages(messages),
temperature=arguments.temperature,
max_tokens=arguments.max_tokens,
response_format=ResponseFormat(type="json_object"),
)
else:
response = self._client.chat.completions.create(
model=self.model,
messages=convert_to_open_ai_messages(messages),
temperature=arguments.temperature,
max_tokens=arguments.max_tokens,
)
return convert_to_iris_message(response.choices[0].message)
except Exception as e:
wait_time = initial_delay * (backoff_factor**attempt)
logging.warning(f"Exception on attempt {attempt + 1}: {e}")
logging.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
logging.error("Failed to interpret image after several attempts.")


class DirectOpenAIChatModel(OpenAIChatModel):
Expand Down
27 changes: 22 additions & 5 deletions app/llm/external/openai_embeddings.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from typing import Literal, Any
from openai import OpenAI
from openai.lib.azure import AzureOpenAI

from ...llm.external.model import EmbeddingModel
import time


class OpenAIEmbeddingModel(EmbeddingModel):
Expand All @@ -11,12 +13,27 @@ class OpenAIEmbeddingModel(EmbeddingModel):
_client: OpenAI

def embed(self, text: str) -> list[float]:
response = self._client.embeddings.create(
model=self.model,
input=text,
encoding_format="float",
retries = 10
backoff_factor = 2
initial_delay = 1

for attempt in range(retries):
try:
response = self._client.embeddings.create(
model=self.model,
input=text,
encoding_format="float",
)
return response.data[0].embedding
except Exception as e:
wait_time = initial_delay * (backoff_factor**attempt)
logging.warning(f"Rate limit exceeded on attempt {attempt + 1}: {e}")
logging.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
logging.error(
"Failed to get embedding after several attempts due to rate limit."
)
return response.data[0].embedding
return []


class DirectOpenAIEmbeddingModel(OpenAIEmbeddingModel):
Expand Down
1 change: 1 addition & 0 deletions app/llm/request_handler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ..request_handler.request_handler_interface import RequestHandler
from ..request_handler.basic_request_handler import BasicRequestHandler

from ..request_handler.capability_request_handler import (
CapabilityRequestHandler,
CapabilityRequestHandlerSelectionMode,
Expand Down
14 changes: 11 additions & 3 deletions app/pipeline/chat/lecture_chat_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)
from langchain_core.runnables import Runnable

from ..shared.citation_pipeline import CitationPipeline
from ...common import convert_iris_message_to_langchain_message
from ...domain import PyrisMessage
from ...llm import CapabilityRequestHandler, RequirementList
Expand Down Expand Up @@ -42,7 +43,8 @@ def lecture_initial_prompt():
questions about the lectures. To answer them the best way, relevant lecture content is provided to you with the
student's question. If the context provided to you is not enough to formulate an answer to the student question
you can simply ask the student to elaborate more on his question. Use only the parts of the context provided for
you that is relevant to the student's question. """
you that is relevant to the student's question. If the user greets you greet him back, and ask him how you can help
"""


class LectureChatPipeline(Pipeline):
Expand All @@ -60,14 +62,15 @@ def __init__(self):
privacy_compliance=True,
)
)
completion_args = CompletionArguments(temperature=0.2, max_tokens=2000)
completion_args = CompletionArguments(temperature=0, max_tokens=2000)
self.llm = IrisLangchainChatModel(
request_handler=request_handler, completion_args=completion_args
)
# Create the pipelines
self.db = VectorDatabase()
self.retriever = LectureRetrieval(self.db.client)
self.pipeline = self.llm | StrOutputParser()
self.citation_pipeline = CitationPipeline()

def __repr__(self):
return f"{self.__class__.__name__}(llm={self.llm})"
Expand Down Expand Up @@ -98,15 +101,20 @@ def __call__(self, dto: LectureChatPipelineExecutionDTO):
student_query=query.contents[0].text_content,
result_limit=10,
course_name=dto.course.name,
course_id=dto.course.id,
base_url=dto.settings.artemis_base_url,
)

self._add_relevant_chunks_to_prompt(retrieved_lecture_chunks)
prompt_val = self.prompt.format_messages()
self.prompt = ChatPromptTemplate.from_messages(prompt_val)
try:
response = (self.prompt | self.pipeline).invoke({})
response_with_citation = self.citation_pipeline(
retrieved_lecture_chunks, response
)
logger.info(f"Response from lecture chat pipeline: {response}")
return response
return response_with_citation
except Exception as e:
raise e

Expand Down
80 changes: 56 additions & 24 deletions app/pipeline/chat/tutor_chat_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
PromptTemplate,
)
from langchain_core.runnables import Runnable
from weaviate.collections.classes.filters import Filter

from .lecture_chat_pipeline import LectureChatPipeline
from .output_models.output_models.selected_paragraphs import SelectedParagraphs
Expand Down Expand Up @@ -86,19 +87,27 @@ def __call__(self, dto: TutorChatPipelineExecutionDTO, **kwargs):
:param dto: execution data transfer object
:param kwargs: The keyword arguments
"""
execution_dto = LectureChatPipelineExecutionDTO(
settings=dto.settings, course=dto.course, chatHistory=dto.chat_history
)
lecture_chat_thread = threading.Thread(
target=self._run_lecture_chat_pipeline(execution_dto), args=(dto,)
)
tutor_chat_thread = threading.Thread(
target=self._run_tutor_chat_pipeline(dto), args=(dto,)
)
lecture_chat_thread.start()
tutor_chat_thread.start()

try:
should_execute_lecture_pipeline = self.should_execute_lecture_pipeline(
dto.course.id
)
self.lecture_chat_response = ""
if should_execute_lecture_pipeline:
execution_dto = LectureChatPipelineExecutionDTO(
settings=dto.settings,
course=dto.course,
chatHistory=dto.chat_history,
)
lecture_chat_thread = threading.Thread(
target=self._run_lecture_chat_pipeline(execution_dto), args=(dto,)
)
lecture_chat_thread.start()

tutor_chat_thread = threading.Thread(
target=self._run_tutor_chat_pipeline(dto),
args=(dto, should_execute_lecture_pipeline),
)
tutor_chat_thread.start()
response = self.choose_best_response(
[self.tutor_chat_response, self.lecture_chat_response],
dto.chat_history[-1].contents[0].text_content,
Expand All @@ -107,7 +116,6 @@ def __call__(self, dto: TutorChatPipelineExecutionDTO, **kwargs):
logger.info(f"Response from tutor chat pipeline: {response}")
self.callback.done("Generated response", final_result=response)
except Exception as e:
print(e)
self.callback.error(f"Failed to generate response: {e}")

def choose_best_response(
Expand Down Expand Up @@ -152,7 +160,11 @@ def _run_lecture_chat_pipeline(self, dto: LectureChatPipelineExecutionDTO):
pipeline = LectureChatPipeline()
self.lecture_chat_response = pipeline(dto=dto)

def _run_tutor_chat_pipeline(self, dto: TutorChatPipelineExecutionDTO):
def _run_tutor_chat_pipeline(
self,
dto: TutorChatPipelineExecutionDTO,
should_execute_lecture_pipeline: bool = False,
):
"""
Runs the pipeline
:param dto: execution data transfer object
Expand Down Expand Up @@ -208,16 +220,18 @@ def _run_tutor_chat_pipeline(self, dto: TutorChatPipelineExecutionDTO):
submission,
selected_files,
)

retrieved_lecture_chunks = self.retriever(
chat_history=history,
student_query=query.contents[0].text_content,
result_limit=10,
course_name=dto.course.name,
problem_statement=problem_statement,
exercise_title=exercise_title,
)
self._add_relevant_chunks_to_prompt(retrieved_lecture_chunks)
if should_execute_lecture_pipeline:
retrieved_lecture_chunks = self.retriever(
chat_history=history,
student_query=query.contents[0].text_content,
result_limit=5,
course_name=dto.course.name,
problem_statement=problem_statement,
exercise_title=exercise_title,
course_id=dto.course.id,
base_url=dto.settings.artemis_base_url,
)
self._add_relevant_chunks_to_prompt(retrieved_lecture_chunks)

self.callback.in_progress("Generating response...")

Expand Down Expand Up @@ -360,3 +374,21 @@ def _add_relevant_chunks_to_prompt(self, retrieved_lecture_chunks: List[dict]):
self.prompt += SystemMessagePromptTemplate.from_template(
"USE ONLY THE CONTENT YOU NEED TO ANSWER THE QUESTION:\n"
)

def should_execute_lecture_pipeline(self, course_id: int) -> bool:
"""
Checks if the lecture pipeline should be executed
:param course_id: The course ID
:return: True if the lecture pipeline should be executed
"""
if course_id:
# Fetch the first object that matches the course ID with the language property
result = self.db.lectures.query.fetch_objects(
filters=Filter.by_property(LectureSchema.COURSE_ID.value).equal(
course_id
),
limit=1,
return_properties=[LectureSchema.COURSE_NAME.value],
)
return len(result.objects) > 0
return False
Loading

0 comments on commit 83cd32a

Please sign in to comment.