Skip to content

Commit

Permalink
examples: add chunker job to support configurable chunking (#3093)
Browse files Browse the repository at this point in the history
What: Extend the pgvector-embedder by adding configurable chunking
mechanism.

Why: Until now, the whole documents were embedded and ingested into the
database but their size sometimes exceed the token limit imposed by the
LLM used for inference. This change introduces a configurable document
chunking mechanism to overcome this problem.

Testing Done: Ran the pipeline jobs locally.

Closes #3084 

Signed-off by: Yoan Salambashev <[email protected]>

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Yoan Salambashev <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 19, 2024
1 parent b566d4f commit e1e76ba
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 52 deletions.
22 changes: 22 additions & 0 deletions examples/chunker/00_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os

from config import CHUNKS_JSON_FILE
from config import DOCUMENTS_JSON_FILE
from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
properties = job_input.get_all_properties()

data_file = os.path.join(job_input.get_job_directory(), DOCUMENTS_JSON_FILE)
chunks_file = os.path.join(job_input.get_job_directory(), CHUNKS_JSON_FILE)
properties.update(
dict(
data_file=data_file,
chunks_file=chunks_file,
chunking_strategy="fixed",
)
)
job_input.set_all_properties(properties)
156 changes: 156 additions & 0 deletions examples/chunker/10_chunk_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import pathlib
import re
import string

from config import CHUNK_OVERLAP
from config import CHUNK_SIZE
from config import CHUNKS_JSON_FILE
from config import DOCUMENTS_JSON_FILE
from nltk.tokenize import word_tokenize
from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def custom_join(tokens):
"""
Joins a list of tokens into a string, adding a space between words
but not between a word and following punctuation.
"""
result = ""
for i, token in enumerate(tokens):
if i == 0:
result += token
elif token in string.punctuation:
result += token
else:
result += " " + token
return result


class ChunkerFactory:
@staticmethod
def get_chunker(strategy_name: str, **kwargs):
chunkers = {
"fixed": FixedSizeChunker,
"wiki": WikiSectionChunker,
}
if strategy_name in chunkers:
return (
chunkers[strategy_name](**kwargs)
if strategy_name == "fixed"
else chunkers[strategy_name]()
)
else:
raise ValueError(
f"Unknown chunking strategy: {strategy_name}. "
f"Supported strategies: {list(chunkers.keys())}"
)


class Chunker:
"""
Splits text into chunks. One of the provided options must be chosen.
"""

def chunk(self, documents: dict):
raise NotImplementedError("The chunking strategy is not supported.")


class FixedSizeChunker(Chunker):
"""
Splits text into chunks of fixed size with overlap between neighbouring ones.
"""

def __init__(self, chunk_size, chunk_overlap):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap

def chunk(self, documents):
chunked_documents = []
for doc in documents:
tokens = word_tokenize(doc["data"])
for i in range(0, len(tokens), self.chunk_size - self.chunk_overlap):
chunk_id = f"{doc['metadata']['id']}_{i // (self.chunk_size - self.chunk_overlap)}"
chunk_metadata = doc["metadata"].copy()
chunk_metadata["id"] = chunk_id
chunked_documents.append(
{
"metadata": chunk_metadata,
"data": custom_join(tokens[i : i + self.chunk_size]),
}
)
return chunked_documents


class WikiSectionChunker(Chunker):
"""
Splits Wiki articles into chunks.
"""

def __init__(self):
pass

def chunk(self, documents):
chunked_documents = []
for doc in documents:
sections = re.split(
r"==+ [^=]+ ==", doc["data"]
) # Wiki section headers are identified by ==
for i, section in enumerate(sections):
chunk_id = f"{doc['metadata']['id']}_{i}"
chunk_metadata = doc["metadata"].copy()
chunk_metadata["id"] = chunk_id
chunked_documents.append(
{
"metadata": chunk_metadata,
"data": section.strip(),
}
)
return chunked_documents


def load_documents(json_file_path: str):
"""
Loads documents from JSON file.
:param json_file_path: Path to the JSON file containing documents.
:return: List of documents.
"""
with open(json_file_path, encoding="utf-8") as file:
return json.load(file)


def store(name, content):
json_data = json.dumps(content, indent=4)
with open(name, "w") as file:
file.write(json_data)


def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

data_job_dir = pathlib.Path(job_input.get_job_directory())
input_json = job_input.get_property("data_file", data_job_dir / DOCUMENTS_JSON_FILE)
output_json = job_input.get_property("chunks_file", data_job_dir / CHUNKS_JSON_FILE)
chunking_strategy = job_input.get_property("chunking_strategy", "fixed")
chunk_size = CHUNK_SIZE
chunk_overlap = CHUNK_OVERLAP

documents = load_documents(input_json)
print(documents)
chunker = ChunkerFactory.get_chunker(
chunking_strategy, chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
chunked_documents = chunker.chunk(documents)
print(chunked_documents)
if chunked_documents:
log.info(
f"{len(chunked_documents)} documents chunks created using the {chunking_strategy} chunking strategy."
)
store(output_json, chunked_documents)
log.info(f"Chunks saved to {output_json}")
39 changes: 39 additions & 0 deletions examples/chunker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Embed And Ingest Data Job Example

The following Versatile Data Kit example allows you to chunk document data (in certain format).

# Expected input format

```json
[
{
"metadata": {
"title": "Page (or chunk) title",
"id": "Content page ID",
"source": "Source URL",
"deleted": <is the content being deleted in the source>
},
"data": "Content Text"
},
]
```

# Output format

The output format is the same as the input one. The "data" field is the only difference: it now contains a chunk
of a document instead of the whole document.

# Chunking the data

There is a property chunking_strategy which accounts for the type of chunking to use for the documents.
It is set by default to "fixed" which means fixed size chunking with overlap.
The example for the fixed size chunking supports configurable chunking - the CHUNK_SIZE and CHUNK_OVERLAP
are configured in config.py.
They account for the chunk size (tokens) and the chunk overlap between neighbouring chunks of the data.
Another chunking strategy is "wiki" which chunks Wikipedia articles into the different sections.

# Run the example
To run the data job locally:
```bash
vdk run chunker
```
2 changes: 2 additions & 0 deletions examples/chunker/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = my-team
7 changes: 7 additions & 0 deletions examples/chunker/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright 2021-2024 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0

DOCUMENTS_JSON_FILE = "fixed_size_example.json"
CHUNKS_JSON_FILE = "chunks_example.json"
CHUNK_SIZE = 2048
CHUNK_OVERLAP = 64
56 changes: 56 additions & 0 deletions examples/chunker/fixed_size_example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[
{
"metadata": {
"title": "Getting Started",
"id": "123213312",
"source": "https://github.com/vmware/versatile-data-kit/wiki/Getting-Started",
"deleted": false
},
"data": "VDK Getting Started guide"
},
{
"metadata": {
"title": "VDK Wiki",
"id": "747124724",
"source": "https://github.com/vmware/versatile-data-kit/wiki",
"deleted": false
},
"data": "VDK Wiki"
},
{
"metadata": {
"title": "VDK Issues",
"id": "721295269",
"source": "https://github.com/vmware/versatile-data-kit/issues",
"deleted": false
},
"data": "VDK Issues"
},
{
"metadata": {
"title": "VDK PRs",
"id": "1323122133",
"source": "https://github.com/vmware/versatile-data-kit/pulls",
"deleted": false
},
"data": "VDK Pull Requests"
},
{
"metadata": {
"title": "VDK Main Page",
"id": "312343243",
"source": "https://github.com/vmware/versatile-data-kit/tree/main",
"deleted": false
},
"data": "VDK: One framework to develop, deploy and operate data workflows with Python and SQL."
},
{
"metadata": {
"title": "VDK VEP",
"id": "747124725",
"source": "https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-milestone-25-vector-database-ingestion",
"deleted": false
},
"data": "VDK VEP milestone 25 vector database ingestion. Summary: With the rise in popularity of LLMs and RAG we see VDK as a core component to getting the data where we need it to be. VDK's strengths are ETL tasks. We see that its very well suited to populating the databases needed for RAG."
}
]
3 changes: 3 additions & 0 deletions examples/chunker/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
nltk
numpy
sentence-transformers
20 changes: 20 additions & 0 deletions examples/chunker/wiki_example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"metadata": {
"title": "Ajvar",
"id": "123",
"source": "https://en.wikipedia.org/wiki/Ajvar#:~:text=Ajvar%20(pronounced%3A%20%2F%CB%88a%C9%AA,is%20popular%20in%20Southeast%20Europe.&text=Homemade%20ajvar%20is%20made%20of%20roasted%20peppers.",
"deleted": false
},
"data": "Ajvar is a condiment made principally from sweet bell peppers and eggplants. The relish became a popular side dish throughout Yugoslavia after World War II and is popular in Southeast Europe. Homemade ajvar is made of roasted peppers. Depending on the capsaicin content in bell peppers and the amount of added chili peppers, it can be sweet , piquant , or very hot. Ajvar can be consumed as a bread spread or as a side dish. Ajvar has a few variations. One variation contains tomato and eggplant. Another is made with green bell peppers and oregano. Homemade Leskovac Ajvar and Macedonian Ajvar are registered with the World Intellectual Property Organization in order to protect their brand names. == Etymology and origin == The name ajvar comes from the Turkish word havyar, which means salted roe, caviar and shares an etymology with caviar, coming from the Persian word xaviyar. Before the 20th century, significant local production of caviar occurred on the Danube, with sturgeon swimming from the Black Sea up to Belgrade. Domestic ajvar, meaning caviar,” used to be a very popular dish in Belgrade homes and restaurants, but the domestic production of caviar became unsteady in the 1890s because of labor disputes. Eventually a special pepper salad was offered as a substitute in Belgrade restaurants under the name red ajvar or Serbian ajvar . == Preparation == Homemade ajvar is made of roasted, minced, and then cooked peppers, while some industrial producers use fresh minced peppers, cooked with sunflower oil afterwards, which leads to a lower quality. Ajvar preparation is somewhat difficult, because it requires considerable manual labour, particularly for peeling the roasted peppers. Traditionally, people prepare it in mid-autumn, when bell peppers are most abundant, and preserve it in glass jars for consumption throughout the year. Anecdotally, most households' stocks do not last until the spring, when fresh vegetables become available, so it is usually enjoyed as a winter food. Often, the whole family or neighbours gather to prepare the bell peppers. The traditional cultivar of pepper used is called roga . Roga is large, red, horn-shaped, with thick flesh and relatively easy to peel. It typically ripens in late September.To produce ajvar, bell peppers are roasted whole on a plate on an open fire, a plate of wood in a stove, or in an oven. The baked peppers must briefly cool to allow the flesh to separate from the skin. Next, the skin is carefully peeled off and the seeds are removed. The peppers are then ground in a mill or chopped into tiny pieces . Finally, the resulting mash is stewed for several hours in large pots. Sunflower oil is added at this stage to condense and reduce the water, and to enhance later preservation. Salt is added at the end and the hot mush is poured directly into sterilized glass jars, which are sealed immediately. == Production == Ajvar is produced in most Balkan countries, including Albania, Bosnia, Croatia, North Macedonia, Slovenia and Serbia. Serbia's reported annual production is 640 tons.Ajvar is one of the so-called zimnica , which include pickled chili peppers, pickled tomatoes, and anything else that can be preserved in a jar just before winter. == See also == Ljutenica – dishPages displaying wikidata descriptions as a fallbackPages displaying short descriptions with no spaces, a similar relish in Bulgarian, Macedonian, and Serbian cuisines Pindjur – relish formPages displaying wikidata descriptions as a fallback, a similar relish in Bosnian, Macedonian, and Serbian cuisines Zacuscă – Romanian-Moldovan dish, a similar relish in Romanian cuisine Kyopolou – Bulgarian-Turkish dish, an eggplant-based relish in Bulgarian and Turkish cuisines Malidzano, a similar relish in Macedonian cuisine Biber salçası – Paste made from peppers or tomato and salt, originating in TurkeyPages displaying short descriptions of redirect targets, a Turkish paste made from red peppers alone Lecso – Hungarian dishPages displaying short descriptions of redirect targets, a similar Hungarian stewed red pepper, onion, and garlic dish List of spreads Achar – Pickled varieties of vegetable and fruit, a similar relish of Indo-European origin in South Asian cuisines == References == == External links == Fall Brings Red Peppers and Ajvar, 'Serbian Salsa'. NPR. 8 November 2006. Ajvar srpski kavijar . Novosti. 2013. Leskovčanka po čijem receptu je brendiran srpski ajvar ušla u biznis kad je ostala bez posla. Blic . 2012. Ajvar - Top-notch gastronomic delight, vegan soul food, recipes and origin. Ajvar.com. 2017."
},
{
"metadata": {
"title": "Bob chorba",
"id": "124",
"source": "https://en.wikipedia.org/wiki/Bob_chorba",
"deleted": false
},
"data": "Bob chorba is a chorba, a Bulgarian soup. It is made from dry beans, onions, tomatoes, chubritza or dzhodzhen and carrots.Local variations may also exclude the carrots or include paprika, potatoes or even some kind of meat. Historically, it has been a common soup and staple food at Bulgarian monasteries. == See also == Bulgarian cuisine List of bean soups List of soups == References =="
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import re

from common.database_storage import DatabaseStorage
from config import get_value
Expand All @@ -12,26 +11,9 @@
log = logging.getLogger(__name__)


def clean_text(text):
"""
Prepares text for NLP tasks (embedding and RAG) by standardizing its form.
:param text: A string containing the text to be processed.
:return: The processed text as a string.
"""
return text


def load_and_clean_documents(content):
cleaned_documents = []
documents = json.loads(content)

for doc in documents:
if "data" in doc:
cleaned_text = clean_text(doc["data"])
cleaned_documents.append([cleaned_text])

print(len(cleaned_documents))
return cleaned_documents
def load_documents(json_file_path):
with open(json_file_path, encoding="utf-8") as file:
return json.load(file)


def embed_documents_in_batches(documents):
Expand All @@ -42,7 +24,7 @@ def embed_documents_in_batches(documents):
embeddings = []
for start_index in range(0, total):
# the resources are not enough to batch 2 documents at a time, so the batch = 1 doc
batch = documents[start_index]
batch = [documents[start_index]]
log.info(f"BATCH: {len(batch)}.")
embeddings.extend(model.encode(batch, show_progress_bar=True))

Expand All @@ -54,16 +36,13 @@ def run(job_input: IJobInput):
log.info(f"Starting job step {__name__}")

output_embeddings = get_value(job_input, "output_embeddings")

storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
storage_name = get_value(job_input, "storage_name", "confluence_data")

cleaned_documents = load_and_clean_documents(storage.retrieve(storage_name))
if cleaned_documents:
log.info(
f"{len(cleaned_documents)} documents loaded and cleaned for embedding."
)
embeddings = embed_documents_in_batches(cleaned_documents)
documents = load_documents(storage.retrieve(storage_name))
if documents:
log.info(f"{len(documents)} chunks loaded and cleaned for embedding.")
embeddings = embed_documents_in_batches(documents)
with open(output_embeddings, "wb") as file:
import pickle

Expand Down
Loading

0 comments on commit e1e76ba

Please sign in to comment.