diff --git a/README.md b/README.md index 70c88d23..5b94d1f9 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,92 @@ with pool.connect() as db_conn: **Note for SQL Server users**: If your SQL Server instance requires SSL, you need to download the CA certificate for your instance and include `cafile={path to downloaded certificate}` and `validate_host=False`. This is a workaround for a [known issue](https://issuetracker.google.com/184867147). +### Custom Connector Object + +If you need to customize something about the connector, or want to specify +defaults for each connection to make, you can initialize a custom +`Connector` object directly: + +```python +from google.cloud.sql.connector import Connector + +# Note: all parameters below are optional +connector = Connector( + ip_type=IPTypes.PUBLIC, + enable_iam_auth=False, + timeout=30, + credentials=custom_creds # google.auth.credentials.Credentials +) +``` + +You can then call the Connector object's `connect` method as you +would the default `connector.connect`: + +```python +def getconn() -> pymysql.connections.Connection: + conn = connector.connect( + "project:region:instance", + "pymysql", + user="root", + password="shhh", + db="your-db-name" + ) + return conn +``` + +To close the `Connector` object's background resources, call it's `close()` method as follows: + +```python +connector.close() +``` + +### Using Connector as a Context Manager + +The `Connector` object can also be used as a context manager in order to +automatically close and cleanup resources, removing the need for explicit +calls to `connector.close()`. + +Connector as a context manager: + +```python +from google.cloud.sql.connector import Connector + +# build connection +def getconn() -> pymysql.connections.Connection: + with Connector() as connector: + conn = connector.connect( + "project:region:instance", + "pymysql", + user="root", + password="shhh", + db="your-db-name" + ) + return conn + +# create connection pool +pool = sqlalchemy.create_engine( + "mysql+pymysql://", + creator=getconn, +) + +# insert statement +insert_stmt = sqlalchemy.text( + "INSERT INTO my_table (id, title) VALUES (:id, :title)", +) + +# interact with Cloud SQL database using connection pool +with pool.connect() as db_conn: + # insert into database + db_conn.execute(insert_stmt, id="book1", title="Book One") + + # query database + result = db_conn.execute("SELECT * from my_table").fetchall() + + # Do something with the results + for row in result: + print(row) +``` + ### Specifying Public or Private IP The Cloud SQL Connector for Python can be used to connect to Cloud SQL instances using both public and private IP addresses. To specify which IP address to use to connect, set the `ip_type` keyword argument Possible values are `IPTypes.PUBLIC` and `IPTypes.PRIVATE`. Example: diff --git a/google/cloud/sql/connector/__init__.py b/google/cloud/sql/connector/__init__.py index ce78734c..216b8f60 100644 --- a/google/cloud/sql/connector/__init__.py +++ b/google/cloud/sql/connector/__init__.py @@ -16,11 +16,11 @@ from typing import List -from .connector import connect +from .connector import connect, Connector from .instance_connection_manager import IPTypes -__ALL__ = [connect, IPTypes] +__ALL__ = [connect, Connector, IPTypes] try: import pkg_resources diff --git a/google/cloud/sql/connector/connector.py b/google/cloud/sql/connector/connector.py index fff3847b..3feb3f6d 100755 --- a/google/cloud/sql/connector/connector.py +++ b/google/cloud/sql/connector/connector.py @@ -16,6 +16,7 @@ import asyncio import concurrent import logging +from types import TracebackType from google.cloud.sql.connector.instance_connection_manager import ( InstanceConnectionManager, IPTypes, @@ -23,7 +24,7 @@ from google.cloud.sql.connector.utils import generate_keys from google.auth.credentials import Credentials from threading import Thread -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Type logger = logging.getLogger(name=__name__) @@ -182,22 +183,29 @@ async def connect_async( icm.force_refresh() raise (e) - async def _close(self) -> None: - """Helper function to close InstanceConnectionManagers' tasks.""" - await asyncio.gather(*[icm.close() for icm in self._instances.values()]) + def __enter__(self) -> Any: + """Enter context manager by returning Connector object""" + return self - def __del__(self) -> None: - """Deconstructor to make sure InstanceConnectionManagers are closed - and tasks have finished to have a graceful exit. - """ - logger.debug("Entering deconstructor") + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + """Exit context manager by closing Connector""" + self.close() - deconstruct_future = asyncio.run_coroutine_threadsafe( - self._close(), loop=self._loop - ) + def close(self) -> None: + """Close Connector by stopping tasks and releasing resources.""" + close_future = asyncio.run_coroutine_threadsafe(self._close(), loop=self._loop) # Will attempt to safely shut down tasks for 5s - deconstruct_future.result(timeout=5) - logger.debug("Finished deconstructing") + close_future.result(timeout=5) + + async def _close(self) -> None: + """Helper function to cancel InstanceConnectionManagers' tasks + and close aiohttp.ClientSession.""" + await asyncio.gather(*[icm.close() for icm in self._instances.values()]) def connect(instance_connection_string: str, driver: str, **kwargs: Any) -> Any: diff --git a/tests/system/test_connector_object.py b/tests/system/test_connector_object.py index 123a3552..33926d1e 100644 --- a/tests/system/test_connector_object.py +++ b/tests/system/test_connector_object.py @@ -18,13 +18,13 @@ import sqlalchemy import logging import google.auth -from google.cloud.sql.connector import connector +from google.cloud.sql.connector import Connector import datetime import concurrent.futures def init_connection_engine( - custom_connector: connector.Connector, + custom_connector: Connector, ) -> sqlalchemy.engine.Engine: def getconn() -> pymysql.connections.Connection: conn = custom_connector.connect( @@ -48,7 +48,7 @@ def test_connector_with_credentials() -> None: credentials, project = google.auth.load_credentials_from_file( os.environ["GOOGLE_APPLICATION_CREDENTIALS"] ) - custom_connector = connector.Connector(credentials=credentials) + custom_connector = Connector(credentials=credentials) try: pool = init_connection_engine(custom_connector) @@ -57,12 +57,14 @@ def test_connector_with_credentials() -> None: except Exception as e: logging.exception("Failed to connect with credentials from file!", e) + # close connector + custom_connector.close() def test_multiple_connectors() -> None: """Test that same Cloud SQL instance can connect with two Connector objects.""" - first_connector = connector.Connector() - second_connector = connector.Connector() + first_connector = Connector() + second_connector = Connector() try: pool = init_connection_engine(first_connector) pool2 = init_connection_engine(second_connector) @@ -83,6 +85,10 @@ def test_multiple_connectors() -> None: except Exception as e: logging.exception("Failed to connect with multiple Connector objects!", e) + # close connectors + first_connector.close() + second_connector.close() + def test_connector_in_ThreadPoolExecutor() -> None: """Test that Connector can connect from ThreadPoolExecutor thread. @@ -91,12 +97,15 @@ def test_connector_in_ThreadPoolExecutor() -> None: def get_time() -> datetime.datetime: """Helper method for getting current time from database.""" - default_connector = connector.Connector() + default_connector = Connector() pool = init_connection_engine(default_connector) # connect to database and get current time with pool.connect() as conn: current_time = conn.execute("SELECT NOW()").fetchone() + + # close connector + default_connector.close() return current_time[0] # try running connector in ThreadPoolExecutor as Cloud Run does @@ -104,3 +113,12 @@ def get_time() -> datetime.datetime: future = executor.submit(get_time) return_value = future.result() assert isinstance(return_value, datetime.datetime) + + +def test_connector_as_context_manager() -> None: + """Test that Connector can be used as a context manager.""" + with Connector() as connector: + pool = init_connection_engine(connector) + + with pool.connect() as conn: + conn.execute("SELECT 1")