Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pinecone Provider #35094

Merged
merged 30 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
eb103ec
Add Pinecone Provider
utkarsharma2 Oct 20, 2023
ec0b911
Update docs/apache-airflow-providers-pinecone/index.rst
utkarsharma2 Oct 20, 2023
da24ad5
Add Pinecone Provider
utkarsharma2 Oct 20, 2023
5e95d79
Fix docs and interface
utkarsharma2 Oct 27, 2023
7efde04
Update airflow/providers/pinecone/hooks/pinecone_hook.py
utkarsharma2 Oct 30, 2023
c9ee020
Update airflow/providers/pinecone/hooks/pinecone_hook.py
utkarsharma2 Oct 30, 2023
b21878e
Update airflow/providers/pinecone/operators/pinecone_operator.py
utkarsharma2 Oct 30, 2023
8bf3976
Update airflow/providers/pinecone/operators/pinecone_operator.py
utkarsharma2 Oct 30, 2023
33467c9
Update airflow/providers/pinecone/operators/pinecone_operator.py
utkarsharma2 Oct 30, 2023
ea0105e
Update airflow/providers/pinecone/operators/pinecone_operator.py
utkarsharma2 Oct 30, 2023
2bfb8a5
Update docs/apache-airflow-providers-pinecone/operators/pinecone.rst
utkarsharma2 Oct 30, 2023
03022a4
Update docs/apache-airflow-providers-pinecone/operators/pinecone.rst
utkarsharma2 Oct 30, 2023
1c8ea50
Update tests/system/providers/pinecone/example_dag_pinecone.py
utkarsharma2 Oct 30, 2023
8fdf439
Update airflow/providers/pinecone/hooks/pinecone_hook.py
utkarsharma2 Oct 31, 2023
2a0fbfa
Rename pinecone_hook to pinecone as per AIP-21
utkarsharma2 Oct 31, 2023
3f6e943
Remove placeholder from connection form
utkarsharma2 Oct 31, 2023
1d55b38
Address PR comments
utkarsharma2 Oct 31, 2023
21ee990
Fix Connection fields issue
utkarsharma2 Oct 31, 2023
17c7fdd
Add upsert_kwargs as param
utkarsharma2 Nov 1, 2023
3960051
Fix example
utkarsharma2 Nov 2, 2023
f4a8dbe
Update changelog
utkarsharma2 Nov 3, 2023
75092fc
Add security.rst to pinecone docs
utkarsharma2 Nov 3, 2023
8cc9ab1
Fix test issue
utkarsharma2 Nov 3, 2023
3bc5d53
Update doc strings
utkarsharma2 Nov 3, 2023
85a6adf
Update docs/apache-airflow-providers-pinecone/index.rst
utkarsharma2 Nov 3, 2023
cc0402f
Add ref to sequrity.rst
pankajastro Nov 4, 2023
6ad3d02
Add changelog.rst
pankajastro Nov 4, 2023
1192e55
Fix malformed table
utkarsharma2 Nov 6, 2023
6de02a5
Resolve conflict
utkarsharma2 Nov 6, 2023
749215e
Fix static checks
utkarsharma2 Nov 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ body:
- oracle
- pagerduty
- papermill
- pinecone
- plexus
- postgres
- presto
Expand Down
6 changes: 3 additions & 3 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -677,9 +677,9 @@ doc, doc_gen, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api,
github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc,
jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp,
microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie,
oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, rabbitmq,
redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp,
snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv,
oracle, otel, pagerduty, pandas, papermill, password, pinecone, pinot, plexus, postgres, presto,
rabbitmq, redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv,
weaviate, webhdfs, winrm, yandex, zendesk
.. END EXTRAS HERE

Expand Down
6 changes: 3 additions & 3 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ doc, doc_gen, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api,
github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc,
jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp,
microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie,
oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, rabbitmq,
redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp,
snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv,
oracle, otel, pagerduty, pandas, papermill, password, pinecone, pinot, plexus, postgres, presto,
rabbitmq, redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv,
weaviate, webhdfs, winrm, yandex, zendesk
# END EXTRAS HERE

Expand Down
26 changes: 26 additions & 0 deletions airflow/providers/pinecone/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

``apache-airflow-providers-pinecone``

Changelog
---------

1.0.0
.....

Initial version of the provider.
16 changes: 16 additions & 0 deletions airflow/providers/pinecone/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/providers/pinecone/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
128 changes: 128 additions & 0 deletions airflow/providers/pinecone/hooks/pinecone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Hook for Pinecone."""
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import pinecone

from airflow.hooks.base import BaseHook

if TYPE_CHECKING:
from pinecone.core.client.models import UpsertResponse


class PineconeHook(BaseHook):
"""
Interact with Pinecone. This hook uses the Pinecone conn_id.

:param conn_id: Optional, default connection id is `pinecone_default`. The connection id to use when
connecting to Pinecone.
"""

conn_name_attr = "conn_id"
default_conn_name = "pinecone_default"
conn_type = "pinecone"
hook_name = "Pinecone"

@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
"""Returns connection widgets to add to connection form."""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField

return {
"log_level": StringField(lazy_gettext("Log Level"), widget=BS3TextFieldWidget(), default=None),
"project_name": StringField(
lazy_gettext("Project Name"),
widget=BS3TextFieldWidget(),
),
}

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Returns custom field behaviour."""
return {
"hidden_fields": ["port", "schema"],
"relabeling": {"login": "Pinecone Environment", "password": "Pinecone API key"},
}

def __init__(self, conn_id: str = default_conn_name) -> None:
self.conn_id = conn_id
self.get_conn()

def get_conn(self) -> None:
pinecone_connection = self.get_connection(self.conn_id)
api_key = pinecone_connection.password
pinecone_environment = pinecone_connection.login
pinecone_host = pinecone_connection.host
extras = pinecone_connection.extra_dejson
pinecone_project_name = extras.get("project_name")
log_level = extras.get("log_level", None)
pinecone.init(
api_key=api_key,
environment=pinecone_environment,
host=pinecone_host,
project_name=pinecone_project_name,
log_level=log_level,
)

def test_connection(self) -> tuple[bool, str]:
try:
pinecone.list_indexes()
return True, "Connection established"
except Exception as e:
return False, str(e)

@staticmethod
def upsert(
index_name: str,
vectors: list[Any],
namespace: str = "",
batch_size: int | None = None,
show_progress: bool = True,
**kwargs: Any,
) -> UpsertResponse:
"""
The upsert operation writes vectors into a namespace.

If a new value is upserted for an existing vector id, it will overwrite the previous value.

.. seealso:: https://docs.pinecone.io/reference/upsert

To upsert in parallel follow

.. seealso:: https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel

:param index_name: The name of the index to describe.
:param vectors: A list of vectors to upsert.
:param namespace: The namespace to write to. If not specified, the default namespace - "" is used.
:param batch_size: The number of vectors to upsert in each batch.
:param show_progress: Whether to show a progress bar using tqdm. Applied only
if batch_size is provided.
"""
index = pinecone.Index(index_name)
return index.upsert(
vectors=vectors,
namespace=namespace,
batch_size=batch_size,
show_progress=show_progress,
**kwargs,
)
16 changes: 16 additions & 0 deletions airflow/providers/pinecone/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
83 changes: 83 additions & 0 deletions airflow/providers/pinecone/operators/pinecone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence

from airflow.models import BaseOperator
from airflow.providers.pinecone.hooks.pinecone import PineconeHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class PineconeIngestOperator(BaseOperator):
"""
Ingest vector embeddings into Pinecone.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PineconeIngestOperator`

:param conn_id: The connection id to use when connecting to Pinecone.
:param index_name: Name of the Pinecone index.
:param input_vectors: Data to be ingested, in the form of a list of tuples where each tuple
contains (id, vector_embedding, metadata).
:param namespace: The namespace to write to. If not specified, the default namespace is used.
:param batch_size: The number of vectors to upsert in each batch.
:param upsert_kwargs: .. seealso:: https://docs.pinecone.io/reference/upsert
"""

template_fields: Sequence[str] = ("index_name", "input_vectors", "namespace")

def __init__(
self,
*,
conn_id: str = PineconeHook.default_conn_name,
index_name: str,
input_vectors: list[tuple],
namespace: str = "",
batch_size: int | None = None,
upsert_kwargs: dict | None = None,
**kwargs: Any,
) -> None:
self.upsert_kwargs = upsert_kwargs or {}
super().__init__(**kwargs)
self.conn_id = conn_id
self.index_name = index_name
self.namespace = namespace
self.batch_size = batch_size
self.input_vectors = input_vectors

@cached_property
def hook(self) -> PineconeHook:
"""Return an instance of the PineconeHook."""
return PineconeHook(conn_id=self.conn_id)

def execute(self, context: Context) -> None:
"""Ingest data into Pinecone using the PineconeHook."""
self.hook.upsert(
index_name=self.index_name,
vectors=self.input_vectors,
namespace=self.namespace,
batch_size=self.batch_size,
**self.upsert_kwargs,
)

self.log.info("Successfully ingested data into Pinecone index %s.", self.index_name)
54 changes: 54 additions & 0 deletions airflow/providers/pinecone/provider.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

---
package-name: apache-airflow-providers-pinecone

name: Pinecone

description: |
`Pinecone <https://docs.pinecone.io/docs/overview>`__

suspended: false

versions:
- 1.0.0

integrations:
- integration-name: Pinecone
external-doc-url: https://docs.pinecone.io/docs/overview
how-to-guide:
- /docs/apache-airflow-providers-pinecone/operators/pinecone.rst
tags: [software]

dependencies:
- apache-airflow>=2.5.0
- pinecone-client>=2.2.4

hooks:
- integration-name: Pinecone
python-modules:
- airflow.providers.pinecone.hooks.pinecone

connection-types:
- hook-class-name: airflow.providers.pinecone.hooks.pinecone.PineconeHook
connection-type: pinecone

operators:
- integration-name: Pinecone
python-modules:
- airflow.providers.pinecone.operators.pinecone
19 changes: 19 additions & 0 deletions docs/apache-airflow-providers-pinecone/changelog.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

.. include:: ../../airflow/providers/pinecone/CHANGELOG.rst
Loading