Skip to content

Commit

Permalink
Remove RxJava completely in favor of a focus on Kotlin Flow (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
nedtwigg authored Jan 23, 2025
2 parents 3a7a619 + 870ff50 commit 5694ef5
Show file tree
Hide file tree
Showing 15 changed files with 327 additions and 549 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
runs-on: ubuntu-latest
name: deploy
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
gh_token: ${{ secrets.GITHUB_TOKEN }}
ORG_GRADLE_PROJECT_nexus_user: ${{ secrets.NEXUS_USER }}
ORG_GRADLE_PROJECT_nexus_pass64: ${{ secrets.NEXUS_PASS64 }}
ORG_GRADLE_PROJECT_gpg_passphrase: ${{ secrets.GPG_PASSPHRASE }}
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# DurianRx releases

## [Unreleased]
### Changed
- Bump required java from 11 to 17. ([#9](https://github.com/diffplug/durian-rx/pull/9))
- Replace `RxJava Disposable` with `Kotlin Job`, and remove `rxjava` completely. ([#10](https://github.com/diffplug/durian-rx/pull/10))

## [4.0.1] - 2022-12-20
### Fixed
Expand Down
3 changes: 0 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ apply from: 干.file('spotless/java.gradle')

def VER_DURIAN='1.2.0'
def VER_DURIAN_DEBUG='1.0.0'
def VER_RXJAVA='2.0.0'
def VER_JUNIT='4.12'

dependencies {
implementation "com.diffplug.durian:durian-core:${VER_DURIAN}"
implementation "com.diffplug.durian:durian-concurrent:${VER_DURIAN}"
api "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.5.2"
api "io.reactivex.rxjava2:rxjava:${VER_RXJAVA}"
testImplementation "junit:junit:${VER_JUNIT}"
testImplementation "com.diffplug.durian:durian-testlib:${VER_DURIAN}"
testImplementation "com.diffplug.durian:durian-debug:${VER_DURIAN_DEBUG}"
Expand All @@ -34,7 +32,6 @@ ext.javadoc_links = [
"https://javadoc.io/doc/com.diffplug.durian/durian-collect/${VER_DURIAN}",
"https://javadoc.io/doc/com.diffplug.durian/durian-concurrent/${VER_DURIAN}",
"https://javadoc.io/doc/com.diffplug.durian/durian-debug/${VER_DURIAN_DEBUG}",
"https://javadoc.io/doc/io.reactivex.rxjava2/rxjava/${VER_RXJAVA}",
'https://docs.oracle.com/javase/8/docs/api/'
].join(' ')
apply from: 干.file('base/maven.gradle')
Expand Down
34 changes: 11 additions & 23 deletions src/main/java/com/diffplug/common/rx/GuardedExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
package com.diffplug.common.rx

import com.diffplug.common.util.concurrent.ListenableFuture
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import io.reactivex.disposables.Disposables
import java.util.*
import java.util.concurrent.CompletionStage
import java.util.concurrent.Executor
import java.util.function.Supplier
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.Flow

/**
Expand All @@ -43,42 +42,35 @@ open class GuardedExecutor(val delegate: RxExecutor, val guard: Chit) : Executor
return Runnable { execute(guard.guard(delegate)) }
}

private fun subscribe(subscriber: Supplier<Disposable>): Disposable {
private fun subscribe(subscriber: Supplier<Job>): Job {
return if (!guard.isDisposed) {
val subscription = subscriber.get()
guard.runWhenDisposed { subscription.dispose() }
subscription
val job = subscriber.get()
guard.runWhenDisposed { job.cancel() }
job
} else {
Disposables.disposed()
SupervisorJob().apply { cancel() }
}
}

override fun <T> subscribeDisposable(flow: Flow<T>, listener: RxListener<T>): Disposable {
override fun <T> subscribeDisposable(flow: Flow<T>, listener: RxListener<T>): Job {
return subscribe { delegate.subscribeDisposable(flow, listener) }
}

override fun <T> subscribeDisposable(deferred: Deferred<T>, listener: RxListener<T>): Disposable {
override fun <T> subscribeDisposable(deferred: Deferred<T>, listener: RxListener<T>): Job {
return subscribe { delegate.subscribeDisposable(deferred, listener) }
}

override fun <T> subscribeDisposable(
observable: Observable<out T>,
listener: RxListener<T>
): Disposable {
return subscribe { delegate.subscribeDisposable(observable, listener) }
}

override fun <T> subscribeDisposable(
future: ListenableFuture<out T>,
listener: RxListener<T>
): Disposable {
): Job {
return subscribe { delegate.subscribeDisposable(future, listener) }
}

override fun <T> subscribeDisposable(
future: CompletionStage<out T>,
listener: RxListener<T>
): Disposable {
): Job {
return subscribe { delegate.subscribeDisposable(future, listener) }
}

Expand All @@ -90,10 +82,6 @@ open class GuardedExecutor(val delegate: RxExecutor, val guard: Chit) : Executor
subscribeDisposable(deferred, listener)
}

override fun <T> subscribe(observable: Observable<out T>, listener: RxListener<T>) {
subscribeDisposable(observable, listener)
}

override fun <T> subscribe(future: ListenableFuture<out T>, listener: RxListener<T>) {
subscribeDisposable(future, listener)
}
Expand Down
125 changes: 21 additions & 104 deletions src/main/java/com/diffplug/common/rx/Rx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ import com.diffplug.common.rx.RxListener.DefaultTerminate
import com.diffplug.common.util.concurrent.ListenableFuture
import com.diffplug.common.util.concurrent.MoreExecutors
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.Scheduler
import io.reactivex.disposables.Disposable
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.schedulers.Schedulers
import java.lang.IllegalStateException
import java.lang.SafeVarargs
import java.util.*
Expand All @@ -39,6 +33,7 @@ import java.util.concurrent.Future
import java.util.function.Consumer
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
Expand All @@ -60,47 +55,40 @@ import kotlinx.coroutines.flow.merge
* Rx.subscribe(listenableOrObservable, val -> doSomething); </pre> * Long version: `Rx` implements
* both the [io.reactivex.Observer] and [com.diffplug.common.util.concurrent.FutureCallback]
* interfaces by mapping them to two `Consumer`s:
*
* * `Consumer<T> onValue`</T>
* * `Consumer<Optional></Optional><Throwable>> onTerminate`</Throwable>
*
* Which are mapped as follows:
*
* * `Observable.onNext(T value) -> onValue.accept(value)`
* * `Observable.onCompleted() -> onTerminate.accept(Optional.empty())`
* * `Observable.onError(Throwable error) -> onTerminate.accept(Optional.of(error))`
* * `FutureCallback.onSuccess(T value) -> onValue.accept(value);
* onTerminate.accept(Optional.empty());`
* onTerminate.accept(Optional.empty());`
* * `FutureCallback.onError(Throwable error) -> onTerminate.accept(Optional.of(error))`
*
* An instance of Rx is created by calling one of Rx's static creator methods:
*
* * [onValue(Consumer&amp;lt;T&amp;gt;)][.onValue]
* * [onTerminate(Consumer&amp;lt;Optional&amp;lt;Throwable&amp;gt;&amp;gt;)][.onTerminate]
* * [onFailure(Consumer&amp;lt;Throwable&amp;gt;)][.onFailure]
* * [onValueOrTerminate(Consumer&amp;lt;T&amp;gt;,
* Consumer&amp;lt;Optional&amp;lt;Throwable&amp;gt;&amp;gt;)][.onValueOnTerminate]
* Consumer&amp;lt;Optional&amp;lt;Throwable&amp;gt;&amp;gt;)][.onValueOnTerminate]
* * [onValueOrFailure(Consumer&amp;lt;T&amp;gt;,
* Consumer&amp;lt;Throwable&amp;gt;)][.onValueOnFailure]
* Consumer&amp;lt;Throwable&amp;gt;)][.onValueOnFailure]
*
* Once you have an instance of Rx, you can subscribe it using the normal RxJava or Guava calls:
*
* * `rxObservable.subscribe(Rx.onValue(val -> doSomething(val));`
* * `Futures.addCallback(listenableFuture, Rx.onValue(val -> doSomething(val));`
*
* But the recommended way to subscribe is to use:
*
* * `Rx.subscribe(listenableOrObservable, Rx.onValue(val -> doSomething(val)));`
* * `Rx.subscribe(listenableOrObservable, val -> doSomething(val)); // automatically uses
* Rx.onValue()`
* Rx.onValue()`
*
* The advantage of this latter method is that it returns [io.reactivex.disposables.Disposable]
* instances which allow you to unsubscribe from futures in the same manner as for observables.
*
* * `subscription = Rx.subscribe( ... )`
*
* If you wish to receive callbacks on a specific thread, you can use:
*
* * `Rx.on(someExecutor).subscribe( ... )`
*
* Because RxJava's Observables use [io.reactivex.Scheduler]s rather than
Expand All @@ -109,7 +97,7 @@ import kotlinx.coroutines.flow.merge
* executor which implements [RxExecutor.Has].
*
* @see [SwtExec]
* (https://diffplug.github.io/durian-swt/javadoc/snapshot/com/diffplug/common/swt/SwtExec.html)
* (https://diffplug.github.io/durian-swt/javadoc/snapshot/com/diffplug/common/swt/SwtExec.html)
*/
object Rx {
fun <T> createEmitFlow() =
Expand Down Expand Up @@ -222,16 +210,6 @@ object Rx {
subscribe(deferred, onValue(listener))
}

@JvmStatic
fun <T> subscribe(observable: Observable<out T>, listener: RxListener<T>) {
sameThreadExecutor().subscribe(observable, listener)
}

@JvmStatic
fun <T> subscribe(observable: Observable<out T>, listener: Consumer<T>) {
subscribe(observable, onValue(listener))
}

@JvmStatic
fun <T> subscribe(observable: IObservable<out T>, listener: RxListener<T>) {
subscribe(observable.asObservable(), listener)
Expand Down Expand Up @@ -264,65 +242,52 @@ object Rx {

// Static versions
@JvmStatic
fun <T> subscribeDisposable(flow: Flow<T>, listener: RxListener<T>): Disposable {
fun <T> subscribeDisposable(flow: Flow<T>, listener: RxListener<T>): Job {
return sameThreadExecutor().subscribeDisposable(flow, listener)
}

@JvmStatic
fun <T> subscribeDisposable(flow: Flow<T>, listener: Consumer<T>): Disposable {
fun <T> subscribeDisposable(flow: Flow<T>, listener: Consumer<T>): Job {
return subscribeDisposable(flow, onValue(listener))
}

@JvmStatic
fun <T> subscribeDisposable(deferred: Deferred<T>, listener: RxListener<T>): Disposable {
fun <T> subscribeDisposable(deferred: Deferred<T>, listener: RxListener<T>): Job {
return sameThreadExecutor().subscribeDisposable(deferred, listener)
}

@JvmStatic
fun <T> subscribeDisposable(deferred: Deferred<T>, listener: Consumer<T>): Disposable {
fun <T> subscribeDisposable(deferred: Deferred<T>, listener: Consumer<T>): Job {
return subscribeDisposable(deferred, onValue(listener))
}

@JvmStatic
fun <T> subscribeDisposable(observable: Observable<out T>, listener: RxListener<T>): Disposable {
return sameThreadExecutor().subscribeDisposable(observable, listener)
}

@JvmStatic
fun <T> subscribeDisposable(observable: Observable<out T>, listener: Consumer<T>): Disposable {
return subscribeDisposable(observable, onValue(listener))
}

@JvmStatic
fun <T> subscribeDisposable(observable: IObservable<out T>, listener: RxListener<T>): Disposable {
fun <T> subscribeDisposable(observable: IObservable<out T>, listener: RxListener<T>): Job {
return subscribeDisposable(observable.asObservable(), listener)
}

@JvmStatic
fun <T> subscribeDisposable(observable: IObservable<out T>, listener: Consumer<T>): Disposable {
fun <T> subscribeDisposable(observable: IObservable<out T>, listener: Consumer<T>): Job {
return subscribeDisposable(observable.asObservable(), listener)
}

@JvmStatic
fun <T> subscribeDisposable(
future: ListenableFuture<out T>,
listener: RxListener<T>
): Disposable {
fun <T> subscribeDisposable(future: ListenableFuture<out T>, listener: RxListener<T>): Job {
return sameThreadExecutor().subscribeDisposable(future, listener)
}

@JvmStatic
fun <T> subscribeDisposable(future: ListenableFuture<out T>, listener: Consumer<T>): Disposable {
fun <T> subscribeDisposable(future: ListenableFuture<out T>, listener: Consumer<T>): Job {
return subscribeDisposable(future, onValueOnTerminate(listener, TrackCancelled(future)))
}

@JvmStatic
fun <T> subscribeDisposable(future: CompletionStage<out T>, listener: RxListener<T>): Disposable {
fun <T> subscribeDisposable(future: CompletionStage<out T>, listener: RxListener<T>): Job {
return sameThreadExecutor().subscribeDisposable(future, listener)
}

@JvmStatic
fun <T> subscribeDisposable(future: CompletionStage<out T>, listener: Consumer<T>): Disposable {
fun <T> subscribeDisposable(future: CompletionStage<out T>, listener: Consumer<T>): Job {
return subscribeDisposable(
future, onValueOnTerminate(listener, TrackCancelled(future.toCompletableFuture())))
}
Expand All @@ -338,26 +303,13 @@ object Rx {
} else if (executor is RxExecutor.Has) {
executor.rxExecutor
} else {
RxExecutor(executor, Schedulers.from(executor), executor.asCoroutineDispatcher())
RxExecutor(executor, executor.asCoroutineDispatcher())
}
}

/**
* Mechanism for specifying a specific Executor (for ListenableFuture) and Scheduler (for
* Observable).
*/
@JvmStatic
fun callbackOn(executor: Executor, scheduler: Scheduler): RxExecutor {
return callbackOn(executor, scheduler, executor.asCoroutineDispatcher())
}

@JvmStatic
fun callbackOn(
executor: Executor,
scheduler: Scheduler,
dispatcher: CoroutineDispatcher
): RxExecutor {
return RxExecutor(executor, scheduler, dispatcher)
fun callbackOn(executor: Executor, dispatcher: CoroutineDispatcher): RxExecutor {
return RxExecutor(executor, dispatcher)
}

@JvmStatic
Expand All @@ -376,7 +328,6 @@ object Rx {
_sameThread =
RxExecutor(
MoreExecutors.directExecutor(),
Schedulers.trampoline(),
MoreExecutors.directExecutor().asCoroutineDispatcher())
}
return _sameThread!!
Expand All @@ -397,46 +348,12 @@ object Rx {
if (_tracingPolicy == null) {
_tracingPolicy = DurianPlugins.get(RxTracingPolicy::class.java, RxTracingPolicy.NONE)
if (_tracingPolicy !== RxTracingPolicy.NONE) {
RxJavaPlugins.setOnObservableSubscribe { observable: Observable<*>, observer: Observer<*>
->
if (observer is RxListener<*>) {
// if it's an RxListener, then _tracingPolicy handled it already
return@setOnObservableSubscribe observer
} else {
// if it isn't an RxListener, then we'll apply _tracing policy
val listener =
onValueOnTerminate({ value: Any -> (observer as Observer<Any>).onNext(value) }) {
errorOpt: Optional<Throwable> ->
if (errorOpt.isPresent) {
observer.onError(errorOpt.get())
} else {
observer.onComplete()
}
}
val traced = _tracingPolicy!!.hook(observable, listener)
return@setOnObservableSubscribe object : Observer<Any?> {
override fun onSubscribe(d: Disposable) {
observer.onSubscribe(d)
}

override fun onNext(value: Any?) {
traced.onNext(value)
}

override fun onError(e: Throwable) {
traced.onError(e)
}

override fun onComplete() {
traced.onComplete()
}
}
}
}
// TODO: setup tracing
}
}
return _tracingPolicy!!
}

private var _tracingPolicy: RxTracingPolicy? = null

/** Package-private for testing - resets all of the static member variables. */
Expand Down
Loading

0 comments on commit 5694ef5

Please sign in to comment.