Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KindConnection #1199

Merged
merged 42 commits into from
Dec 19, 2018
Merged

KindConnection #1199

merged 42 commits into from
Dec 19, 2018

Conversation

nomisRev
Copy link
Member

This PR replaces IOConnection with an abstract version KindConnection which is wired for all F that currently support Async#async.

Copy link
Member

@JorgeCastilloPrz JorgeCastilloPrz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall but feels like the connection arguments required for the async() calls are being ignored and each data type instantiates its own connection inside the method body and the param conn is ignored. Should we keep it exposed and provide instances as default values? or should we remove the connection from the Proc aliases?

@@ -15,6 +15,8 @@ interface Applicative<F> : Functor<F> {

fun <A> A.just(dummy: Unit = Unit): Kind<F, A> = just(this)

fun unit(): Kind<F, Unit> = just(Unit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally D:

@@ -87,7 +87,7 @@ interface DeferredKMonadDeferInstance : MonadDefer<ForDeferredK>, DeferredKBrack
@extension
interface DeferredKAsyncInstance : Async<ForDeferredK>, DeferredKMonadDeferInstance {
override fun <A> async(fa: Proc<A>): DeferredK<A> =
DeferredK.async(fa = fa)
DeferredK.async(fa = { _, cb -> fa(cb) })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to pass in the connection from the callers? looks like we always ask for it in the async arguments (DeferredKProc) but then it's being instantiated inside the method itself and the param ignored?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for now.

This PR just adds the support on the concrete types. This is just in an attempt to do everything in smaller PRs. A discussion of where goes what needs to follow still, I think me & Paco are a little out of sync on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like the correct approach, yes. The correct implementation (the one in Concurrent) has to have a connection parameter, and the Async one is an specialization where the parameter is ignored.

@@ -115,7 +115,7 @@ interface FluxKAsyncInstance :
Async<ForFluxK>,
FluxKMonadDeferInstance {
override fun <A> async(fa: Proc<A>): FluxK<A> =
FluxK.runAsync(fa)
FluxK.runAsync { _, cb -> fa(cb) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, what's the intention on exposing it to callers if it's ignored inside ?

@@ -88,7 +88,7 @@ interface MonoKAsyncInstance :
Async<ForMonoK>,
MonoKMonadDeferInstance {
override fun <A> async(fa: Proc<A>): MonoK<A> =
MonoK.async(fa)
MonoK.async { _, cb -> fa(cb) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

@@ -119,7 +119,7 @@ interface FlowableKAsyncInstance :
Async<ForFlowableK>,
FlowableKMonadDeferInstance {
override fun <A> async(fa: Proc<A>): FlowableK<A> =
FlowableK.async(fa, BS())
FlowableK.async({ _, cb -> fa(cb) }, BS())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

@@ -107,7 +107,7 @@ interface MaybeKMonadDeferInstance : MonadDefer<ForMaybeK>, MaybeKBracketInstanc
@extension
interface MaybeKAsyncInstance : Async<ForMaybeK>, MaybeKMonadDeferInstance {
override fun <A> async(fa: Proc<A>): MaybeK<A> =
MaybeK.async(fa)
MaybeK.async { _, cb -> fa(cb) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing

@@ -86,7 +86,7 @@ interface SingleKAsyncInstance :
Async<ForSingleK>,
SingleKMonadDeferInstance {
override fun <A> async(fa: Proc<A>): SingleK<A> =
SingleK.async(fa)
SingleK.async { _, cb -> fa(cb) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another one

import arrow.typeclasses.Applicative
import java.util.concurrent.atomic.AtomicReference

typealias CancelToken<F> = Kind<F, Unit>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

* val result = DeferredK.async { conn: DeferredKConnection, cb: (Either<Throwable, String>) -> Unit ->
* val resource = Resource()
* conn.push(DeferredK { resource.close() })
* resource.asyncRead { value -> cb(value.right()) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a probable cause for the explosion in CI. Ank can't handle asynchrony of any kind.

*
* Matching the behavior of [async],
* Matching the behavior of [asyncK],
* its [CoroutineContext] is set to [DefaultDispatcher]
* and its [CoroutineStart] is [CoroutineStart.LAZY].
*
* {: data-executable='true'}
*
* ```kotlin:ank
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need Ank for this when it has no return?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletableDeferred<A>().apply {
fa {
fa(conn) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@pakoito
Copy link
Member

pakoito commented Dec 14, 2018

I've discussed with Simon directly the next step to take. The current implementations of async for Observable, Flowable, etc. need to be wired bi-directionally. They're currently wired uni-directionally.

That means we can have the cancel silently or cancel throwing semantics that IO has, for everyone :D

@@ -503,7 +503,9 @@ sealed class DeferredK<A>(
fun <A> async(scope: CoroutineScope = GlobalScope, ctx: CoroutineContext = Dispatchers.Default, start: CoroutineStart = CoroutineStart.LAZY, fa: DeferredKProc<A>): DeferredK<A> {
val conn = DeferredKConnection()
return Generated(ctx, start, scope) {
CompletableDeferred<A>().apply {
val supervisor = Job()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the parent of this job need to be within the scope? See if you can create it from there.

@raulraja raulraja merged commit 1375c97 into master Dec 19, 2018
@raulraja raulraja deleted the simon-kindconnection branch December 19, 2018 21:05
@@ -502,7 +502,8 @@ sealed class DeferredK<A>(
*/
fun <A> async(scope: CoroutineScope = GlobalScope, ctx: CoroutineContext = Dispatchers.Default, start: CoroutineStart = CoroutineStart.LAZY, fa: DeferredKProc<A>): DeferredK<A> {
val conn = DeferredKConnection()
val supervisor = Job()
//If the context doesn’t have a Job, then the coroutine which is created doesn’t have a parent.
val supervisor = scope.coroutineContext[Job] ?: Job()
Copy link
Member

@pakoito pakoito Dec 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In scope.coroutineContext[Job], what is Job here? where is it defined? I'd like to know to understand the architecture better.

@@ -502,7 +502,8 @@ sealed class DeferredK<A>(
*/
fun <A> async(scope: CoroutineScope = GlobalScope, ctx: CoroutineContext = Dispatchers.Default, start: CoroutineStart = CoroutineStart.LAZY, fa: DeferredKProc<A>): DeferredK<A> {
val conn = DeferredKConnection()
val supervisor = Job()
//If the context doesn’t have a Job, then the coroutine which is created doesn’t have a parent.
val supervisor = scope.coroutineContext[Job] ?: Job()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create a Job() for a GlobalScope?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants