-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add twitter future support #128
Conversation
this is jvm only, so I'm not sure exactly how to make the work here but the core of this works. Twitter future and Catbirds Rerunnable are now supported
and execute the tests to verify they're passing
this matches the monix side
case Async(ac, timeout) ⇒ | ||
Rerunnable.fromFuture { | ||
val p: Promise[A] = Promise() | ||
FuturePool.unboundedPool(ac(p setValue _, p setException _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chris-vale-ck is it common practice in Finagle to use the unbounded pool for these cases or should it be required as implicit in fetchRerunableMonadError
to allow pools with different policies?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So yeah, I know how it sounds to non-finaglers but yes using the default unbounded pool is how things roll. That pool is the same pool that the rest of finagle uses. I can gen up a new pool with some limits on it if it's desired, or even leave the choice of FuturePool
up in the air.
case e => Rerunnable.fromFuture(Future(e.value)) | ||
} | ||
|
||
implicit def fetchRerunableMonadError: FetchMonadError[Rerunnable] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fetchRerunableMonadError
should probably be fetchRerunnableMonadError
} | ||
} | ||
|
||
implicit def fetchTwFutureMonadError: FetchMonadError[Future] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/47deg/fetch/pull/128/files#r131264794 also applies here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also make use of Twitter Future
s so this is a very welcome contribution.
Here is a gist I shared in the Fetch channel several months ago, which might be worth considering pulling some things from, specifically timeouts & parameterising the future pool.
https://gist.github.com/tomjadams/2ca1111532ee7ce0f550d5d0bd9771e5
case Async(ac, timeout) ⇒ | ||
Rerunnable.fromFuture { | ||
val p: Promise[A] = Promise() | ||
FuturePool.unboundedPool(ac(p setValue _, p setException _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm uncomfortable about using an unbounded pool here, would rather see it passed in (implicitly), giving the user more control.
case Sync(e) ⇒ Future(e.value) | ||
case Async(ac, timeout) ⇒ | ||
val p: Promise[A] = Promise() | ||
FuturePool.unboundedPool(ac(p setValue _, p setException _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per the above comment, would prefer this used a passed pool rather than this one.
new FetchMonadError.FromMonadError[Future] { | ||
override def runQuery[A](j: Query[A]): Future[A] = j match { | ||
case Sync(e) ⇒ Future(e.value) | ||
case Async(ac, timeout) ⇒ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to see this taking the timeout into account. We don't use it specifically, but I guess some people might, so good to support that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See also this pending PR regarding timeout capabilities #127
implicit def fetchTwFutureMonadError: FetchMonadError[Future] = | ||
new FetchMonadError.FromMonadError[Future] { | ||
override def runQuery[A](j: Query[A]): Future[A] = j match { | ||
case Sync(e) ⇒ Future(e.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there semantic differences here between Future.apply
and Future.value
that make sense to use one vs. the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.value
could potentially throw an exception, so I think we need Future.apply
here.
You could also use catchNonFatalEval
(from cats.ApplicativeError
) instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking about it. From a callers POV they just get a Future. But using apply would schedule that code to be run on some thread whereas value would just return a completed future without the need for any async evaluation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peterneyens Good point, hadn’t thought of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tomjadams For Twitter's Future
apply
will evaluate the thunk in the calling thread, so I think it's fine to use here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@travisbrown Yeah, I did read that after I posted the initial comment. But that'd be more work than simply returning a value though right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tomjadams Yeah, there's a little more work in that you have to instantiate an extra => A
and Try
, but dropping down to a try-catch block to avoid that feels like premature optimization (and there are probably a ton of other places with similar overhead).
with two n's in ht middle
Codecov Report
@@ Coverage Diff @@
## master #128 +/- ##
==========================================
+ Coverage 78% 78.63% +0.63%
==========================================
Files 14 15 +1
Lines 300 323 +23
Branches 2 2
==========================================
+ Hits 234 254 +20
- Misses 66 69 +3
Continue to review full report at Codecov.
|
with the default value of the interruptableUnboundedPool and then use the fact that the pool creates interruptable threads to wire up the timeouts.
I've gone ahead and added an implicit ThreadPool but with the default value of |
rely on `catbird` to provide the twitter `util-core` dependency.
Twitter util no longer supports 2.10 so this won't build with it in. also remove the now unneeded version check for including paradise.
oops, missed travis
test evalToRunnable
Looks like codecov is not reporting properly since tests for that file are included. @dominv do you think you can take a look at codecov here in case it's missconfigured? |
@raulraja Sure, I'll find out what's happening |
for both twitter future and catbirds rerunnable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are using =>
and <-
everywhere, can you replace the unicode ⇒
and ←
?
} | ||
} | ||
case Ap(qf, qx) ⇒ | ||
runQuery(qf).product(Rerunnable(qx)) map { case (f, x) ⇒ f(x) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be runQuery(qf).product(runQuery(qx))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without the map the statement produces a Rerunnable[(Any => A, Any)]
so we need to run the (Any) => A
on the Any
. or at least that's what I think needs to be done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion, I meant runQuery(qf).product(runQuery(qx)).map { case (f, x) => f(x) }
.
So thanks 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, yeah you're right.
case _ => p | ||
} | ||
case Ap(qf, qx) ⇒ | ||
runQuery(qf).join(runQuery(qx)).map { case (f, x) ⇒ f(x) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
catbird overrides ap
exactly in this way, so we could just write ap(runQuery(qf))(runQuery(qx))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed that
to use the catbird provided ap
): FetchMonadError[Rerunnable] = | ||
new FetchMonadError.FromMonadError[Rerunnable] { | ||
override def runQuery[A](j: Query[A]): Rerunnable[A] = j match { | ||
case Sync(e) ⇒ Rerunnable { e.value } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use evalToRerunnable
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
in the rerunnable sync case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @chris-vale-ck.
Thanks for reviewing @tomjadams and @travisbrown.
@raulraja For what I've seen so far, I believe that it is not possible to access to the files' individual reports from the Nevertheless, it's possible to access to this reports using the |
* limitations under the License. | ||
*/ | ||
|
||
package fetch.twitterFuture |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be strange to call this "twitterFuture" while it also contains Rerunnable
. Maybe just "twitter" or "twitterUtil"?
adapted from the scala future version
when testing timeouts
@peterneyens I went ahead and wrote the timeout tests for both twitter future and catbirds rerunnable versions. Well when I say wrote I mean adapted form the tests you mentioned. |
Two small notes:
|
Thanks for the info @travisbrown. The next Fetch release needs to wait for a Monix release for Cats 1.0.0-MF anyway, so waiting on 0.17.0 and maybe another update for Finagle 7 is not a problem. @chris-vale-ck Thanks for the tests. |
As a finagle user I'd love to use fetch as a first class citizen in the whole twitter ecosystem. This PR adds the implicit for
com.twitter.util.Future
to do that. We also use @travisbrown catbird to provide us with the instance/syntax/implicits support for the twitter ecosystem so I've also added support forRerunnable
in this PR.