Skip to content

Commit

Permalink
Dispatch Timeouts
Browse files Browse the repository at this point in the history
Signed-off-by: Elliot Scribner <[email protected]>
  • Loading branch information
ejscribner committed Feb 7, 2025
1 parent 440a0c3 commit 95c45c7
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from couchbase_columnar.cluster import Cluster
from couchbase_columnar.common.result import BlockingQueryResult
from couchbase_columnar.credential import Credential
from couchbase_columnar.options import QueryOptions
from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions
from jinja2 import BaseLoader, Environment
from pydantic import StrictFloat, StrictStr

Expand Down Expand Up @@ -348,7 +348,10 @@ def _get_columnar_cluster(config: CouchbaseColumnarOfflineStoreConfig) -> Cluste
assert config.password is not None

cred = Credential.from_username_and_password(config.user, config.password)
return Cluster.create_instance(config.connection_string, cred)
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
return Cluster.create_instance(
config.connection_string, cred, ClusterOptions(timeout_options=timeout_opts)
)


def _execute_query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from couchbase_columnar.cluster import Cluster
from couchbase_columnar.credential import Credential
from couchbase_columnar.options import QueryOptions
from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions
from typeguard import typechecked

from feast.data_source import DataSource
Expand Down Expand Up @@ -203,7 +203,12 @@ def get_table_column_names_and_types(
cred = Credential.from_username_and_password(
config.offline_store.user, config.offline_store.password
)
cluster = Cluster.create_instance(config.offline_store.connection_string, cred)
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
cluster = Cluster.create_instance(
config.offline_store.connection_string,
cred,
ClusterOptions(timeout_options=timeout_opts),
)

query_context = self.get_table_query_string()
query = f"""
Expand All @@ -218,7 +223,7 @@ def get_table_column_names_and_types(
"""

result = cluster.execute_query(
query, QueryOptions(timeout=timedelta(seconds=500))
query, QueryOptions(timeout=timedelta(seconds=config.offline_store.timeout))
)
if not result:
raise ZeroColumnQueryResult(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pandas as pd
from couchbase_columnar.cluster import Cluster
from couchbase_columnar.credential import Credential
from couchbase_columnar.options import QueryOptions
from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions

from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
Expand Down Expand Up @@ -38,7 +38,7 @@ def __init__(self, project_name: str, *args, **kwargs):
connection_string=os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"],
user=os.environ["COUCHBASE_COLUMNAR_USER"],
password=os.environ["COUCHBASE_COLUMNAR_PASSWORD"],
timeout=480,
timeout=120,
)

def create_data_source(
Expand Down Expand Up @@ -67,8 +67,11 @@ def format_row(row):
cred = Credential.from_username_and_password(
self.offline_store_config.user, self.offline_store_config.password
)
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
cluster = Cluster.create_instance(
self.offline_store_config.connection_string, cred
self.offline_store_config.connection_string,
cred,
ClusterOptions(timeout_options=timeout_opts),
)

create_cluster_query = f"CREATE ANALYTICS COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.{collection_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;"
Expand Down Expand Up @@ -127,8 +130,12 @@ def teardown(self):
cred = Credential.from_username_and_password(
self.offline_store_config.user, self.offline_store_config.password
)

timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
cluster = Cluster.create_instance(
self.offline_store_config.connection_string, cred
self.offline_store_config.connection_string,
cred,
ClusterOptions(timeout_options=timeout_opts),
)

for collection in self.collections:
Expand Down
11 changes: 8 additions & 3 deletions sdk/python/feast/templates/couchbase/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from couchbase_columnar.cluster import Cluster
from couchbase_columnar.common.errors import InvalidCredentialError, TimeoutError
from couchbase_columnar.credential import Credential
from couchbase_columnar.options import QueryOptions
from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions

from feast.file_utils import replace_str_in_file
from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase import (
Expand Down Expand Up @@ -61,13 +61,18 @@ def bootstrap():
cred = Credential.from_username_and_password(
columnar_user, columnar_password
)
cluster = Cluster.create_instance(columnar_connection_string, cred)
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
cluster = Cluster.create_instance(
columnar_connection_string,
cred,
ClusterOptions(timeout_options=timeout_opts),
)

table_name = "Default.Default.feast_driver_hourly_stats"
try:
cluster.execute_query(
f"DROP COLLECTION {table_name} IF EXISTS",
QueryOptions(timeout=timedelta(seconds=500)),
QueryOptions(timeout=timedelta(seconds=columnar_timeout)),
)
except TimeoutError:
# FIXME: temp workaround, timeouts occur in Columnar SDK even when the drop was successful
Expand Down

0 comments on commit 95c45c7

Please sign in to comment.