Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: resolve TimeoutError and add context manager to Connector #309

Merged
merged 4 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,92 @@ with pool.connect() as db_conn:

**Note for SQL Server users**: If your SQL Server instance requires SSL, you need to download the CA certificate for your instance and include `cafile={path to downloaded certificate}` and `validate_host=False`. This is a workaround for a [known issue](https://issuetracker.google.com/184867147).

### Custom Connector Object

If you need to customize something about the connector, or want to specify
defaults for each connection to make, you can initialize a custom
`Connector` object directly:
jackwotherspoon marked this conversation as resolved.
Show resolved Hide resolved

```python
from google.cloud.sql.connector import Connector

# Note: all parameters below are optional
connector = Connector(
ip_type=IPTypes.PUBLIC,
enable_iam_auth=False,
timeout=30,
credentials=custom_creds # google.auth.credentials.Credentials
)
```

You can then call the Connector object's `connect` method as you
would the default `connector.connect`:

```python
def getconn() -> pymysql.connections.Connection:
conn = connector.connect(
"project:region:instance",
"pymysql",
user="root",
password="shhh",
db="your-db-name"
)
return conn
```

To close the `Connector` object's background resources, call it's `close()` method as follows:

```python
connector.close()
```

### Using Connector as a Context Manager

The `Connector` object can also be used as a context manager in order to
automatically close and cleanup resources, removing the need for explicit
calls to `connector.close()`.

Connector as a context manager:

```python
from google.cloud.sql.connector import Connector

# build connection
def getconn() -> pymysql.connections.Connection:
with Connector() as connector:
conn = connector.connect(
"project:region:instance",
"pymysql",
user="root",
password="shhh",
db="your-db-name"
)
return conn

# create connection pool
pool = sqlalchemy.create_engine(
"mysql+pymysql://",
creator=getconn,
)

# insert statement
insert_stmt = sqlalchemy.text(
"INSERT INTO my_table (id, title) VALUES (:id, :title)",
)

# interact with Cloud SQL database using connection pool
with pool.connect() as db_conn:
# insert into database
db_conn.execute(insert_stmt, id="book1", title="Book One")

# query database
result = db_conn.execute("SELECT * from my_table").fetchall()

# Do something with the results
for row in result:
print(row)
```

### Specifying Public or Private IP
The Cloud SQL Connector for Python can be used to connect to Cloud SQL instances using both public and private IP addresses. To specify which IP address to use to connect, set the `ip_type` keyword argument Possible values are `IPTypes.PUBLIC` and `IPTypes.PRIVATE`.
Example:
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/sql/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

from typing import List

from .connector import connect
from .connector import connect, Connector
from .instance_connection_manager import IPTypes


__ALL__ = [connect, IPTypes]
__ALL__ = [connect, Connector, IPTypes]

try:
import pkg_resources
Expand Down
36 changes: 22 additions & 14 deletions google/cloud/sql/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
import asyncio
import concurrent
import logging
from types import TracebackType
from google.cloud.sql.connector.instance_connection_manager import (
InstanceConnectionManager,
IPTypes,
)
from google.cloud.sql.connector.utils import generate_keys
from google.auth.credentials import Credentials
from threading import Thread
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Type

logger = logging.getLogger(name=__name__)

Expand Down Expand Up @@ -182,22 +183,29 @@ async def connect_async(
icm.force_refresh()
raise (e)

async def _close(self) -> None:
"""Helper function to close InstanceConnectionManagers' tasks."""
await asyncio.gather(*[icm.close() for icm in self._instances.values()])
def __enter__(self) -> Any:
kurtisvg marked this conversation as resolved.
Show resolved Hide resolved
"""Enter context manager by returning Connector object"""
return self

def __del__(self) -> None:
"""Deconstructor to make sure InstanceConnectionManagers are closed
and tasks have finished to have a graceful exit.
"""
logger.debug("Entering deconstructor")
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Exit context manager by closing Connector"""
self.close()

deconstruct_future = asyncio.run_coroutine_threadsafe(
self._close(), loop=self._loop
)
def close(self) -> None:
"""Close Connector by stopping tasks and releasing resources."""
close_future = asyncio.run_coroutine_threadsafe(self._close(), loop=self._loop)
# Will attempt to safely shut down tasks for 5s
deconstruct_future.result(timeout=5)
logger.debug("Finished deconstructing")
close_future.result(timeout=5)

async def _close(self) -> None:
"""Helper function to cancel InstanceConnectionManagers' tasks
and close aiohttp.ClientSession."""
await asyncio.gather(*[icm.close() for icm in self._instances.values()])
kurtisvg marked this conversation as resolved.
Show resolved Hide resolved


def connect(instance_connection_string: str, driver: str, **kwargs: Any) -> Any:
Expand Down
30 changes: 24 additions & 6 deletions tests/system/test_connector_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import sqlalchemy
import logging
import google.auth
from google.cloud.sql.connector import connector
from google.cloud.sql.connector import Connector
import datetime
import concurrent.futures


def init_connection_engine(
custom_connector: connector.Connector,
custom_connector: Connector,
) -> sqlalchemy.engine.Engine:
def getconn() -> pymysql.connections.Connection:
conn = custom_connector.connect(
Expand All @@ -48,7 +48,7 @@ def test_connector_with_credentials() -> None:
credentials, project = google.auth.load_credentials_from_file(
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
)
custom_connector = connector.Connector(credentials=credentials)
custom_connector = Connector(credentials=credentials)
try:
pool = init_connection_engine(custom_connector)

Expand All @@ -57,12 +57,14 @@ def test_connector_with_credentials() -> None:

except Exception as e:
logging.exception("Failed to connect with credentials from file!", e)
# close connector
custom_connector.close()


def test_multiple_connectors() -> None:
"""Test that same Cloud SQL instance can connect with two Connector objects."""
first_connector = connector.Connector()
second_connector = connector.Connector()
first_connector = Connector()
second_connector = Connector()
try:
pool = init_connection_engine(first_connector)
pool2 = init_connection_engine(second_connector)
Expand All @@ -83,6 +85,10 @@ def test_multiple_connectors() -> None:
except Exception as e:
logging.exception("Failed to connect with multiple Connector objects!", e)

# close connectors
first_connector.close()
second_connector.close()


def test_connector_in_ThreadPoolExecutor() -> None:
"""Test that Connector can connect from ThreadPoolExecutor thread.
Expand All @@ -91,16 +97,28 @@ def test_connector_in_ThreadPoolExecutor() -> None:

def get_time() -> datetime.datetime:
"""Helper method for getting current time from database."""
default_connector = connector.Connector()
default_connector = Connector()
pool = init_connection_engine(default_connector)

# connect to database and get current time
with pool.connect() as conn:
current_time = conn.execute("SELECT NOW()").fetchone()

# close connector
default_connector.close()
return current_time[0]

# try running connector in ThreadPoolExecutor as Cloud Run does
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(get_time)
return_value = future.result()
assert isinstance(return_value, datetime.datetime)


def test_connector_as_context_manager() -> None:
"""Test that Connector can be used as a context manager."""
with Connector() as connector:
pool = init_connection_engine(connector)

with pool.connect() as conn:
conn.execute("SELECT 1")