Skip to content

Commit

Permalink
Migrated to non-cancellable suspending calls
Browse files Browse the repository at this point in the history
Closes #6
  • Loading branch information
agcom committed Feb 7, 2021
1 parent 28bfb9c commit f7e7bbc
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 28 deletions.
18 changes: 11 additions & 7 deletions lib/src/main/kotlin/io/github/agcom/knio2/AsyncByteChannel.kt
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
package io.github.agcom.knio2

import kotlinx.coroutines.suspendCancellableCoroutine
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousByteChannel
import kotlin.coroutines.suspendCoroutine

/**
* Suspending version of [read][AsynchronousByteChannel.read] function.
*
* The operation is not actually cancellable, because the underlying channel ([AsynchronousByteChannel]) provides no guarantee for cancellation.
* In case of cancellation, you may ignore the results.
* The call is not cancellable (suspends until success or failure), because the underlying channel ([AsynchronousByteChannel]) provides no guarantee for cancellation.
* Note that closing the channel (probably) continues every call with a failure (and that covers most use cases).
*
* However, you can mimic cancellation by ignoring the call (hence, ignoring the results).
*/
public suspend fun AsynchronousByteChannel.readAwait(dst: ByteBuffer): Int = suspendCancellableCoroutine {
public suspend fun AsynchronousByteChannel.readAwait(dst: ByteBuffer): Int = suspendCoroutine {
read(dst, Unit, it.asCompletionHandler())
}

/**
* Suspending version of [write][AsynchronousByteChannel.write] function.
*
* The operation is not actually cancellable, because the underlying channel ([AsynchronousByteChannel]) provides no guarantee for cancellation.
* In case of cancellation, you may ignore the results.
* The call is not cancellable (suspends until success or failure), because the underlying channel ([AsynchronousByteChannel]) provides no guarantee for cancellation.
* Note that closing the channel (probably) continues every call with a failure (and that covers most use cases).
*
* However, you can mimic cancellation by ignoring the call (hence, ignoring the results).
*/
public suspend fun AsynchronousByteChannel.writeAwait(src: ByteBuffer): Int = suspendCancellableCoroutine {
public suspend fun AsynchronousByteChannel.writeAwait(src: ByteBuffer): Int = suspendCoroutine {
write(src, Unit, it.asCompletionHandler())
}
26 changes: 16 additions & 10 deletions lib/src/main/kotlin/io/github/agcom/knio2/AsyncFileChannel.kt
Original file line number Diff line number Diff line change
@@ -1,46 +1,52 @@
package io.github.agcom.knio2

import kotlinx.coroutines.suspendCancellableCoroutine
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousFileChannel
import java.nio.channels.FileLock
import kotlin.coroutines.suspendCoroutine

/**
* Suspending version of [lock][AsynchronousFileChannel.lock] function.
*
* The operation is not actually cancellable, because the underlying channel ([AsynchronousFileChannel]) provides no guarantee for cancellation.
* In case of cancellation, you may ignore the results.
* The call is not cancellable (suspends until success or failure), because the underlying channel ([AsynchronousFileChannel]) provides no guarantee for cancellation.
* Note that closing the channel (probably) continues every call with a failure (and that covers most use cases).
*
* However, you can mimic cancellation by ignoring the call (hence, ignoring the results).
*/
public suspend fun AsynchronousFileChannel.lockAwait(
position: Long = 0,
size: Long = Long.MAX_VALUE,
shared: Boolean = false
): FileLock = suspendCancellableCoroutine {
): FileLock = suspendCoroutine {
lock(position, size, shared, Unit, it.asCompletionHandler())
}

/**
* Suspending version of [lock][AsynchronousFileChannel.read] function.
*
* The operation is not actually cancellable, because the underlying channel ([AsynchronousFileChannel]) provides no guarantee for cancellation.
* In case of cancellation, you may ignore the results.
* The call is not cancellable (suspends until success or failure), because the underlying channel ([AsynchronousFileChannel]) provides no guarantee for cancellation.
* Note that closing the channel (probably) continues every call with a failure (and that covers most use cases).
*
* However, you can mimic cancellation by ignoring the call (hence, ignoring the results).
*/
public suspend fun AsynchronousFileChannel.readAwait(
dst: ByteBuffer,
position: Long
): Int = suspendCancellableCoroutine {
): Int = suspendCoroutine {
read(dst, position, Unit, it.asCompletionHandler())
}

/**
* Suspending version of [lock][AsynchronousFileChannel.write] function.
*
* The operation is not actually cancellable, because the underlying channel ([AsynchronousFileChannel]) provides no guarantee for cancellation.
* In case of cancellation, you may ignore the results.
* The call is not cancellable (suspends until success or failure), because the underlying channel ([AsynchronousFileChannel]) provides no guarantee for cancellation.
* Note that closing the channel (probably) continues every call with a failure (and that covers most use cases).
*
* However, you can mimic cancellation by ignoring the call (hence, ignoring the results).
*/
public suspend fun AsynchronousFileChannel.writeAwait(
src: ByteBuffer,
position: Long
): Int = suspendCancellableCoroutine {
): Int = suspendCoroutine {
write(src, position, Unit, it.asCompletionHandler())
}
27 changes: 16 additions & 11 deletions lib/src/main/kotlin/io/github/agcom/knio2/AsyncSocketChannel.kt
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package io.github.agcom.knio2

import kotlinx.coroutines.suspendCancellableCoroutine
import java.net.SocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousSocketChannel
import java.util.concurrent.TimeUnit
import kotlin.coroutines.suspendCoroutine

/**
* Suspending version of [connect][AsynchronousSocketChannel.connect] function.
*
* The operation is not actually cancellable, because the underlying channel ([AsynchronousSocketChannel]) provides no guarantee for cancellation.
* In case of cancellation, you may ignore the results.
* > The correct way to actually cancel the connection is to [close][AsynchronousSocketChannel.close] the channel.
* The call is not cancellable (suspends until success or failure), because the underlying channel ([AsynchronousSocketChannel]) provides no guarantee for cancellation.
* Note that closing the channel (probably) continues every call with a failure (and that covers most use cases).
*
* However, you can mimic cancellation by ignoring the call (hence, ignoring the results).
*/
public suspend fun AsynchronousSocketChannel.connectAwait(remote: SocketAddress) {
suspendCancellableCoroutine<Void?> {
suspendCoroutine<Void?> {
connect(remote, Unit, it.asCompletionHandler())
}
}
Expand All @@ -26,10 +27,12 @@ public suspend fun AsynchronousSocketChannel.connectAwait(remote: SocketAddress)
*
* The offset and length parameters are always set to 0 and [dsts] size. You can achieve their application by [Array.sliceArray] function and a spread operator.
*
* The operation is not actually cancellable, because the underlying channel ([AsynchronousSocketChannel]) provides no guarantee for cancellation.
* In case of cancellation, you may ignore the results.
* The call is not cancellable (suspends until success or failure), because the underlying channel ([AsynchronousSocketChannel]) provides no guarantee for cancellation.
* Note that closing the channel (probably) continues every call with a failure (and that covers most use cases).
*
* However, you can mimic cancellation by ignoring the call (hence, ignoring the results).
*/
public suspend fun AsynchronousSocketChannel.readAwait(vararg dsts: ByteBuffer): Long = suspendCancellableCoroutine {
public suspend fun AsynchronousSocketChannel.readAwait(vararg dsts: ByteBuffer): Long = suspendCoroutine {
read(dsts, 0, dsts.size, -1, TimeUnit.MILLISECONDS, Unit, it.asCompletionHandler())
}

Expand All @@ -40,9 +43,11 @@ public suspend fun AsynchronousSocketChannel.readAwait(vararg dsts: ByteBuffer):
*
* * The offset and length parameters are always set to 0 and [srcs] size. You can achieve their application by [Array.sliceArray] function and a spread operator.
*
* The operation is not actually cancellable, because the underlying channel ([AsynchronousSocketChannel]) provides no guarantee for cancellation.
* In case of cancellation, you may ignore the results.
* The call is not cancellable (suspends until success or failure), because the underlying channel ([AsynchronousSocketChannel]) provides no guarantee for cancellation.
* Note that closing the channel (probably) continues every call with a failure (and that covers most use cases).
*
* However, you can mimic cancellation by ignoring the call (hence, ignoring the results).
*/
public suspend fun AsynchronousSocketChannel.writeAwait(vararg srcs: ByteBuffer): Long = suspendCancellableCoroutine {
public suspend fun AsynchronousSocketChannel.writeAwait(vararg srcs: ByteBuffer): Long = suspendCoroutine {
write(srcs, 0, srcs.size, -1, TimeUnit.MILLISECONDS, Unit, it.asCompletionHandler())
}

0 comments on commit f7e7bbc

Please sign in to comment.