Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove RxJava completely in favor of a focus on Kotlin Flow #10

Merged
merged 9 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
runs-on: ubuntu-latest
name: deploy
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
gh_token: ${{ secrets.GITHUB_TOKEN }}
ORG_GRADLE_PROJECT_nexus_user: ${{ secrets.NEXUS_USER }}
ORG_GRADLE_PROJECT_nexus_pass64: ${{ secrets.NEXUS_PASS64 }}
ORG_GRADLE_PROJECT_gpg_passphrase: ${{ secrets.GPG_PASSPHRASE }}
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# DurianRx releases

## [Unreleased]
### Changed
- Bump required java from 11 to 17. ([#9](https://github.com/diffplug/durian-rx/pull/9))
- Replace `RxJava Disposable` with `Kotlin Job`, and remove `rxjava` completely. ([#10](https://github.com/diffplug/durian-rx/pull/10))

## [4.0.1] - 2022-12-20
### Fixed
Expand Down
3 changes: 0 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ apply from: 干.file('spotless/java.gradle')

def VER_DURIAN='1.2.0'
def VER_DURIAN_DEBUG='1.0.0'
def VER_RXJAVA='2.0.0'
def VER_JUNIT='4.12'

dependencies {
implementation "com.diffplug.durian:durian-core:${VER_DURIAN}"
implementation "com.diffplug.durian:durian-concurrent:${VER_DURIAN}"
api "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.5.2"
api "io.reactivex.rxjava2:rxjava:${VER_RXJAVA}"
testImplementation "junit:junit:${VER_JUNIT}"
testImplementation "com.diffplug.durian:durian-testlib:${VER_DURIAN}"
testImplementation "com.diffplug.durian:durian-debug:${VER_DURIAN_DEBUG}"
Expand All @@ -34,7 +32,6 @@ ext.javadoc_links = [
"https://javadoc.io/doc/com.diffplug.durian/durian-collect/${VER_DURIAN}",
"https://javadoc.io/doc/com.diffplug.durian/durian-concurrent/${VER_DURIAN}",
"https://javadoc.io/doc/com.diffplug.durian/durian-debug/${VER_DURIAN_DEBUG}",
"https://javadoc.io/doc/io.reactivex.rxjava2/rxjava/${VER_RXJAVA}",
'https://docs.oracle.com/javase/8/docs/api/'
].join(' ')
apply from: 干.file('base/maven.gradle')
Expand Down
34 changes: 11 additions & 23 deletions src/main/java/com/diffplug/common/rx/GuardedExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
package com.diffplug.common.rx

import com.diffplug.common.util.concurrent.ListenableFuture
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import io.reactivex.disposables.Disposables
import java.util.*
import java.util.concurrent.CompletionStage
import java.util.concurrent.Executor
import java.util.function.Supplier
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.Flow

/**
Expand All @@ -43,42 +42,35 @@ open class GuardedExecutor(val delegate: RxExecutor, val guard: Chit) : Executor
return Runnable { execute(guard.guard(delegate)) }
}

private fun subscribe(subscriber: Supplier<Disposable>): Disposable {
private fun subscribe(subscriber: Supplier<Job>): Job {
return if (!guard.isDisposed) {
val subscription = subscriber.get()
guard.runWhenDisposed { subscription.dispose() }
subscription
val job = subscriber.get()
guard.runWhenDisposed { job.cancel() }
job
} else {
Disposables.disposed()
SupervisorJob().apply { cancel() }
}
}

override fun <T> subscribeDisposable(flow: Flow<T>, listener: RxListener<T>): Disposable {
override fun <T> subscribeDisposable(flow: Flow<T>, listener: RxListener<T>): Job {
return subscribe { delegate.subscribeDisposable(flow, listener) }
}

override fun <T> subscribeDisposable(deferred: Deferred<T>, listener: RxListener<T>): Disposable {
override fun <T> subscribeDisposable(deferred: Deferred<T>, listener: RxListener<T>): Job {
return subscribe { delegate.subscribeDisposable(deferred, listener) }
}

override fun <T> subscribeDisposable(
observable: Observable<out T>,
listener: RxListener<T>
): Disposable {
return subscribe { delegate.subscribeDisposable(observable, listener) }
}

override fun <T> subscribeDisposable(
future: ListenableFuture<out T>,
listener: RxListener<T>
): Disposable {
): Job {
return subscribe { delegate.subscribeDisposable(future, listener) }
}

override fun <T> subscribeDisposable(
future: CompletionStage<out T>,
listener: RxListener<T>
): Disposable {
): Job {
return subscribe { delegate.subscribeDisposable(future, listener) }
}

Expand All @@ -90,10 +82,6 @@ open class GuardedExecutor(val delegate: RxExecutor, val guard: Chit) : Executor
subscribeDisposable(deferred, listener)
}

override fun <T> subscribe(observable: Observable<out T>, listener: RxListener<T>) {
subscribeDisposable(observable, listener)
}

override fun <T> subscribe(future: ListenableFuture<out T>, listener: RxListener<T>) {
subscribeDisposable(future, listener)
}
Expand Down
125 changes: 21 additions & 104 deletions src/main/java/com/diffplug/common/rx/Rx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ import com.diffplug.common.rx.RxListener.DefaultTerminate
import com.diffplug.common.util.concurrent.ListenableFuture
import com.diffplug.common.util.concurrent.MoreExecutors
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.Scheduler
import io.reactivex.disposables.Disposable
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.schedulers.Schedulers
import java.lang.IllegalStateException
import java.lang.SafeVarargs
import java.util.*
Expand All @@ -39,6 +33,7 @@ import java.util.concurrent.Future
import java.util.function.Consumer
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
Expand All @@ -60,47 +55,40 @@ import kotlinx.coroutines.flow.merge
* Rx.subscribe(listenableOrObservable, val -> doSomething); </pre> * Long version: `Rx` implements
* both the [io.reactivex.Observer] and [com.diffplug.common.util.concurrent.FutureCallback]
* interfaces by mapping them to two `Consumer`s:
*
* * `Consumer<T> onValue`</T>
* * `Consumer<Optional></Optional><Throwable>> onTerminate`</Throwable>
*
* Which are mapped as follows:
*
* * `Observable.onNext(T value) -> onValue.accept(value)`
* * `Observable.onCompleted() -> onTerminate.accept(Optional.empty())`
* * `Observable.onError(Throwable error) -> onTerminate.accept(Optional.of(error))`
* * `FutureCallback.onSuccess(T value) -> onValue.accept(value);
* onTerminate.accept(Optional.empty());`
* onTerminate.accept(Optional.empty());`
* * `FutureCallback.onError(Throwable error) -> onTerminate.accept(Optional.of(error))`
*
* An instance of Rx is created by calling one of Rx's static creator methods:
*
* * [onValue(Consumer&amp;lt;T&amp;gt;)][.onValue]
* * [onTerminate(Consumer&amp;lt;Optional&amp;lt;Throwable&amp;gt;&amp;gt;)][.onTerminate]
* * [onFailure(Consumer&amp;lt;Throwable&amp;gt;)][.onFailure]
* * [onValueOrTerminate(Consumer&amp;lt;T&amp;gt;,
* Consumer&amp;lt;Optional&amp;lt;Throwable&amp;gt;&amp;gt;)][.onValueOnTerminate]
* Consumer&amp;lt;Optional&amp;lt;Throwable&amp;gt;&amp;gt;)][.onValueOnTerminate]
* * [onValueOrFailure(Consumer&amp;lt;T&amp;gt;,
* Consumer&amp;lt;Throwable&amp;gt;)][.onValueOnFailure]
* Consumer&amp;lt;Throwable&amp;gt;)][.onValueOnFailure]
*
* Once you have an instance of Rx, you can subscribe it using the normal RxJava or Guava calls:
*
* * `rxObservable.subscribe(Rx.onValue(val -> doSomething(val));`
* * `Futures.addCallback(listenableFuture, Rx.onValue(val -> doSomething(val));`
*
* But the recommended way to subscribe is to use:
*
* * `Rx.subscribe(listenableOrObservable, Rx.onValue(val -> doSomething(val)));`
* * `Rx.subscribe(listenableOrObservable, val -> doSomething(val)); // automatically uses
* Rx.onValue()`
* Rx.onValue()`
*
* The advantage of this latter method is that it returns [io.reactivex.disposables.Disposable]
* instances which allow you to unsubscribe from futures in the same manner as for observables.
*
* * `subscription = Rx.subscribe( ... )`
*
* If you wish to receive callbacks on a specific thread, you can use:
*
* * `Rx.on(someExecutor).subscribe( ... )`
*
* Because RxJava's Observables use [io.reactivex.Scheduler]s rather than
Expand All @@ -109,7 +97,7 @@ import kotlinx.coroutines.flow.merge
* executor which implements [RxExecutor.Has].
*
* @see [SwtExec]
* (https://diffplug.github.io/durian-swt/javadoc/snapshot/com/diffplug/common/swt/SwtExec.html)
* (https://diffplug.github.io/durian-swt/javadoc/snapshot/com/diffplug/common/swt/SwtExec.html)
*/
object Rx {
fun <T> createEmitFlow() =
Expand Down Expand Up @@ -222,16 +210,6 @@ object Rx {
subscribe(deferred, onValue(listener))
}

@JvmStatic
fun <T> subscribe(observable: Observable<out T>, listener: RxListener<T>) {
sameThreadExecutor().subscribe(observable, listener)
}

@JvmStatic
fun <T> subscribe(observable: Observable<out T>, listener: Consumer<T>) {
subscribe(observable, onValue(listener))
}

@JvmStatic
fun <T> subscribe(observable: IObservable<out T>, listener: RxListener<T>) {
subscribe(observable.asObservable(), listener)
Expand Down Expand Up @@ -264,65 +242,52 @@ object Rx {

// Static versions
@JvmStatic
fun <T> subscribeDisposable(flow: Flow<T>, listener: RxListener<T>): Disposable {
fun <T> subscribeDisposable(flow: Flow<T>, listener: RxListener<T>): Job {
return sameThreadExecutor().subscribeDisposable(flow, listener)
}

@JvmStatic
fun <T> subscribeDisposable(flow: Flow<T>, listener: Consumer<T>): Disposable {
fun <T> subscribeDisposable(flow: Flow<T>, listener: Consumer<T>): Job {
return subscribeDisposable(flow, onValue(listener))
}

@JvmStatic
fun <T> subscribeDisposable(deferred: Deferred<T>, listener: RxListener<T>): Disposable {
fun <T> subscribeDisposable(deferred: Deferred<T>, listener: RxListener<T>): Job {
return sameThreadExecutor().subscribeDisposable(deferred, listener)
}

@JvmStatic
fun <T> subscribeDisposable(deferred: Deferred<T>, listener: Consumer<T>): Disposable {
fun <T> subscribeDisposable(deferred: Deferred<T>, listener: Consumer<T>): Job {
return subscribeDisposable(deferred, onValue(listener))
}

@JvmStatic
fun <T> subscribeDisposable(observable: Observable<out T>, listener: RxListener<T>): Disposable {
return sameThreadExecutor().subscribeDisposable(observable, listener)
}

@JvmStatic
fun <T> subscribeDisposable(observable: Observable<out T>, listener: Consumer<T>): Disposable {
return subscribeDisposable(observable, onValue(listener))
}

@JvmStatic
fun <T> subscribeDisposable(observable: IObservable<out T>, listener: RxListener<T>): Disposable {
fun <T> subscribeDisposable(observable: IObservable<out T>, listener: RxListener<T>): Job {
return subscribeDisposable(observable.asObservable(), listener)
}

@JvmStatic
fun <T> subscribeDisposable(observable: IObservable<out T>, listener: Consumer<T>): Disposable {
fun <T> subscribeDisposable(observable: IObservable<out T>, listener: Consumer<T>): Job {
return subscribeDisposable(observable.asObservable(), listener)
}

@JvmStatic
fun <T> subscribeDisposable(
future: ListenableFuture<out T>,
listener: RxListener<T>
): Disposable {
fun <T> subscribeDisposable(future: ListenableFuture<out T>, listener: RxListener<T>): Job {
return sameThreadExecutor().subscribeDisposable(future, listener)
}

@JvmStatic
fun <T> subscribeDisposable(future: ListenableFuture<out T>, listener: Consumer<T>): Disposable {
fun <T> subscribeDisposable(future: ListenableFuture<out T>, listener: Consumer<T>): Job {
return subscribeDisposable(future, onValueOnTerminate(listener, TrackCancelled(future)))
}

@JvmStatic
fun <T> subscribeDisposable(future: CompletionStage<out T>, listener: RxListener<T>): Disposable {
fun <T> subscribeDisposable(future: CompletionStage<out T>, listener: RxListener<T>): Job {
return sameThreadExecutor().subscribeDisposable(future, listener)
}

@JvmStatic
fun <T> subscribeDisposable(future: CompletionStage<out T>, listener: Consumer<T>): Disposable {
fun <T> subscribeDisposable(future: CompletionStage<out T>, listener: Consumer<T>): Job {
return subscribeDisposable(
future, onValueOnTerminate(listener, TrackCancelled(future.toCompletableFuture())))
}
Expand All @@ -338,26 +303,13 @@ object Rx {
} else if (executor is RxExecutor.Has) {
executor.rxExecutor
} else {
RxExecutor(executor, Schedulers.from(executor), executor.asCoroutineDispatcher())
RxExecutor(executor, executor.asCoroutineDispatcher())
}
}

/**
* Mechanism for specifying a specific Executor (for ListenableFuture) and Scheduler (for
* Observable).
*/
@JvmStatic
fun callbackOn(executor: Executor, scheduler: Scheduler): RxExecutor {
return callbackOn(executor, scheduler, executor.asCoroutineDispatcher())
}

@JvmStatic
fun callbackOn(
executor: Executor,
scheduler: Scheduler,
dispatcher: CoroutineDispatcher
): RxExecutor {
return RxExecutor(executor, scheduler, dispatcher)
fun callbackOn(executor: Executor, dispatcher: CoroutineDispatcher): RxExecutor {
return RxExecutor(executor, dispatcher)
}

@JvmStatic
Expand All @@ -376,7 +328,6 @@ object Rx {
_sameThread =
RxExecutor(
MoreExecutors.directExecutor(),
Schedulers.trampoline(),
MoreExecutors.directExecutor().asCoroutineDispatcher())
}
return _sameThread!!
Expand All @@ -397,46 +348,12 @@ object Rx {
if (_tracingPolicy == null) {
_tracingPolicy = DurianPlugins.get(RxTracingPolicy::class.java, RxTracingPolicy.NONE)
if (_tracingPolicy !== RxTracingPolicy.NONE) {
RxJavaPlugins.setOnObservableSubscribe { observable: Observable<*>, observer: Observer<*>
->
if (observer is RxListener<*>) {
// if it's an RxListener, then _tracingPolicy handled it already
return@setOnObservableSubscribe observer
} else {
// if it isn't an RxListener, then we'll apply _tracing policy
val listener =
onValueOnTerminate({ value: Any -> (observer as Observer<Any>).onNext(value) }) {
errorOpt: Optional<Throwable> ->
if (errorOpt.isPresent) {
observer.onError(errorOpt.get())
} else {
observer.onComplete()
}
}
val traced = _tracingPolicy!!.hook(observable, listener)
return@setOnObservableSubscribe object : Observer<Any?> {
override fun onSubscribe(d: Disposable) {
observer.onSubscribe(d)
}

override fun onNext(value: Any?) {
traced.onNext(value)
}

override fun onError(e: Throwable) {
traced.onError(e)
}

override fun onComplete() {
traced.onComplete()
}
}
}
}
// TODO: setup tracing
}
}
return _tracingPolicy!!
}

private var _tracingPolicy: RxTracingPolicy? = null

/** Package-private for testing - resets all of the static member variables. */
Expand Down
Loading
Loading