Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: add Fetch And Embed Data Job Example #3065

Merged
merged 11 commits into from
Feb 1, 2024
54 changes: 54 additions & 0 deletions examples/fetch-embed-job-example/10_fetch_confluence_space.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import csv
import logging
import pathlib

from config import DOCUMENTS_CSV_FILE_LOCATION
from langchain_community.document_loaders import ConfluenceLoader
from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def fetch_confluence_space(url, token, space_key):
try:
# For more info regarding the LangChain ConfluenceLoader:
# https://python.langchain.com/docs/integrations/document_loaders/confluence
loader = ConfluenceLoader(url=url, token=token)
documents = loader.load(
space_key=space_key, include_attachments=True, limit=50, max_pages=50
)
return documents
except Exception as e:
log.error(f"Error fetching documents from Confluence: {e}")
return None


def write_documents_to_csv(documents, filename):
with open(filename, mode="w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
for doc in documents:
writer.writerow([doc.page_content])


def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

confluence_url = job_input.get_property(
"confluence_url", "https://yoansalambashev.atlassian.net/"
)
# since the Confluence space is public, no need to generate API token
token = ""

Check warning on line 42 in examples/fetch-embed-job-example/10_fetch_confluence_space.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/fetch-embed-job-example/10_fetch_confluence_space.py#L42

Possible hardcoded password: ''
space_key = job_input.get_property("space_key", "RESEARCH")
data_job_dir = pathlib.Path(job_input.get_job_directory())
output_csv = data_job_dir / DOCUMENTS_CSV_FILE_LOCATION

docs = fetch_confluence_space(confluence_url, token, space_key)

if docs:
log.info(f"{len(docs)} documents fetched successfully.")
write_documents_to_csv(docs, output_csv)
log.info(f"Documents written to {output_csv}")
else:
log.error(f"Failed to fetch any documents from the space with key {space_key}.")
104 changes: 104 additions & 0 deletions examples/fetch-embed-job-example/20_clean_and_embed_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import csv
import logging
import pathlib
import re

import nltk
from config import DOCUMENTS_CSV_FILE_LOCATION
from config import EMBEDDINGS_PKL_FILE_LOCATION
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sentence_transformers import SentenceTransformer
from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def clean_text(text):
"""
Prepares text for NLP tasks (embedding and RAG) by standardizing its form. It focuses on retaining
meaningful words and achieving consistency in their representation. This involves
converting to lowercase (uniformity), removing punctuation and stopwords
(focusing on relevant words), and lemmatization (reducing words to their base form).
Such preprocessing is crucial for effective NLP analysis.

:param text: A string containing the text to be processed.
:return: The processed text as a string.
"""
text = text.lower()
# remove punctuation and special characters
text = re.sub(r"[^\w\s]", "", text)
# remove stopwords and lemmatize
stop_words = set(stopwords.words("english"))
lemmatizer = WordNetLemmatizer()
text = " ".join(
[lemmatizer.lemmatize(word) for word in text.split() if word not in stop_words]
)
return text


def load_and_clean_documents(filename):
cleaned_documents = []
with open(filename, encoding="utf-8") as file:
reader = csv.reader(file)
next(reader, None)
for row in reader:
if row:
cleaned_text = clean_text(row[0])
cleaned_documents.append([cleaned_text])
return cleaned_documents


def save_cleaned_documents(cleaned_documents, output_file):
with open(output_file, mode="w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
writer.writerows(cleaned_documents)


def embed_documents_in_batches(documents):
# the model card: https://huggingface.co/sentence-transformers/all-mpnet-base-v2
model = SentenceTransformer("all-mpnet-base-v2")
total = len(documents)
log.info(f"total: {total}")
embeddings = []
for start_index in range(0, total):
# the resources are not enough to batch 2 documents at a time, so the batch = 1 doc
batch = documents[start_index]
log.info(f"BATCH: {len(batch)}.")
embeddings.extend(model.encode(batch, show_progress_bar=True))
return embeddings


def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

input_csv = DOCUMENTS_CSV_FILE_LOCATION
# output_cleaned_csv = 'documents_cleaned.csv'
data_job_dir = pathlib.Path(job_input.get_job_directory())
output_embeddings = data_job_dir / EMBEDDINGS_PKL_FILE_LOCATION

# create a temporary (until the end of the job execution) dir with
# write permissions to store the relevant nltk dependencies
temp_dir = job_input.get_temporary_write_directory()
nltk_data_path = temp_dir / "nltk_data"
nltk_data_path.mkdir(exist_ok=True)
nltk.data.path.append(str(nltk_data_path))

nltk.download("stopwords", download_dir=str(nltk_data_path))
nltk.download("wordnet", download_dir=str(nltk_data_path))

cleaned_documents = load_and_clean_documents(input_csv)
if cleaned_documents:
log.info(
f"{len(cleaned_documents)} documents loaded and cleaned for embedding."
)
# save_cleaned_documents(cleaned_documents, output_cleaned_csv)
# log.info(f"Cleaned documents saved to {output_cleaned_csv}")
embeddings = embed_documents_in_batches(cleaned_documents)
with open(output_embeddings, "wb") as file:
import pickle

Check warning on line 101 in examples/fetch-embed-job-example/20_clean_and_embed_data.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/fetch-embed-job-example/20_clean_and_embed_data.py#L101

Consider possible security implications associated with pickle module.

pickle.dump(embeddings, file)
log.info(f"Embeddings saved to {output_embeddings}")
25 changes: 25 additions & 0 deletions examples/fetch-embed-job-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Fetch And Embed Data Job Example

The following Versatile Data Kit example allows you to fetch the data from public Confluence space and embed it.

# Fetch Confluence Data
The data is fetched in [10_fetch_confluence_space.py](./10_fetch_confluence_space.py) using the
[LangChain's ConfluenceLoader](https://python.langchain.com/docs/integrations/document_loaders/confluence).

# Create embeddings for the data
The fetched data from the previous step is read, cleaned and embedded using the
[all-mpnet-base-v2](https://huggingface.co/sentence-transformers/all-mpnet-base-v2) HuggingFace SentenceTransformer Embedding model:

# Run the example
To run the data job locally:
```bash
vdk run fetch-embed-job-example
```

To open the output embeddings pickle file, use:

```python
import pandas as pd

obj = pd.read_pickle(r'embeddings.pkl')
```
2 changes: 2 additions & 0 deletions examples/fetch-embed-job-example/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = my-team
5 changes: 5 additions & 0 deletions examples/fetch-embed-job-example/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0

DOCUMENTS_CSV_FILE_LOCATION = "documents.csv"
EMBEDDINGS_PKL_FILE_LOCATION = "embeddings.pkl"
Loading
Loading