Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: add Fetch And Embed Data Job Example #3065

Merged
merged 11 commits into from
Feb 1, 2024
49 changes: 49 additions & 0 deletions examples/fetch-embed-job-example/10_fetch_confluence_space.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import csv
import logging

from langchain_community.document_loaders import ConfluenceLoader
from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def fetch_confluence_space(url, token, space_key):
try:
# For more info regarding the LangChain ConfluenceLoader:
# https://python.langchain.com/docs/integrations/document_loaders/confluence
loader = ConfluenceLoader(url=url, token=token)
documents = loader.load(
space_key=space_key, include_attachments=True, limit=50, max_pages=50
)
return documents
except Exception as e:
log.error(f"Error fetching documents from Confluence: {e}")
return None


def write_documents_to_csv(documents, filename):
with open(filename, mode="w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
for doc in documents:
writer.writerow([doc.page_content])


def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

confluence_url = "https://yoansalambashev.atlassian.net/"
# since the Confluence space is public, no need to generate API token
token = ""

Check warning on line 38 in examples/fetch-embed-job-example/10_fetch_confluence_space.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/fetch-embed-job-example/10_fetch_confluence_space.py#L38

Possible hardcoded password: ''
space_key = "RESEARCH"
output_csv = "documents.csv"

docs = fetch_confluence_space(confluence_url, token, space_key)

if docs:
log.info(f"{len(docs)} documents fetched successfully.")
write_documents_to_csv(docs, output_csv)
log.info(f"Documents written to {output_csv}")
else:
log.error(f"Failed to fetch any documents from the space with key {space_key}.")
84 changes: 84 additions & 0 deletions examples/fetch-embed-job-example/20_clean_and_embed_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import csv
import logging
import re

import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sentence_transformers import SentenceTransformer
from vdk.api.job_input import IJobInput

nltk.download("stopwords")
nltk.download("wordnet")
nltk.download("punkt")

log = logging.getLogger(__name__)


def clean_text(text):
text = text.lower()
# remove punctuation and special characters
text = re.sub(r"[^\w\s]", "", text)
# remove stopwords and lemmatize
stop_words = set(stopwords.words("english"))
lemmatizer = WordNetLemmatizer()
text = " ".join(
[lemmatizer.lemmatize(word) for word in text.split() if word not in stop_words]
)
return text


def load_and_clean_documents(filename):
cleaned_documents = []
with open(filename, encoding="utf-8") as file:
reader = csv.reader(file)
next(reader, None)
for row in reader:
if row:
cleaned_text = clean_text(row[0])
cleaned_documents.append([cleaned_text])
return cleaned_documents


def save_cleaned_documents(cleaned_documents, output_file):
with open(output_file, mode="w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
writer.writerows(cleaned_documents)


def embed_documents_in_batches(documents):
# the model card: https://huggingface.co/sentence-transformers/all-mpnet-base-v2
model = SentenceTransformer("all-mpnet-base-v2")
total = len(documents)
log.info(f"total: {total}")
embeddings = []
for start_index in range(0, total):
# the resources are not enough to batch 2 documents at a time, so the batch = 1 doc
batch = documents[start_index]
log.info(f"BATCH: {len(batch)}.")
embeddings.extend(model.encode(batch, show_progress_bar=True))
return embeddings


def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

input_csv = "documents.csv"
# output_cleaned_csv = 'documents_cleaned.csv'
output_embeddings = "embeddings.pkl"

cleaned_documents = load_and_clean_documents(input_csv)
if cleaned_documents:
log.info(
f"{len(cleaned_documents)} documents loaded and cleaned for embedding."
)
# save_cleaned_documents(cleaned_documents, output_cleaned_csv)
# log.info(f"Cleaned documents saved to {output_cleaned_csv}")
embeddings = embed_documents_in_batches(cleaned_documents)
with open(output_embeddings, "wb") as file:
import pickle

Check warning on line 81 in examples/fetch-embed-job-example/20_clean_and_embed_data.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/fetch-embed-job-example/20_clean_and_embed_data.py#L81

Consider possible security implications associated with pickle module.

pickle.dump(embeddings, file)
log.info(f"Embeddings saved to {output_embeddings}")
25 changes: 25 additions & 0 deletions examples/fetch-embed-job-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Fetch And Embed Data Job Example

The following Versatile Data Kit example allows you to fetch the data from public Confluence space and embed it.

# Fetch Confluence Data
The data is fetched in [10_fetch_confluence_space.py](./10_fetch_confluence_space.py) using the
[LangChain's ConfluenceLoader](https://python.langchain.com/docs/integrations/document_loaders/confluence).

# Create embeddings for the data
The fetched data from the previous step is read, cleaned and embedded using the
[all-mpnet-base-v2](https://huggingface.co/sentence-transformers/all-mpnet-base-v2) HuggingFace SentenceTransformer Embedding model:

# Run the example
To run the data job locally:
```bash
vdk run fetch-embed-job-example
```

To open the output embeddings pickle file, use:

```python
import pandas as pd

obj = pd.read_pickle(r'embeddings.pkl')
```
2 changes: 2 additions & 0 deletions examples/fetch-embed-job-example/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = my-team
15 changes: 15 additions & 0 deletions examples/fetch-embed-job-example/documents.csv

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions examples/fetch-embed-job-example/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
atlassian-python-api
langchain_community
lxml
nltk
sentence-transformers
Loading