Skip to content

Update coroutines #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fun NodeJsTcpServerTransport(port: Int, onStart: () -> Unit = {}): ServerTranspo
// nodejs TCP transport connection - may not work in all cases, not tested properly
@OptIn(ExperimentalCoroutinesApi::class, TransportApi::class)
class NodeJsTcpConnection(private val socket: Socket) : Connection {
override val job: Job = Job()
override val job: CompletableJob = Job()

private val sendChannel = Channel<ByteReadPacket>(8)
private val receiveChannel = Channel<ByteReadPacket>(8)
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ group=io.rsocket.kotlin
version=0.12.0

#Versions
kotlinVersion=1.4.20
kotlinVersion=1.4.21
ktorVersion=1.4.3
kotlinxCoroutinesVersion=1.3.9-native-mt-2
kotlinxCoroutinesVersion=1.4.2-native-mt
kotlinxAtomicfuVersion=0.14.4
kotlinxSerializationVersion=1.0.1
kotlinxBenchmarkVersion=0.2.0-dev-20
Expand Down
3 changes: 1 addition & 2 deletions playground/src/commonMain/kotlin/streams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ import io.rsocket.kotlin.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

@ExperimentalStreamsApi
private suspend fun s() {
val flow = flow {
val strategy = coroutineContext[RequestStrategy]!!.provide()
val strategy = currentCoroutineContext()[RequestStrategy]!!.provide()
var i = strategy.firstRequest()
println("INIT: $i")
var r = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package io.rsocket.kotlin

import kotlinx.coroutines.*

interface Cancelable {
val job: Job
interface Cancellable {
val job: CompletableJob
}

val Cancelable.isActive: Boolean get() = job.isActive
fun Cancelable.cancel(cause: CancellationException? = null): Unit = job.cancel(cause)
fun Cancelable.cancel(message: String, cause: Throwable? = null): Unit = job.cancel(message, cause)
suspend fun Cancelable.join(): Unit = job.join()
suspend fun Cancelable.cancelAndJoin(): Unit = job.cancelAndJoin()
val Cancellable.isActive: Boolean get() = job.isActive
suspend fun Cancellable.join(): Unit = job.join()
suspend fun Cancellable.cancelAndJoin(): Unit = job.cancelAndJoin()
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.rsocket.kotlin.frame.*
* That interface isn't stable for inheritance.
*/
@TransportApi
interface Connection : Cancelable {
interface Connection : Cancellable {

@DangerousInternalIoApi
val pool: ObjectPool<ChunkBuffer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.ktor.utils.io.core.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.flow.*

interface RSocket : Cancelable {
interface RSocket : Cancellable {

suspend fun metadataPush(metadata: ByteReadPacket) {
metadata.release()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RSocketRequestHandlerBuilder internal constructor() {
requestChannel = block
}

internal fun build(job: Job): RSocket =
internal fun build(job: CompletableJob): RSocket =
RSocketRequestHandler(job, metadataPush, fireAndForget, requestResponse, requestStream, requestChannel)
}

Expand All @@ -65,7 +65,7 @@ fun RSocketRequestHandler(parentJob: Job? = null, configure: RSocketRequestHandl
}

private class RSocketRequestHandler(
override val job: Job,
override val job: CompletableJob,
private val metadataPush: (suspend RSocket.(metadata: ByteReadPacket) -> Unit)? = null,
private val fireAndForget: (suspend RSocket.(payload: Payload) -> Unit)? = null,
private val requestResponse: (suspend RSocket.(payload: Payload) -> Payload)? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ public class RSocketConnectorBuilder internal constructor() {
)

private companion object {
private val defaultAcceptor: ConnectionAcceptor = ConnectionAcceptor { EmptyRSocket }
private val defaultAcceptor: ConnectionAcceptor = ConnectionAcceptor { EmptyRSocket() }

private object EmptyRSocket : RSocket {
override val job: Job = NonCancellable
private class EmptyRSocket : RSocket {
override val job: CompletableJob = Job()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class RSocketServer internal constructor(

private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
sendFrame(ErrorFrame(0, error))
cancel("Setup failed", error)
job.completeExceptionally(error)
throw error
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,40 @@ import kotlinx.coroutines.flow.*
internal typealias ReconnectPredicate = suspend (cause: Throwable, attempt: Long) -> Boolean

@Suppress("FunctionName")
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
@OptIn(FlowPreview::class)
internal suspend fun ReconnectableRSocket(
logger: Logger,
connect: suspend () -> RSocket,
predicate: ReconnectPredicate,
): RSocket {
val state = MutableStateFlow<ReconnectState>(ReconnectState.Connecting)

val job =
val job = Job()
val state =
connect.asFlow()
.map<RSocket, ReconnectState> { ReconnectState.Connected(it) } //if connection established - state = connected
.onStart { emit(ReconnectState.Connecting) } //init - state = connecting
.retryWhen { cause, attempt ->
.retryWhen { cause, attempt -> //reconnection logic
logger.debug(cause) { "Connection establishment failed, attempt: $attempt. Trying to reconnect..." }
predicate(cause, attempt)
} //reconnection logic
.catch {
}
.catch { //reconnection failed - state = failed
logger.debug(it) { "Reconnection failed" }
emit(ReconnectState.Failed(it))
} //reconnection failed - state = failed
.mapNotNull {
state.value = it //set state //TODO replace with Flow.stateIn when coroutines 1.4.0-native-mt will be released
}
.transform { value ->
emit(value) //emit before any action, to pass value directly to state

when (it) {
when (value) {
is ReconnectState.Connected -> {
logger.debug { "Connection established" }
it.rSocket.join() //await for connection completion
value.rSocket.join() //await for connection completion
logger.debug { "Connection closed. Reconnecting..." }
}
is ReconnectState.Failed -> throw it.error //reconnect failed, cancel job
ReconnectState.Connecting -> null //skip, still waiting for new connection
is ReconnectState.Failed -> job.completeExceptionally(value.error) //reconnect failed, fail job
ReconnectState.Connecting -> Unit //skip, still waiting for new connection
}
}
.launchRestarting() //reconnect if old connection completed/failed
.restarting() //reconnect if old connection completed
.stateIn(CoroutineScope(Dispatchers.Unconfined + job))

//await first connection to fail fast if something
state.mapNotNull {
Expand All @@ -74,27 +74,16 @@ internal suspend fun ReconnectableRSocket(
return ReconnectableRSocket(job, state)
}

private fun Flow<*>.launchRestarting(): Job = GlobalScope.launch(Dispatchers.Unconfined) {
while (isActive) {
try {
collect()
} catch (e: Throwable) {
// KLUDGE: K/N
cancel("Reconnection failed", e)
break
}
}
}
private fun Flow<ReconnectState>.restarting(): Flow<ReconnectState> = flow { while (true) emitAll(this@restarting) }

private sealed class ReconnectState {
object Connecting : ReconnectState()
data class Failed(val error: Throwable) : ReconnectState()
data class Connected(val rSocket: RSocket) : ReconnectState()
}

@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
private class ReconnectableRSocket(
override val job: Job,
override val job: CompletableJob,
private val state: StateFlow<ReconnectState>,
) : RSocket {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.rsocket.kotlin.internal
import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.native.concurrent.*

internal inline fun <T> Closeable.closeOnError(block: () -> T): T {
try {
Expand All @@ -33,9 +34,27 @@ internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
cancel(cause?.let { it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it) })
}

//TODO Can be removed after fix of https://github.com/Kotlin/kotlinx.coroutines/issues/2435
internal fun ReceiveChannel<Closeable>.closeReceivedElements() {
try {
while (true) poll()?.close() ?: break
} catch (e: Throwable) {
}
}

@SharedImmutable
private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close

@Suppress("FunctionName")
internal fun <E : Closeable> SafeChannel(capacity: Int): Channel<E> = Channel(capacity, onUndeliveredElement = onUndeliveredCloseable)

//TODO check after fix of https://github.com/Kotlin/kotlinx.coroutines/issues/2435
// and https://github.com/Kotlin/kotlinx.coroutines/issues/974
internal fun <E : Closeable> SendChannel<E>.safeOffer(element: E) {
try {
if (!offer(element)) element.close()
} catch (cause: Throwable) {
element.close()
throw cause
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
package io.rsocket.kotlin.internal

import io.rsocket.kotlin.frame.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

internal class Prioritizer {
private val priorityChannel = Channel<Frame>(Channel.UNLIMITED)
private val commonChannel = Channel<Frame>(Channel.UNLIMITED)
private val priorityChannel = SafeChannel<Frame>(Channel.UNLIMITED)
private val commonChannel = SafeChannel<Frame>(Channel.UNLIMITED)

fun send(frame: Frame) {
commonChannel.offer(frame)
commonChannel.safeOffer(frame)
}

fun sendPrioritized(frame: Frame) {
priorityChannel.offer(frame)
priorityChannel.safeOffer(frame)
}

suspend fun receive(): Frame {
Expand All @@ -41,10 +42,10 @@ internal class Prioritizer {
}
}

fun close(throwable: Throwable?) {
fun cancel(error: CancellationException) {
priorityChannel.closeReceivedElements()
commonChannel.closeReceivedElements()
priorityChannel.close(throwable)
commonChannel.close(throwable)
priorityChannel.cancel(error)
commonChannel.cancel(error)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kotlinx.coroutines.flow.*
internal class RSocketRequester(
private val state: RSocketState,
private val streamId: StreamId,
) : RSocket, Cancelable by state {
) : RSocket, Cancellable by state {

override suspend fun metadataPush(metadata: ByteReadPacket): Unit = metadata.closeOnError {
checkAvailable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kotlinx.coroutines.*
internal class RSocketResponder(
private val state: RSocketState,
private val requestHandler: RSocket,
) : Cancelable by state {
) : Cancellable by state {

fun handleMetadataPush(frame: MetadataPushFrame) {
state.launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import kotlinx.coroutines.flow.*
internal class RSocketState(
private val connection: Connection,
keepAlive: KeepAlive,
) : Cancelable by connection {
) : Cancellable by connection {
private val prioritizer = Prioritizer()
private val requestScope = CoroutineScope(SupervisorJob(job))
private val scope = CoroutineScope(job)
Expand All @@ -53,7 +53,7 @@ internal class RSocketState(
}

fun createReceiverFor(streamId: Int, initFrame: RequestFrame? = null): ReceiveChannel<RequestFrame> {
val receiver = Channel<RequestFrame>(Channel.UNLIMITED)
val receiver = SafeChannel<RequestFrame>(Channel.UNLIMITED)
initFrame?.let(receiver::offer) //used only in RequestChannel on responder side
receivers[streamId] = receiver
return receiver
Expand All @@ -71,7 +71,7 @@ internal class RSocketState(
if (cause != null) send(CancelFrame(streamId))
receivers.remove(streamId)?.apply {
closeReceivedElements()
close(cause)
cancelConsumed(cause)
}
}
}
Expand Down Expand Up @@ -120,7 +120,7 @@ internal class RSocketState(
when (val streamId = frame.streamId) {
0 -> when (frame) {
is ErrorFrame -> {
cancel("Zero stream error", frame.throwable)
job.completeExceptionally(frame.throwable)
frame.release() //TODO
}
is KeepAliveFrame -> keepAliveHandler.receive(frame)
Expand All @@ -146,7 +146,7 @@ internal class RSocketState(
frame.release()
}
is RequestFrame -> when (frame.type) {
FrameType.Payload -> receivers[streamId]?.offer(frame)
FrameType.Payload -> receivers[streamId]?.safeOffer(frame) ?: frame.release()
FrameType.RequestFnF -> responder.handleFireAndForget(frame)
FrameType.RequestResponse -> responder.handlerRequestResponse(frame)
FrameType.RequestStream -> responder.handleRequestStream(frame)
Expand All @@ -164,20 +164,35 @@ internal class RSocketState(
fun start(requestHandler: RSocket) {
val responder = RSocketResponder(this, requestHandler)
keepAliveHandler.startIn(scope)
requestHandler.job.invokeOnCompletion { cancel("Request handled stopped", it) }
requestHandler.job.invokeOnCompletion {
when (it) {
null -> job.complete()
is CancellationException -> job.cancel(it)
else -> job.completeExceptionally(it)
}
}
job.invokeOnCompletion { error ->
requestHandler.cancel("Connection closed", error)
when (error) {
null -> requestHandler.job.complete()
is CancellationException -> requestHandler.job.cancel(error)
else -> requestHandler.job.completeExceptionally(error)
}
val cancelError = error as? CancellationException ?: CancellationException("Connection closed", error)
receivers.values().forEach {
it.closeReceivedElements()
it.close((error as? CancellationException)?.cause ?: error)
it.cancel(cancelError)
}
senders.values().forEach { it.cancel(cancelError) }
receivers.clear()
limits.clear()
senders.clear()
prioritizer.close(error)
prioritizer.cancel(cancelError)
}
scope.launch {
while (connection.isActive) connection.sendFrame(prioritizer.receive())
while (connection.isActive) {
val frame = prioritizer.receive()
frame.closeOnError { connection.sendFrame(frame) }
}
}
scope.launch {
while (connection.isActive) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class SetupRejectionTest : SuspendTest, TestWithLeakCheck {
}
val sender = sendingRSocket.await()
assertFalse(sender.isActive)
val error = expectError()
assertTrue(error is RSocketError.Setup.Rejected)
assertEquals(errorMessage, error.message)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck {

@Test
fun noKeepAliveSentAfterRSocketCanceled() = test {
requester().cancel()
requester().job.cancel()
connection.test {
expectNoEventsIn(500)
}
Expand Down
Loading