Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Examples use-case - Access API subscription endpoints - Kotlin #101

Merged
merged 17 commits into from
Oct 3, 2024
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.onflow.examples.kotlin.streaming.streamEvents

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import org.onflow.examples.kotlin.AccessAPIConnector
import org.onflow.flow.sdk.*
import org.onflow.flow.sdk.crypto.PrivateKey

class SubscribeEventsExample(
privateKey: PrivateKey,
accessApiConnection: FlowAccessApi
) {
private val accessAPI = accessApiConnection
private val connector = AccessAPIConnector(privateKey, accessAPI)

suspend fun streamEvents(
scope: CoroutineScope,
receivedEvents: MutableList<FlowEvent>
) {
val blockId = connector.latestBlockID

val (dataChannel, errorChannel, job) = accessAPI.subscribeEventsByBlockId(scope, blockId)
processEvents(scope, dataChannel, errorChannel, receivedEvents)
job.cancelAndJoin()
}
private suspend fun processEvents(
scope: CoroutineScope,
dataChannel: ReceiveChannel<List<FlowEvent>>,
errorChannel: ReceiveChannel<Throwable>,
receivedEvents: MutableList<FlowEvent>
) {
val dataJob = scope.launch {
try {
for (events in dataChannel) {
if (!isActive) break
if (events.isNotEmpty()) {
receivedEvents.addAll(events)
}
yield()
}
} catch (e: CancellationException) {
println("Data channel processing cancelled")
} finally {
println("Data channel processing finished")
dataChannel.cancel()
}
}

val errorJob = scope.launch {
try {
for (error in errorChannel) {
println("~~~ ERROR: ${error.message} ~~~")
if (!isActive) break
yield()
}
} catch (e: CancellationException) {
println("Error channel processing cancelled")
} finally {
println("Error channel processing finished")
errorChannel.cancel()
}
}

dataJob.join()
errorJob.join()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package org.onflow.examples.kotlin.streaming.streamEventsReconnect

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.selects.select
import org.onflow.flow.sdk.*

class SubscribeEventsReconnectExample(
accessApiConnection: FlowAccessApi
) {
private val accessAPI = accessApiConnection

suspend fun streamEvents(scope: CoroutineScope, receivedEvents: MutableList<FlowEvent>) {
val header: FlowBlockHeader = getLatestBlockHeader()

val (eventChannel, errorChannel, job) = accessAPI.subscribeEventsByBlockId(scope, header.id)
val lastHeight = header.height

processEventsWithReconnect(scope, eventChannel, errorChannel, lastHeight, receivedEvents, job)
}

private fun getLatestBlockHeader(): FlowBlockHeader {
return when (val response = accessAPI.getLatestBlockHeader(true)) {
is FlowAccessApi.AccessApiCallResponse.Success -> response.data
is FlowAccessApi.AccessApiCallResponse.Error -> throw Exception(response.message, response.throwable)
}
}

@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun processEventsWithReconnect(
scope: CoroutineScope,
initialEventChannel: ReceiveChannel<List<FlowEvent>>,
initialErrorChannel: ReceiveChannel<Throwable>,
lastHeight: Long,
receivedEvents: MutableList<FlowEvent>,
initialJob: Job
) {
var eventChannel = initialEventChannel
var errorChannel = initialErrorChannel
var job = initialJob
var height = lastHeight
var reconnectAttempts = 0
val maxReconnectAttempts = 5

val dataJob = scope.launch {
while (reconnectAttempts < maxReconnectAttempts) {
val shouldReconnect: Boolean = select {
eventChannel.onReceiveCatching { result ->
result.getOrNull()?.let { events ->
if (events.isNotEmpty()) {
receivedEvents.addAll(events)
println("Received events at height: $height")
height++
reconnectAttempts = 0 // Reset reconnect attempts on success
false
} else {
println("No events received, attempting to reconnect...")
true
}
} ?: true
}
errorChannel.onReceiveCatching { result ->
result.getOrNull()?.let { error ->
println("~~~ ERROR: ${error.message} ~~~")
true
} == true
}
onTimeout(1000L) {
println("Timeout occurred, checking channels...")
false
}
}

if (shouldReconnect) {
reconnectAttempts++
if (reconnectAttempts < maxReconnectAttempts) {
println("Reconnecting at block $height (attempt $reconnectAttempts/$maxReconnectAttempts)")

// Cancel the previous job before reconnecting
job.cancelAndJoin()

// Perform reconnection and update channels and job
val (newEventChannel, newErrorChannel, newJob) = reconnect(scope, height)
eventChannel = newEventChannel
errorChannel = newErrorChannel
job = newJob
} else {
println("Max reconnect attempts reached. Stopping.")
break
}
}
}
}

dataJob.join()
}

private fun reconnect(
scope: CoroutineScope,
height: Long
): Triple<ReceiveChannel<List<FlowEvent>>, ReceiveChannel<Throwable>, Job> {
return accessAPI.subscribeEventsByBlockHeight(scope, height)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.onflow.examples.kotlin.streaming.streamExecutionData

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import org.onflow.examples.kotlin.AccessAPIConnector
import org.onflow.flow.sdk.*
import org.onflow.flow.sdk.crypto.PrivateKey

class SubscribeExecutionDataExample(
privateKey: PrivateKey,
accessApiConnection: FlowAccessApi
) {
private val accessAPI = accessApiConnection
private val connector = AccessAPIConnector(privateKey, accessAPI)

suspend fun streamExecutionData(
scope: CoroutineScope,
receivedExecutionData: MutableList<FlowBlockExecutionData>
) {
val blockId = connector.latestBlockID

val (dataChannel, errorChannel, job) = accessAPI.subscribeExecutionDataByBlockId(scope, blockId)
processExecutionData(scope, dataChannel, errorChannel, receivedExecutionData)
if (job.isActive) {
job.cancelAndJoin()
}
}

private suspend fun processExecutionData(
scope: CoroutineScope,
dataChannel: ReceiveChannel<FlowBlockExecutionData>,
errorChannel: ReceiveChannel<Throwable>,
receivedExecutionData: MutableList<FlowBlockExecutionData>
) {
val dataJob = scope.launch {
try {
for (data in dataChannel) {
if (!isActive) break // Respond to cancellation
receivedExecutionData.add(data)
yield() // Cooperative cancellation
}
} catch (e: CancellationException) {
println("Data channel processing cancelled")
} finally {
println("Data channel processing finished")
dataChannel.cancel()
}
}

val errorJob = scope.launch {
try {
for (error in errorChannel) {
if (!isActive) break // Respond to cancellation
println("~~~ ERROR: ${error.message} ~~~")
yield() // Cooperative cancellation
}
} catch (e: CancellationException) {
println("Error channel processing cancelled")
} finally {
println("Error channel processing finished")
errorChannel.cancel()
}
}

dataJob.join()
errorJob.join()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.onflow.examples.kotlin.streaming.streamEvents

import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.onflow.examples.kotlin.AccessAPIConnector
import org.onflow.flow.common.test.*
import org.onflow.flow.sdk.*
import org.onflow.flow.sdk.crypto.Crypto

@FlowEmulatorProjectTest(flowJsonLocation = "../flow/flow.json")
internal class SubscribeEventsExampleTest {
@FlowServiceAccountCredentials
lateinit var serviceAccount: TestAccount

@FlowTestClient
lateinit var accessAPI: FlowAccessApi

private lateinit var subscribeEventsExample: SubscribeEventsExample
private lateinit var accessAPIConnector: AccessAPIConnector

@BeforeEach
fun setup() {
val privateKey = serviceAccount.privateKey
subscribeEventsExample = SubscribeEventsExample(privateKey, accessAPI)
accessAPIConnector = AccessAPIConnector(serviceAccount.privateKey, accessAPI)
}

@Test
fun `Can stream and receive block events`() = runBlocking {
val testScope = CoroutineScope(Dispatchers.IO + Job())
val receivedEvents = mutableListOf<FlowEvent>()
try {
val streamJob = launch {
withTimeoutOrNull(10_000L) {
subscribeEventsExample.streamEvents(testScope, receivedEvents)
}
}

// Trigger a sample transaction
val publicKey = Crypto.generateKeyPair(SignatureAlgorithm.ECDSA_P256).public
accessAPIConnector.sendSampleTransaction(
serviceAccount.flowAddress,
publicKey
)

delay(3000L)
testScope.cancel()
streamJob.join()
} catch (e: CancellationException) {
println("Test scope cancelled: ${e.message}")
}

// Validate that events have been received and processed
assertTrue(receivedEvents.isNotEmpty(), "Should have received at least one event")
receivedEvents.forEach { event ->
assertNotNull(event.type, "Event type should not be null")
assertNotNull(event.transactionId, "Transaction ID should not be null")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.onflow.examples.kotlin.streaming.streamEventsReconnect

import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.onflow.examples.kotlin.AccessAPIConnector
import org.onflow.flow.common.test.*
import org.onflow.flow.sdk.*
import org.onflow.flow.sdk.crypto.Crypto

@FlowEmulatorProjectTest(flowJsonLocation = "../flow/flow.json")
internal class SubscribeEventsReconnectExampleTest {
@FlowServiceAccountCredentials
lateinit var serviceAccount: TestAccount

@FlowTestClient
lateinit var accessAPI: FlowAccessApi

private lateinit var subscribeEventsReconnectExample: SubscribeEventsReconnectExample
private lateinit var accessAPIConnector: AccessAPIConnector

@BeforeEach
fun setup() {
accessAPIConnector = AccessAPIConnector(serviceAccount.privateKey, accessAPI)
subscribeEventsReconnectExample = SubscribeEventsReconnectExample(accessAPI)
}

@Test
fun `Can stream and reconnect events`() = runBlocking {
val testScope = CoroutineScope(Dispatchers.IO + Job())
val receivedEvents = mutableListOf<FlowEvent>()

val reconnectJob = launch {
subscribeEventsReconnectExample.streamEvents(testScope, receivedEvents)
}

// Trigger a sample event
val publicKey = Crypto.generateKeyPair(SignatureAlgorithm.ECDSA_P256).public
accessAPIConnector.sendSampleTransaction(
serviceAccount.flowAddress,
publicKey
)
delay(5000L)

// Check if the stream has reconnected and continued processing after the simulated disconnection
reconnectJob.cancelAndJoin()

// Validate that events have been received and processed
assertTrue(receivedEvents.isNotEmpty(), "Should have received at least one event")
receivedEvents.forEach { event ->
assertNotNull(event.type, "Event type should not be null")
assertNotNull(event.transactionId, "Transaction ID should not be null")
}
}
}
Loading
Loading