From 92d359b7806c2446e8f8c5effba07a4ca788e0df Mon Sep 17 00:00:00 2001 From: Lea Lobanov Date: Thu, 25 Jul 2024 19:36:27 +0700 Subject: [PATCH] WIP: update tests for non-blocking methods --- build.gradle.kts | 1 + src/main/kotlin/org/onflow/flow/sdk/Flow.kt | 7 +- .../onflow/flow/sdk/impl/FlowAccessApiImpl.kt | 27 +++--- .../flow/sdk/impl/FlowAccessApiImplTest.kt | 94 +++++++++++++++---- 4 files changed, 95 insertions(+), 34 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index d302c70..860ecb9 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -63,6 +63,7 @@ dependencies { testApi("org.junit.jupiter:junit-jupiter:5.10.1") testApi("org.assertj:assertj-core:3.25.1") + testApi("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.5.0") testFixturesImplementation("org.junit.jupiter:junit-jupiter:5.10.1") testFixturesImplementation("org.mockito:mockito-core:3.12.4") diff --git a/src/main/kotlin/org/onflow/flow/sdk/Flow.kt b/src/main/kotlin/org/onflow/flow/sdk/Flow.kt index ff18266..d7e1c5c 100644 --- a/src/main/kotlin/org/onflow/flow/sdk/Flow.kt +++ b/src/main/kotlin/org/onflow/flow/sdk/Flow.kt @@ -5,6 +5,9 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import io.grpc.ManagedChannel import io.grpc.ManagedChannelBuilder +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob import org.onflow.protobuf.access.AccessAPIGrpc import org.onflow.flow.sdk.cadence.CadenceNamespace import org.onflow.flow.sdk.cadence.Field @@ -16,7 +19,6 @@ import kotlin.reflect.KClass object Flow { private const val DEFAULT_USER_AGENT = "Flow JVM SDK" - private const val DEFAULT_MAX_MESSAGE_SIZE = 16777216 private var OBJECT_MAPPER: ObjectMapper = ObjectMapper() @@ -46,7 +48,8 @@ object Flow { @JvmOverloads fun newAccessApi(host: String, port: Int = 9000, secure: Boolean = false, userAgent: String = DEFAULT_USER_AGENT, maxMessageSize: Int = DEFAULT_MAX_MESSAGE_SIZE): FlowAccessApi { val channel = openChannel(host, port, secure, userAgent, maxMessageSize) - return FlowAccessApiImpl(AccessAPIGrpc.newBlockingStub(channel), ExecutionDataAPIGrpc.newBlockingStub(channel)) + val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + return FlowAccessApiImpl(AccessAPIGrpc.newBlockingStub(channel), ExecutionDataAPIGrpc.newBlockingStub(channel), coroutineScope) } @JvmStatic diff --git a/src/main/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImpl.kt b/src/main/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImpl.kt index 5e0d4a9..ff7573d 100644 --- a/src/main/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImpl.kt +++ b/src/main/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImpl.kt @@ -3,9 +3,12 @@ package org.onflow.flow.sdk.impl import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.launch import com.google.protobuf.ByteString import org.onflow.flow.sdk.* import io.grpc.ManagedChannel +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.ClosedReceiveChannelException import org.onflow.protobuf.access.Access import org.onflow.protobuf.access.AccessAPIGrpc import org.onflow.protobuf.executiondata.ExecutionDataAPIGrpc @@ -14,7 +17,8 @@ import java.io.Closeable class FlowAccessApiImpl( private val api: AccessAPIGrpc.AccessAPIBlockingStub, - private val executionDataApi: ExecutionDataAPIGrpc.ExecutionDataAPIBlockingStub + private val executionDataApi: ExecutionDataAPIGrpc.ExecutionDataAPIBlockingStub, + private val coroutineScope: CoroutineScope ) : FlowAccessApi, Closeable { override fun close() { val chan = api.channel @@ -391,7 +395,7 @@ class FlowAccessApiImpl( val responseChannel = Channel(Channel.UNLIMITED) val errorChannel = Channel(Channel.UNLIMITED) - return runBlocking { + coroutineScope.launch { try { val request = Executiondata.SubscribeExecutionDataFromStartBlockIDRequest.newBuilder() .setStartBlockId(blockId.byteStringValue) @@ -402,15 +406,14 @@ class FlowAccessApiImpl( for (response in responseIterator) { responseChannel.send(FlowBlockExecutionData.of(response.blockExecutionData)) } - FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel) } catch (e: Exception) { errorChannel.send(e) - FlowAccessApi.AccessApiCallResponse.Error("Failed to subscribe execution data by block ID", e) } finally { responseChannel.close() errorChannel.close() } } + return FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel) } override fun subscribeExecutionDataByBlockHeight( @@ -419,7 +422,7 @@ class FlowAccessApiImpl( val responseChannel = Channel(Channel.UNLIMITED) val errorChannel = Channel(Channel.UNLIMITED) - return runBlocking { + coroutineScope.launch { try { val request = Executiondata.SubscribeExecutionDataFromStartBlockHeightRequest.newBuilder() .setStartBlockHeight(height) @@ -430,15 +433,15 @@ class FlowAccessApiImpl( for (response in responseIterator) { responseChannel.send(FlowBlockExecutionData.of(response.blockExecutionData)) } - FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel) } catch (e: Exception) { errorChannel.send(e) - FlowAccessApi.AccessApiCallResponse.Error("Failed to subscribe execution data by block height", e) } finally { responseChannel.close() errorChannel.close() } } + + return FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel) } override fun subscribeEventsByBlockId( @@ -447,7 +450,7 @@ class FlowAccessApiImpl( val responseChannel = Channel>(Channel.UNLIMITED) val errorChannel = Channel(Channel.UNLIMITED) - return runBlocking { + coroutineScope.launch { try { val request = Executiondata.SubscribeEventsFromStartBlockIDRequest.newBuilder() .setStartBlockId(blockId.byteStringValue) @@ -458,15 +461,14 @@ class FlowAccessApiImpl( for (response in responseIterator) { responseChannel.send(response.eventsList.map { FlowEvent.of(it) }) } - FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel) } catch (e: Exception) { errorChannel.send(e) - FlowAccessApi.AccessApiCallResponse.Error("Failed to subscribe events by block ID", e) } finally { responseChannel.close() errorChannel.close() } } + return FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel) } override fun subscribeEventsByBlockHeight( @@ -475,7 +477,7 @@ class FlowAccessApiImpl( val responseChannel = Channel>(Channel.UNLIMITED) val errorChannel = Channel(Channel.UNLIMITED) - return runBlocking { + coroutineScope.launch { try { val request = Executiondata.SubscribeEventsFromStartHeightRequest.newBuilder() .setStartBlockHeight(height) @@ -486,14 +488,13 @@ class FlowAccessApiImpl( for (response in responseIterator) { responseChannel.send(response.eventsList.map { FlowEvent.of(it) }) } - FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel) } catch (e: Exception) { errorChannel.send(e) - FlowAccessApi.AccessApiCallResponse.Error("Failed to subscribe events by block height", e) } finally { responseChannel.close() errorChannel.close() } } + return FlowAccessApi.AccessApiCallResponse.Success(responseChannel to errorChannel) } } diff --git a/src/test/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImplTest.kt b/src/test/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImplTest.kt index c7f9227..7967f4c 100644 --- a/src/test/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImplTest.kt +++ b/src/test/kotlin/org/onflow/flow/sdk/impl/FlowAccessApiImplTest.kt @@ -1,9 +1,12 @@ package org.onflow.flow.sdk.impl import com.google.protobuf.ByteString +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineDispatcher +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import org.onflow.flow.sdk.* import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals @@ -25,6 +28,7 @@ import java.io.PrintStream import java.math.BigDecimal import java.time.LocalDateTime +@ExperimentalCoroutinesApi class FlowAccessApiImplTest { private lateinit var flowAccessApiImpl: FlowAccessApiImpl private lateinit var mockApi: AccessAPIGrpc.AccessAPIBlockingStub @@ -34,13 +38,14 @@ class FlowAccessApiImplTest { private val api = mock(AccessAPIGrpc.AccessAPIBlockingStub::class.java) private val executionDataApi = mock(ExecutionDataAPIGrpc.ExecutionDataAPIBlockingStub::class.java) - private val flowAccessApi = FlowAccessApiImpl(api, executionDataApi) + private val testDispatcher = TestCoroutineDispatcher() + private val testScope = TestCoroutineScope(testDispatcher) @BeforeEach fun setUp() { mockApi = mock(AccessAPIGrpc.AccessAPIBlockingStub::class.java) mockExecutionDataApi = mock(ExecutionDataAPIGrpc.ExecutionDataAPIBlockingStub::class.java) - flowAccessApiImpl = FlowAccessApiImpl(mockApi, mockExecutionDataApi) + flowAccessApiImpl = FlowAccessApiImpl(mockApi, mockExecutionDataApi, testScope) outputStreamCaptor = ByteArrayOutputStream() originalOut = System.out System.setOut(PrintStream(outputStreamCaptor)) @@ -412,7 +417,7 @@ class FlowAccessApiImplTest { } @Test - fun `Test subscribeExecutionDataByBlockHeight success case`(): Unit = runBlocking { + fun `Test subscribeExecutionDataByBlockHeight success case`() = testScope.runBlockingTest { val blockHeight = 100L val expectedExecutionDataProto = BlockExecutionDataOuterClass.BlockExecutionData.getDefaultInstance() val expectedExecutionData = FlowBlockExecutionData.of(expectedExecutionDataProto) @@ -443,25 +448,38 @@ class FlowAccessApiImplTest { } @Test - fun `Test subscribeExecutionDataByBlockHeight error case`() = runBlocking { + fun `Test subscribeExecutionDataByBlockHeight error case`() = testScope.runBlockingTest { val blockHeight = 100L val exception = RuntimeException("Test exception") - `when`(mockExecutionDataApi.subscribeExecutionDataFromStartBlockHeight(any())).thenThrow(exception) + `when`(mockExecutionDataApi.subscribeExecutionDataFromStartBlockHeight(any())) + .thenAnswer { throw exception } when (val result = flowAccessApiImpl.subscribeExecutionDataByBlockHeight(blockHeight)) { is FlowAccessApi.AccessApiCallResponse.Success -> { - fail("Expected error but got success") + val (_, errorChannel) = result.data + + // Check for errors in the errorChannel + var receivedException: Throwable? = null + val job = launch { + receivedException = errorChannel.receiveCatching().getOrNull() + } + job.join() + + if (receivedException != null) { + assertEquals(exception.message, receivedException!!.message) + } else { + fail("Expected error but got success") + } } is FlowAccessApi.AccessApiCallResponse.Error -> { - assertEquals("Failed to subscribe execution data by block height", result.message) - assertEquals(exception, result.throwable) + fail("Expected success but got error: ${result.message}") } } } @Test - fun `Test subscribeEventsByBlockId success case`(): Unit = runBlocking { + fun `Test subscribeEventsByBlockId success case`() = testScope.runBlockingTest { val blockId = FlowId("01") val expectedEventsProto = EventOuterClass.Event.getDefaultInstance() val expectedEvents = listOf(FlowEvent.of(expectedEventsProto)) @@ -492,7 +510,7 @@ class FlowAccessApiImplTest { } @Test - fun `Test subscribeEventsByBlockId error case`() = runBlocking { + fun `Test subscribeEventsByBlockId error case`() = testScope.runBlockingTest { val blockId = FlowId("01") val exception = RuntimeException("Test exception") @@ -500,7 +518,20 @@ class FlowAccessApiImplTest { when (val result = flowAccessApiImpl.subscribeEventsByBlockId(blockId)) { is FlowAccessApi.AccessApiCallResponse.Success -> { - fail("Expected error but got success") + val (_, errorChannel) = result.data + + // Check for errors in the errorChannel + var receivedException: Throwable? = null + val job = launch { + receivedException = errorChannel.receiveCatching().getOrNull() + } + job.join() + + if (receivedException != null) { + assertEquals(exception.message, receivedException!!.message) + } else { + fail("Expected error but got success") + } } is FlowAccessApi.AccessApiCallResponse.Error -> { assertEquals("Failed to subscribe events by block ID", result.message) @@ -510,7 +541,7 @@ class FlowAccessApiImplTest { } @Test - fun `Test subscribeEventsByBlockHeight success case`(): Unit = runBlocking { + fun `Test subscribeEventsByBlockHeight success case`() = testScope.runBlockingTest { val blockHeight = 100L val expectedEventsProto = EventOuterClass.Event.getDefaultInstance() val expectedEvents = listOf(FlowEvent.of(expectedEventsProto)) @@ -541,7 +572,7 @@ class FlowAccessApiImplTest { } @Test - fun `Test subscribeEventsByBlockHeight error case`() = runBlocking { + fun `Test subscribeEventsByBlockHeight error case`() = testScope.runBlockingTest { val blockHeight = 100L val exception = RuntimeException("Test exception") @@ -549,7 +580,20 @@ class FlowAccessApiImplTest { when (val result = flowAccessApiImpl.subscribeEventsByBlockHeight(blockHeight)) { is FlowAccessApi.AccessApiCallResponse.Success -> { - fail("Expected error but got success") + val (_, errorChannel) = result.data + + // Check for errors in the errorChannel + var receivedException: Throwable? = null + val job = launch { + receivedException = errorChannel.receiveCatching().getOrNull() + } + job.join() + + if (receivedException != null) { + assertEquals(exception.message, receivedException!!.message) + } else { + fail("Expected error but got success") + } } is FlowAccessApi.AccessApiCallResponse.Error -> { assertEquals("Failed to subscribe events by block height", result.message) @@ -559,7 +603,7 @@ class FlowAccessApiImplTest { } @Test - fun `Test subscribeExecutionDataByBlockId success case`(): Unit = runBlocking { + fun `Test subscribeExecutionDataByBlockId success case`() = testScope.runBlockingTest { val blockId = FlowId("01") val expectedExecutionDataProto = BlockExecutionDataOuterClass.BlockExecutionData.getDefaultInstance() val expectedExecutionData = FlowBlockExecutionData.of(expectedExecutionDataProto) @@ -590,7 +634,7 @@ class FlowAccessApiImplTest { } @Test - fun `Test subscribeExecutionDataByBlockId error case`() = runBlocking { + fun `Test subscribeExecutionDataByBlockId error case`() = testScope.runBlockingTest { val blockId = FlowId("01") val exception = RuntimeException("Test exception") @@ -598,7 +642,20 @@ class FlowAccessApiImplTest { when (val result = flowAccessApiImpl.subscribeExecutionDataByBlockId(blockId)) { is FlowAccessApi.AccessApiCallResponse.Success -> { - fail("Expected error but got success") + val (_, errorChannel) = result.data + + // Check for errors in the errorChannel + var receivedException: Throwable? = null + val job = launch { + receivedException = errorChannel.receiveCatching().getOrNull() + } + job.join() + + if (receivedException != null) { + assertEquals(exception.message, receivedException!!.message) + } else { + fail("Expected error but got success") + } } is FlowAccessApi.AccessApiCallResponse.Error -> { assertEquals("Failed to subscribe execution data by block ID", result.message) @@ -606,7 +663,6 @@ class FlowAccessApiImplTest { } } } - private fun createMockTransaction(flowId: FlowId = FlowId("01")) = FlowTransaction(FlowScript("script"), emptyList(), flowId, 123L, FlowTransactionProposalKey(FlowAddress("02"), 1, 123L), FlowAddress("02"), emptyList()) private fun createMockAccount(flowAddress: FlowAddress) = FlowAccount(flowAddress, BigDecimal.ONE, FlowCode("code".toByteArray()), emptyList(), emptyMap())