Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: refactor a bit the examples #3127

Merged
merged 4 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/confluence-reader/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ The `ConfluenceDataSource` class is the heart of this data job. It provides a se

These methods make use of the last_modification.txt file to determine the last modification date and track changes in the Confluence space, allowing for efficient data retrieval and management.

## JSON Data Format
## Output Data Format

The resulting JSON data (confluence_data.json) is generated using the `ConfluenceDocument` class (see confluence_document.py).
The resulting data is generated using the `ConfluenceDocument` class (see confluence_document.py).
It follows this structured format:

```json
Expand Down
2 changes: 1 addition & 1 deletion examples/confluence-reader/confluence_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


class ConfluenceDocument:
def __init__(self, metadata, data, deleted=False):
def __init__(self, metadata: dict, data: str, deleted=False):
"""
Initializes a ConfluenceDocument instance.

Expand Down
83 changes: 18 additions & 65 deletions examples/confluence-reader/fetch_confluence_space.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,33 @@
# Copyright 2023-2024 Broadcom
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import os
import pathlib
from datetime import datetime

from common.database_storage import DatabaseStorage
from confluence_document import ConfluenceDocument
from langchain_community.document_loaders import ConfluenceLoader
from vdk.api.job_input import IJobInput

from vdk.plugin.storage.database_storage import DatabaseStorage

log = logging.getLogger(__name__)


def read_json_file(file_path):
try:
with open(file_path) as file:
return json.load(file)
except (FileNotFoundError, json.JSONDecodeError) as e:
log.error(f"Error reading JSON file: {e}")
return None


def write_json_file(file_path, data):
try:
with open(file_path, "w") as file:
json.dump(data, file, indent=4)
except OSError as e:
log.error(f"Error writing JSON file: {e}")


def update_saved_documents(file_path, new_docs):
existing_docs = read_json_file(file_path) or []

if (
isinstance(existing_docs, list)
and existing_docs
and isinstance(existing_docs[0], dict)
):
existing_docs = [
ConfluenceDocument(
doc["metadata"], doc["data"], doc["metadata"].get("deleted", False)
)
for doc in existing_docs
]

existing_docs_dict = {doc.metadata["id"]: doc for doc in existing_docs}

for doc in new_docs:
existing_docs_dict[doc.metadata["id"]] = doc
def merge_docs(existing_docs, new_docs) -> list:
if existing_docs:
existing_docs_dict = {doc.metadata["id"]: doc for doc in existing_docs}

updated_docs_list = list(existing_docs_dict.values())
for doc in new_docs:
existing_docs_dict[doc.metadata["id"]] = doc
return list(existing_docs_dict.values())
else:
return new_docs

serialized_docs = [doc.serialize() for doc in updated_docs_list]
write_json_file(file_path, serialized_docs)


def flag_deleted_pages(file_path, current_confluence_documents):
existing_docs = read_json_file(file_path)
def flag_deleted_pages(existing_docs, current_confluence_documents):
if existing_docs is None:
log.error("Existing documents not found. Exiting.")
return

existing_docs = [
ConfluenceDocument(
doc["metadata"], doc["data"], doc["metadata"].get("deleted", False)
)
for doc in existing_docs
]

current_page_ids = {doc.metadata["id"] for doc in current_confluence_documents}

num_deleted = 0
Expand All @@ -80,9 +37,6 @@ def flag_deleted_pages(file_path, current_confluence_documents):
num_deleted += 1
log.info(f"Found {num_deleted} deleted pages.")

serialized_docs = [doc.serialize() for doc in existing_docs]
write_json_file(file_path, serialized_docs)


class ConfluenceDataSource:
"""
Expand Down Expand Up @@ -170,34 +124,33 @@ def run(job_input: IJobInput):
.setdefault(parent_page_id, {})
.get("last_date", "1900-01-01 12:00")
)
data_file = os.path.join(
job_input.get_temporary_write_directory(), "confluence_data.json"
)
storage_name = get_value(job_input, "storage_name", "confluence_data")
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
# TODO: this is not optimal . We just care about the IDs, we should not need to retrieve everything
data = storage.retrieve(storage_name)
pathlib.Path(data_file).write_text(data if data else "[]")
existing_docs = storage.retrieve(storage_name)
if existing_docs:
existing_docs = [ConfluenceDocument(**doc) for doc in existing_docs]

confluence_reader = ConfluenceDataSource(confluence_url, token, space_key)

updated_docs = confluence_reader.fetch_updated_pages_in_confluence_space(
last_date, parent_page_id
)
log.info(f"Found {len(updated_docs)} updated pages")
update_saved_documents(data_file, updated_docs)
all_docs = merge_docs(existing_docs, updated_docs)

# This is buggy , it doesn't account for server timezone and local timezone
# But also assumes that server clock and local clock are synchronized (which they are likely not)
# The ts should be the one of the latest processed page.
set_property(job_input, "last_date", datetime.now().strftime("%Y-%m-%d %H:%M"))

flag_deleted_pages(
data_file,
all_docs,
confluence_reader.fetch_all_pages_in_confluence_space(parent_page_id),
)

# TODO: it would be better to save each page in separate row.
# But that's quick solution for now to pass the data to the next job

storage.store(storage_name, pathlib.Path(data_file).read_text())
log.info(f"Store {len(all_docs)} documents in {storage_name}")
# TODO: why not use job_input.send_object_for_ingestion ... it's our ingestion interface
storage.store(storage_name, [doc.serialize() for doc in all_docs])
3 changes: 2 additions & 1 deletion examples/confluence-reader/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
# The file is optional and can be deleted if no extra library dependencies are necessary.

atlassian-python-api
langchain_community
langchain-community
lxml
psycopg2-binary
sqlalchemy
vdk-storage
4 changes: 4 additions & 0 deletions examples/pgvector-embedder/00_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ def run(job_input: IJobInput):
)
)
job_input.set_all_properties(properties)

hf_home = job_input.get_temporary_write_directory() / "hf"
hf_home.mkdir(parents=True, exist_ok=True)
os.environ["HF_HOME"] = str(hf_home)
17 changes: 17 additions & 0 deletions examples/pgvector-embedder/01_setup_huggingface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2023-2024 Broadcom
# SPDX-License-Identifier: Apache-2.0

# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os.path
import pathlib

Check notice on line 7 in examples/pgvector-embedder/01_setup_huggingface.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/pgvector-embedder/01_setup_huggingface.py#L7

'pathlib' imported but unused (F401)

Check warning on line 7 in examples/pgvector-embedder/01_setup_huggingface.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/pgvector-embedder/01_setup_huggingface.py#L7

Unused import pathlib

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
# HF uses temporary directories in the process of its work
# So make sure to use only allowed ones
hf_home = job_input.get_temporary_write_directory() / "hf"
hf_home.mkdir(parents=True, exist_ok=True)
os.environ["HF_HOME"] = str(hf_home)
9 changes: 2 additions & 7 deletions examples/pgvector-embedder/20_embed_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,14 @@
import json
import logging

from common.database_storage import DatabaseStorage
from config import get_value
from sentence_transformers import SentenceTransformer
from vdk.api.job_input import IJobInput
from vdk.plugin.storage.database_storage import DatabaseStorage

log = logging.getLogger(__name__)


def load_documents(json_file_path):
with open(json_file_path, encoding="utf-8") as file:
return json.load(file)


def embed_documents_in_batches(documents):
# the model card: https://huggingface.co/sentence-transformers/all-mpnet-base-v2
model = SentenceTransformer("all-mpnet-base-v2")
Expand All @@ -41,7 +36,7 @@ def run(job_input: IJobInput):
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
storage_name = get_value(job_input, "storage_name", "confluence_data")

documents = load_documents(storage.retrieve(storage_name))
documents = storage.retrieve(storage_name)
if documents:
log.info(f"{len(documents)} chunks loaded and cleaned for embedding.")
embeddings = embed_documents_in_batches(documents)
Expand Down
8 changes: 7 additions & 1 deletion examples/pgvector-embedder/30_create_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
DROP TABLE IF EXISTS public.{destination_embeddings_table} CASCADE;

-- TODO (missing vdk feature): we need to create the tables as the postgres plugin doesn't support automatic schema inference
CREATE TABLE IF NOT EXISTS public.{destination_embeddings_table}
(
id TEXT PRIMARY KEY,
embedding public.vector

Check warning on line 10 in examples/pgvector-embedder/30_create_schema.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/pgvector-embedder/30_create_schema.sql#L10

Expected TSQL Keyword to be capitalized
);

CREATE TABLE IF NOT EXISTS public.{destination_metadata_table}
(
id VARCHAR PRIMARY KEY,
id TEXT PRIMARY KEY,
title TEXT,
source TEXT,
data TEXT,
Expand Down
5 changes: 2 additions & 3 deletions examples/pgvector-embedder/40_ingest_embeddings.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# Copyright 2023-2024 Broadcom
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import pickle

import numpy as np
from common.database_storage import DatabaseStorage
from config import get_value
from vdk.api.job_input import IJobInput
from vdk.plugin.storage.database_storage import DatabaseStorage

log = logging.getLogger(__name__)

Expand All @@ -21,7 +20,7 @@ def run(job_input: IJobInput):
embeddings = pickle.load(file)
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
storage_name = get_value(job_input, "storage_name", "confluence_data")
documents = json.loads(storage.retrieve(storage_name))
documents = storage.retrieve(storage_name)

# TODO: our postgres plugin doesn't support updates (upserts) so updating with same ID fails.

Expand Down
2 changes: 2 additions & 0 deletions examples/pgvector-embedder/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ psycopg2-binary
sentence-transformers
sqlalchemy
vdk-postgres

vdk-storage
25 changes: 25 additions & 0 deletions projects/vdk-plugins/vdk-storage/.plugin-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2023-2024 Broadcom
# SPDX-License-Identifier: Apache-2.0

# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0

image: "python:3.7"

.build-vdk-storage:
variables:
PLUGIN_NAME: vdk-storage
extends: .build-plugin

build-py37-vdk-storage:
extends: .build-vdk-storage
image: "python:3.7"

build-py311-vdk-storage:
extends: .build-vdk-storage
image: "python:3.11"

release-vdk-storage:
variables:
PLUGIN_NAME: vdk-storage
extends: .release-plugin
43 changes: 43 additions & 0 deletions projects/vdk-plugins/vdk-storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# storage

Library for access to different managed storages


## Usage

```
pip install vdk-storage
```

### Configuration

(`vdk config-help` is useful command to browse all config options of your installation of vdk)

| Name | Description | (example) Value |
|---|---|---|
| dummy_config_key | Dummy configuration | "Dummy" |

### Example

TODO

### Build and testing

```
pip install -r requirements.txt
pip install -e .
pytest
```

In VDK repo [../build-plugin.sh](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/build-plugin.sh) script can be used also.


#### Note about the CICD:

.plugin-ci.yaml is needed only for plugins part of [Versatile Data Kit Plugin repo](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins).

The CI/CD is separated in two stages, a build stage and a release stage.
The build stage is made up of a few jobs, all which inherit from the same
job configuration and only differ in the Python version they use (3.7, 3.8, 3.9 and 3.10).
They run according to rules, which are ordered in a way such that changes to a
plugin's directory trigger the plugin CI, but changes to a different plugin does not.
7 changes: 7 additions & 0 deletions projects/vdk-plugins/vdk-storage/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# this file is used to provide testing requirements
# for requirements (dependencies) needed during and after installation of the plugin see (and update) setup.py install_requires section


pytest
vdk-core
vdk-test-utils
41 changes: 41 additions & 0 deletions projects/vdk-plugins/vdk-storage/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2023-2024 Broadcom
# SPDX-License-Identifier: Apache-2.0

# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pathlib

import setuptools

"""

Check warning on line 10 in projects/vdk-plugins/vdk-storage/setup.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-storage/setup.py#L10

String statement has no effect
Builds a package with the help of setuptools in order for this package to be imported in other projects
"""

__version__ = "0.1.1"

setuptools.setup(
name="vdk-storage",
version=__version__,
url="https://github.com/vmware/versatile-data-kit",
description="Library for access to different managed storages",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["vdk-core", "sqlalchemy"],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
entry_points={"vdk.plugin.run": ["vdk-storage = vdk.plugin.storage.plugin_entry"]},
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
project_urls={
"Documentation": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-storage",
"Source Code": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-storage",
"Bug Tracker": "https://github.com/vmware/versatile-data-kit/issues/new/choose",
},
)
Loading
Loading