Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
jdye64 committed Oct 28, 2024
2 parents 44d668e + b0fdc68 commit 00cb3ea
Show file tree
Hide file tree
Showing 76 changed files with 4,397 additions and 1,371 deletions.
2 changes: 1 addition & 1 deletion CITATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ If you use NVIDIA Ingest in a publication, please use citations in the following

## Sample Citations:

Using [RAPIDS](rapids.ai) citations for reference.
Using [RAPIDS](https://rapids.ai/) citations for reference.

### Bringing UMAP Closer to the Speed of Light <br> with GPU Acceleration
```tex
Expand Down
8 changes: 4 additions & 4 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ External contributions will be welcome soon, and they are greatly appreciated! E
- [Seasoned Developers](#seasoned-developers)
- [Workflow](#workflow)
- [Common Processing Patterns](#common-processing-patterns)
- [traceable](#traceable)
- [nv_ingest_node_failure_context_manager](#nv_ingest_node_failure_context_manager)
- [filter_by_task](#filter_by_task)
- [cm_skip_processing_if_failed](#cm_skip_processing_if_failed)
- [traceable](#traceable---srcnv_ingestutiltracingtaggingpy)
- [nv_ingest_node_failure_context_manager](#nv_ingest_node_failure_context_manager---srcnv_ingestutilexception_handlersdecoratorspy)
- [filter_by_task](#filter_by_task---srcnv_ingestutilflow_controlfilter_by_taskpy)
- [cm_skip_processing_if_failed](#cm_skip_processing_if_failed---morpheusutilscontrol_message_utilspy)
- [Adding a New Stage or Module](#adding-a-new-stage-or-module)
- [Common Practices for Writing Unit Tests](#common-practices-for-writing-unit-tests)
- [General Guidelines](#general-guidelines)
Expand Down
106 changes: 78 additions & 28 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# SPDX-License-Identifier: Apache-2.0
# syntax=docker/dockerfile:1.3

ARG BASE_IMG=nvcr.io/nvidia/morpheus/morpheus
ARG BASE_IMG_TAG=v24.06.01-runtime
ARG BASE_IMG=nvcr.io/nvidia/cuda
ARG BASE_IMG_TAG=12.2.2-base-ubuntu22.04

# Use NVIDIA Morpheus as the base image
FROM $BASE_IMG:$BASE_IMG_TAG AS base
Expand All @@ -13,27 +13,63 @@ ARG RELEASE_TYPE="dev"
ARG VERSION=""
ARG VERSION_REV="0"

# We require Python 3.10.15 but base image currently comes with 3.10.14, update here.
RUN source activate morpheus \
&& conda install python=3.10.15
# Install necessary dependencies using apt-get
RUN apt-get update && apt-get install -y \
wget \
bzip2 \
ca-certificates \
curl \
&& apt-get clean

# Install miniconda
RUN wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O /tmp/miniconda.sh \
&& bash /tmp/miniconda.sh -b -p /opt/conda \
&& rm /tmp/miniconda.sh

# Add conda to the PATH
ENV PATH=/opt/conda/bin:$PATH

# Install Mamba, a faster alternative to conda, within the base environment
RUN conda install -y mamba -n base -c conda-forge

# Create nv_ingest base environment
RUN conda create -y --name nv_ingest python=3.10.15

# Activate the environment (make it default for subsequent commands)
RUN echo "source activate nv_ingest" >> ~/.bashrc

# Set default shell to bash
SHELL ["/bin/bash", "-c"]

# Install Tini via conda from the conda-forge channel
RUN source activate nv_ingest \
&& mamba install -y -c conda-forge tini

# Install Morpheus dependencies
RUN source activate nv_ingest \
&& mamba install -y \
nvidia/label/dev::morpheus-core \
nvidia/label/dev::morpheus-llm \
-c rapidsai -c pytorch -c nvidia -c conda-forge

# Install additional dependencies using apt-get
RUN apt-get update && apt-get install -y \
libgl1-mesa-glx \
&& apt-get clean

# Set the working directory in the container
WORKDIR /workspace

RUN apt-get update \
&& apt-get install --yes \
libgl1-mesa-glx
# Copy custom entrypoint script
COPY ./docker/scripts/entrypoint.sh /workspace/docker/entrypoint.sh

FROM base AS nv_ingest_install
# Copy the module code
COPY setup.py setup.py
# Don't copy full source here, pipelines won't be installed via setup anyway, and this allows us to rebuild more quickly if we're just changing the pipeline

COPY ci ci
COPY requirements.txt extra-requirements.txt test-requirements.txt util-requirements.txt ./

SHELL ["/bin/bash", "-c"]

# Prevent haystack from ending telemetry data
# Prevent haystack from sending telemetry data
ENV HAYSTACK_TELEMETRY_ENABLED=False

# Ensure the NV_INGEST_VERSION is PEP 440 compatible
Expand All @@ -53,8 +89,10 @@ ENV NV_INGEST_RELEASE_TYPE=${RELEASE_TYPE}
ENV NV_INGEST_VERSION_OVERRIDE=${NV_INGEST_VERSION_OVERRIDE}
ENV NV_INGEST_CLIENT_VERSION_OVERRIDE=${NV_INGEST_VERSION_OVERRIDE}

SHELL ["/bin/bash", "-c"]

# Cache the requirements and install them before uploading source code changes
RUN source activate morpheus \
RUN source activate nv_ingest \
&& pip install -r requirements.txt

COPY tests tests
Expand All @@ -63,8 +101,8 @@ COPY client client
COPY src/nv_ingest src/nv_ingest
RUN rm -rf ./src/nv_ingest/dist ./client/dist

# Build the client and install it in the conda cache so that the later nv-ingest build can locate it
RUN source activate morpheus \
# Build the client and install it in the conda cache
RUN source activate nv_ingest \
&& pip install -e client \
&& pip install -r extra-requirements.txt

Expand All @@ -73,36 +111,48 @@ RUN chmod +x ./ci/scripts/build_pip_packages.sh \
&& ./ci/scripts/build_pip_packages.sh --type ${RELEASE_TYPE} --lib client \
&& ./ci/scripts/build_pip_packages.sh --type ${RELEASE_TYPE} --lib service

RUN source activate morpheus \
RUN source activate nv_ingest \
&& pip install ./dist/*.whl

RUN source activate morpheus \
RUN source activate nv_ingest \
&& rm -rf src requirements.txt test-requirements.txt util-requirements.txt

# Interim pyarrow backport until folded into upstream dependency tree
RUN source activate morpheus \
&& conda install https://anaconda.org/conda-forge/pyarrow/14.0.2/download/linux-64/pyarrow-14.0.2-py310h188ebfb_19_cuda.conda

# Upgrade setuptools to mitigate https://github.com/advisories/GHSA-cx63-2mw6-8hw5
RUN source activate base \
&& conda install setuptools==70.0.0

FROM base AS runtime

RUN source activate morpheus \
RUN source activate nv_ingest \
&& pip install ./client/dist/*.whl \
## Installations below can be removed after the next Morpheus release
&& pip install --no-input milvus==2.3.5 \
&& pip install --no-input pymilvus==2.3.6 \
&& pip install --no-input langchain==0.1.16 \
&& pip install --no-input langchain-nvidia-ai-endpoints==0.0.11 \
&& pip install --no-input faiss-gpu==1.7.* \
&& pip install --no-input google-search-results==2.4 \
&& pip install --no-input nemollm==0.3.5 \
&& rm -rf client/dist

# Install patched MRC version to circumvent NUMA node issue -- remove after Morpheus 10.24 release
RUN source activate nv_ingest \
&& conda install -y -c nvidia/label/dev mrc=24.10.00a=cuda_12.5_py310_h5ae46af_10

FROM nv_ingest_install AS runtime

COPY src/pipeline.py ./
COPY pyproject.toml ./
COPY ./docker/scripts/entrypoint_source_ext.sh /opt/docker/bin/entrypoint_source

RUN chmod +x /workspace/docker/entrypoint.sh

# Set entrypoint to tini with a custom entrypoint script
ENTRYPOINT ["/opt/conda/envs/nv_ingest/bin/tini", "--", "/workspace/docker/entrypoint.sh"]

# Start both the core nv-ingest pipeline service and the FastAPI microservice in parallel
CMD ["sh", "-c", "python /workspace/pipeline.py & uvicorn nv_ingest.main:app --workers 32 --host 0.0.0.0 --port 7670 & wait"]

FROM base AS development
FROM nv_ingest_install AS development

RUN source activate morpheus && \
RUN source activate nv_ingest && \
pip install -e ./client

CMD ["/bin/bash"]
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ Username: $oauthtoken
Password: <Your Key>
```

> [!NOTE]
> during the early access (EA) phase, your API key must be created as a member of `nemo-microservice / ea-participants` which you may join by applying for early access here: https://developer.nvidia.com/nemo-microservices-early-access/join. When approved, switch your profile to this org / team, then the key you generate will have access to the resources outlined below.
4. Create a .env file containing your NGC API key, and the following paths:
```
# Container images must access resources from NGC.
Expand Down
82 changes: 33 additions & 49 deletions client/src/nv_ingest_client/cli/util/click.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# SPDX-License-Identifier: Apache-2.0


import glob
import json
import logging
import os
Expand All @@ -13,6 +12,13 @@

import click
from nv_ingest_client.cli.util.processing import check_schema
from nv_ingest_client.primitives.tasks.caption import CaptionTaskSchema
from nv_ingest_client.primitives.tasks.chart_extraction import ChartExtractionSchema
from nv_ingest_client.primitives.tasks.chart_extraction import ChartExtractionTask
from nv_ingest_client.primitives.tasks.dedup import DedupTaskSchema
from nv_ingest_client.primitives.tasks.embed import EmbedTaskSchema
from nv_ingest_client.primitives.tasks.extract import ExtractTaskSchema
from nv_ingest_client.primitives.tasks.filter import FilterTaskSchema
from nv_ingest_client.primitives.tasks import CaptionTask
from nv_ingest_client.primitives.tasks import DedupTask
from nv_ingest_client.primitives.tasks import EmbedTask
Expand All @@ -21,14 +27,12 @@
from nv_ingest_client.primitives.tasks import SplitTask
from nv_ingest_client.primitives.tasks import StoreTask
from nv_ingest_client.primitives.tasks import VdbUploadTask
from nv_ingest_client.primitives.tasks.caption import CaptionTaskSchema
from nv_ingest_client.primitives.tasks.dedup import DedupTaskSchema
from nv_ingest_client.primitives.tasks.embed import EmbedTaskSchema
from nv_ingest_client.primitives.tasks.extract import ExtractTaskSchema
from nv_ingest_client.primitives.tasks.filter import FilterTaskSchema
from nv_ingest_client.primitives.tasks.split import SplitTaskSchema
from nv_ingest_client.primitives.tasks.store import StoreTaskSchema
from nv_ingest_client.primitives.tasks.table_extraction import TableExtractionSchema
from nv_ingest_client.primitives.tasks.table_extraction import TableExtractionTask
from nv_ingest_client.primitives.tasks.vdb_upload import VdbUploadTaskSchema
from nv_ingest_client.util.util import generate_matching_files

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -104,48 +108,59 @@ def click_validate_task(ctx, param, value):
if task_id == "split":
task_options = check_schema(SplitTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = SplitTask(**task_options.dict())
new_task = [(new_task_id, SplitTask(**task_options.dict()))]
elif task_id == "extract":
task_options = check_schema(ExtractTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}_{task_options.document_type}"
new_task = ExtractTask(**task_options.dict())
new_task = [(new_task_id, ExtractTask(**task_options.dict()))]

if (task_options.extract_tables == True):
subtask_options = check_schema(TableExtractionSchema, {}, "table_data_extract", "{}")
new_task.append(("table_data_extract", TableExtractionTask(**subtask_options.dict())))

if (task_options.extract_charts == True):
subtask_options = check_schema(ChartExtractionSchema, {}, "chart_data_extract", "{}")
new_task.append(("chart_data_extract", ChartExtractionTask(**subtask_options.dict())))

elif task_id == "store":
task_options = check_schema(StoreTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = StoreTask(**task_options.dict())
new_task = [(new_task_id, StoreTask(**task_options.dict()))]
elif task_id == "caption":
task_options = check_schema(CaptionTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = CaptionTask(**task_options.dict())
new_task = [(new_task_id, CaptionTask(**task_options.dict()))]
elif task_id == "dedup":
task_options = check_schema(DedupTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = DedupTask(**task_options.dict())
new_task = [(new_task_id, DedupTask(**task_options.dict()))]
elif task_id == "filter":
task_options = check_schema(FilterTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = FilterTask(**task_options.dict())
new_task = [(new_task_id, FilterTask(**task_options.dict()))]
elif task_id == "embed":
task_options = check_schema(EmbedTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = EmbedTask(**task_options.dict())
new_task = [(new_task_id, EmbedTask(**task_options.dict()))]
elif task_id == "vdb_upload":
task_options = check_schema(VdbUploadTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = VdbUploadTask(**task_options.dict())

new_task = [(new_task_id, VdbUploadTask(**task_options.dict()))]
else:
raise ValueError(f"Unsupported task type: {task_id}")

if new_task_id in validated_tasks:
raise ValueError(f"Duplicate task detected: {new_task_id}")

logger.debug("Adding task: %s", new_task_id)
validated_tasks[new_task_id] = new_task
for task_tuple in new_task:
validated_tasks[task_tuple[0]] = task_tuple[1]
except ValueError as e:
validation_errors.append(str(e))

if validation_errors:
# Aggregate error messages with original values highlighted
error_message = "\n".join(validation_errors)
# logger.error(error_message)
raise click.BadParameter(error_message)

return validated_tasks
Expand Down Expand Up @@ -190,37 +205,6 @@ def pre_process_dataset(dataset_json: str, shuffle_dataset: bool):
return file_source


def _generate_matching_files(file_sources):
"""
Generates a list of file paths that match the given patterns specified in file_sources.
Parameters
----------
file_sources : list of str
A list containing the file source patterns to match against.
Returns
-------
generator
A generator yielding paths to files that match the specified patterns.
Notes
-----
This function utilizes glob pattern matching to find files that match the specified patterns.
It yields each matching file path, allowing for efficient processing of potentially large
sets of files.
"""

files = [
file_path
for pattern in file_sources
for file_path in glob.glob(pattern, recursive=True)
if os.path.isfile(file_path)
]
for file_path in files:
yield file_path


def click_match_and_validate_files(ctx, param, value):
"""
Matches and validates files based on the provided file source patterns.
Expand All @@ -239,7 +223,7 @@ def click_match_and_validate_files(ctx, param, value):
if not value:
return []

matching_files = list(_generate_matching_files(value))
matching_files = list(generate_matching_files(value))
if not matching_files:
logger.warning("No files found matching the specified patterns.")
return []
Expand Down
Loading

0 comments on commit 00cb3ea

Please sign in to comment.