Skip to content

Commit c279c51

Browse files
authored
Rework transport and internal implementation to support multiplexed transports (#255)
* new Transport API * migrate local to new Transport API and introduce multiplexed kind * fix some tests to use port 0 to auto-assign port
1 parent 1d1b079 commit c279c51

File tree

83 files changed

+3426
-1642
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+3426
-1642
lines changed

build-logic/src/main/kotlin/rsocketbuild.multiplatform-base.gradle.kts

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ kotlin {
3838
}
3939
progressiveMode.set(true)
4040
freeCompilerArgs.add("-Xrender-internal-diagnostic-names")
41+
optIn.addAll(OptIns.ExperimentalSubclassOptIn)
4142
}
4243

4344
sourceSets.configureEach {
@@ -51,6 +52,7 @@ kotlin {
5152

5253
// rsocket related
5354
optIn(OptIns.TransportApi)
55+
optIn(OptIns.RSocketTransportApi)
5456
optIn(OptIns.ExperimentalMetadataApi)
5557
optIn(OptIns.ExperimentalStreamsApi)
5658
optIn(OptIns.RSocketLoggingApi)

build-logic/src/main/kotlin/rsocketbuild/OptIns.kt

+2
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ package rsocketbuild
1818

1919
@Suppress("ConstPropertyName")
2020
object OptIns {
21+
const val ExperimentalSubclassOptIn = "kotlin.ExperimentalSubclassOptIn"
2122
const val ExperimentalStdlibApi = "kotlin.ExperimentalStdlibApi"
2223
const val ExperimentalCoroutinesApi = "kotlinx.coroutines.ExperimentalCoroutinesApi"
2324
const val DelicateCoroutinesApi = "kotlinx.coroutines.DelicateCoroutinesApi"
2425

2526
const val TransportApi = "io.rsocket.kotlin.TransportApi"
27+
const val RSocketTransportApi = "io.rsocket.kotlin.transport.RSocketTransportApi"
2628
const val ExperimentalMetadataApi = "io.rsocket.kotlin.ExperimentalMetadataApi"
2729
const val ExperimentalStreamsApi = "io.rsocket.kotlin.ExperimentalStreamsApi"
2830
const val RSocketLoggingApi = "io.rsocket.kotlin.RSocketLoggingApi"

rsocket-core/api/rsocket-core.api

+67
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ public abstract interface class io/rsocket/kotlin/core/MimeTypeWithName : io/rso
194194

195195
public final class io/rsocket/kotlin/core/RSocketConnector {
196196
public final fun connect (Lio/rsocket/kotlin/transport/ClientTransport;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
197+
public final fun connect (Lio/rsocket/kotlin/transport/RSocketClientTarget;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
197198
}
198199

199200
public final class io/rsocket/kotlin/core/RSocketConnectorBuilder {
@@ -228,6 +229,8 @@ public final class io/rsocket/kotlin/core/RSocketConnectorBuilderKt {
228229
public final class io/rsocket/kotlin/core/RSocketServer {
229230
public final fun bind (Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
230231
public final fun bindIn (Lkotlinx/coroutines/CoroutineScope;Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
232+
public final fun createHandler (Lio/rsocket/kotlin/ConnectionAcceptor;)Lio/rsocket/kotlin/transport/RSocketConnectionHandler;
233+
public final fun startServer (Lio/rsocket/kotlin/transport/RSocketServerTarget;Lio/rsocket/kotlin/ConnectionAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
231234
}
232235

233236
public final class io/rsocket/kotlin/core/RSocketServerBuilder {
@@ -779,7 +782,71 @@ public final class io/rsocket/kotlin/transport/ClientTransportKt {
779782
public static final fun ClientTransport (Lkotlin/coroutines/CoroutineContext;Lio/rsocket/kotlin/transport/ClientTransport;)Lio/rsocket/kotlin/transport/ClientTransport;
780783
}
781784

785+
public abstract interface class io/rsocket/kotlin/transport/RSocketClientTarget : kotlinx/coroutines/CoroutineScope {
786+
public abstract fun connectClient (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;)Lkotlinx/coroutines/Job;
787+
}
788+
789+
public abstract interface class io/rsocket/kotlin/transport/RSocketConnection {
790+
}
791+
792+
public abstract interface class io/rsocket/kotlin/transport/RSocketConnectionHandler {
793+
public abstract fun handleConnection (Lio/rsocket/kotlin/transport/RSocketConnection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
794+
}
795+
796+
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection : io/rsocket/kotlin/transport/RSocketConnection {
797+
public abstract fun acceptStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
798+
public abstract fun createStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
799+
}
800+
801+
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection$Stream : java/io/Closeable {
802+
public abstract fun close ()V
803+
public abstract fun isClosedForSend ()Z
804+
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
805+
public abstract fun sendFrame (Lio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
806+
public abstract fun setSendPriority (I)V
807+
}
808+
809+
public abstract interface class io/rsocket/kotlin/transport/RSocketSequentialConnection : io/rsocket/kotlin/transport/RSocketConnection {
810+
public abstract fun isClosedForSend ()Z
811+
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
812+
public abstract fun sendFrame (ILio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
813+
}
814+
815+
public abstract interface class io/rsocket/kotlin/transport/RSocketServerInstance : kotlinx/coroutines/CoroutineScope {
816+
}
817+
818+
public abstract interface class io/rsocket/kotlin/transport/RSocketServerTarget : kotlinx/coroutines/CoroutineScope {
819+
public abstract fun startServer (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
820+
}
821+
822+
public abstract interface class io/rsocket/kotlin/transport/RSocketTransport : kotlinx/coroutines/CoroutineScope {
823+
}
824+
825+
public abstract interface annotation class io/rsocket/kotlin/transport/RSocketTransportApi : java/lang/annotation/Annotation {
826+
}
827+
828+
public abstract interface class io/rsocket/kotlin/transport/RSocketTransportBuilder {
829+
public abstract fun buildTransport (Lkotlin/coroutines/CoroutineContext;)Lio/rsocket/kotlin/transport/RSocketTransport;
830+
}
831+
832+
public abstract class io/rsocket/kotlin/transport/RSocketTransportFactory {
833+
public fun <init> (Lkotlin/jvm/functions/Function0;)V
834+
public final fun getCreateBuilder ()Lkotlin/jvm/functions/Function0;
835+
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketTransport;
836+
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/RSocketTransportFactory;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransport;
837+
}
838+
782839
public abstract interface class io/rsocket/kotlin/transport/ServerTransport {
783840
public abstract fun start (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function3;)Ljava/lang/Object;
784841
}
785842

843+
public final class io/rsocket/kotlin/transport/internal/PrioritizationFrameQueue {
844+
public fun <init> (I)V
845+
public final fun cancel ()V
846+
public final fun close ()V
847+
public final fun dequeueFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
848+
public final fun enqueueFrame (ILio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
849+
public final fun isClosedForSend ()Z
850+
public final fun tryDequeueFrame ()Lio/ktor/utils/io/core/ByteReadPacket;
851+
}
852+

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt

-11
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
package io.rsocket.kotlin
1818

1919
import io.ktor.utils.io.core.*
20-
import io.rsocket.kotlin.frame.*
21-
import io.rsocket.kotlin.internal.*
2220
import kotlinx.coroutines.*
2321

2422
/**
@@ -29,12 +27,3 @@ public interface Connection : CoroutineScope {
2927
public suspend fun send(packet: ByteReadPacket)
3028
public suspend fun receive(): ByteReadPacket
3129
}
32-
33-
@OptIn(TransportApi::class)
34-
internal suspend inline fun <T> Connection.receiveFrame(pool: BufferPool, block: (frame: Frame) -> T): T =
35-
receive().readFrame(pool).closeOnError(block)
36-
37-
@OptIn(TransportApi::class)
38-
internal suspend fun Connection.sendFrame(pool: BufferPool, frame: Frame) {
39-
frame.toPacket(pool).closeOnError { send(it) }
40-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.connection
18+
19+
import io.ktor.utils.io.core.*
20+
import io.rsocket.kotlin.*
21+
import io.rsocket.kotlin.frame.*
22+
import io.rsocket.kotlin.internal.*
23+
import io.rsocket.kotlin.internal.io.*
24+
import io.rsocket.kotlin.operation.*
25+
import io.rsocket.kotlin.payload.*
26+
import io.rsocket.kotlin.transport.*
27+
import kotlinx.coroutines.*
28+
import kotlinx.coroutines.flow.*
29+
import kotlin.coroutines.*
30+
31+
// TODO: rename to just `Connection` after root `Connection` will be dropped
32+
@RSocketTransportApi
33+
internal abstract class Connection2(
34+
protected val frameCodec: FrameCodec,
35+
// requestContext
36+
final override val coroutineContext: CoroutineContext,
37+
) : RSocket, Closeable {
38+
39+
// connection establishment part
40+
41+
abstract suspend fun establishConnection(handler: ConnectionEstablishmentHandler): ConnectionConfig
42+
43+
// setup completed, start handling requests
44+
abstract suspend fun handleConnection(inbound: ConnectionInbound)
45+
46+
// connection part
47+
48+
protected abstract suspend fun sendConnectionFrame(frame: ByteReadPacket)
49+
private suspend fun sendConnectionFrame(frame: Frame): Unit = sendConnectionFrame(frameCodec.encodeFrame(frame))
50+
51+
suspend fun sendError(cause: Throwable) {
52+
sendConnectionFrame(ErrorFrame(0, cause))
53+
}
54+
55+
private suspend fun sendMetadataPush(metadata: ByteReadPacket) {
56+
sendConnectionFrame(MetadataPushFrame(metadata))
57+
}
58+
59+
suspend fun sendKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long) {
60+
sendConnectionFrame(KeepAliveFrame(respond, lastPosition, data))
61+
}
62+
63+
// operations part
64+
65+
protected abstract fun launchRequest(requestPayload: Payload, operation: RequesterOperation): Job
66+
private suspend fun ensureActiveOrClose(closeable: Closeable) {
67+
currentCoroutineContext().ensureActive { closeable.close() }
68+
coroutineContext.ensureActive { closeable.close() }
69+
}
70+
71+
final override suspend fun metadataPush(metadata: ByteReadPacket) {
72+
ensureActiveOrClose(metadata)
73+
sendMetadataPush(metadata)
74+
}
75+
76+
final override suspend fun fireAndForget(payload: Payload) {
77+
ensureActiveOrClose(payload)
78+
79+
suspendCancellableCoroutine { cont ->
80+
val requestJob = launchRequest(
81+
requestPayload = payload,
82+
operation = RequesterFireAndForgetOperation(cont)
83+
)
84+
cont.invokeOnCancellation { cause ->
85+
requestJob.cancel("Request was cancelled", cause)
86+
}
87+
}
88+
}
89+
90+
final override suspend fun requestResponse(payload: Payload): Payload {
91+
ensureActiveOrClose(payload)
92+
93+
val responseDeferred = CompletableDeferred<Payload>()
94+
95+
val requestJob = launchRequest(
96+
requestPayload = payload,
97+
operation = RequesterRequestResponseOperation(responseDeferred)
98+
)
99+
100+
try {
101+
responseDeferred.join()
102+
} catch (cause: Throwable) {
103+
requestJob.cancel("Request was cancelled", cause)
104+
throw cause
105+
}
106+
return responseDeferred.await()
107+
}
108+
109+
@OptIn(ExperimentalStreamsApi::class)
110+
final override fun requestStream(
111+
payload: Payload,
112+
): Flow<Payload> = payloadFlow { strategy, initialRequest ->
113+
ensureActiveOrClose(payload)
114+
115+
val responsePayloads = PayloadChannel()
116+
117+
val requestJob = launchRequest(
118+
requestPayload = payload,
119+
operation = RequesterRequestStreamOperation(initialRequest, responsePayloads)
120+
)
121+
122+
throw try {
123+
responsePayloads.consumeInto(this, strategy)
124+
} catch (cause: Throwable) {
125+
requestJob.cancel("Request was cancelled", cause)
126+
throw cause
127+
} ?: return@payloadFlow
128+
}
129+
130+
@OptIn(ExperimentalStreamsApi::class)
131+
final override fun requestChannel(
132+
initPayload: Payload,
133+
payloads: Flow<Payload>,
134+
): Flow<Payload> = payloadFlow { strategy, initialRequest ->
135+
ensureActiveOrClose(initPayload)
136+
137+
val responsePayloads = PayloadChannel()
138+
139+
val requestJob = launchRequest(
140+
initPayload,
141+
RequesterRequestChannelOperation(initialRequest, payloads, responsePayloads)
142+
)
143+
144+
throw try {
145+
responsePayloads.consumeInto(this, strategy)
146+
} catch (cause: Throwable) {
147+
requestJob.cancel("Request was cancelled", cause)
148+
throw cause
149+
} ?: return@payloadFlow
150+
}
151+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.connection
18+
19+
import io.ktor.utils.io.core.*
20+
import io.rsocket.kotlin.frame.*
21+
import io.rsocket.kotlin.frame.io.*
22+
import io.rsocket.kotlin.keepalive.*
23+
import io.rsocket.kotlin.payload.*
24+
import io.rsocket.kotlin.transport.*
25+
26+
// send/receive setup, resume, resume ok, lease, error
27+
@RSocketTransportApi
28+
internal abstract class ConnectionEstablishmentContext(
29+
private val frameCodec: FrameCodec,
30+
) {
31+
protected abstract suspend fun receiveFrameRaw(): ByteReadPacket?
32+
protected abstract suspend fun sendFrame(frame: ByteReadPacket)
33+
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frameCodec.encodeFrame(frame))
34+
35+
// only setup|lease|resume|resume_ok|error frames
36+
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
37+
expectedStreamId = 0,
38+
frame = receiveFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
39+
)
40+
41+
suspend fun sendSetup(
42+
version: Version,
43+
honorLease: Boolean,
44+
keepAlive: KeepAlive,
45+
resumeToken: ByteReadPacket?,
46+
payloadMimeType: PayloadMimeType,
47+
payload: Payload,
48+
): Unit = sendFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
49+
}

0 commit comments

Comments
 (0)