Skip to content

Commit

Permalink
WIP: update tests for non-blocking methods
Browse files Browse the repository at this point in the history
  • Loading branch information
lealobanov committed Jul 25, 2024
1 parent 099bea9 commit 92d359b
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 34 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {

testApi("org.junit.jupiter:junit-jupiter:5.10.1")
testApi("org.assertj:assertj-core:3.25.1")
testApi("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.5.0")

testFixturesImplementation("org.junit.jupiter:junit-jupiter:5.10.1")
testFixturesImplementation("org.mockito:mockito-core:3.12.4")
Expand Down
7 changes: 5 additions & 2 deletions src/main/kotlin/org/onflow/flow/sdk/Flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import org.onflow.protobuf.access.AccessAPIGrpc
import org.onflow.flow.sdk.cadence.CadenceNamespace
import org.onflow.flow.sdk.cadence.Field
Expand All @@ -16,7 +19,6 @@ import kotlin.reflect.KClass

object Flow {
private const val DEFAULT_USER_AGENT = "Flow JVM SDK"

private const val DEFAULT_MAX_MESSAGE_SIZE = 16777216

private var OBJECT_MAPPER: ObjectMapper = ObjectMapper()
Expand Down Expand Up @@ -46,7 +48,8 @@ object Flow {
@JvmOverloads
fun newAccessApi(host: String, port: Int = 9000, secure: Boolean = false, userAgent: String = DEFAULT_USER_AGENT, maxMessageSize: Int = DEFAULT_MAX_MESSAGE_SIZE): FlowAccessApi {
val channel = openChannel(host, port, secure, userAgent, maxMessageSize)
return FlowAccessApiImpl(AccessAPIGrpc.newBlockingStub(channel), ExecutionDataAPIGrpc.newBlockingStub(channel))
val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
return FlowAccessApiImpl(AccessAPIGrpc.newBlockingStub(channel), ExecutionDataAPIGrpc.newBlockingStub(channel), coroutineScope)
}

@JvmStatic
Expand Down
27 changes: 14 additions & 13 deletions src/main/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package org.onflow.flow.sdk.impl
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.launch
import com.google.protobuf.ByteString
import org.onflow.flow.sdk.*
import io.grpc.ManagedChannel
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import org.onflow.protobuf.access.Access
import org.onflow.protobuf.access.AccessAPIGrpc
import org.onflow.protobuf.executiondata.ExecutionDataAPIGrpc
Expand All @@ -14,7 +17,8 @@ import java.io.Closeable

class FlowAccessApiImpl(
private val api: AccessAPIGrpc.AccessAPIBlockingStub,
private val executionDataApi: ExecutionDataAPIGrpc.ExecutionDataAPIBlockingStub
private val executionDataApi: ExecutionDataAPIGrpc.ExecutionDataAPIBlockingStub,
private val coroutineScope: CoroutineScope
) : FlowAccessApi, Closeable {
override fun close() {
val chan = api.channel
Expand Down Expand Up @@ -391,7 +395,7 @@ class FlowAccessApiImpl(
val responseChannel = Channel<FlowBlockExecutionData>(Channel.UNLIMITED)
val errorChannel = Channel<Throwable>(Channel.UNLIMITED)

return runBlocking {
coroutineScope.launch {
try {
val request = Executiondata.SubscribeExecutionDataFromStartBlockIDRequest.newBuilder()
.setStartBlockId(blockId.byteStringValue)
Expand All @@ -402,15 +406,14 @@ class FlowAccessApiImpl(
for (response in responseIterator) {
responseChannel.send(FlowBlockExecutionData.of(response.blockExecutionData))
}
FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel)
} catch (e: Exception) {
errorChannel.send(e)
FlowAccessApi.AccessApiCallResponse.Error("Failed to subscribe execution data by block ID", e)
} finally {
responseChannel.close()
errorChannel.close()
}
}
return FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel)
}

override fun subscribeExecutionDataByBlockHeight(
Expand All @@ -419,7 +422,7 @@ class FlowAccessApiImpl(
val responseChannel = Channel<FlowBlockExecutionData>(Channel.UNLIMITED)
val errorChannel = Channel<Throwable>(Channel.UNLIMITED)

return runBlocking {
coroutineScope.launch {
try {
val request = Executiondata.SubscribeExecutionDataFromStartBlockHeightRequest.newBuilder()
.setStartBlockHeight(height)
Expand All @@ -430,15 +433,15 @@ class FlowAccessApiImpl(
for (response in responseIterator) {
responseChannel.send(FlowBlockExecutionData.of(response.blockExecutionData))
}
FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel)
} catch (e: Exception) {
errorChannel.send(e)
FlowAccessApi.AccessApiCallResponse.Error("Failed to subscribe execution data by block height", e)
} finally {
responseChannel.close()
errorChannel.close()
}
}

return FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel)
}

override fun subscribeEventsByBlockId(
Expand All @@ -447,7 +450,7 @@ class FlowAccessApiImpl(
val responseChannel = Channel<List<FlowEvent>>(Channel.UNLIMITED)
val errorChannel = Channel<Throwable>(Channel.UNLIMITED)

return runBlocking {
coroutineScope.launch {
try {
val request = Executiondata.SubscribeEventsFromStartBlockIDRequest.newBuilder()
.setStartBlockId(blockId.byteStringValue)
Expand All @@ -458,15 +461,14 @@ class FlowAccessApiImpl(
for (response in responseIterator) {
responseChannel.send(response.eventsList.map { FlowEvent.of(it) })
}
FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel)
} catch (e: Exception) {
errorChannel.send(e)
FlowAccessApi.AccessApiCallResponse.Error("Failed to subscribe events by block ID", e)
} finally {
responseChannel.close()
errorChannel.close()
}
}
return FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel)
}

override fun subscribeEventsByBlockHeight(
Expand All @@ -475,7 +477,7 @@ class FlowAccessApiImpl(
val responseChannel = Channel<List<FlowEvent>>(Channel.UNLIMITED)
val errorChannel = Channel<Throwable>(Channel.UNLIMITED)

return runBlocking {
coroutineScope.launch {
try {
val request = Executiondata.SubscribeEventsFromStartHeightRequest.newBuilder()
.setStartBlockHeight(height)
Expand All @@ -486,14 +488,13 @@ class FlowAccessApiImpl(
for (response in responseIterator) {
responseChannel.send(response.eventsList.map { FlowEvent.of(it) })
}
FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel)
} catch (e: Exception) {
errorChannel.send(e)
FlowAccessApi.AccessApiCallResponse.Error("Failed to subscribe events by block height", e)
} finally {
responseChannel.close()
errorChannel.close()
}
}
return FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel)
}
}
94 changes: 75 additions & 19 deletions src/test/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImplTest.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.onflow.flow.sdk.impl

import com.google.protobuf.ByteString
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestCoroutineDispatcher
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.onflow.flow.sdk.*
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
Expand All @@ -25,6 +28,7 @@ import java.io.PrintStream
import java.math.BigDecimal
import java.time.LocalDateTime

@ExperimentalCoroutinesApi
class FlowAccessApiImplTest {
private lateinit var flowAccessApiImpl: FlowAccessApiImpl
private lateinit var mockApi: AccessAPIGrpc.AccessAPIBlockingStub
Expand All @@ -34,13 +38,14 @@ class FlowAccessApiImplTest {

private val api = mock(AccessAPIGrpc.AccessAPIBlockingStub::class.java)
private val executionDataApi = mock(ExecutionDataAPIGrpc.ExecutionDataAPIBlockingStub::class.java)
private val flowAccessApi = FlowAccessApiImpl(api, executionDataApi)
private val testDispatcher = TestCoroutineDispatcher()
private val testScope = TestCoroutineScope(testDispatcher)

@BeforeEach
fun setUp() {
mockApi = mock(AccessAPIGrpc.AccessAPIBlockingStub::class.java)
mockExecutionDataApi = mock(ExecutionDataAPIGrpc.ExecutionDataAPIBlockingStub::class.java)
flowAccessApiImpl = FlowAccessApiImpl(mockApi, mockExecutionDataApi)
flowAccessApiImpl = FlowAccessApiImpl(mockApi, mockExecutionDataApi, testScope)
outputStreamCaptor = ByteArrayOutputStream()
originalOut = System.out
System.setOut(PrintStream(outputStreamCaptor))
Expand Down Expand Up @@ -412,7 +417,7 @@ class FlowAccessApiImplTest {
}

@Test
fun `Test subscribeExecutionDataByBlockHeight success case`(): Unit = runBlocking {
fun `Test subscribeExecutionDataByBlockHeight success case`() = testScope.runBlockingTest {
val blockHeight = 100L
val expectedExecutionDataProto = BlockExecutionDataOuterClass.BlockExecutionData.getDefaultInstance()
val expectedExecutionData = FlowBlockExecutionData.of(expectedExecutionDataProto)
Expand Down Expand Up @@ -443,25 +448,38 @@ class FlowAccessApiImplTest {
}

@Test
fun `Test subscribeExecutionDataByBlockHeight error case`() = runBlocking {
fun `Test subscribeExecutionDataByBlockHeight error case`() = testScope.runBlockingTest {
val blockHeight = 100L
val exception = RuntimeException("Test exception")

`when`(mockExecutionDataApi.subscribeExecutionDataFromStartBlockHeight(any())).thenThrow(exception)
`when`(mockExecutionDataApi.subscribeExecutionDataFromStartBlockHeight(any()))
.thenAnswer { throw exception }

when (val result = flowAccessApiImpl.subscribeExecutionDataByBlockHeight(blockHeight)) {
is FlowAccessApi.AccessApiCallResponse.Success -> {
fail("Expected error but got success")
val (_, errorChannel) = result.data

// Check for errors in the errorChannel
var receivedException: Throwable? = null
val job = launch {
receivedException = errorChannel.receiveCatching().getOrNull()
}
job.join()

if (receivedException != null) {
assertEquals(exception.message, receivedException!!.message)
} else {
fail("Expected error but got success")
}
}
is FlowAccessApi.AccessApiCallResponse.Error -> {
assertEquals("Failed to subscribe execution data by block height", result.message)
assertEquals(exception, result.throwable)
fail("Expected success but got error: ${result.message}")
}
}
}

@Test
fun `Test subscribeEventsByBlockId success case`(): Unit = runBlocking {
fun `Test subscribeEventsByBlockId success case`() = testScope.runBlockingTest {
val blockId = FlowId("01")
val expectedEventsProto = EventOuterClass.Event.getDefaultInstance()
val expectedEvents = listOf(FlowEvent.of(expectedEventsProto))
Expand Down Expand Up @@ -492,15 +510,28 @@ class FlowAccessApiImplTest {
}

@Test
fun `Test subscribeEventsByBlockId error case`() = runBlocking {
fun `Test subscribeEventsByBlockId error case`() = testScope.runBlockingTest {
val blockId = FlowId("01")
val exception = RuntimeException("Test exception")

`when`(mockExecutionDataApi.subscribeEventsFromStartBlockID(any())).thenThrow(exception)

when (val result = flowAccessApiImpl.subscribeEventsByBlockId(blockId)) {
is FlowAccessApi.AccessApiCallResponse.Success -> {
fail("Expected error but got success")
val (_, errorChannel) = result.data

// Check for errors in the errorChannel
var receivedException: Throwable? = null
val job = launch {
receivedException = errorChannel.receiveCatching().getOrNull()
}
job.join()

if (receivedException != null) {
assertEquals(exception.message, receivedException!!.message)
} else {
fail("Expected error but got success")
}
}
is FlowAccessApi.AccessApiCallResponse.Error -> {
assertEquals("Failed to subscribe events by block ID", result.message)
Expand All @@ -510,7 +541,7 @@ class FlowAccessApiImplTest {
}

@Test
fun `Test subscribeEventsByBlockHeight success case`(): Unit = runBlocking {
fun `Test subscribeEventsByBlockHeight success case`() = testScope.runBlockingTest {
val blockHeight = 100L
val expectedEventsProto = EventOuterClass.Event.getDefaultInstance()
val expectedEvents = listOf(FlowEvent.of(expectedEventsProto))
Expand Down Expand Up @@ -541,15 +572,28 @@ class FlowAccessApiImplTest {
}

@Test
fun `Test subscribeEventsByBlockHeight error case`() = runBlocking {
fun `Test subscribeEventsByBlockHeight error case`() = testScope.runBlockingTest {
val blockHeight = 100L
val exception = RuntimeException("Test exception")

`when`(mockExecutionDataApi.subscribeEventsFromStartHeight(any())).thenThrow(exception)

when (val result = flowAccessApiImpl.subscribeEventsByBlockHeight(blockHeight)) {
is FlowAccessApi.AccessApiCallResponse.Success -> {
fail("Expected error but got success")
val (_, errorChannel) = result.data

// Check for errors in the errorChannel
var receivedException: Throwable? = null
val job = launch {
receivedException = errorChannel.receiveCatching().getOrNull()
}
job.join()

if (receivedException != null) {
assertEquals(exception.message, receivedException!!.message)
} else {
fail("Expected error but got success")
}
}
is FlowAccessApi.AccessApiCallResponse.Error -> {
assertEquals("Failed to subscribe events by block height", result.message)
Expand All @@ -559,7 +603,7 @@ class FlowAccessApiImplTest {
}

@Test
fun `Test subscribeExecutionDataByBlockId success case`(): Unit = runBlocking {
fun `Test subscribeExecutionDataByBlockId success case`() = testScope.runBlockingTest {
val blockId = FlowId("01")
val expectedExecutionDataProto = BlockExecutionDataOuterClass.BlockExecutionData.getDefaultInstance()
val expectedExecutionData = FlowBlockExecutionData.of(expectedExecutionDataProto)
Expand Down Expand Up @@ -590,23 +634,35 @@ class FlowAccessApiImplTest {
}

@Test
fun `Test subscribeExecutionDataByBlockId error case`() = runBlocking {
fun `Test subscribeExecutionDataByBlockId error case`() = testScope.runBlockingTest {
val blockId = FlowId("01")
val exception = RuntimeException("Test exception")

`when`(mockExecutionDataApi.subscribeExecutionDataFromStartBlockID(any())).thenThrow(exception)

when (val result = flowAccessApiImpl.subscribeExecutionDataByBlockId(blockId)) {
is FlowAccessApi.AccessApiCallResponse.Success -> {
fail("Expected error but got success")
val (_, errorChannel) = result.data

// Check for errors in the errorChannel
var receivedException: Throwable? = null
val job = launch {
receivedException = errorChannel.receiveCatching().getOrNull()
}
job.join()

if (receivedException != null) {
assertEquals(exception.message, receivedException!!.message)
} else {
fail("Expected error but got success")
}
}
is FlowAccessApi.AccessApiCallResponse.Error -> {
assertEquals("Failed to subscribe execution data by block ID", result.message)
assertEquals(exception, result.throwable)
}
}
}

private fun createMockTransaction(flowId: FlowId = FlowId("01")) = FlowTransaction(FlowScript("script"), emptyList(), flowId, 123L, FlowTransactionProposalKey(FlowAddress("02"), 1, 123L), FlowAddress("02"), emptyList())

private fun createMockAccount(flowAddress: FlowAddress) = FlowAccount(flowAddress, BigDecimal.ONE, FlowCode("code".toByteArray()), emptyList(), emptyMap())
Expand Down

0 comments on commit 92d359b

Please sign in to comment.