Skip to content

Commit

Permalink
async test working
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex committed Nov 22, 2024
1 parent 8dd0cec commit 30f4928
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 55 deletions.
23 changes: 3 additions & 20 deletions python/nistoar/midas/dbio/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,33 +828,15 @@ class DBClient(ABC):
"""

def __init__(self, config: Mapping, projcoll: str,websocket_server: WebSocketServer, nativeclient=None, foruser: str = ANONYMOUS):
print("Arguments passed to __init__:", locals())
print("\n")
print("websocket_server == ", websocket_server)
self._cfg = config
self._native = nativeclient
self._projcoll = projcoll
self._who = foruser
self._whogrps = None
# Get the current stack frame
stack = inspect.stack()

# Print the caller function
caller = stack[1]
print(f"__init__ called from {caller.function} in {caller.filename} at line {caller.lineno}")
print("----- \n")
# Print the caller function
caller = stack[2]
print(f"__init__ called from {caller.function} in {caller.filename} at line {caller.lineno}")

caller = stack[3]
print(f"__init__ called from {caller.function} in {caller.filename} at line {caller.lineno}")

self._dbgroups = DBGroups(self)

self.websocket_server = websocket_server
message = f"Record created: {self._projcoll} for user {foruser}"
asyncio.run(self.websocket_server.send_message_to_clients(message))


@property
Expand Down Expand Up @@ -918,8 +900,9 @@ def create_record(self, name: str, shoulder: str = None, foruser: str = None) ->
rec = ProjectRecord(self._projcoll, rec, self)
rec.save()
# Send a message through the WebSocket
message = f"Record created: {name} for user {foruser}"
#asyncio.run(self.websocket_server.send_message_to_clients(message))
message = f"{name}"
asyncio.run(self.websocket_server.send_message_to_clients(message))
#print(message)
return rec

def _default_shoulder(self):
Expand Down
7 changes: 0 additions & 7 deletions python/nistoar/midas/dbio/inmem.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def select_records(self, perm: base.Permissions=base.ACLs.OWN, **cnsts) -> Itera
for p in perm:
if rec.authorized(p):
yield deepcopy(rec)
break

def adv_select_records(self, filter:dict,
perm: base.Permissions=base.ACLs.OWN,) -> Iterator[base.ProjectRecord]:
Expand Down Expand Up @@ -167,11 +166,5 @@ def create_client(self, servicetype: str, config: Mapping={}, foruser: str = bas
cfg = merge_config(config, deepcopy(self._cfg))
if servicetype not in self._db:
self._db[servicetype] = {}
print("\n AZZ \n")
print(self._db)
print(cfg)
print(servicetype)
print(self.websocket_server)
print(foruser)
return InMemoryDBClient(self._db, cfg, servicetype,self.websocket_server, foruser)

33 changes: 18 additions & 15 deletions python/nistoar/midas/dbio/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import websockets
from concurrent.futures import ThreadPoolExecutor
import copy

class WebSocketServer:
def __init__(self, host="localhost", port=8765):
Expand All @@ -12,10 +13,10 @@ def __init__(self, host="localhost", port=8765):

async def start(self):
self.server = await websockets.serve(self.websocket_handler, self.host, self.port)
print(f"WebSocket server started on ws://{self.host}:{self.port}")
#print(f"WebSocket server started on ws://{self.host}:{self.port}")


async def websocket_handler(self, websocket, path):
async def websocket_handler(self, websocket):
# Add the new client to the set
self.clients.add(websocket)
try:
Expand All @@ -26,20 +27,11 @@ async def websocket_handler(self, websocket, path):
self.clients.remove(websocket)

async def send_message_to_clients(self, message):
for client in self.clients:
print(client)
if self.clients:
await asyncio.wait([client.send(message) for client in self.clients])

def is_running(self):
return self.server is not None and self.server.is_serving()

def start_in_thread(self):
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(self._start_event_loop)

def _start_event_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.start())
for client in self.clients:
asyncio.create_task(client.send(message))

async def stop(self):
if self.server:
Expand All @@ -51,3 +43,14 @@ async def wait_closed(self):
if self.server:
await self.server.wait_closed()

def __deepcopy__(self, memo):
# Create a shallow copy of the object
new_copy = copy.copy(self)
# Deep copy the attributes that are not problematic
new_copy.host = copy.deepcopy(self.host, memo)
new_copy.port = copy.deepcopy(self.port, memo)
new_copy.clients = copy.deepcopy(self.clients, memo)
# Do not copy the problematic attribute
new_copy.server = self.server
return new_copy

67 changes: 56 additions & 11 deletions python/tests/nistoar/midas/dbio/test_inmem.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
dmp = json.load(file)





class TestInMemoryDBClientFactory(test.TestCase):

@classmethod
Expand All @@ -45,7 +42,7 @@ def initialize_websocket_server(cls):
cls.loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls.loop)
cls.loop.run_until_complete(websocket_server.start())
print("WebSocketServer initialized:", websocket_server)
#print("WebSocketServer initialized:", websocket_server)
return websocket_server

@classmethod
Expand All @@ -57,7 +54,6 @@ def tearDownClass(cls):
# Ensure the WebSocket server is properly closed
cls.loop.run_until_complete(cls.websocket_server.stop())
cls.loop.run_until_complete(cls.websocket_server.wait_closed())
print("WebSocketServer closed")

# Cancel all lingering tasks
asyncio.set_event_loop(cls.loop) # Set the event loop as the current event loop
Expand All @@ -68,7 +64,6 @@ def tearDownClass(cls):

# Close the event loop
cls.loop.close()
print("Event loop closed")



Expand Down Expand Up @@ -108,7 +103,7 @@ def initialize_websocket_server(cls):
cls.loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls.loop)
cls.loop.run_until_complete(websocket_server.start())
print("WebSocketServer initialized:", websocket_server)
#print("WebSocketServer initialized:", websocket_server)
return websocket_server

@classmethod
Expand All @@ -120,7 +115,6 @@ def tearDownClass(cls):
# Ensure the WebSocket server is properly closed
cls.loop.run_until_complete(cls.websocket_server.stop())
cls.loop.run_until_complete(cls.websocket_server.wait_closed())
print("WebSocketServer closed")

# Cancel all lingering tasks
asyncio.set_event_loop(cls.loop)
Expand All @@ -131,7 +125,6 @@ def tearDownClass(cls):

# Close the event loop
cls.loop.close()
print("Event loop closed")

def setUp(self):
self.cfg = {"default_shoulder": "mds3"}
Expand Down Expand Up @@ -349,8 +342,6 @@ def test_adv_select_records(self):
self.cli._db[base.DMP_PROJECTS][id] = rec.to_dict()




id = "pdr0:0006"
rec = base.ProjectRecord(
base.DMP_PROJECTS, {"id": id, "name": "test 2", "status": {
Expand Down Expand Up @@ -522,5 +513,59 @@ def test_record_action(self):
self.assertEqual(acts[1]['type'], Action.COMMENT)


class TestWebSocketServer(test.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.websocket_server = WebSocketServer()
self.loop = asyncio.get_event_loop()
await self.websocket_server.start()

# Initialize the InMemoryDBClientFactory with the websocket_server
self.cfg = {"default_shoulder": "mds3"}
self.user = "nist0:ava1"
self.cli = inmem.InMemoryDBClientFactory({},self.websocket_server).create_client(
base.DMP_PROJECTS, self.cfg, self.user)

async def asyncTearDown(self):
await self.websocket_server.stop()
await self.websocket_server.wait_closed()


async def test_create_records_websocket(self):
messages = []

async def receive_messages(uri):
try:
async with websockets.connect(uri) as websocket:
while True:
message = await websocket.recv()
#print(f"Received message: {message}")
messages.append(message)
#print(f"Messages: {messages}")
# Break the loop after receiving the first message for this test
break
except Exception as e:
print(f"Failed to connect to WebSocket server: {e}")

# Start the WebSocket client to receive messages
uri = 'ws://localhost:8765'
receive_task = asyncio.create_task(receive_messages(uri))
await asyncio.sleep(2)

#await self.websocket_server.send_message_to_clients("Connection established")
# Inject some data into the database
await self.websocket_server.send_message_to_clients("test1")
await asyncio.sleep(2)
id = "pdr0:0002"
rec = base.ProjectRecord(
base.DMP_PROJECTS, {"id": id, "name": "test 1"}, self.cli)
self.cli._db[base.DMP_PROJECTS][id] = rec.to_dict()


# Assert the number of messages received
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0], "test1")



if __name__ == '__main__':
test.main()
16 changes: 14 additions & 2 deletions python/tests/nistoar/midas/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import websockets
from concurrent.futures import ThreadPoolExecutor
import copy

class WebSocketServer:
def __init__(self, host="localhost", port=8765):
Expand All @@ -12,10 +13,10 @@ def __init__(self, host="localhost", port=8765):

async def start(self):
self.server = await websockets.serve(self.websocket_handler, self.host, self.port)
print(f"WebSocket server started on ws://{self.host}:{self.port}")
#print(f"WebSocket server started on ws://{self.host}:{self.port}")


async def websocket_handler(self, websocket, path):
async def websocket_handler(self, websocket):
# Add the new client to the set
self.clients.add(websocket)
try:
Expand Down Expand Up @@ -50,4 +51,15 @@ async def stop(self):
async def wait_closed(self):
if self.server:
await self.server.wait_closed()

def __deepcopy__(self, memo):
# Create a shallow copy of the object
new_copy = copy.copy(self)
# Deep copy the attributes that are not problematic
new_copy.host = copy.deepcopy(self.host, memo)
new_copy.port = copy.deepcopy(self.port, memo)
new_copy.clients = copy.deepcopy(self.clients, memo)
# Do not copy the problematic attribute
new_copy.server = self.server
return new_copy

0 comments on commit 30f4928

Please sign in to comment.