From d7436bdb4ec5298609639573467013c22b696212 Mon Sep 17 00:00:00 2001 From: Antoni Ivanov Date: Mon, 5 Feb 2024 21:58:45 +0200 Subject: [PATCH 1/4] examples: confluence-reader: fixes to be more demoable - the recursive method for find pages was crashing so repalced with more CQL - added passing parent id so we can take only few pages for demo purpsoes - Noted bugs and issues in the code and added todos --- .../fetch_confluence_space.py | 65 +++++++++++-------- 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/examples/confluence-reader/fetch_confluence_space.py b/examples/confluence-reader/fetch_confluence_space.py index e37a5b0e09..7b712fd6f6 100644 --- a/examples/confluence-reader/fetch_confluence_space.py +++ b/examples/confluence-reader/fetch_confluence_space.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import json import logging +import os from datetime import datetime from confluence_document import ConfluenceDocument @@ -72,9 +73,12 @@ def flag_deleted_pages(file_path, current_confluence_documents): current_page_ids = {doc.metadata["id"] for doc in current_confluence_documents} + num_deleted = 0 for doc in existing_docs: if doc.metadata["id"] not in current_page_ids: doc.metadata["deleted"] = True + num_deleted += 1 + log.info(f"Found {num_deleted} deleted pages.") serialized_docs = [doc.serialize() for doc in existing_docs] write_json_file(file_path, serialized_docs) @@ -92,6 +96,9 @@ def read_last_modification_date(): def update_last_modification_date(): try: with open(LAST_MODIFICATION_FILE, "w") as file: + # This is buggy , it doesn't account for server timezone and local timezone + # But also assumes that server clock and local clock are synchronized (which they are likely not) + # The ts should be the one of the latest processed page. formatted_date = datetime.now().strftime("%Y-%m-%d %H:%M") file.write(formatted_date) except OSError as e: @@ -114,7 +121,6 @@ class ConfluenceDataSource: Methods: fetch_updated_pages_in_confluence_space(): Fetches updated pages in the Confluence space based on the last modification date. fetch_all_pages_in_confluence_space(): Retrieves all pages in the Confluence space. - fetch_updated_documents_by_parent_id(parent_page_id): Recursively fetches updated documents based on a parent page ID. flag_deleted_pages(): Flags deleted pages based on the current Confluence data. update_saved_documents(): Updates the saved documents in the JSON file with the latest data. @@ -128,7 +134,9 @@ def __init__(self, confluence_url, token, space_key): def fetch_confluence_documents(self, cql_query): try: - raw_documents = self.loader.load(cql=cql_query, limit=10, max_pages=10) + # TODO: think about configurable limits ? or some streaming solution + # How do we fit all documents in memory ? + raw_documents = self.loader.load(cql=cql_query, limit=50, max_pages=200) return [ ConfluenceDocument(doc.metadata, doc.page_content) for doc in raw_documents @@ -137,51 +145,52 @@ def fetch_confluence_documents(self, cql_query): log.error(f"Error fetching documents from Confluence: {e}") return [] - def fetch_updated_pages_in_confluence_space(self): + def fetch_updated_pages_in_confluence_space(self, parent_page_id=None): last_date = read_last_modification_date() + # TODO: this really should be called not when page is read but after it's successfully processed. update_last_modification_date() cql_query = ( f"lastModified > '{last_date}' and type = page and space = {self.space_key}" ) + if parent_page_id: + # https://developer.atlassian.com/server/confluence/cql-field-reference/#ancestor + cql_query += f" and ancestor = {parent_page_id}" + return self.fetch_confluence_documents(cql_query) - def fetch_all_pages_in_confluence_space(self): + def fetch_all_pages_in_confluence_space(self, parent_page_id=None): + # TODO: this is very inefficient as we are actually downloading everything + # the rest api offer expand query parameter for that but langchain loader limits all expansion to return body always. + # See https://docs.atlassian.com/atlassian-confluence/REST/5.5/ + # We can hack around with by subclassing ContentFormat enum ? and try to convince library devs to add metadata only response in the loader cql_query = f"type = page and space = {self.space_key}" + if parent_page_id: + cql_query += f" and ancestor = {parent_page_id}" return self.fetch_confluence_documents(cql_query) - def fetch_updated_documents_by_parent_id(self, parent_page_id): - last_modified_date = read_last_modification_date() - update_last_modification_date() - - def fetch_updated_recursive(page_id, last_modified_date): - updated_documents = [] - cql_query = f"type = page and parent = {page_id} and lastModified > '{last_modified_date}'" - child_documents = self.fetch_confluence_documents(cql_query) - - for doc in child_documents: - updated_documents.append(doc) - updated_documents.extend( - fetch_updated_recursive(doc["id"], last_modified_date) - ) - - return updated_documents - - return fetch_updated_recursive(parent_page_id, last_modified_date) - def run(job_input: IJobInput): log.info(f"Starting job step {__name__}") - confluence_url = job_input.get_property("confluence_url", "YOUR_CONFLUENCE_URL") - token = job_input.get_property("confluence_token", "YOUR_CONFLUENCE_TOKEN") - space_key = job_input.get_property("confluence_space_key", "YOUR_SPACE_KEY") + confluence_url = job_input.get_property( + "confluence_url", "http://confluence.eng.vmware.com/" + ) + token = job_input.get_property( + "confluence_token", os.environ.get("VDK_CONFLUENCE_TOKEN") + ) + space_key = job_input.get_property("confluence_space_key", "TAURUS") + parent_page_id = job_input.get_property("confluence_parent_page_id", "1105807412") confluence_reader = ConfluenceDataSource(confluence_url, token, space_key) - updated_docs = confluence_reader.fetch_updated_pages_in_confluence_space() + updated_docs = confluence_reader.fetch_updated_pages_in_confluence_space( + parent_page_id + ) + log.info(f"Found {len(updated_docs)} updated pages") update_saved_documents(CONFLUENCE_DATA_FILE, updated_docs) flag_deleted_pages( - CONFLUENCE_DATA_FILE, confluence_reader.fetch_all_pages_in_confluence_space() + CONFLUENCE_DATA_FILE, + confluence_reader.fetch_all_pages_in_confluence_space(parent_page_id), ) From 6fc7cd8c9b5cc15fd28aa423023a5e712e25fe63 Mon Sep 17 00:00:00 2001 From: Antoni Ivanov Date: Tue, 6 Feb 2024 13:09:57 +0200 Subject: [PATCH 2/4] embed-ingest-job-example: make more demoable - parameterize the table names used in the job - add clean up deleted rows (though I realised it's redundant for now as we need to drop the table first as the postgres ingestion does not support upserts (updates)) - as the embedding job is written in so generic way. Actually there's no need to tie it to confluence at all. It would work for any dataset. - Added multiple TODOs for missing features. The job could be even further generalied if our ingestion frameowrk improves --- .../embed-ingest-job-example/00_properties.py | 14 ++++++++++++++ .../30_create_schema.sql | 12 +++++++----- .../40_ingest_embeddings.py | 6 ++++-- .../50_cleanup_deleted_rows.sql | 11 +++++++++++ examples/embed-ingest-job-example/README.md | 17 ++++++++++++++++- 5 files changed, 52 insertions(+), 8 deletions(-) create mode 100644 examples/embed-ingest-job-example/00_properties.py create mode 100644 examples/embed-ingest-job-example/50_cleanup_deleted_rows.sql diff --git a/examples/embed-ingest-job-example/00_properties.py b/examples/embed-ingest-job-example/00_properties.py new file mode 100644 index 0000000000..61f0861a23 --- /dev/null +++ b/examples/embed-ingest-job-example/00_properties.py @@ -0,0 +1,14 @@ +# Copyright 2021-2024 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + + +def run(job_input: IJobInput): + properties = job_input.get_all_properties() + properties.update( + dict( + destination_embeddings_table="vdk_doc_embeddings_ai", + destination_metadata_table="vdk_doc_metadata_ai", + ) + ) + job_input.set_all_properties(properties) diff --git a/examples/embed-ingest-job-example/30_create_schema.sql b/examples/embed-ingest-job-example/30_create_schema.sql index c5dd066610..bbef0480f0 100644 --- a/examples/embed-ingest-job-example/30_create_schema.sql +++ b/examples/embed-ingest-job-example/30_create_schema.sql @@ -1,19 +1,21 @@ -DROP TABLE IF EXISTS public.vdk_confluence_doc_embeddings_example CASCADE; -DROP TABLE IF EXISTS public.vdk_confluence_doc_metadata_example CASCADE; +--TODO (missing vdk feature): we need to drop the tables as postgres pluigin doesn't support upserts (updates) +DROP TABLE IF EXISTS public.{destination_embeddings_table} CASCADE; +DROP TABLE IF EXISTS public.{destination_metadata_table} CASCADE; -CREATE TABLE IF NOT EXISTS public.vdk_confluence_doc_embeddings_example +-- TODO (missing vdk feature): we need to create the tables as the postgres plugin doesn't support automatic schema inference +CREATE TABLE IF NOT EXISTS public.{destination_embeddings_table} ( id SERIAL PRIMARY KEY, embedding public.vector ); -CREATE TABLE IF NOT EXISTS public.vdk_confluence_doc_metadata_example +CREATE TABLE IF NOT EXISTS public.{destination_metadata_table} ( id INTEGER PRIMARY KEY, title TEXT, source TEXT, data TEXT, deleted BOOLEAN, - CONSTRAINT fk_metadata_embeddings FOREIGN KEY (id) REFERENCES public.vdk_confluence_doc_embeddings_example(id) + CONSTRAINT fk_metadata_embeddings FOREIGN KEY (id) REFERENCES public.{destination_embeddings_table}(id) ); diff --git a/examples/embed-ingest-job-example/40_ingest_embeddings.py b/examples/embed-ingest-job-example/40_ingest_embeddings.py index 5613d29ded..56b5f644bb 100644 --- a/examples/embed-ingest-job-example/40_ingest_embeddings.py +++ b/examples/embed-ingest-job-example/40_ingest_embeddings.py @@ -27,6 +27,8 @@ def run(job_input: IJobInput): print(len(documents), len(embeddings)) + # TODO: our postgres plugin doesn't support updates (upserts) so updating with same ID fails. + for i, embedding in enumerate(embeddings): embedding_list = ( embedding.tolist() if isinstance(embedding, np.ndarray) else embedding @@ -37,7 +39,7 @@ def run(job_input: IJobInput): } job_input.send_object_for_ingestion( payload=embedding_payload, - destination_table="vdk_confluence_doc_embeddings_example", + destination_table=job_input.get_property("destination_embeddings_table"), ) for document in documents: @@ -50,5 +52,5 @@ def run(job_input: IJobInput): } job_input.send_object_for_ingestion( payload=metadata_payload, - destination_table="vdk_confluence_doc_metadata_example", + destination_table=job_input.get_property("destination_metadata_table"), ) diff --git a/examples/embed-ingest-job-example/50_cleanup_deleted_rows.sql b/examples/embed-ingest-job-example/50_cleanup_deleted_rows.sql new file mode 100644 index 0000000000..ebf4945c1e --- /dev/null +++ b/examples/embed-ingest-job-example/50_cleanup_deleted_rows.sql @@ -0,0 +1,11 @@ +-- TODO (missing vdk feature): this may not be necessary if our Ingestion framework supports deletion + +-- Step 1: Delete from metadata table where deleted is true +DELETE FROM public.{destination_metadata_table} +WHERE deleted = TRUE; + +-- Step 2: Delete from embeddings table where id not present in metadata table +DELETE FROM public.{destination_embeddings_table} +WHERE id NOT IN ( + SELECT id FROM public.{destination_metadata_table} +); diff --git a/examples/embed-ingest-job-example/README.md b/examples/embed-ingest-job-example/README.md index 8ff618c165..3e5eb90ae4 100644 --- a/examples/embed-ingest-job-example/README.md +++ b/examples/embed-ingest-job-example/README.md @@ -1,8 +1,23 @@ # Embed And Ingest Data Job Example -The following Versatile Data Kit example allows you to embed your Confluence JSON data +The following Versatile Data Kit example allows you to embed documenta data and metadata (in certain format) and ingest it into Postgres instance with pgvector. +# Expected input format + +```python +[ + { + "metadata": { + "title": "Page (or chunk) title", + "id": "Content page ID", + "source": "Source URL", + "deleted": + }, + "data": "Content Text" + }, +``` + # Create embeddings for the data The fetched data from the previous step is read, cleaned and embedded using the [all-mpnet-base-v2](https://huggingface.co/sentence-transformers/all-mpnet-base-v2) HuggingFace SentenceTransformer Embedding model. From 182771c620e6d83a053518403806f8f57788c350 Mon Sep 17 00:00:00 2001 From: Antoni Ivanov Date: Tue, 6 Feb 2024 13:32:33 +0200 Subject: [PATCH 3/4] renamed embed-ingest-job-example to pgvector-embedder --- .../00_properties.py | 0 .../20_clean_and_embed_json_data.py | 0 .../30_create_schema.sql | 0 .../40_ingest_embeddings.py | 0 .../50_cleanup_deleted_rows.sql | 0 .../{embed-ingest-job-example => pgvector-embedder}/README.md | 0 .../{embed-ingest-job-example => pgvector-embedder}/config.ini | 0 .../{embed-ingest-job-example => pgvector-embedder}/config.py | 0 .../documents_example.json | 0 .../requirements.txt | 0 10 files changed, 0 insertions(+), 0 deletions(-) rename examples/{embed-ingest-job-example => pgvector-embedder}/00_properties.py (100%) rename examples/{embed-ingest-job-example => pgvector-embedder}/20_clean_and_embed_json_data.py (100%) rename examples/{embed-ingest-job-example => pgvector-embedder}/30_create_schema.sql (100%) rename examples/{embed-ingest-job-example => pgvector-embedder}/40_ingest_embeddings.py (100%) rename examples/{embed-ingest-job-example => pgvector-embedder}/50_cleanup_deleted_rows.sql (100%) rename examples/{embed-ingest-job-example => pgvector-embedder}/README.md (100%) rename examples/{embed-ingest-job-example => pgvector-embedder}/config.ini (100%) rename examples/{embed-ingest-job-example => pgvector-embedder}/config.py (100%) rename examples/{embed-ingest-job-example => pgvector-embedder}/documents_example.json (100%) rename examples/{embed-ingest-job-example => pgvector-embedder}/requirements.txt (100%) diff --git a/examples/embed-ingest-job-example/00_properties.py b/examples/pgvector-embedder/00_properties.py similarity index 100% rename from examples/embed-ingest-job-example/00_properties.py rename to examples/pgvector-embedder/00_properties.py diff --git a/examples/embed-ingest-job-example/20_clean_and_embed_json_data.py b/examples/pgvector-embedder/20_clean_and_embed_json_data.py similarity index 100% rename from examples/embed-ingest-job-example/20_clean_and_embed_json_data.py rename to examples/pgvector-embedder/20_clean_and_embed_json_data.py diff --git a/examples/embed-ingest-job-example/30_create_schema.sql b/examples/pgvector-embedder/30_create_schema.sql similarity index 100% rename from examples/embed-ingest-job-example/30_create_schema.sql rename to examples/pgvector-embedder/30_create_schema.sql diff --git a/examples/embed-ingest-job-example/40_ingest_embeddings.py b/examples/pgvector-embedder/40_ingest_embeddings.py similarity index 100% rename from examples/embed-ingest-job-example/40_ingest_embeddings.py rename to examples/pgvector-embedder/40_ingest_embeddings.py diff --git a/examples/embed-ingest-job-example/50_cleanup_deleted_rows.sql b/examples/pgvector-embedder/50_cleanup_deleted_rows.sql similarity index 100% rename from examples/embed-ingest-job-example/50_cleanup_deleted_rows.sql rename to examples/pgvector-embedder/50_cleanup_deleted_rows.sql diff --git a/examples/embed-ingest-job-example/README.md b/examples/pgvector-embedder/README.md similarity index 100% rename from examples/embed-ingest-job-example/README.md rename to examples/pgvector-embedder/README.md diff --git a/examples/embed-ingest-job-example/config.ini b/examples/pgvector-embedder/config.ini similarity index 100% rename from examples/embed-ingest-job-example/config.ini rename to examples/pgvector-embedder/config.ini diff --git a/examples/embed-ingest-job-example/config.py b/examples/pgvector-embedder/config.py similarity index 100% rename from examples/embed-ingest-job-example/config.py rename to examples/pgvector-embedder/config.py diff --git a/examples/embed-ingest-job-example/documents_example.json b/examples/pgvector-embedder/documents_example.json similarity index 100% rename from examples/embed-ingest-job-example/documents_example.json rename to examples/pgvector-embedder/documents_example.json diff --git a/examples/embed-ingest-job-example/requirements.txt b/examples/pgvector-embedder/requirements.txt similarity index 100% rename from examples/embed-ingest-job-example/requirements.txt rename to examples/pgvector-embedder/requirements.txt From cceb5da6acdf295e867691276261439c0f748db7 Mon Sep 17 00:00:00 2001 From: Antoni Ivanov Date: Tue, 6 Feb 2024 15:24:44 +0200 Subject: [PATCH 4/4] a few more fixes --- examples/confluence-reader/README.md | 1 - .../fetch_confluence_space.py | 71 +++++++++---------- .../confluence-reader/last_modification.txt | 1 - examples/pgvector-embedder/00_properties.py | 14 +++- .../20_clean_and_embed_json_data.py | 9 +-- .../pgvector-embedder/40_ingest_embeddings.py | 13 ++-- examples/pgvector-embedder/config.py | 8 ++- examples/rag-dag-pipeline/README.md | 21 ++++++ examples/rag-dag-pipeline/config.ini | 15 ++++ examples/rag-dag-pipeline/pipeline.py | 30 ++++++++ examples/rag-dag-pipeline/requirements.txt | 1 + 11 files changed, 126 insertions(+), 58 deletions(-) delete mode 100644 examples/confluence-reader/last_modification.txt create mode 100644 examples/rag-dag-pipeline/README.md create mode 100644 examples/rag-dag-pipeline/config.ini create mode 100644 examples/rag-dag-pipeline/pipeline.py create mode 100644 examples/rag-dag-pipeline/requirements.txt diff --git a/examples/confluence-reader/README.md b/examples/confluence-reader/README.md index b6316ab962..0e1d405815 100644 --- a/examples/confluence-reader/README.md +++ b/examples/confluence-reader/README.md @@ -8,7 +8,6 @@ The `ConfluenceDataSource` class is the heart of this data job. It provides a se - `fetch_updated_pages_in_confluence_space()`: Fetches updated pages in the Confluence space based on the last modification date. - `fetch_all_pages_in_confluence_space()`: Retrieves all pages in the Confluence space. -- `fetch_updated_documents_by_parent_id(parent_page_id)`: Recursively fetches updated documents based on a parent page ID, ensuring that nested pages are also captured. - `flag_deleted_pages()`: Flags deleted pages based on the current Confluence data. - `update_saved_documents()`: Updates the saved documents in the JSON file with the latest data. diff --git a/examples/confluence-reader/fetch_confluence_space.py b/examples/confluence-reader/fetch_confluence_space.py index 7b712fd6f6..4ceff4ba8a 100644 --- a/examples/confluence-reader/fetch_confluence_space.py +++ b/examples/confluence-reader/fetch_confluence_space.py @@ -11,9 +11,6 @@ log = logging.getLogger(__name__) -CONFLUENCE_DATA_FILE = "confluence_data.json" -LAST_MODIFICATION_FILE = "last_modification.txt" - def read_json_file(file_path): try: @@ -84,27 +81,6 @@ def flag_deleted_pages(file_path, current_confluence_documents): write_json_file(file_path, serialized_docs) -def read_last_modification_date(): - try: - with open(LAST_MODIFICATION_FILE) as file: - return file.read().strip() - except FileNotFoundError: - log.error(f"{LAST_MODIFICATION_FILE} not found. Using default date.") - return datetime.min.strftime("%Y-%m-%d %H:%M") - - -def update_last_modification_date(): - try: - with open(LAST_MODIFICATION_FILE, "w") as file: - # This is buggy , it doesn't account for server timezone and local timezone - # But also assumes that server clock and local clock are synchronized (which they are likely not) - # The ts should be the one of the latest processed page. - formatted_date = datetime.now().strftime("%Y-%m-%d %H:%M") - file.write(formatted_date) - except OSError as e: - log.error(f"Error writing to file: {e}") - - class ConfluenceDataSource: """ A class for retrieving and managing data from a Confluence space. @@ -143,12 +119,12 @@ def fetch_confluence_documents(self, cql_query): ] except Exception as e: log.error(f"Error fetching documents from Confluence: {e}") - return [] + raise e - def fetch_updated_pages_in_confluence_space(self, parent_page_id=None): - last_date = read_last_modification_date() + def fetch_updated_pages_in_confluence_space( + self, last_date="1900-02-06 17:54", parent_page_id=None + ): # TODO: this really should be called not when page is read but after it's successfully processed. - update_last_modification_date() cql_query = ( f"lastModified > '{last_date}' and type = page and space = {self.space_key}" ) @@ -170,27 +146,46 @@ def fetch_all_pages_in_confluence_space(self, parent_page_id=None): return self.fetch_confluence_documents(cql_query) +def get_value(job_input, key: str, default_value=None): + return job_input.get_arguments().get( + key, job_input.get_property(key, os.environ.get(key.upper(), default_value)) + ) + + +def set_property(job_input: IJobInput, key, value): + props = job_input.get_all_properties() + props[key] = value + job_input.set_all_properties(props) + + def run(job_input: IJobInput): log.info(f"Starting job step {__name__}") - confluence_url = job_input.get_property( - "confluence_url", "http://confluence.eng.vmware.com/" + confluence_url = get_value(job_input, "confluence_url") + token = get_value(job_input, "confluence_token") + space_key = get_value(job_input, "confluence_space_key") + parent_page_id = get_value(job_input, "confluence_parent_page_id") + last_date = get_value(job_input, "last_date", "1900-01-01 12:00") + data_file = get_value( + job_input, + "data_file", + os.path.join(job_input.get_temporary_write_directory(), "confluence_data.json"), ) - token = job_input.get_property( - "confluence_token", os.environ.get("VDK_CONFLUENCE_TOKEN") - ) - space_key = job_input.get_property("confluence_space_key", "TAURUS") - parent_page_id = job_input.get_property("confluence_parent_page_id", "1105807412") confluence_reader = ConfluenceDataSource(confluence_url, token, space_key) updated_docs = confluence_reader.fetch_updated_pages_in_confluence_space( - parent_page_id + last_date, parent_page_id ) log.info(f"Found {len(updated_docs)} updated pages") - update_saved_documents(CONFLUENCE_DATA_FILE, updated_docs) + update_saved_documents(data_file, updated_docs) + + # This is buggy , it doesn't account for server timezone and local timezone + # But also assumes that server clock and local clock are synchronized (which they are likely not) + # The ts should be the one of the latest processed page. + set_property(job_input, "last_date", datetime.now().strftime("%Y-%m-%d %H:%M")) flag_deleted_pages( - CONFLUENCE_DATA_FILE, + data_file, confluence_reader.fetch_all_pages_in_confluence_space(parent_page_id), ) diff --git a/examples/confluence-reader/last_modification.txt b/examples/confluence-reader/last_modification.txt deleted file mode 100644 index 7e95439e1e..0000000000 --- a/examples/confluence-reader/last_modification.txt +++ /dev/null @@ -1 +0,0 @@ -2000-01-01 00:00 diff --git a/examples/pgvector-embedder/00_properties.py b/examples/pgvector-embedder/00_properties.py index 61f0861a23..79331acf68 100644 --- a/examples/pgvector-embedder/00_properties.py +++ b/examples/pgvector-embedder/00_properties.py @@ -1,14 +1,24 @@ # Copyright 2021-2024 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 +import os.path +import pathlib + from vdk.api.job_input import IJobInput def run(job_input: IJobInput): properties = job_input.get_all_properties() + + data_file = os.path.join(job_input.get_job_directory(), "documents_example.json") + output_embeddings = os.path.join( + job_input.get_temporary_write_directory(), "embeddings_example.pkl" + ) properties.update( dict( - destination_embeddings_table="vdk_doc_embeddings_ai", - destination_metadata_table="vdk_doc_metadata_ai", + destination_embeddings_table="vdk_doc_embeddings", + destination_metadata_table="vdk_doc_metadata", + data_file=data_file, + output_embeddings=output_embeddings, ) ) job_input.set_all_properties(properties) diff --git a/examples/pgvector-embedder/20_clean_and_embed_json_data.py b/examples/pgvector-embedder/20_clean_and_embed_json_data.py index c765232253..120d689219 100644 --- a/examples/pgvector-embedder/20_clean_and_embed_json_data.py +++ b/examples/pgvector-embedder/20_clean_and_embed_json_data.py @@ -2,12 +2,10 @@ # SPDX-License-Identifier: Apache-2.0 import json import logging -import pathlib import re import nltk -from config import DOCUMENTS_JSON_FILE_LOCATION -from config import EMBEDDINGS_PKL_FILE_LOCATION +from config import get_value from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from sentence_transformers import SentenceTransformer @@ -84,9 +82,8 @@ def setup_nltk(temp_dir): def run(job_input: IJobInput): log.info(f"Starting job step {__name__}") - data_job_dir = pathlib.Path(job_input.get_job_directory()) - input_json = data_job_dir / DOCUMENTS_JSON_FILE_LOCATION - output_embeddings = data_job_dir / EMBEDDINGS_PKL_FILE_LOCATION + input_json = get_value(job_input, "data_file") + output_embeddings = get_value(job_input, "output_embeddings") temp_dir = job_input.get_temporary_write_directory() setup_nltk(temp_dir) diff --git a/examples/pgvector-embedder/40_ingest_embeddings.py b/examples/pgvector-embedder/40_ingest_embeddings.py index 56b5f644bb..0abae87c27 100644 --- a/examples/pgvector-embedder/40_ingest_embeddings.py +++ b/examples/pgvector-embedder/40_ingest_embeddings.py @@ -2,12 +2,10 @@ # SPDX-License-Identifier: Apache-2.0 import json import logging -import pathlib import pickle import numpy as np -from config import DOCUMENTS_JSON_FILE_LOCATION -from config import EMBEDDINGS_PKL_FILE_LOCATION +from config import get_value from vdk.api.job_input import IJobInput log = logging.getLogger(__name__) @@ -16,9 +14,8 @@ def run(job_input: IJobInput): log.info(f"Starting job step {__name__}") - data_job_dir = pathlib.Path(job_input.get_job_directory()) - input_embeddings_path = data_job_dir / EMBEDDINGS_PKL_FILE_LOCATION - input_documents_path = data_job_dir / DOCUMENTS_JSON_FILE_LOCATION + input_embeddings_path = get_value(job_input, "output_embeddings") + input_documents_path = get_value(job_input, "data_file") with open(input_embeddings_path, "rb") as file: embeddings = pickle.load(file) @@ -39,7 +36,7 @@ def run(job_input: IJobInput): } job_input.send_object_for_ingestion( payload=embedding_payload, - destination_table=job_input.get_property("destination_embeddings_table"), + destination_table=get_value(job_input, "destination_embeddings_table"), ) for document in documents: @@ -52,5 +49,5 @@ def run(job_input: IJobInput): } job_input.send_object_for_ingestion( payload=metadata_payload, - destination_table=job_input.get_property("destination_metadata_table"), + destination_table=get_value(job_input, "destination_metadata_table"), ) diff --git a/examples/pgvector-embedder/config.py b/examples/pgvector-embedder/config.py index ebe713b275..cb0c75555b 100644 --- a/examples/pgvector-embedder/config.py +++ b/examples/pgvector-embedder/config.py @@ -1,5 +1,9 @@ # Copyright 2021-2024 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 +import os -DOCUMENTS_JSON_FILE_LOCATION = "documents_example.json" -EMBEDDINGS_PKL_FILE_LOCATION = "embeddings_example.pkl" + +def get_value(job_input, key: str, default_value=None): + return job_input.get_arguments().get( + key, job_input.get_property(key, os.environ.get(key.upper(), default_value)) + ) diff --git a/examples/rag-dag-pipeline/README.md b/examples/rag-dag-pipeline/README.md new file mode 100644 index 0000000000..d01d9f804d --- /dev/null +++ b/examples/rag-dag-pipeline/README.md @@ -0,0 +1,21 @@ +This jobs contains a ETL (Extract, Load, Transform) pipelines designed for processing data from Confluence and embedding it using pgvector + +The jobs are orchestrated using the vdk-dag plugin to run in a defined sequence. + +# Job structure + +Here are the two main jobs: + +- Extracts raw data from Confluence and loads it into a specified location (table, file, etc.). +- pgvector-embedder: Transforms the extracted data by embedding it using pgvector and stores the metadata and embeddings in specified tables (vdk_confluence_metadata and vdk_confluence_embeddings). + +TODO (missing vdk feature): as the idea is for this to be used as a template, we need to allow somehow VDK to handle automatically jobs specified in the DAG +Currently a the job specified (e.g confluence-reader) must be deployed and deployed VDK jobs can only run one execution at a time. +What can we do to solve that? + +A) Create a separate deployment automatically +B) Run the job with the arguments provided as a separate job instance + - what about job properties - maybe it should inhert the parent job properties ? Or ignore them and only accept arguments? +C) ... + +TODO (missing vdk feature): how do I pick between different jobs to compose them? diff --git a/examples/rag-dag-pipeline/config.ini b/examples/rag-dag-pipeline/config.ini new file mode 100644 index 0000000000..d8aa8462e3 --- /dev/null +++ b/examples/rag-dag-pipeline/config.ini @@ -0,0 +1,15 @@ +; Supported format: https://docs.python.org/3/library/configparser.html#supported-ini-file-structure + +; This is the only file required to deploy a Data Job. +; Read more to understand what each option means: + +; Information about the owner of the Data Job +[owner] + +; Team is a way to group Data Jobs that belonged to the same team. +team = my-team + +[vdk] +dags_max_concurrent_running_jobs = 2 +dags_delayed_jobs_min_delay_seconds = 1 +dags_delayed_jobs_randomized_added_delay_seconds = 1 diff --git a/examples/rag-dag-pipeline/pipeline.py b/examples/rag-dag-pipeline/pipeline.py new file mode 100644 index 0000000000..3e864b120a --- /dev/null +++ b/examples/rag-dag-pipeline/pipeline.py @@ -0,0 +1,30 @@ +# Copyright 2021-2024 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.plugin.dag.dag_runner import DagInput + +# ELT + +jobs = [ + dict( + job_name="confluence-reader", + team_name="my-team", + fail_dag_on_error=True, + arguments=dict(data_file=f"/tmp/confluence.json"), + depends_on=[], + ), + dict( + job_name="pgvector-embedder", + team_name="my-team", + fail_dag_on_error=True, + arguments=dict( + data_file=f"/tmp/confluence.json", + destination_metadata_table="vdk_confluence_metadata", + destination_embeddings_table="vdk_confluence_embeddings", + ), + depends_on=["confluence-reader"], + ), +] + + +def run(job_input) -> None: + DagInput().run_dag(jobs) diff --git a/examples/rag-dag-pipeline/requirements.txt b/examples/rag-dag-pipeline/requirements.txt new file mode 100644 index 0000000000..e4f62cfaca --- /dev/null +++ b/examples/rag-dag-pipeline/requirements.txt @@ -0,0 +1 @@ +vdk-dag