Skip to content

Commit

Permalink
examples: refactor a bit the examples
Browse files Browse the repository at this point in the history
- moved the common storage code to vdk-storage library
- refactored confluence-reader to directly use it instead of using file
  • Loading branch information
antoniivanov committed Feb 21, 2024
1 parent c168fc8 commit 19a752f
Show file tree
Hide file tree
Showing 24 changed files with 254 additions and 276 deletions.
4 changes: 2 additions & 2 deletions examples/confluence-reader/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ The `ConfluenceDataSource` class is the heart of this data job. It provides a se

These methods make use of the last_modification.txt file to determine the last modification date and track changes in the Confluence space, allowing for efficient data retrieval and management.

## JSON Data Format
## Output Data Format

The resulting JSON data (confluence_data.json) is generated using the `ConfluenceDocument` class (see confluence_document.py).
The resulting data is generated using the `ConfluenceDocument` class (see confluence_document.py).
It follows this structured format:

```json
Expand Down
2 changes: 1 addition & 1 deletion examples/confluence-reader/confluence_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


class ConfluenceDocument:
def __init__(self, metadata, data, deleted=False):
def __init__(self, metadata: dict, data: str, deleted=False):
"""
Initializes a ConfluenceDocument instance.
Expand Down
83 changes: 18 additions & 65 deletions examples/confluence-reader/fetch_confluence_space.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,33 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import os
import pathlib
from datetime import datetime

from common.database_storage import DatabaseStorage
from confluence_document import ConfluenceDocument
from langchain_community.document_loaders import ConfluenceLoader
from vdk.api.job_input import IJobInput

from vdk.plugin.storage.database_storage import DatabaseStorage

log = logging.getLogger(__name__)


def read_json_file(file_path):
try:
with open(file_path) as file:
return json.load(file)
except (FileNotFoundError, json.JSONDecodeError) as e:
log.error(f"Error reading JSON file: {e}")
return None


def write_json_file(file_path, data):
try:
with open(file_path, "w") as file:
json.dump(data, file, indent=4)
except OSError as e:
log.error(f"Error writing JSON file: {e}")


def update_saved_documents(file_path, new_docs):
existing_docs = read_json_file(file_path) or []

if (
isinstance(existing_docs, list)
and existing_docs
and isinstance(existing_docs[0], dict)
):
existing_docs = [
ConfluenceDocument(
doc["metadata"], doc["data"], doc["metadata"].get("deleted", False)
)
for doc in existing_docs
]

existing_docs_dict = {doc.metadata["id"]: doc for doc in existing_docs}

for doc in new_docs:
existing_docs_dict[doc.metadata["id"]] = doc
def merge_docs(existing_docs, new_docs) -> list:
if existing_docs:
existing_docs_dict = {doc.metadata["id"]: doc for doc in existing_docs}

updated_docs_list = list(existing_docs_dict.values())
for doc in new_docs:
existing_docs_dict[doc.metadata["id"]] = doc
return list(existing_docs_dict.values())
else:
return new_docs

serialized_docs = [doc.serialize() for doc in updated_docs_list]
write_json_file(file_path, serialized_docs)


def flag_deleted_pages(file_path, current_confluence_documents):
existing_docs = read_json_file(file_path)
def flag_deleted_pages(existing_docs, current_confluence_documents):
if existing_docs is None:
log.error("Existing documents not found. Exiting.")
return

existing_docs = [
ConfluenceDocument(
doc["metadata"], doc["data"], doc["metadata"].get("deleted", False)
)
for doc in existing_docs
]

current_page_ids = {doc.metadata["id"] for doc in current_confluence_documents}

num_deleted = 0
Expand All @@ -80,9 +37,6 @@ def flag_deleted_pages(file_path, current_confluence_documents):
num_deleted += 1
log.info(f"Found {num_deleted} deleted pages.")

serialized_docs = [doc.serialize() for doc in existing_docs]
write_json_file(file_path, serialized_docs)


class ConfluenceDataSource:
"""
Expand Down Expand Up @@ -170,34 +124,33 @@ def run(job_input: IJobInput):
.setdefault(parent_page_id, {})
.get("last_date", "1900-01-01 12:00")
)
data_file = os.path.join(
job_input.get_temporary_write_directory(), "confluence_data.json"
)
storage_name = get_value(job_input, "storage_name", "confluence_data")
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
# TODO: this is not optimal . We just care about the IDs, we should not need to retrieve everything
data = storage.retrieve(storage_name)
pathlib.Path(data_file).write_text(data if data else "[]")
existing_docs = storage.retrieve(storage_name)
if existing_docs:
existing_docs = [ConfluenceDocument(**doc) for doc in existing_docs]

confluence_reader = ConfluenceDataSource(confluence_url, token, space_key)

updated_docs = confluence_reader.fetch_updated_pages_in_confluence_space(
last_date, parent_page_id
)
log.info(f"Found {len(updated_docs)} updated pages")
update_saved_documents(data_file, updated_docs)
all_docs = merge_docs(existing_docs, updated_docs)

# This is buggy , it doesn't account for server timezone and local timezone
# But also assumes that server clock and local clock are synchronized (which they are likely not)
# The ts should be the one of the latest processed page.
set_property(job_input, "last_date", datetime.now().strftime("%Y-%m-%d %H:%M"))

flag_deleted_pages(
data_file,
all_docs,
confluence_reader.fetch_all_pages_in_confluence_space(parent_page_id),
)

# TODO: it would be better to save each page in separate row.
# But that's quick solution for now to pass the data to the next job

storage.store(storage_name, pathlib.Path(data_file).read_text())
log.info(f"Store {len(all_docs)} documents in {storage_name}")
# TODO: why not use job_input.send_object_for_ingestion ... it's our ingestion interface
storage.store(storage_name, [doc.serialize() for doc in all_docs])
3 changes: 2 additions & 1 deletion examples/confluence-reader/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
# The file is optional and can be deleted if no extra library dependencies are necessary.

atlassian-python-api
langchain_community
langchain-community
lxml
psycopg2-binary
sqlalchemy
vdk-storage
4 changes: 4 additions & 0 deletions examples/pgvector-embedder/00_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ def run(job_input: IJobInput):
)
)
job_input.set_all_properties(properties)

hf_home = job_input.get_temporary_write_directory() / "hf"
hf_home.mkdir(parents=True, exist_ok=True)
os.environ["HF_HOME"] = str(hf_home)
29 changes: 29 additions & 0 deletions examples/pgvector-embedder/01_local_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os

Check notice on line 3 in examples/pgvector-embedder/01_local_properties.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/pgvector-embedder/01_local_properties.py#L3

'os' imported but unused (F401)

Check warning on line 3 in examples/pgvector-embedder/01_local_properties.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/pgvector-embedder/01_local_properties.py#L3

Unused import os

from vdk.api.job_input import IJobInput


def set_property(job_input: IJobInput, key, value):
props = job_input.get_all_properties()
props[key] = value
job_input.set_all_properties(props)


def run(job_input: IJobInput):
set_property(
job_input,
"storage_connection_string",
"postgresql://pgadmin:[email protected]:5432/spulov-test",
)

props = job_input.get_all_properties()
props.update(
dict(
destination_metadata_table="vdk_confluence_sc_metadata",
destination_embeddings_table="vdk_confluence_sc_embeddings",
storage_name="supercollider_chunked",
)
)
job_input.set_all_properties(props)
14 changes: 14 additions & 0 deletions examples/pgvector-embedder/01_setup_huggingface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os.path
import pathlib

Check notice on line 4 in examples/pgvector-embedder/01_setup_huggingface.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/pgvector-embedder/01_setup_huggingface.py#L4

'pathlib' imported but unused (F401)

Check warning on line 4 in examples/pgvector-embedder/01_setup_huggingface.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/pgvector-embedder/01_setup_huggingface.py#L4

Unused import pathlib

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
# HF uses temporary directories in the process of its work
# So make sure to use only allowed ones
hf_home = job_input.get_temporary_write_directory() / "hf"
hf_home.mkdir(parents=True, exist_ok=True)
os.environ["HF_HOME"] = str(hf_home)
9 changes: 2 additions & 7 deletions examples/pgvector-embedder/20_embed_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@
import json
import logging

from common.database_storage import DatabaseStorage
from config import get_value
from sentence_transformers import SentenceTransformer
from vdk.api.job_input import IJobInput
from vdk.plugin.storage.database_storage import DatabaseStorage

log = logging.getLogger(__name__)


def load_documents(json_file_path):
with open(json_file_path, encoding="utf-8") as file:
return json.load(file)


def embed_documents_in_batches(documents):
# the model card: https://huggingface.co/sentence-transformers/all-mpnet-base-v2
model = SentenceTransformer("all-mpnet-base-v2")
Expand All @@ -39,7 +34,7 @@ def run(job_input: IJobInput):
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
storage_name = get_value(job_input, "storage_name", "confluence_data")

documents = load_documents(storage.retrieve(storage_name))
documents = storage.retrieve(storage_name)
if documents:
log.info(f"{len(documents)} chunks loaded and cleaned for embedding.")
embeddings = embed_documents_in_batches(documents)
Expand Down
8 changes: 7 additions & 1 deletion examples/pgvector-embedder/30_create_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ DROP TABLE IF EXISTS public.{destination_metadata_table} CASCADE;
DROP TABLE IF EXISTS public.{destination_embeddings_table} CASCADE;

-- TODO (missing vdk feature): we need to create the tables as the postgres plugin doesn't support automatic schema inference
CREATE TABLE IF NOT EXISTS public.{destination_embeddings_table}
(
id TEXT PRIMARY KEY,
embedding public.vector

Check warning on line 10 in examples/pgvector-embedder/30_create_schema.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/pgvector-embedder/30_create_schema.sql#L10

Expected TSQL Keyword to be capitalized
);

CREATE TABLE IF NOT EXISTS public.{destination_metadata_table}
(
id VARCHAR PRIMARY KEY,
id TEXT PRIMARY KEY,
title TEXT,
source TEXT,
data TEXT,
Expand Down
5 changes: 2 additions & 3 deletions examples/pgvector-embedder/40_ingest_embeddings.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import pickle

import numpy as np
from common.database_storage import DatabaseStorage
from config import get_value
from vdk.api.job_input import IJobInput
from vdk.plugin.storage.database_storage import DatabaseStorage

log = logging.getLogger(__name__)

Expand All @@ -21,7 +20,7 @@ def run(job_input: IJobInput):
embeddings = pickle.load(file)
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
storage_name = get_value(job_input, "storage_name", "confluence_data")
documents = json.loads(storage.retrieve(storage_name))
documents = storage.retrieve(storage_name)

# TODO: our postgres plugin doesn't support updates (upserts) so updating with same ID fails.

Expand Down
90 changes: 0 additions & 90 deletions examples/pgvector-embedder/common/database_storage.py

This file was deleted.

Loading

0 comments on commit 19a752f

Please sign in to comment.