Skip to content

Commit

Permalink
a few more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
antoniivanov committed Feb 7, 2024
1 parent 182771c commit cceb5da
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 58 deletions.
1 change: 0 additions & 1 deletion examples/confluence-reader/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ The `ConfluenceDataSource` class is the heart of this data job. It provides a se

- `fetch_updated_pages_in_confluence_space()`: Fetches updated pages in the Confluence space based on the last modification date.
- `fetch_all_pages_in_confluence_space()`: Retrieves all pages in the Confluence space.
- `fetch_updated_documents_by_parent_id(parent_page_id)`: Recursively fetches updated documents based on a parent page ID, ensuring that nested pages are also captured.
- `flag_deleted_pages()`: Flags deleted pages based on the current Confluence data.
- `update_saved_documents()`: Updates the saved documents in the JSON file with the latest data.

Expand Down
71 changes: 33 additions & 38 deletions examples/confluence-reader/fetch_confluence_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@

log = logging.getLogger(__name__)

CONFLUENCE_DATA_FILE = "confluence_data.json"
LAST_MODIFICATION_FILE = "last_modification.txt"


def read_json_file(file_path):
try:
Expand Down Expand Up @@ -84,27 +81,6 @@ def flag_deleted_pages(file_path, current_confluence_documents):
write_json_file(file_path, serialized_docs)


def read_last_modification_date():
try:
with open(LAST_MODIFICATION_FILE) as file:
return file.read().strip()
except FileNotFoundError:
log.error(f"{LAST_MODIFICATION_FILE} not found. Using default date.")
return datetime.min.strftime("%Y-%m-%d %H:%M")


def update_last_modification_date():
try:
with open(LAST_MODIFICATION_FILE, "w") as file:
# This is buggy , it doesn't account for server timezone and local timezone
# But also assumes that server clock and local clock are synchronized (which they are likely not)
# The ts should be the one of the latest processed page.
formatted_date = datetime.now().strftime("%Y-%m-%d %H:%M")
file.write(formatted_date)
except OSError as e:
log.error(f"Error writing to file: {e}")


class ConfluenceDataSource:
"""
A class for retrieving and managing data from a Confluence space.
Expand Down Expand Up @@ -143,12 +119,12 @@ def fetch_confluence_documents(self, cql_query):
]
except Exception as e:
log.error(f"Error fetching documents from Confluence: {e}")
return []
raise e

def fetch_updated_pages_in_confluence_space(self, parent_page_id=None):
last_date = read_last_modification_date()
def fetch_updated_pages_in_confluence_space(
self, last_date="1900-02-06 17:54", parent_page_id=None
):
# TODO: this really should be called not when page is read but after it's successfully processed.
update_last_modification_date()
cql_query = (
f"lastModified > '{last_date}' and type = page and space = {self.space_key}"
)
Expand All @@ -170,27 +146,46 @@ def fetch_all_pages_in_confluence_space(self, parent_page_id=None):
return self.fetch_confluence_documents(cql_query)


def get_value(job_input, key: str, default_value=None):
return job_input.get_arguments().get(
key, job_input.get_property(key, os.environ.get(key.upper(), default_value))
)


def set_property(job_input: IJobInput, key, value):
props = job_input.get_all_properties()
props[key] = value
job_input.set_all_properties(props)


def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

confluence_url = job_input.get_property(
"confluence_url", "http://confluence.eng.vmware.com/"
confluence_url = get_value(job_input, "confluence_url")
token = get_value(job_input, "confluence_token")
space_key = get_value(job_input, "confluence_space_key")
parent_page_id = get_value(job_input, "confluence_parent_page_id")
last_date = get_value(job_input, "last_date", "1900-01-01 12:00")
data_file = get_value(
job_input,
"data_file",
os.path.join(job_input.get_temporary_write_directory(), "confluence_data.json"),
)
token = job_input.get_property(
"confluence_token", os.environ.get("VDK_CONFLUENCE_TOKEN")
)
space_key = job_input.get_property("confluence_space_key", "TAURUS")
parent_page_id = job_input.get_property("confluence_parent_page_id", "1105807412")

confluence_reader = ConfluenceDataSource(confluence_url, token, space_key)

updated_docs = confluence_reader.fetch_updated_pages_in_confluence_space(
parent_page_id
last_date, parent_page_id
)
log.info(f"Found {len(updated_docs)} updated pages")
update_saved_documents(CONFLUENCE_DATA_FILE, updated_docs)
update_saved_documents(data_file, updated_docs)

# This is buggy , it doesn't account for server timezone and local timezone
# But also assumes that server clock and local clock are synchronized (which they are likely not)
# The ts should be the one of the latest processed page.
set_property(job_input, "last_date", datetime.now().strftime("%Y-%m-%d %H:%M"))

flag_deleted_pages(
CONFLUENCE_DATA_FILE,
data_file,
confluence_reader.fetch_all_pages_in_confluence_space(parent_page_id),
)
1 change: 0 additions & 1 deletion examples/confluence-reader/last_modification.txt

This file was deleted.

14 changes: 12 additions & 2 deletions examples/pgvector-embedder/00_properties.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os.path
import pathlib

Check notice on line 4 in examples/pgvector-embedder/00_properties.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/pgvector-embedder/00_properties.py#L4

'pathlib' imported but unused (F401)

Check warning on line 4 in examples/pgvector-embedder/00_properties.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/pgvector-embedder/00_properties.py#L4

Unused import pathlib

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
properties = job_input.get_all_properties()

data_file = os.path.join(job_input.get_job_directory(), "documents_example.json")
output_embeddings = os.path.join(
job_input.get_temporary_write_directory(), "embeddings_example.pkl"
)
properties.update(
dict(
destination_embeddings_table="vdk_doc_embeddings_ai",
destination_metadata_table="vdk_doc_metadata_ai",
destination_embeddings_table="vdk_doc_embeddings",
destination_metadata_table="vdk_doc_metadata",
data_file=data_file,
output_embeddings=output_embeddings,
)
)
job_input.set_all_properties(properties)
9 changes: 3 additions & 6 deletions examples/pgvector-embedder/20_clean_and_embed_json_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import pathlib
import re

import nltk
from config import DOCUMENTS_JSON_FILE_LOCATION
from config import EMBEDDINGS_PKL_FILE_LOCATION
from config import get_value
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sentence_transformers import SentenceTransformer
Expand Down Expand Up @@ -84,9 +82,8 @@ def setup_nltk(temp_dir):
def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

data_job_dir = pathlib.Path(job_input.get_job_directory())
input_json = data_job_dir / DOCUMENTS_JSON_FILE_LOCATION
output_embeddings = data_job_dir / EMBEDDINGS_PKL_FILE_LOCATION
input_json = get_value(job_input, "data_file")
output_embeddings = get_value(job_input, "output_embeddings")

temp_dir = job_input.get_temporary_write_directory()
setup_nltk(temp_dir)
Expand Down
13 changes: 5 additions & 8 deletions examples/pgvector-embedder/40_ingest_embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import pathlib
import pickle

import numpy as np
from config import DOCUMENTS_JSON_FILE_LOCATION
from config import EMBEDDINGS_PKL_FILE_LOCATION
from config import get_value
from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)
Expand All @@ -16,9 +14,8 @@
def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

data_job_dir = pathlib.Path(job_input.get_job_directory())
input_embeddings_path = data_job_dir / EMBEDDINGS_PKL_FILE_LOCATION
input_documents_path = data_job_dir / DOCUMENTS_JSON_FILE_LOCATION
input_embeddings_path = get_value(job_input, "output_embeddings")
input_documents_path = get_value(job_input, "data_file")

with open(input_embeddings_path, "rb") as file:
embeddings = pickle.load(file)
Expand All @@ -39,7 +36,7 @@ def run(job_input: IJobInput):
}
job_input.send_object_for_ingestion(
payload=embedding_payload,
destination_table=job_input.get_property("destination_embeddings_table"),
destination_table=get_value(job_input, "destination_embeddings_table"),
)

for document in documents:
Expand All @@ -52,5 +49,5 @@ def run(job_input: IJobInput):
}
job_input.send_object_for_ingestion(
payload=metadata_payload,
destination_table=job_input.get_property("destination_metadata_table"),
destination_table=get_value(job_input, "destination_metadata_table"),
)
8 changes: 6 additions & 2 deletions examples/pgvector-embedder/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os

DOCUMENTS_JSON_FILE_LOCATION = "documents_example.json"
EMBEDDINGS_PKL_FILE_LOCATION = "embeddings_example.pkl"

def get_value(job_input, key: str, default_value=None):
return job_input.get_arguments().get(
key, job_input.get_property(key, os.environ.get(key.upper(), default_value))
)
21 changes: 21 additions & 0 deletions examples/rag-dag-pipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
This jobs contains a ETL (Extract, Load, Transform) pipelines designed for processing data from Confluence and embedding it using pgvector

The jobs are orchestrated using the vdk-dag plugin to run in a defined sequence.

# Job structure

Here are the two main jobs:

- Extracts raw data from Confluence and loads it into a specified location (table, file, etc.).
- pgvector-embedder: Transforms the extracted data by embedding it using pgvector and stores the metadata and embeddings in specified tables (vdk_confluence_metadata and vdk_confluence_embeddings).

TODO (missing vdk feature): as the idea is for this to be used as a template, we need to allow somehow VDK to handle automatically jobs specified in the DAG
Currently a the job specified (e.g confluence-reader) must be deployed and deployed VDK jobs can only run one execution at a time.
What can we do to solve that?

A) Create a separate deployment automatically
B) Run the job with the arguments provided as a separate job instance
- what about job properties - maybe it should inhert the parent job properties ? Or ignore them and only accept arguments?
C) ...

TODO (missing vdk feature): how do I pick between different jobs to compose them?
15 changes: 15 additions & 0 deletions examples/rag-dag-pipeline/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
; Supported format: https://docs.python.org/3/library/configparser.html#supported-ini-file-structure

; This is the only file required to deploy a Data Job.
; Read more to understand what each option means:

; Information about the owner of the Data Job
[owner]

; Team is a way to group Data Jobs that belonged to the same team.
team = my-team

[vdk]
dags_max_concurrent_running_jobs = 2
dags_delayed_jobs_min_delay_seconds = 1
dags_delayed_jobs_randomized_added_delay_seconds = 1
30 changes: 30 additions & 0 deletions examples/rag-dag-pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.plugin.dag.dag_runner import DagInput

# ELT

jobs = [
dict(
job_name="confluence-reader",
team_name="my-team",
fail_dag_on_error=True,
arguments=dict(data_file=f"/tmp/confluence.json"),

Check notice on line 12 in examples/rag-dag-pipeline/pipeline.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/rag-dag-pipeline/pipeline.py#L12

f-string is missing placeholders (F541)
depends_on=[],
),
dict(
job_name="pgvector-embedder",
team_name="my-team",
fail_dag_on_error=True,
arguments=dict(
data_file=f"/tmp/confluence.json",

Check warning on line 20 in examples/rag-dag-pipeline/pipeline.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/rag-dag-pipeline/pipeline.py#L20

Probable insecure usage of temp file/directory.

Check notice on line 20 in examples/rag-dag-pipeline/pipeline.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/rag-dag-pipeline/pipeline.py#L20

f-string is missing placeholders (F541)
destination_metadata_table="vdk_confluence_metadata",
destination_embeddings_table="vdk_confluence_embeddings",
),
depends_on=["confluence-reader"],
),
]


def run(job_input) -> None:
DagInput().run_dag(jobs)
1 change: 1 addition & 0 deletions examples/rag-dag-pipeline/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vdk-dag

0 comments on commit cceb5da

Please sign in to comment.