Skip to content

Commit c71adfb

Browse files
committed
Rework transport and internal implementation to support multiplexed transports
* new Transport API * migrate local to new Transport API and introduce multiplexed kind * fix some tests to use port 0 to auto-assign port
1 parent 6b06c63 commit c71adfb

File tree

83 files changed

+3416
-1648
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+3416
-1648
lines changed

build-logic/src/main/kotlin/rsocketbuild.multiplatform-base.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ kotlin {
4646

4747
// rsocket related
4848
optIn(OptIns.TransportApi)
49+
optIn(OptIns.RSocketTransportApi)
4950
optIn(OptIns.ExperimentalMetadataApi)
5051
optIn(OptIns.ExperimentalStreamsApi)
5152
optIn(OptIns.RSocketLoggingApi)

build-logic/src/main/kotlin/rsocketbuild/OptIns.kt

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ object OptIns {
2323
const val DelicateCoroutinesApi = "kotlinx.coroutines.DelicateCoroutinesApi"
2424

2525
const val TransportApi = "io.rsocket.kotlin.TransportApi"
26+
const val RSocketTransportApi = "io.rsocket.kotlin.transport.RSocketTransportApi"
2627
const val ExperimentalMetadataApi = "io.rsocket.kotlin.ExperimentalMetadataApi"
2728
const val ExperimentalStreamsApi = "io.rsocket.kotlin.ExperimentalStreamsApi"
2829
const val RSocketLoggingApi = "io.rsocket.kotlin.RSocketLoggingApi"

rsocket-core/api/rsocket-core.api

+67
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ public abstract interface class io/rsocket/kotlin/core/MimeTypeWithName : io/rso
194194

195195
public final class io/rsocket/kotlin/core/RSocketConnector {
196196
public final fun connect (Lio/rsocket/kotlin/transport/ClientTransport;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
197+
public final fun connect (Lio/rsocket/kotlin/transport/RSocketClientTarget;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
197198
}
198199

199200
public final class io/rsocket/kotlin/core/RSocketConnectorBuilder {
@@ -228,6 +229,8 @@ public final class io/rsocket/kotlin/core/RSocketConnectorBuilderKt {
228229
public final class io/rsocket/kotlin/core/RSocketServer {
229230
public final fun bind (Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
230231
public final fun bindIn (Lkotlinx/coroutines/CoroutineScope;Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
232+
public final fun createHandler (Lio/rsocket/kotlin/ConnectionAcceptor;)Lio/rsocket/kotlin/transport/RSocketConnectionHandler;
233+
public final fun startServer (Lio/rsocket/kotlin/transport/RSocketServerTarget;Lio/rsocket/kotlin/ConnectionAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
231234
}
232235

233236
public final class io/rsocket/kotlin/core/RSocketServerBuilder {
@@ -760,7 +763,71 @@ public final class io/rsocket/kotlin/transport/ClientTransportKt {
760763
public static final fun ClientTransport (Lkotlin/coroutines/CoroutineContext;Lio/rsocket/kotlin/transport/ClientTransport;)Lio/rsocket/kotlin/transport/ClientTransport;
761764
}
762765

766+
public abstract interface class io/rsocket/kotlin/transport/RSocketClientTarget : kotlinx/coroutines/CoroutineScope {
767+
public abstract fun connectClient (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;)Lkotlinx/coroutines/Job;
768+
}
769+
770+
public abstract interface class io/rsocket/kotlin/transport/RSocketConnection {
771+
}
772+
773+
public abstract interface class io/rsocket/kotlin/transport/RSocketConnectionHandler {
774+
public abstract fun handleConnection (Lio/rsocket/kotlin/transport/RSocketConnection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
775+
}
776+
777+
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection : io/rsocket/kotlin/transport/RSocketConnection {
778+
public abstract fun acceptStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
779+
public abstract fun createStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
780+
}
781+
782+
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection$Stream : java/io/Closeable {
783+
public abstract fun close ()V
784+
public abstract fun isClosedForSend ()Z
785+
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
786+
public abstract fun sendFrame (Lio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
787+
public abstract fun setSendPriority (I)V
788+
}
789+
790+
public abstract interface class io/rsocket/kotlin/transport/RSocketSequentialConnection : io/rsocket/kotlin/transport/RSocketConnection {
791+
public abstract fun isClosedForSend ()Z
792+
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
793+
public abstract fun sendFrame (ILio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
794+
}
795+
796+
public abstract interface class io/rsocket/kotlin/transport/RSocketServerInstance : kotlinx/coroutines/CoroutineScope {
797+
}
798+
799+
public abstract interface class io/rsocket/kotlin/transport/RSocketServerTarget : kotlinx/coroutines/CoroutineScope {
800+
public abstract fun startServer (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
801+
}
802+
803+
public abstract interface class io/rsocket/kotlin/transport/RSocketTransport : kotlinx/coroutines/CoroutineScope {
804+
}
805+
806+
public abstract interface annotation class io/rsocket/kotlin/transport/RSocketTransportApi : java/lang/annotation/Annotation {
807+
}
808+
809+
public abstract interface class io/rsocket/kotlin/transport/RSocketTransportBuilder {
810+
public abstract fun buildTransport (Lkotlin/coroutines/CoroutineContext;)Lio/rsocket/kotlin/transport/RSocketTransport;
811+
}
812+
813+
public abstract class io/rsocket/kotlin/transport/RSocketTransportFactory {
814+
public fun <init> (Lkotlin/jvm/functions/Function0;)V
815+
public final fun getCreateBuilder ()Lkotlin/jvm/functions/Function0;
816+
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketTransport;
817+
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/RSocketTransportFactory;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransport;
818+
}
819+
763820
public abstract interface class io/rsocket/kotlin/transport/ServerTransport {
764821
public abstract fun start (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function3;)Ljava/lang/Object;
765822
}
766823

824+
public final class io/rsocket/kotlin/transport/internal/PrioritizationFrameQueue {
825+
public fun <init> (I)V
826+
public final fun cancel ()V
827+
public final fun close ()V
828+
public final fun dequeueFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
829+
public final fun enqueueFrame (ILio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
830+
public final fun isClosedForSend ()Z
831+
public final fun tryDequeueFrame ()Lio/ktor/utils/io/core/ByteReadPacket;
832+
}
833+

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt

-12
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717
package io.rsocket.kotlin
1818

1919
import io.ktor.utils.io.core.*
20-
import io.rsocket.kotlin.frame.*
21-
import io.rsocket.kotlin.internal.*
22-
import io.rsocket.kotlin.internal.io.*
2320
import kotlinx.coroutines.*
2421

2522
/**
@@ -30,12 +27,3 @@ public interface Connection : CoroutineScope {
3027
public suspend fun send(packet: ByteReadPacket)
3128
public suspend fun receive(): ByteReadPacket
3229
}
33-
34-
@OptIn(TransportApi::class)
35-
internal suspend inline fun <T> Connection.receiveFrame(pool: BufferPool, block: (frame: Frame) -> T): T =
36-
receive().readFrame(pool).closeOnError(block)
37-
38-
@OptIn(TransportApi::class)
39-
internal suspend fun Connection.sendFrame(pool: BufferPool, frame: Frame) {
40-
frame.toPacket(pool).closeOnError { send(it) }
41-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.connection
18+
19+
import io.ktor.utils.io.core.*
20+
import io.rsocket.kotlin.*
21+
import io.rsocket.kotlin.frame.*
22+
import io.rsocket.kotlin.internal.*
23+
import io.rsocket.kotlin.internal.io.*
24+
import io.rsocket.kotlin.operation.*
25+
import io.rsocket.kotlin.payload.*
26+
import io.rsocket.kotlin.transport.*
27+
import kotlinx.coroutines.*
28+
import kotlinx.coroutines.flow.*
29+
import kotlin.coroutines.*
30+
31+
// TODO: rename to just `Connection` after root `Connection` will be dropped
32+
@RSocketTransportApi
33+
internal abstract class Connection2(
34+
protected val frameCodec: FrameCodec,
35+
// requestContext
36+
final override val coroutineContext: CoroutineContext,
37+
) : RSocket, Closeable {
38+
39+
// connection establishment part
40+
41+
abstract suspend fun establishConnection(handler: ConnectionEstablishmentHandler): ConnectionConfig
42+
43+
// setup completed, start handling requests
44+
abstract suspend fun handleConnection(inbound: ConnectionInbound)
45+
46+
// connection part
47+
48+
protected abstract suspend fun sendConnectionFrame(frame: ByteReadPacket)
49+
private suspend fun sendConnectionFrame(frame: Frame): Unit = sendConnectionFrame(frameCodec.encodeFrame(frame))
50+
51+
suspend fun sendError(cause: Throwable) {
52+
sendConnectionFrame(ErrorFrame(0, cause))
53+
}
54+
55+
private suspend fun sendMetadataPush(metadata: ByteReadPacket) {
56+
sendConnectionFrame(MetadataPushFrame(metadata))
57+
}
58+
59+
suspend fun sendKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long) {
60+
sendConnectionFrame(KeepAliveFrame(respond, lastPosition, data))
61+
}
62+
63+
// operations part
64+
65+
protected abstract fun launchRequest(requestPayload: Payload, operation: RequesterOperation): Job
66+
private suspend fun ensureActiveOrClose(closeable: Closeable) {
67+
currentCoroutineContext().ensureActive { closeable.close() }
68+
coroutineContext.ensureActive { closeable.close() }
69+
}
70+
71+
final override suspend fun metadataPush(metadata: ByteReadPacket) {
72+
ensureActiveOrClose(metadata)
73+
sendMetadataPush(metadata)
74+
}
75+
76+
final override suspend fun fireAndForget(payload: Payload) {
77+
ensureActiveOrClose(payload)
78+
79+
suspendCancellableCoroutine { cont ->
80+
val requestJob = launchRequest(
81+
requestPayload = payload,
82+
operation = RequesterFireAndForgetOperation(cont)
83+
)
84+
cont.invokeOnCancellation { cause ->
85+
requestJob.cancel("Request was cancelled", cause)
86+
}
87+
}
88+
}
89+
90+
final override suspend fun requestResponse(payload: Payload): Payload {
91+
ensureActiveOrClose(payload)
92+
93+
val responseDeferred = CompletableDeferred<Payload>()
94+
95+
val requestJob = launchRequest(
96+
requestPayload = payload,
97+
operation = RequesterRequestResponseOperation(responseDeferred)
98+
)
99+
100+
try {
101+
responseDeferred.join()
102+
} catch (cause: Throwable) {
103+
requestJob.cancel("Request was cancelled", cause)
104+
throw cause
105+
}
106+
return responseDeferred.await()
107+
}
108+
109+
@OptIn(ExperimentalStreamsApi::class)
110+
final override fun requestStream(
111+
payload: Payload,
112+
): Flow<Payload> = payloadFlow { strategy, initialRequest ->
113+
ensureActiveOrClose(payload)
114+
115+
val responsePayloads = PayloadChannel()
116+
117+
val requestJob = launchRequest(
118+
requestPayload = payload,
119+
operation = RequesterRequestStreamOperation(initialRequest, responsePayloads)
120+
)
121+
122+
throw try {
123+
responsePayloads.consumeInto(this, strategy)
124+
} catch (cause: Throwable) {
125+
requestJob.cancel("Request was cancelled", cause)
126+
throw cause
127+
} ?: return@payloadFlow
128+
}
129+
130+
@OptIn(ExperimentalStreamsApi::class)
131+
final override fun requestChannel(
132+
initPayload: Payload,
133+
payloads: Flow<Payload>,
134+
): Flow<Payload> = payloadFlow { strategy, initialRequest ->
135+
ensureActiveOrClose(initPayload)
136+
137+
val responsePayloads = PayloadChannel()
138+
139+
val requestJob = launchRequest(
140+
initPayload,
141+
RequesterRequestChannelOperation(initialRequest, payloads, responsePayloads)
142+
)
143+
144+
throw try {
145+
responsePayloads.consumeInto(this, strategy)
146+
} catch (cause: Throwable) {
147+
requestJob.cancel("Request was cancelled", cause)
148+
throw cause
149+
} ?: return@payloadFlow
150+
}
151+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.connection
18+
19+
import io.ktor.utils.io.core.*
20+
import io.rsocket.kotlin.frame.*
21+
import io.rsocket.kotlin.frame.io.*
22+
import io.rsocket.kotlin.keepalive.*
23+
import io.rsocket.kotlin.payload.*
24+
import io.rsocket.kotlin.transport.*
25+
26+
// send/receive setup, resume, resume ok, lease, error
27+
@RSocketTransportApi
28+
internal abstract class ConnectionEstablishmentContext(
29+
private val frameCodec: FrameCodec,
30+
) {
31+
protected abstract suspend fun receiveFrameRaw(): ByteReadPacket?
32+
protected abstract suspend fun sendFrame(frame: ByteReadPacket)
33+
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frameCodec.encodeFrame(frame))
34+
35+
// only setup|lease|resume|resume_ok|error frames
36+
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
37+
expectedStreamId = 0,
38+
frame = receiveFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
39+
)
40+
41+
suspend fun sendSetup(
42+
version: Version,
43+
honorLease: Boolean,
44+
keepAlive: KeepAlive,
45+
resumeToken: ByteReadPacket?,
46+
payloadMimeType: PayloadMimeType,
47+
payload: Payload,
48+
): Unit = sendFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
49+
}

0 commit comments

Comments
 (0)