Skip to content

Commit

Permalink
examples: support non-local stoarge
Browse files Browse the repository at this point in the history
  • Loading branch information
antoniivanov committed Feb 15, 2024
1 parent 28ed822 commit 276fca1
Show file tree
Hide file tree
Showing 22 changed files with 476 additions and 119 deletions.
90 changes: 90 additions & 0 deletions examples/confluence-reader/common/database_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pickle

Check warning on line 3 in examples/confluence-reader/common/database_storage.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/confluence-reader/common/database_storage.py#L3

Consider possible security implications associated with pickle module.
from typing import Any
from typing import List
from typing import Optional
from typing import Union

from common.storage import IStorage
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import LargeBinary
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.exc import IntegrityError


class DatabaseStorage(IStorage):
def __init__(self, connection_string: str):
self.engine = create_engine(connection_string)
self.metadata = MetaData()
self.table = Table(
"vdk_storage",
self.metadata,
Column("name", String, primary_key=True),
Column("content", LargeBinary),
Column("content_type", String),
)
self.metadata.create_all(self.engine)

def store(self, name: str, content: Union[str, bytes, Any]) -> None:
serialized_content, content_type = self._serialize_content(content)
ins = self.table.insert().values(
name=name, content=serialized_content, content_type=content_type
)
try:
with self.engine.connect() as conn:
conn.execute(ins)
conn.commit()
except IntegrityError:
# Handle duplicate name by updating existing content
upd = (
self.table.update()
.where(self.table.c.name == name)
.values(content=serialized_content, content_type=content_type)
)
with self.engine.connect() as conn:
conn.execute(upd)
conn.commit()

def retrieve(self, name: str) -> Optional[Union[str, bytes, Any]]:
sel = self.table.select().where(self.table.c.name == name)
with self.engine.connect() as conn:
result = conn.execute(sel).fetchone()
if result:
return self._deserialize_content(result.content, result.content_type)
return None

def list_contents(self) -> List[str]:
sel = select(self.table.c.name)
with self.engine.connect() as conn:
result = conn.execute(sel).fetchall()
return [row[0] for row in result]

def remove(self, name: str) -> bool:
del_stmt = self.table.delete().where(self.table.c.name == name)
with self.engine.connect() as conn:
result = conn.execute(del_stmt)
conn.commit()
return result.rowcount > 0

@staticmethod
def _serialize_content(content: Union[str, bytes, Any]) -> tuple[bytes, str]:
if isinstance(content, bytes):
return content, "bytes"
elif isinstance(content, str):
return content.encode(), "string"
else:
# Fallback to pickle for other types
return pickle.dumps(content), "pickle"

@staticmethod
def _deserialize_content(content: bytes, content_type: Optional[str]) -> Any:
if content_type == "pickle":
return pickle.loads(content)

Check warning on line 87 in examples/confluence-reader/common/database_storage.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/confluence-reader/common/database_storage.py#L87

Pickle and modules that wrap it can be unsafe when used to deserialize untrusted data, possible security issue.
if content_type == "string":
return content.decode()
return content
57 changes: 57 additions & 0 deletions examples/confluence-reader/common/file_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import os
from typing import Any
from typing import List
from typing import Optional
from typing import Union

from storage import IStorage


class FileStorage(IStorage):
def __init__(self, base_path: str):
self.base_path = base_path
if not os.path.exists(self.base_path):
os.makedirs(self.base_path)

def _get_file_path(self, name: str) -> str:
return os.path.join(self.base_path, name)

def store(
self,
name: str,
content: Union[str, bytes, Any],
content_type: Optional[str] = None,
) -> None:
file_path = self._get_file_path(name)
with open(file_path, "w") as file:
if isinstance(content, (str, bytes)):
# Directly save strings and bytes
file.write(content if isinstance(content, str) else content.decode())
else:
# Assume JSON serializable for other types
json.dump(content, file)

def retrieve(self, name: str) -> Optional[Union[str, bytes, Any]]:
file_path = self._get_file_path(name)
if not os.path.exists(file_path):
return None
with open(file_path) as file:
try:
return json.load(file)
except json.JSONDecodeError:
# Content was not JSON, return as string
file.seek(0)
return file.read()

def list_contents(self) -> List[str]:
return os.listdir(self.base_path)

def remove(self, name: str) -> bool:
file_path = self._get_file_path(name)
if os.path.exists(file_path):
os.remove(file_path)
return True
return False
46 changes: 46 additions & 0 deletions examples/confluence-reader/common/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from typing import Any
from typing import List
from typing import Optional
from typing import Union


class IStorage:
def store(self, name: str, content: Union[str, bytes, Any]) -> None:
"""
Stores the given content under the specified name. If the content is not a string or bytes,
the method tries to serialize it based on the content_type (if provided) or infers the type.
:param name: The unique name to store the content under.
:param content: The content to store. Can be of type str, bytes, or any serializable type.
"""
pass

Check warning on line 18 in examples/confluence-reader/common/storage.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/confluence-reader/common/storage.py#L18

Unnecessary pass statement

def retrieve(self, name: str) -> Optional[Union[str, bytes, Any]]:
"""
Retrieves the content stored under the specified name. The method attempts to deserialize
the content to its original type if possible.
:param name: The name of the content to retrieve.
:return: The retrieved content, which can be of type str, bytes, or any deserialized Python object.
Returns None if the content does not exist.
"""
pass

def list_contents(self) -> List[str]:
"""
Lists the names of all stored contents.
:return: A list of names representing the stored contents.
"""
pass

def remove(self, name: str) -> bool:
"""
Removes the content stored under the specified name.
:param name: The name of the content to remove.
:return: True if the content was successfully removed, False otherwise.
"""
pass
50 changes: 31 additions & 19 deletions examples/confluence-reader/fetch_confluence_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import json
import logging
import os
import pathlib
from datetime import datetime

from common.database_storage import DatabaseStorage
from confluence_document import ConfluenceDocument
from langchain_community.document_loaders import ConfluenceLoader
from vdk.api.job_input import IJobInput


log = logging.getLogger(__name__)


Expand Down Expand Up @@ -109,17 +112,12 @@ def __init__(self, confluence_url, token, space_key):
self.loader = ConfluenceLoader(url=self.confluence_url, token=self.token)

def fetch_confluence_documents(self, cql_query):
try:
# TODO: think about configurable limits ? or some streaming solution
# How do we fit all documents in memory ?
raw_documents = self.loader.load(cql=cql_query, limit=50, max_pages=200)
return [
ConfluenceDocument(doc.metadata, doc.page_content)
for doc in raw_documents
]
except Exception as e:
log.error(f"Error fetching documents from Confluence: {e}")
raise e
# TODO: think about configurable limits ? or some streaming solution
# How do we fit all documents in memory ?
raw_documents = self.loader.load(cql=cql_query, limit=50, max_pages=200)
return [
ConfluenceDocument(doc.metadata, doc.page_content) for doc in raw_documents
]

def fetch_updated_pages_in_confluence_space(
self, last_date="1900-02-06 17:54", parent_page_id=None
Expand Down Expand Up @@ -147,9 +145,10 @@ def fetch_all_pages_in_confluence_space(self, parent_page_id=None):


def get_value(job_input, key: str, default_value=None):
return job_input.get_arguments().get(
key, job_input.get_property(key, os.environ.get(key.upper(), default_value))
)
value = os.environ.get(key.upper(), default_value)
value = job_input.get_property(key, value)
value = job_input.get_secret(key, value)
return job_input.get_arguments().get(key, value)


def set_property(job_input: IJobInput, key, value):
Expand All @@ -165,12 +164,20 @@ def run(job_input: IJobInput):
token = get_value(job_input, "confluence_token")
space_key = get_value(job_input, "confluence_space_key")
parent_page_id = get_value(job_input, "confluence_parent_page_id")
last_date = get_value(job_input, "last_date", "1900-01-01 12:00")
data_file = get_value(
job_input,
"data_file",
os.path.join(job_input.get_temporary_write_directory(), "confluence_data.json"),
last_date = (
job_input.get_property(confluence_url, {})
.setdefault(space_key, {})
.setdefault(parent_page_id, {})
.get("last_date", "1900-01-01 12:00")
)
data_file = os.path.join(
job_input.get_temporary_write_directory(), "confluence_data.json"
)
storage_name = get_value(job_input, "storage_name", "confluence_data")
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
# TODO: this is not optimal . We just care about the IDs, we should not need to retrieve everything
data = storage.retrieve(storage_name)
pathlib.Path(data_file).write_text(data if data else "[]")

confluence_reader = ConfluenceDataSource(confluence_url, token, space_key)

Expand All @@ -189,3 +196,8 @@ def run(job_input: IJobInput):
data_file,
confluence_reader.fetch_all_pages_in_confluence_space(parent_page_id),
)

# TODO: it would be better to save each page in separate row.
# But that's quick solution for now to pass the data to the next job

storage.store(storage_name, pathlib.Path(data_file).read_text())
2 changes: 2 additions & 0 deletions examples/confluence-reader/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
atlassian-python-api
langchain_community
lxml
psycopg2-binary
sqlalchemy
2 changes: 0 additions & 2 deletions examples/pgvector-embedder/00_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
def run(job_input: IJobInput):
properties = job_input.get_all_properties()

data_file = os.path.join(job_input.get_job_directory(), "documents_example.json")
output_embeddings = os.path.join(
job_input.get_temporary_write_directory(), "embeddings_example.pkl"
)
properties.update(
dict(
destination_embeddings_table="vdk_doc_embeddings",
destination_metadata_table="vdk_doc_metadata",
data_file=data_file,
output_embeddings=output_embeddings,
)
)
Expand Down
38 changes: 7 additions & 31 deletions examples/pgvector-embedder/20_clean_and_embed_json_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
import logging
import re

import nltk
from common.database_storage import DatabaseStorage
from config import get_value
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sentence_transformers import SentenceTransformer
from vdk.api.job_input import IJobInput

Expand All @@ -18,29 +16,20 @@ def clean_text(text):
"""
Prepares text for NLP tasks (embedding and RAG) by standardizing its form. It focuses on retaining
meaningful words and achieving consistency in their representation. This involves
converting to lowercase (uniformity), removing punctuation and stopwords
(focusing on relevant words), and lemmatization (reducing words to their base form).
Such preprocessing is crucial for effective NLP analysis.
converting to lowercase (uniformity),
:param text: A string containing the text to be processed.
:return: The processed text as a string.
"""
text = text.lower()
# remove punctuation and special characters
text = re.sub(r"[^\w\s]", "", text)
# remove stopwords and lemmatize
stop_words = set(stopwords.words("english"))
lemmatizer = WordNetLemmatizer()
text = " ".join(
[lemmatizer.lemmatize(word) for word in text.split() if word not in stop_words]
)
return text


def load_and_clean_documents(json_file_path):
def load_and_clean_documents(content):
cleaned_documents = []
with open(json_file_path, encoding="utf-8") as file:
documents = json.load(file)
documents = json.loads(content)

for doc in documents:
if "data" in doc:
Expand All @@ -67,28 +56,15 @@ def embed_documents_in_batches(documents):
return embeddings


def setup_nltk(temp_dir):
"""
Set up NLTK by creating a temporary directory for NLTK data and downloading required resources.
"""
nltk_data_path = temp_dir / "nltk_data"
nltk_data_path.mkdir(exist_ok=True)
nltk.data.path.append(str(nltk_data_path))

nltk.download("stopwords", download_dir=str(nltk_data_path))
nltk.download("wordnet", download_dir=str(nltk_data_path))


def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

input_json = get_value(job_input, "data_file")
output_embeddings = get_value(job_input, "output_embeddings")

temp_dir = job_input.get_temporary_write_directory()
setup_nltk(temp_dir)
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
storage_name = get_value(job_input, "storage_name", "confluence_data")

cleaned_documents = load_and_clean_documents(input_json)
cleaned_documents = load_and_clean_documents(storage.retrieve(storage_name))
if cleaned_documents:
log.info(
f"{len(cleaned_documents)} documents loaded and cleaned for embedding."
Expand Down
3 changes: 1 addition & 2 deletions examples/pgvector-embedder/30_create_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ CREATE TABLE IF NOT EXISTS public.{destination_metadata_table}
title TEXT,
source TEXT,
data TEXT,
deleted BOOLEAN,
CONSTRAINT fk_metadata_embeddings FOREIGN KEY (id) REFERENCES public.{destination_embeddings_table}(id)
deleted BOOLEAN
);
Loading

0 comments on commit 276fca1

Please sign in to comment.