Skip to content

Commit

Permalink
examples: add Fetch And Embed Data Job Example (#3065)
Browse files Browse the repository at this point in the history
What:
Add Data Job that fetches public Confluence data and then creates
embeddings out of it.

Why:
To have a job that creates embeddings out of Confluence documents.

Closes #2992

---------

Signed-off-by: Yoan Salambashev <[email protected]>
  • Loading branch information
yonitoo authored Feb 1, 2024
1 parent eae895a commit 09c49ba
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 0 deletions.
54 changes: 54 additions & 0 deletions examples/fetch-embed-job-example/10_fetch_confluence_space.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import csv
import logging
import pathlib

from config import DOCUMENTS_CSV_FILE_LOCATION
from langchain_community.document_loaders import ConfluenceLoader
from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def fetch_confluence_space(url, token, space_key):
try:
# For more info regarding the LangChain ConfluenceLoader:
# https://python.langchain.com/docs/integrations/document_loaders/confluence
loader = ConfluenceLoader(url=url, token=token)
documents = loader.load(
space_key=space_key, include_attachments=True, limit=50, max_pages=50
)
return documents
except Exception as e:
log.error(f"Error fetching documents from Confluence: {e}")
return None


def write_documents_to_csv(documents, filename):
with open(filename, mode="w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
for doc in documents:
writer.writerow([doc.page_content])


def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

confluence_url = job_input.get_property(
"confluence_url", "https://yoansalambashev.atlassian.net/"
)
# since the Confluence space is public, no need to generate API token
token = ""
space_key = job_input.get_property("space_key", "RESEARCH")
data_job_dir = pathlib.Path(job_input.get_job_directory())
output_csv = data_job_dir / DOCUMENTS_CSV_FILE_LOCATION

docs = fetch_confluence_space(confluence_url, token, space_key)

if docs:
log.info(f"{len(docs)} documents fetched successfully.")
write_documents_to_csv(docs, output_csv)
log.info(f"Documents written to {output_csv}")
else:
log.error(f"Failed to fetch any documents from the space with key {space_key}.")
104 changes: 104 additions & 0 deletions examples/fetch-embed-job-example/20_clean_and_embed_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import csv
import logging
import pathlib
import re

import nltk
from config import DOCUMENTS_CSV_FILE_LOCATION
from config import EMBEDDINGS_PKL_FILE_LOCATION
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sentence_transformers import SentenceTransformer
from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def clean_text(text):
"""
Prepares text for NLP tasks (embedding and RAG) by standardizing its form. It focuses on retaining
meaningful words and achieving consistency in their representation. This involves
converting to lowercase (uniformity), removing punctuation and stopwords
(focusing on relevant words), and lemmatization (reducing words to their base form).
Such preprocessing is crucial for effective NLP analysis.
:param text: A string containing the text to be processed.
:return: The processed text as a string.
"""
text = text.lower()
# remove punctuation and special characters
text = re.sub(r"[^\w\s]", "", text)
# remove stopwords and lemmatize
stop_words = set(stopwords.words("english"))
lemmatizer = WordNetLemmatizer()
text = " ".join(
[lemmatizer.lemmatize(word) for word in text.split() if word not in stop_words]
)
return text


def load_and_clean_documents(filename):
cleaned_documents = []
with open(filename, encoding="utf-8") as file:
reader = csv.reader(file)
next(reader, None)
for row in reader:
if row:
cleaned_text = clean_text(row[0])
cleaned_documents.append([cleaned_text])
return cleaned_documents


def save_cleaned_documents(cleaned_documents, output_file):
with open(output_file, mode="w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
writer.writerows(cleaned_documents)


def embed_documents_in_batches(documents):
# the model card: https://huggingface.co/sentence-transformers/all-mpnet-base-v2
model = SentenceTransformer("all-mpnet-base-v2")
total = len(documents)
log.info(f"total: {total}")
embeddings = []
for start_index in range(0, total):
# the resources are not enough to batch 2 documents at a time, so the batch = 1 doc
batch = documents[start_index]
log.info(f"BATCH: {len(batch)}.")
embeddings.extend(model.encode(batch, show_progress_bar=True))
return embeddings


def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

input_csv = DOCUMENTS_CSV_FILE_LOCATION
# output_cleaned_csv = 'documents_cleaned.csv'
data_job_dir = pathlib.Path(job_input.get_job_directory())
output_embeddings = data_job_dir / EMBEDDINGS_PKL_FILE_LOCATION

# create a temporary (until the end of the job execution) dir with
# write permissions to store the relevant nltk dependencies
temp_dir = job_input.get_temporary_write_directory()
nltk_data_path = temp_dir / "nltk_data"
nltk_data_path.mkdir(exist_ok=True)
nltk.data.path.append(str(nltk_data_path))

nltk.download("stopwords", download_dir=str(nltk_data_path))
nltk.download("wordnet", download_dir=str(nltk_data_path))

cleaned_documents = load_and_clean_documents(input_csv)
if cleaned_documents:
log.info(
f"{len(cleaned_documents)} documents loaded and cleaned for embedding."
)
# save_cleaned_documents(cleaned_documents, output_cleaned_csv)
# log.info(f"Cleaned documents saved to {output_cleaned_csv}")
embeddings = embed_documents_in_batches(cleaned_documents)
with open(output_embeddings, "wb") as file:
import pickle

pickle.dump(embeddings, file)
log.info(f"Embeddings saved to {output_embeddings}")
25 changes: 25 additions & 0 deletions examples/fetch-embed-job-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Fetch And Embed Data Job Example

The following Versatile Data Kit example allows you to fetch the data from public Confluence space and embed it.

# Fetch Confluence Data
The data is fetched in [10_fetch_confluence_space.py](./10_fetch_confluence_space.py) using the
[LangChain's ConfluenceLoader](https://python.langchain.com/docs/integrations/document_loaders/confluence).

# Create embeddings for the data
The fetched data from the previous step is read, cleaned and embedded using the
[all-mpnet-base-v2](https://huggingface.co/sentence-transformers/all-mpnet-base-v2) HuggingFace SentenceTransformer Embedding model:

# Run the example
To run the data job locally:
```bash
vdk run fetch-embed-job-example
```

To open the output embeddings pickle file, use:

```python
import pandas as pd

obj = pd.read_pickle(r'embeddings.pkl')
```
2 changes: 2 additions & 0 deletions examples/fetch-embed-job-example/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = my-team
5 changes: 5 additions & 0 deletions examples/fetch-embed-job-example/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0

DOCUMENTS_CSV_FILE_LOCATION = "documents.csv"
EMBEDDINGS_PKL_FILE_LOCATION = "embeddings.pkl"
Loading

0 comments on commit 09c49ba

Please sign in to comment.