-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Examples use-case - Access API subscription endpoints - Kotlin #101
Conversation
Warning Rate limit exceeded@lealobanov has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 8 minutes and 17 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThis pull request introduces new classes for subscribing to and processing events and execution data from the Flow blockchain using Kotlin coroutines. The Changes
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
SDK Examples Unit Test Results35 files 35 suites 6m 9s ⏱️ Results for commit dec4cf5. ♻️ This comment has been updated with latest results. |
Common Integration Test Results1 files 1 suites 7s ⏱️ Results for commit dec4cf5. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 17
🧹 Outside diff range and nitpick comments (24)
kotlin-example/src/test/kotlin/org/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectTest.kt (2)
12-27
: LGTM: Class structure and setup are well-organized.The class is properly annotated for Flow emulator testing, and the necessary properties are correctly initialized. The
setup
method is appropriately annotated with@BeforeEach
.Consider adding a brief comment explaining the purpose of the
AccessAPIConnector
class, as its role might not be immediately clear to other developers.
29-55
: LGTM: Test method implementation is comprehensive, but consider some improvements.The test method effectively simulates event streaming and reconnection. It uses coroutines appropriately and checks for the expected behavior.
Consider the following improvements:
- Replace the hard-coded delay with a more robust waiting mechanism, such as a polling loop with a timeout. This will make the test more reliable across different environments.
- Add more specific assertions about the event content, if possible, to ensure the received events match the expected format and data.
- Consider extracting the event generation logic into a separate helper method for better readability and potential reuse.
Example of a more robust waiting mechanism:
val timeout = 10000L // 10 seconds val pollInterval = 500L // 0.5 seconds var elapsedTime = 0L while (receivedEvents.isEmpty() && elapsedTime < timeout) { delay(pollInterval) elapsedTime += pollInterval } assertTrue(elapsedTime < timeout, "Timed out waiting for events")kotlin-example/src/test/kotlin/org/onflow/examples/kotlin/streaming/streamExecutionData/SubscribeExecutionDataExampleTest.kt (2)
12-21
: LGTM: Class declaration and property setup look good.The
@FlowEmulatorProjectTest
annotation correctly indicates that this test runs in a Flow emulator environment. The use oflateinit
forserviceAccount
andaccessAPI
is appropriate for test setup.Consider adding KDoc comments to describe the purpose of the test class and its properties, especially for
subscribeExecutionDataExample
andaccessAPIConnector
.
1-68
: Overall, well-structured and effective test implementation.This test class effectively demonstrates the streaming of execution data using the Flow SDK and Kotlin coroutines. The use of
@FlowEmulatorProjectTest
and appropriate setup methods shows good testing practices.To further improve the test suite:
- Consider adding more test cases to cover edge cases or error scenarios.
- Implement a helper function to reduce duplication in assertion logic for block and transaction IDs.
- If this is part of a larger test suite, consider extracting common setup logic to a base test class.
Example of a helper function for assertions:
private fun assertValidExecutionData(executionData: List<FlowBlockExecutionData>) { assertTrue(executionData.isNotEmpty(), "Should have received at least one block execution data") executionData.forEach { blockExecutionData -> assertNotNull(blockExecutionData.blockId, "Block ID should not be null") blockExecutionData.chunkExecutionData.forEach { chunkExecutionData -> chunkExecutionData.transactionResults.forEach { transaction -> assertNotNull(transaction.transactionId, "Transaction ID should not be null") } } } }This function can then be called in your test method, improving readability and maintainability.
sdk/src/main/kotlin/org/onflow/flow/sdk/FlowAccessApi.kt (3)
67-69
: LGTM: Updated return type for better coroutine managementThe change to return a
Triple
including aJob
is a good improvement. It allows for better management of the coroutine lifecycle, which is crucial for streaming operations.However, consider if a custom data class might be more appropriate and easier to use than a
Triple
. For example:data class SubscriptionResult<T>( val dataChannel: ReceiveChannel<T>, val errorChannel: ReceiveChannel<Throwable>, val job: Job )This would make the API more self-documenting and easier to use.
77-79
: LGTM: Updated return type for event streamingThe change to return a
Triple
including aJob
is consistent with the previous methods and provides better coroutine lifecycle management. This update aligns well with the PR objectives of providing examples for streaming events.The
ReceiveChannel<List<FlowEvent>>
allows for efficient streaming of multiple events, which is a good design choice for this use case.Consider applying the custom data class suggestion from the previous comments to this method as well for consistency and improved API usability.
Line range hint
1-84
: Overall LGTM: Improved subscription methods with suggestion for further enhancementThe changes made to the
FlowAccessApi
interface consistently improve the subscription methods by adding aJob
to the return types. This enhancement allows for better management of coroutine lifecycles, which is crucial for streaming operations. The changes align well with the PR objectives of providing examples for streaming events and execution data.To further improve the API's usability, consider implementing a custom data class to replace the
Triple
return type across all subscription methods. This would make the API more self-documenting and easier to use. For example:data class SubscriptionResult<T>( val dataChannel: ReceiveChannel<T>, val errorChannel: ReceiveChannel<Throwable>, val job: Job )This change would provide a more intuitive and type-safe way to handle the subscription results.
kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExample.kt (4)
34-40
: Simplify cancellation checks within the data processing loopThe
for
loop overdataChannel
inherently checks for cancellation. The explicitif (!isActive) break
and theyield()
call may be redundant. Simplifying the loop can make the code cleaner without losing functionality.Consider refactoring the loop as follows:
for (events in dataChannel) { if (events.isNotEmpty()) { receivedEvents.addAll(events) } // No need for explicit cancellation check or yield() }
51-55
: Simplify cancellation checks within the error handling loopSimilar to the data processing loop, the
for
loop overerrorChannel
doesn't require explicit cancellation checks or theyield()
call. Simplifying this loop can enhance readability.Refactor the loop as follows:
for (error in errorChannel) { println("~~~ ERROR: ${error.message} ~~~") // No need for explicit cancellation check or yield() }
16-19
: Leverage coroutine scope without passing it explicitlySince
streamEvents
is asuspend
function, you can usecoroutineScope
orwithContext
to manage coroutines, eliminating the need to passCoroutineScope
explicitly. This can simplify your function signatures and improve code clarity.Refactor
streamEvents
to usecoroutineScope
:suspend fun streamEvents( receivedEvents: MutableList<FlowEvent> ) = coroutineScope { val blockId = connector.latestBlockID val (dataChannel, errorChannel, job) = accessAPI.subscribeEventsByBlockId(this, blockId) processEvents(this, dataChannel, errorChannel, receivedEvents) job.cancelAndJoin() }
64-65
: Consider the order of joining coroutinesCurrently,
dataJob
is joined beforeerrorJob
. Depending on the desired program flow, you might want to ensure that all errors are processed before completing data processing. Consider whether the order ofdataJob.join()
anderrorJob.join()
affects your application's logic.If processing errors should take precedence, adjust the order:
errorJob.join() dataJob.join()kotlin-example/src/test/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExampleTest.kt (1)
25-27
: Use consistentprivateKey
variable for clarityIn line 27, you use
serviceAccount.privateKey
directly, whereas in line 25, you assigned it toprivateKey
. For consistency and clarity, consider using theprivateKey
variable in both places.Apply this change:
val privateKey = serviceAccount.privateKey subscribeEventsExample = SubscribeEventsExample(privateKey, accessAPI) - accessAPIConnector = AccessAPIConnector(serviceAccount.privateKey, accessAPI) + accessAPIConnector = AccessAPIConnector(privateKey, accessAPI)kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamExecutionData/SubscribeExecutionDataExample.kt (1)
25-27
: Confirm proper cancellation of the subscription jobAfter processing execution data, you're checking if
job.isActive
before cancelling and joining it. Ensure this aligns with your intended behavior, and that resources are properly released.Consider adding error handling or logging to capture any issues during cancellation.
kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectExample.kt (2)
25-25
: Use a more specific exception typeThrowing a generic
Exception
can make error handling less precise. Consider using a more specific exception or creating a custom exception to provide clearer context when errors occur.Example:
- is FlowAccessApi.AccessApiCallResponse.Error -> throw Exception(response.message, response.throwable) + is FlowAccessApi.AccessApiCallResponse.Error -> throw FlowAccessApiException(response.message, response.throwable)Ensure that
FlowAccessApiException
is a custom exception that provides meaningful information about the API call failure.
52-52
: Use a logging framework instead ofprintln
statementsReplacing
println
with a logging framework provides better control over log levels and output formatting, which is beneficial for both development and production environments.Consider using a logging library like SLF4J with Logback:
Add the logging dependencies to your build file.
Initialize a logger in your class:
import org.slf4j.LoggerFactory private val logger = LoggerFactory.getLogger(SubscribeEventsReconnectExample::class.java)Replace
println
statements with appropriate logging methods:- println("Received events at height: $height") + logger.info("Received events at height: $height")Also applies to: 57-57, 64-64, 69-69, 77-77, 88-88
sdk/src/main/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImpl.kt (9)
8-8
: Avoid using wildcard imports for better code clarityUsing wildcard imports like
import kotlinx.coroutines.*
can reduce code readability and may lead to namespace pollution. It's recommended to import only the specific classes or functions you need to enhance maintainability and clarity.
Line range hint
391-414
: Re-evaluate exposing theJob
in the API return typeReturning the
Job
in theTriple
exposes internal coroutine management details to the API client, which might not be necessary. Since the caller already provides aCoroutineScope
, they can manage coroutine lifecycles if needed. Consider encapsulating the coroutine handling within the API to provide a cleaner and more abstracted interface.
Line range hint
391-414
: Consider specifying a bounded capacity for channels to prevent memory issuesThe
responseChannel
anderrorChannel
are initialized withChannel.UNLIMITED
capacity. While this prevents message loss, it may lead to uncontrolled memory growth if the producer outpaces the consumer. Recommend using a bounded capacity (e.g.,Channel.BUFFERED
or a specific size) to ensure better resource management.
Line range hint
420-443
: Re-evaluate exposing theJob
in the API return typeSimilar to
subscribeExecutionDataByBlockId
, returning theJob
in theTriple
exposes internal coroutine details. Assess whether this is necessary or if the coroutine lifecycle can be managed internally within the API for a cleaner interface.
Line range hint
420-443
: Consider specifying a bounded capacity for channels to prevent memory issuesThe use of
Channel.UNLIMITED
forresponseChannel
anderrorChannel
may cause unbounded memory usage if consumers are slower than producers. Suggest specifying a bounded capacity to manage resources effectively.
Line range hint
449-472
: Reconsider exposing theJob
in the API return typeReturning the coroutine
Job
to the caller reveals implementation details and might not be necessary. Consider managing the coroutine internally and providing a simpler API to the client.
Line range hint
449-472
: Consider specifying a bounded capacity for channels to prevent memory issuesInitializing
responseChannel
anderrorChannel
withChannel.UNLIMITED
could lead to excessive memory consumption under heavy load. It's advisable to use a bounded capacity to ensure resource constraints are respected.
Line range hint
478-501
: Reconsider exposing theJob
in the API return typeAs with the other subscription methods, evaluate whether returning the
Job
is necessary. Encapsulating coroutine management internally could provide a cleaner and more user-friendly API.
Line range hint
478-501
: Consider specifying a bounded capacity for channels to prevent memory issuesUsing
Channel.UNLIMITED
for channel capacities may result in uncontrolled memory growth if message production exceeds consumption rates. Setting a bounded capacity helps in maintaining resource efficiency.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
- kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExample.kt (1 hunks)
- kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectExample.kt (1 hunks)
- kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamExecutionData/SubscribeExecutionDataExample.kt (1 hunks)
- kotlin-example/src/test/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExampleTest.kt (1 hunks)
- kotlin-example/src/test/kotlin/org/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectTest.kt (1 hunks)
- kotlin-example/src/test/kotlin/org/onflow/examples/kotlin/streaming/streamExecutionData/SubscribeExecutionDataExampleTest.kt (1 hunks)
- sdk/src/main/kotlin/org/onflow/flow/sdk/FlowAccessApi.kt (2 hunks)
- sdk/src/main/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImpl.kt (6 hunks)
🧰 Additional context used
🪛 detekt
kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExample.kt
[warning] 41-41: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
[warning] 56-56: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamExecutionData/SubscribeExecutionDataExample.kt
[warning] 43-43: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
[warning] 58-58: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
🔇 Additional comments (9)
kotlin-example/src/test/kotlin/org/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectTest.kt (1)
1-11
: LGTM: Package declaration and imports are correct and complete.The package declaration is appropriate, and all necessary imports are present. There are no unused imports, which is good for code cleanliness.
kotlin-example/src/test/kotlin/org/onflow/examples/kotlin/streaming/streamExecutionData/SubscribeExecutionDataExampleTest.kt (3)
1-11
: LGTM: Import statements are appropriate and concise.The import statements cover all necessary dependencies for the test class, including Kotlin coroutines, JUnit assertions, Flow SDK components, and custom classes. No unused imports are present.
23-27
: LGTM: Setup method is concise and appropriate.The
@BeforeEach
annotated setup method correctly initializesaccessAPIConnector
andsubscribeExecutionDataExample
using the injectedserviceAccount
andaccessAPI
. This ensures a clean state for each test execution.
29-67
: LGTM: Test method effectively demonstrates streaming execution data.The test method successfully demonstrates the ability to stream execution data using coroutines. It covers the main functionality by launching a streaming job, triggering a sample transaction, and validating the received data.
Consider the following improvements:
Error handling: Instead of catching a general
CancellationException
, catch specific exceptions that might occur during the streaming process.Timing: The
delay(3000L)
is a fixed wait time. Consider using a more robust approach, such as polling for a condition or using a timeout with a maximum number of retries.Assertions: Add more specific assertions to validate the content of the received execution data, not just the presence of IDs.
Here's a sample refactor for improved error handling:
try { // ... existing code ... } catch (e: CancellationException) { println("Test scope cancelled: ${e.message}") } catch (e: FlowException) { fail("Flow operation failed: ${e.message}") } catch (e: Exception) { fail("Unexpected error occurred: ${e.message}") }To ensure that the
SubscribeExecutionDataExample
class is correctly implemented, run the following script:sdk/src/main/kotlin/org/onflow/flow/sdk/FlowAccessApi.kt (3)
5-5
: LGTM: Import statement added for JobThe addition of the
kotlinx.coroutines.Job
import is consistent with the changes made to the subscription methods. This import is necessary for the interface to reference theJob
type in the return values.
72-74
: LGTM: Consistent update to return typeThis change is consistent with the update made to
subscribeExecutionDataByBlockId
. The same benefits of better coroutine lifecycle management apply here.Please refer to the previous comment regarding the potential use of a custom data class instead of a
Triple
for improved API usability.
82-84
: LGTM: Consistent update to return type for event streamingThis change is consistent with the update made to
subscribeEventsByBlockId
. The same benefits of better coroutine lifecycle management and efficient event streaming apply here.Please refer to the previous comments regarding:
- The potential use of a custom data class instead of a
Triple
for improved API usability.- The alignment with PR objectives for providing examples of streaming events.
kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExample.kt (2)
44-45
: Ensure channels are properly closedWhile canceling the channels in the
finally
blocks, it's important to ensure that this doesn't interfere with other consumers of the channels or lead to premature closure. Verify that canceling these channels here is appropriate in the context of your application.To confirm the proper usage of channel cancellation, you can check for other usages of
dataChannel
anderrorChannel
in the codebase:Also applies to: 59-60
✅ Verification successful
Channel cancellation confirmed appropriate
The cancellation of
dataChannel
anderrorChannel
in thefinally
block does not interfere with other consumers. All usages of these channels are properly scoped within their respective contexts, ensuring that their closure does not lead to premature termination elsewhere in the application.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Search for other usages of dataChannel and errorChannel. rg --type kotlin 'dataChannel|errorChannel'Length of output: 8514
20-22
: Verify correct usage ofsubscribeEventsByBlockId
with the scopeEnsure that
accessAPI.subscribeEventsByBlockId
correctly utilizes the provided scope and that there are no conflicts or unintended behaviors due to scope management.To confirm the implementation and usage of
subscribeEventsByBlockId
, run the following script:
...n/org/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectTest.kt
Show resolved
Hide resolved
.../src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExample.kt
Show resolved
Hide resolved
.../src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExample.kt
Outdated
Show resolved
Hide resolved
.../src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExample.kt
Show resolved
Hide resolved
.../test/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExampleTest.kt
Outdated
Show resolved
Hide resolved
...rg/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectExample.kt
Show resolved
Hide resolved
...rg/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectExample.kt
Show resolved
Hide resolved
...rg/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectExample.kt
Show resolved
Hide resolved
...rg/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectExample.kt
Show resolved
Hide resolved
...rg/onflow/examples/kotlin/streaming/streamEventsReconnect/SubscribeEventsReconnectExample.kt
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
- kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExample.kt (1 hunks)
- kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamExecutionData/SubscribeExecutionDataExample.kt (1 hunks)
- kotlin-example/src/test/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExampleTest.kt (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- kotlin-example/src/test/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExampleTest.kt
🧰 Additional context used
🪛 detekt
kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExample.kt
[warning] 41-41: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
[warning] 56-56: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamExecutionData/SubscribeExecutionDataExample.kt
[warning] 42-42: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
[warning] 57-57: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
🔇 Additional comments (4)
kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamEvents/SubscribeEventsExample.kt (1)
26-31
: LGTM: Well-structured method signatureThe
processEvents
method signature is well-defined, clearly indicating its dependencies and purpose. It appropriately takes the necessary parameters for processing events asynchronously.kotlin-example/src/main/kotlin/org/onflow/examples/kotlin/streaming/streamExecutionData/SubscribeExecutionDataExample.kt (3)
14-14
: Assess the necessity of theconnector
instance.As previously suggested, consider removing the
connector
and usingaccessAPI
directly to retrievelatestBlockID
.Also applies to: 20-20
16-27
: FunctionstreamExecutionData
implementation looks good.The function correctly retrieves the latest block ID and subscribes to execution data channels.
29-67
: FunctionprocessExecutionData
implementation is correct.The concurrent processing of data and error channels is appropriately handled.
🧰 Tools
🪛 detekt
[warning] 42-42: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
[warning] 57-57: The caught exception is swallowed. The original exception could be lost.
(detekt.exceptions.SwallowedException)
Description
Streaming
For contributor use:
master
branchFiles changed
in the Github PR explorerSummary by CodeRabbit
New Features
Tests
Bug Fixes