Skip to content

Commit

Permalink
Merge pull request #399 from lsst-sqre/tickets/DM-48719/tap-async
Browse files Browse the repository at this point in the history
Change TAP async to use run_async from pyvo instead of submit_job and polling
  • Loading branch information
stvoutsin authored Feb 5, 2025
2 parents 3aab64c + 761a9e1 commit 480af79
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 26 deletions.
5 changes: 5 additions & 0 deletions changelog.d/20250203_145642_steliosvoutsinas_tap_async.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<!-- Delete the sections that don't apply -->

### Other changes

- Modify TAPBusiness to use pyvo run_async instead of using submit_job and polling
38 changes: 12 additions & 26 deletions src/mobu/services/business/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ async def execute(self) -> None:

success = False
try:
if self.options.sync:
await self.run_sync_query(query)
else:
await self.run_async_query(query)
await self.run_query(query)
success = True
finally:
await self.events.tap_query.publish(
Expand All @@ -119,8 +116,8 @@ async def execute(self) -> None:

self.logger.info(f"Query finished after {elapsed} seconds")

async def run_async_query(self, query: str) -> None:
"""Run the query asynchronously.
async def run_query(self, query: str) -> None:
"""Run a TAP query either synchronously or asynchronously.
Parameters
----------
Expand All @@ -129,28 +126,17 @@ async def run_async_query(self, query: str) -> None:
"""
if not self._client:
raise RuntimeError("TAPBusiness startup never ran")
self.logger.info(f"Running (async): {query}")
job = self._client.submit_job(query)
try:
job.run()
while job.phase not in ("COMPLETED", "ERROR"):
await asyncio.sleep(30)
finally:
job.delete()

async def run_sync_query(self, query: str) -> None:
"""Run the query synchronously.

Parameters
----------
query
Query string to execute.
"""
if not self._client:
raise RuntimeError("TAPBusiness startup never ran")
self.logger.info(f"Running (sync): {query}")
if self.options.sync:
mode = "(sync)"
method = self._client.search
else:
mode = "(async)"
method = self._client.run_async

self.logger.info(f"Running {mode}: {query}")
loop = asyncio.get_event_loop()
await loop.run_in_executor(self._pool, self._client.search, query)
await loop.run_in_executor(self._pool, method, query)

def dump(self) -> TAPBusinessData:
return TAPBusinessData(
Expand Down

0 comments on commit 480af79

Please sign in to comment.