Skip to content

Commit 11b836e

Browse files
authored
Merge pull request #104 from icanbwell/separate-bundle-resources-when-streaming
fix bug when there is empty bundle and we ask to expand it
2 parents ae52919 + 5fa9e52 commit 11b836e

34 files changed

+289
-79
lines changed

README.md

+39
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing_extensions import Optionalfrom helix_fhir_client_sdk.responses.fhir_get_response import FhirGetResponsefrom build.lib.helix_fhir_client_sdk.responses.fhir_get_response import FhirGetResponsefrom helix_fhir_client_sdk.responses.fhir_get_response import FhirGetResponse
2+
13
# helix.fhir.client.sdk
24

35
<p align="left">
@@ -40,3 +42,40 @@ https://github.com/icanbwell/fhir-server-performance
4042
* 1.x supports python 3.7+
4143
* 2.x supports python 3.10+
4244
* 3.x supports python 3.12+
45+
46+
# Asynchronous Support
47+
When communicating with FHIR servers, a lot of time is spent waiting for the server to respond.
48+
This is a good use case for using asynchronous programming.
49+
This SDK supports asynchronous programming using the `async` and `await` keywords.
50+
51+
The return types are Python AsyncGenerators. Python makes it very easy to work with AsyncGenerators.
52+
53+
For example, if the SDK provides a function like this:
54+
```python
55+
56+
async def get_resources(self) -> AsyncGenerator[FhirGetResponse, None]:
57+
...
58+
```
59+
60+
You can iterate over the results as they become available:
61+
```python
62+
response: Optional[FhirGetResponse]
63+
async for response in client.get_resources():
64+
print(response.resource)
65+
```
66+
67+
Or you can get a list of responses (which will return AFTER all the responses are received:
68+
```python
69+
70+
responses: List[FhirGetResponse] = [response async for response in client.get_resources()]
71+
```
72+
73+
Or you can aggregate the responses into one response (which will return AFTER all the responses are received:
74+
```python
75+
76+
response: Optional[FhirGetResponse] = await FhirGetResponse.from_async_generator(client.get_resources())
77+
```
78+
79+
# Data Streaming
80+
For FHIR servers that support data streaming (e.g., b.well FHIR server), you can just set the `use_data_streaming` parameter to stream the data as it i received.
81+
The data will be streamed in AsyncGenerators as described above.

helix_fhir_client_sdk/fhir_merge_mixin.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ def merge(
297297
id_: Optional[str] = None,
298298
json_data_list: List[str],
299299
batch_size: Optional[int] = None,
300-
) -> FhirMergeResponse:
300+
) -> Optional[FhirMergeResponse]:
301301
"""
302302
Calls $merge function on FHIR server
303303
@@ -308,7 +308,7 @@ def merge(
308308
:return: response
309309
"""
310310

311-
result: FhirMergeResponse = AsyncRunner.run(
311+
result: Optional[FhirMergeResponse] = AsyncRunner.run(
312312
FhirMergeResponse.from_async_generator(
313313
self.merge_async(
314314
id_=id_, json_data_list=json_data_list, batch_size=batch_size

helix_fhir_client_sdk/graph/fhir_graph_mixin.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def graph(
6969
*,
7070
graph_definition: GraphDefinition,
7171
contained: bool,
72-
) -> FhirGetResponse:
72+
) -> Optional[FhirGetResponse]:
7373

7474
return AsyncRunner.run(
7575
FhirGetResponse.from_async_generator(

helix_fhir_client_sdk/graph/simulated_graph_processor_mixin.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ async def _get_resources_by_parameters_async(
494494
)
495495

496496
if cached_response:
497-
result.append([cached_response])
497+
result.append(cached_response)
498498
elif cached_response:
499499
result = cached_response
500500
assert result
@@ -540,7 +540,7 @@ async def simulate_graph_async(
540540
assert self._additional_parameters is not None
541541
self._additional_parameters.append("contained=true")
542542

543-
result: FhirGetResponse = await FhirGetResponse.from_async_generator(
543+
result: Optional[FhirGetResponse] = await FhirGetResponse.from_async_generator(
544544
self.process_simulate_graph_async(
545545
id_=id_,
546546
graph_json=graph_json,
@@ -559,6 +559,7 @@ async def simulate_graph_async(
559559
auth_scopes=self._auth_scopes,
560560
)
561561
)
562+
assert result, "No result returned from simulate_graph_async"
562563
return result
563564

564565
# noinspection PyPep8Naming

helix_fhir_client_sdk/responses/fhir_delete_response.py

+30-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Optional
1+
from typing import Optional, AsyncGenerator
22

33

44
class FhirDeleteResponse:
@@ -33,3 +33,32 @@ def __init__(
3333
""" Number of resources deleted """
3434
self.count: Optional[int] = count
3535
self.resource_type: Optional[str] = resource_type
36+
37+
def append(self, other: Optional["FhirDeleteResponse"]) -> None:
38+
"""
39+
Appends another FhirDeleteResponse to this one
40+
41+
:param other: FhirDeleteResponse to append
42+
"""
43+
if other:
44+
self.responses += other.responses
45+
self.error = (self.error or "") + (other.error or "")
46+
self.count = (self.count or 0) + (other.count or 0)
47+
48+
@classmethod
49+
async def from_async_generator(
50+
cls, generator: AsyncGenerator["FhirDeleteResponse", None]
51+
) -> Optional["FhirDeleteResponse"]:
52+
"""
53+
Reads a generator of FhirDeleteResponse and returns a single FhirDeleteResponse by appending all the FhirDeleteResponse
54+
55+
:param generator: generator of FhirDeleteResponse items
56+
:return: FhirDeleteResponse
57+
"""
58+
result: FhirDeleteResponse | None = None
59+
async for value in generator:
60+
if not result:
61+
result = value
62+
else:
63+
result.append(value)
64+
return result

helix_fhir_client_sdk/responses/fhir_get_response.py

+33-5
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,35 @@ def __init__(
8787
self.chunk_number: Optional[int] = chunk_number
8888
self.cache_hits: Optional[int] = cache_hits
8989

90-
def append(self, others: List["FhirGetResponse"]) -> "FhirGetResponse":
90+
def append(self, other_response: "FhirGetResponse") -> "FhirGetResponse":
91+
"""
92+
Append the responses from other to self
93+
94+
:param other_response: FhirGetResponse object to append to current one
95+
:return: self
96+
"""
97+
bundle_entries: List[BundleEntry] = self.get_bundle_entries()
98+
if other_response.responses:
99+
other_bundle_entries: List[BundleEntry] = (
100+
other_response.get_bundle_entries()
101+
)
102+
bundle_entries.extend(other_bundle_entries)
103+
bundle = {
104+
"resourceType": "Bundle",
105+
"entry": bundle_entries,
106+
}
107+
self.responses = json.dumps(bundle, cls=FhirJSONEncoder)
108+
if other_response.chunk_number and (other_response.chunk_number or 0) > (
109+
self.chunk_number or 0
110+
):
111+
self.chunk_number = other_response.chunk_number
112+
if other_response.next_url:
113+
self.next_url = other_response.next_url
114+
self.access_token = other_response.access_token
115+
self.cache_hits = (self.cache_hits or 0) + (other_response.cache_hits or 0)
116+
return self
117+
118+
def extend(self, others: List["FhirGetResponse"]) -> "FhirGetResponse":
91119
"""
92120
Append the responses from other to self
93121
@@ -373,10 +401,10 @@ def get_resource_type_and_ids(self) -> List[str]:
373401
f"Could not get resourceType and id from resources: {json.dumps(resources, cls=FhirJSONEncoder)}"
374402
) from e
375403

376-
@staticmethod
404+
@classmethod
377405
async def from_async_generator(
378-
generator: AsyncGenerator["FhirGetResponse", None]
379-
) -> "FhirGetResponse":
406+
cls, generator: AsyncGenerator["FhirGetResponse", None]
407+
) -> Optional["FhirGetResponse"]:
380408
"""
381409
Reads a generator of FhirGetResponse and returns a single FhirGetResponse by appending all the FhirGetResponse
382410
@@ -388,7 +416,7 @@ async def from_async_generator(
388416
if not result:
389417
result = value
390418
else:
391-
result.append([value])
419+
result.append(value)
392420

393421
assert result
394422
return result

helix_fhir_client_sdk/responses/fhir_merge_response.py

+8-9
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,21 @@ def __init__(
3030
self.data: str = json_data
3131
self.successful: bool = status != 200
3232

33-
def append(self, other: Optional[List["FhirMergeResponse"]]) -> None:
33+
def append(self, other: Optional["FhirMergeResponse"]) -> None:
3434
"""
3535
Appends another FhirMergeResponse to this one
3636
3737
:param other: FhirMergeResponse to append
3838
"""
3939
if other:
40-
for r in other:
41-
self.responses.extend(r.responses)
42-
self.error = (self.error or "") + (r.error or "")
43-
self.successful = self.successful and r.successful
40+
self.responses.extend(other.responses)
41+
self.error = (self.error or "") + (other.error or "")
42+
self.successful = self.successful and other.successful
4443

45-
@staticmethod
44+
@classmethod
4645
async def from_async_generator(
47-
generator: AsyncGenerator["FhirMergeResponse", None]
48-
) -> "FhirMergeResponse":
46+
cls, generator: AsyncGenerator["FhirMergeResponse", None]
47+
) -> Optional["FhirMergeResponse"]:
4948
"""
5049
Reads a generator of FhirGetResponse and returns a single FhirGetResponse by appending all the FhirGetResponse
5150
@@ -57,7 +56,7 @@ async def from_async_generator(
5756
if not result:
5857
result = value
5958
else:
60-
result.append([value])
59+
result.append(value)
6160

6261
assert result
6362
return result

helix_fhir_client_sdk/responses/fhir_response_processor.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,6 @@ async def _handle_response_200_non_streaming(
387387
extra_context_to_return=extra_context_to_return,
388388
resource_or_bundle=response_json,
389389
separate_bundle_resources=separate_bundle_resources,
390-
text=text,
391390
total_count=total_count,
392391
url=url,
393392
)
@@ -434,7 +433,6 @@ async def expand_or_separate_bundle_async(
434433
extra_context_to_return: Optional[Dict[str, Any]],
435434
resource_or_bundle: Dict[str, Any],
436435
separate_bundle_resources: bool,
437-
text: str,
438436
total_count: int,
439437
url: Optional[str],
440438
) -> Tuple[str, int]:
@@ -470,14 +468,14 @@ async def expand_or_separate_bundle_async(
470468
)
471469
resources_json = json.dumps(resource_separator_result.resources_dicts)
472470
total_count = resource_separator_result.total_count
473-
elif resources:
471+
elif len(resources) > 0:
474472
total_count = len(resources)
475473
if len(resources) == 1:
476474
resources_json = json.dumps(resources[0])
477475
else:
478476
resources_json = json.dumps(resources)
479477
else:
480-
resources_json = text
478+
resources_json = json.dumps(resources)
481479

482480
return resources_json, total_count
483481

@@ -619,7 +617,6 @@ def get_chunk_iterator() -> (
619617
extra_context_to_return=extra_context_to_return,
620618
resource_or_bundle=completed_resource,
621619
separate_bundle_resources=separate_bundle_resources,
622-
text=json.dumps(completed_resource),
623620
total_count=total_count,
624621
url=url,
625622
)

helix_fhir_client_sdk/responses/fhir_update_response.py

+29-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Optional
1+
from typing import Optional, AsyncGenerator
22

33

44
class FhirUpdateResponse:
@@ -29,3 +29,31 @@ def __init__(
2929
self.access_token: Optional[str] = access_token
3030
self.status: int = status
3131
self.resource_type: Optional[str] = resource_type
32+
33+
def append(self, other: Optional["FhirUpdateResponse"]) -> None:
34+
"""
35+
Appends another FhirUpdateResponse to this one
36+
37+
:param other: FhirUpdateResponse to append
38+
"""
39+
if other:
40+
self.responses += other.responses
41+
self.error = (self.error or "") + (other.error or "")
42+
43+
@classmethod
44+
async def from_async_generator(
45+
cls, generator: AsyncGenerator["FhirUpdateResponse", None]
46+
) -> Optional["FhirUpdateResponse"]:
47+
"""
48+
Reads a generator of FhirUpdateResponse and returns a single FhirUpdateResponse by appending all the FhirUpdateResponse
49+
50+
:param generator: generator of FhirUpdateResponse items
51+
:return: FhirUpdateResponse
52+
"""
53+
result: FhirUpdateResponse | None = None
54+
async for value in generator:
55+
if not result:
56+
result = value
57+
else:
58+
result.append(value)
59+
return result
+31-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,39 @@
11
import dataclasses
2-
from typing import List, Dict, Any, Optional
2+
from typing import List, Dict, Any, Optional, AsyncGenerator
33

44

55
@dataclasses.dataclass
66
class GetResult:
77
request_id: Optional[str]
88
resources: List[Dict[str, Any]]
99
response_headers: Optional[List[str]]
10+
11+
def append(self, other: Optional["GetResult"]) -> None:
12+
"""
13+
Appends another GetResult to this one
14+
15+
:param other: GetResult to append
16+
"""
17+
if other:
18+
self.resources.extend(other.resources)
19+
self.response_headers = (self.response_headers or []) + (
20+
other.response_headers or []
21+
)
22+
23+
@classmethod
24+
async def from_async_generator(
25+
cls, generator: AsyncGenerator["GetResult", None]
26+
) -> Optional["GetResult"]:
27+
"""
28+
Reads a generator of GetResult and returns a single GetResult by appending all the GetResult
29+
30+
:param generator: generator of GetResult items
31+
:return: GetResult
32+
"""
33+
result: GetResult | None = None
34+
async for value in generator:
35+
if not result:
36+
result = value
37+
else:
38+
result.append(value)
39+
return result
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import dataclasses
2-
from typing import List, Dict, Any, Optional
2+
from typing import List, Dict, Any, Optional, AsyncGenerator
33

44

55
@dataclasses.dataclass
@@ -8,3 +8,34 @@ class PagingResult:
88
resources: List[Dict[str, Any]]
99
page_number: int
1010
response_headers: Optional[List[str]]
11+
12+
def append(self, other: Optional["PagingResult"]) -> None:
13+
"""
14+
Appends another PagingResult to this one
15+
16+
:param other: PagingResult to append
17+
"""
18+
if other:
19+
self.resources.extend(other.resources)
20+
self.response_headers = (self.response_headers or []) + (
21+
other.response_headers or []
22+
)
23+
self.page_number = other.page_number
24+
25+
@classmethod
26+
async def from_async_generator(
27+
cls, generator: AsyncGenerator["PagingResult", None]
28+
) -> Optional["PagingResult"]:
29+
"""
30+
Reads a generator of PagingResult and returns a single PagingResult by appending all the PagingResult
31+
32+
:param generator: generator of PagingResult items
33+
:return: PagingResult
34+
"""
35+
result: PagingResult | None = None
36+
async for value in generator:
37+
if not result:
38+
result = value
39+
else:
40+
result.append(value)
41+
return result

0 commit comments

Comments
 (0)