Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable pgvector support for Postgres provider #34891

Closed
wants to merge 9 commits into from

Conversation

sunank200
Copy link
Collaborator

@sunank200 sunank200 commented Oct 12, 2023

This PR is part of our larger effort to add first-class integrations to support LLMOps that was presented at the Airflow Summit. This PR specifically adds the pgvector support for Postgres Provider. pgvector is a renowned Open-source vector similarity search for Postgres. In this iteration, we are integrating with their Embeddings Model.

The primary objective of this Provider is to present users with an alternative embedding model. This allows them to generate vectors for their proprietary data, a pivotal step towards establishing integrations with LLM models like ChatGPT.

Example DAG:
The PgVectorIngestOperator can accept either a list of strings or a callable returning a list of strings.



^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Comment on lines +109 to +111
input_callable: Callable[[Any], Any] | None = None,
input_callable_args: Collection[Any] | None = None,
input_callable_kwargs: Mapping[str, Any] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wan't to address the same question as I asked in #34921 (comment)

What benefits provide this arguments, because for me it could be replaced be replaced by different things

  1. Make input data templated field and provide thought XCom from upstream task.
  2. Combination of taskflow (or PythonOperator) + PostgresHook
@task
def awesome_task(conn_id: str):
   hook = PostgresHook(postgres_conn_id=conn_id)
   ...
   hook.ingest_embedding(
       table="foo", 
       input_data=...
       vector_size=42
   )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the approaches we would like to support within the operator is to get data without using XCOMs, because if I am right there is no automatic cleanup of XCOMs, so users might not want to populate the database with large number of XCOM entries. Hence, we thought of supporting the callable approach within the operator

cc: @jlaneve

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, purpose here is to give a workaround to storing things in XComs. depending on how much data you're working with, passing a large number of vectors through XComs can be unideal (especially if you don't have a custom XCom backend). instead, giving the user the ability to execute the same data fetching code within the task means we don't pollute XComs.

this has disadvantages though, particularly around retries. hence why there are two different input methods to let the user decide which is right for them: (1) from XComs and (2) with a callable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(2) with a callable

That is exactly what a PythonOperator (and taskflow decorators) does as well as provide greater flexibility, e.g. access to task context.

In the other hand current implementation it is a combination of (1) classic operator and (2) restricted PythonOperator. The second part could be replaced by add examples in docs how to use Hook within the taskflow.
It would reduce complexibility of the code and number of required tests

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just have an @task.pgvector_import() decorator which provides a cleaner UX for instantiating the hook and passing params? Conceptually like https://github.com/astronomer/ask-astro/blob/00cfbd7a48aafe5603b1ef49342f1dd68c148156/airflow/dags/ingestion/ask-astro-load-blogs.py#L167

@pankajkoti pankajkoti mentioned this pull request Oct 18, 2023
@mpgreg mpgreg mentioned this pull request Oct 19, 2023
from pgvector.psycopg import register_vector
from psycopg2 import sql

self.conn.execute("CREATE EXTENSION IF NOT EXISTS vector")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.conn.execute("CREATE EXTENSION IF NOT EXISTS vector")

AFAIK, before Postgres 13 CREATE EXTENSION required superuser permissions, in PG 13+ trusted extension could be installed by someone with appropriate CREATE, however it is not a case of pgvector

So let keep create extensions for DBAs or for some one with appropriate permissions. We should not force users to use SU in their databases.

@pankajkoti
Copy link
Member

Closing this PR in lieu of a separate provider PR: #35399

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants