From 4655b4cdd2ad7363be7749a691dffb833bfdbfed Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 5 Jun 2023 17:17:45 +0100 Subject: [PATCH 1/3] Kotlin coroutine update. As the Flow interface is not stable for inheritance in 3rd party libraries. Using AbstractFlow as recommended instead via the sealed MongoAbstractFlow class. JAVA-4950 --- config/detekt/baseline.xml | 2 +- .../kotlin/client/coroutine/AggregateFlow.kt | 8 +---- .../client/coroutine/ChangeStreamFlow.kt | 6 ++-- .../kotlin/client/coroutine/DistinctFlow.kt | 7 +--- .../kotlin/client/coroutine/FindFlow.kt | 7 +--- .../client/coroutine/ListCollectionsFlow.kt | 8 ++--- .../client/coroutine/ListDatabasesFlow.kt | 8 ++--- .../client/coroutine/ListIndexesFlow.kt | 7 +--- .../kotlin/client/coroutine/MapReduceFlow.kt | 7 +--- .../client/coroutine/MongoAbstractFlow.kt | 36 +++++++++++++++++++ 10 files changed, 48 insertions(+), 48 deletions(-) create mode 100644 driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MongoAbstractFlow.kt diff --git a/config/detekt/baseline.xml b/config/detekt/baseline.xml index 0f6b17d2460..aecdf138852 100644 --- a/config/detekt/baseline.xml +++ b/config/detekt/baseline.xml @@ -10,7 +10,7 @@ MaxLineLength:MapReduceIterable.kt$MapReduceIterable$* SwallowedException:MockitoHelper.kt$MockitoHelper.DeepReflectionEqMatcher$e: Throwable TooManyFunctions:ClientSession.kt$ClientSession : jClientSession - TooManyFunctions:FindFlow.kt$FindFlow<T : Any> : Flow + TooManyFunctions:FindFlow.kt$FindFlow<T : Any> : MongoAbstractFlow TooManyFunctions:FindIterable.kt$FindIterable<T : Any> : MongoIterable TooManyFunctions:MongoCollection.kt$MongoCollection<T : Any> TooManyFunctions:MongoDatabase.kt$MongoDatabase diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt index 787f7fbd222..df79522d13e 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt @@ -19,9 +19,6 @@ import com.mongodb.ExplainVerbosity import com.mongodb.client.model.Collation import com.mongodb.reactivestreams.client.AggregatePublisher import java.util.concurrent.TimeUnit -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle import org.bson.BsonValue @@ -34,7 +31,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [Aggregation command](https://www.mongodb.com/docs/manual/reference/command/aggregate) */ -public class AggregateFlow(private val wrapped: AggregatePublisher) : Flow { +public class AggregateFlow(private val wrapped: AggregatePublisher) : MongoAbstractFlow(wrapped) { /** * Sets the number of documents to return per batch. @@ -167,7 +164,6 @@ public class AggregateFlow(private val wrapped: AggregatePublisher) /** * Explain the execution plan for this operation with the given verbosity level * - * @param R the type of the document class * @param verbosity the verbosity of the explanation * @return the execution plan * @see [Explain command](https://www.mongodb.com/docs/manual/reference/command/explain/) @@ -198,6 +194,4 @@ public class AggregateFlow(private val wrapped: AggregatePublisher) */ public suspend inline fun explain(verbosity: ExplainVerbosity? = null): R = explain(R::class.java, verbosity) - - public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ChangeStreamFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ChangeStreamFlow.kt index 4a214d6282c..2ab87b8dc05 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ChangeStreamFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ChangeStreamFlow.kt @@ -22,7 +22,6 @@ import com.mongodb.client.model.changestream.FullDocumentBeforeChange import com.mongodb.reactivestreams.client.ChangeStreamPublisher import java.util.concurrent.TimeUnit import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.reactive.asFlow import org.bson.BsonDocument import org.bson.BsonTimestamp @@ -37,7 +36,8 @@ import org.bson.BsonValue * * @param T The type of the result. */ -public class ChangeStreamFlow(private val wrapped: ChangeStreamPublisher) : Flow> { +public class ChangeStreamFlow(private val wrapped: ChangeStreamPublisher) : + MongoAbstractFlow>(wrapped) { /** * Sets the fullDocument value. @@ -173,6 +173,4 @@ public class ChangeStreamFlow(private val wrapped: ChangeStreamPublishe public fun showExpandedEvents(showExpandedEvents: Boolean): ChangeStreamFlow = apply { wrapped.showExpandedEvents(showExpandedEvents) } - public override suspend fun collect(collector: FlowCollector>): Unit = - wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt index edca50a58b0..9b6f572a911 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt @@ -18,9 +18,6 @@ package com.mongodb.kotlin.client.coroutine import com.mongodb.client.model.Collation import com.mongodb.reactivestreams.client.DistinctPublisher import java.util.concurrent.TimeUnit -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.reactive.asFlow import org.bson.BsonValue import org.bson.conversions.Bson @@ -30,7 +27,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [Distinct command](https://www.mongodb.com/docs/manual/reference/command/distinct/) */ -public class DistinctFlow(private val wrapped: DistinctPublisher) : Flow { +public class DistinctFlow(private val wrapped: DistinctPublisher) : MongoAbstractFlow(wrapped) { /** * Sets the number of documents to return per batch. @@ -86,6 +83,4 @@ public class DistinctFlow(private val wrapped: DistinctPublisher) : * @return this */ public fun comment(comment: BsonValue?): DistinctFlow = apply { wrapped.comment(comment) } - - public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt index 5f5381a85f3..2896b293f69 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt @@ -20,9 +20,6 @@ import com.mongodb.ExplainVerbosity import com.mongodb.client.model.Collation import com.mongodb.reactivestreams.client.FindPublisher import java.util.concurrent.TimeUnit -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.awaitSingle import org.bson.BsonValue import org.bson.Document @@ -34,7 +31,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [Collection filter](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/) */ -public class FindFlow(private val wrapped: FindPublisher) : Flow { +public class FindFlow(private val wrapped: FindPublisher) : MongoAbstractFlow(wrapped) { /** * Sets the number of documents to return per batch. @@ -292,6 +289,4 @@ public class FindFlow(private val wrapped: FindPublisher) : Flow */ public suspend inline fun explain(verbosity: ExplainVerbosity? = null): R = explain(R::class.java, verbosity) - - public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt index a35273046ac..627e810892e 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt @@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine import com.mongodb.reactivestreams.client.ListCollectionsPublisher import java.util.concurrent.TimeUnit -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.reactive.asFlow import org.bson.BsonValue import org.bson.conversions.Bson @@ -29,7 +26,8 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [List collections](https://www.mongodb.com/docs/manual/reference/command/listCollections/) */ -public class ListCollectionsFlow(private val wrapped: ListCollectionsPublisher) : Flow { +public class ListCollectionsFlow(private val wrapped: ListCollectionsPublisher) : + MongoAbstractFlow(wrapped) { /** * Sets the maximum execution time on the server for this operation. * @@ -74,6 +72,4 @@ public class ListCollectionsFlow(private val wrapped: ListCollectionsPu * @return this */ public fun comment(comment: BsonValue?): ListCollectionsFlow = apply { wrapped.comment(comment) } - - public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt index ad642da8d25..4663d1995b7 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt @@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine import com.mongodb.reactivestreams.client.ListDatabasesPublisher import java.util.concurrent.TimeUnit -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.reactive.asFlow import org.bson.BsonValue import org.bson.conversions.Bson @@ -29,7 +26,8 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [List databases](https://www.mongodb.com/docs/manual/reference/command/listDatabases/) */ -public class ListDatabasesFlow(private val wrapped: ListDatabasesPublisher) : Flow { +public class ListDatabasesFlow(private val wrapped: ListDatabasesPublisher) : + MongoAbstractFlow(wrapped) { /** * Sets the maximum execution time on the server for this operation. * @@ -93,6 +91,4 @@ public class ListDatabasesFlow(private val wrapped: ListDatabasesPublis * @return this */ public fun comment(comment: BsonValue?): ListDatabasesFlow = apply { wrapped.comment(comment) } - - public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt index cacd0853a72..0e009ddae12 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt @@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine import com.mongodb.reactivestreams.client.ListIndexesPublisher import java.util.concurrent.TimeUnit -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.reactive.asFlow import org.bson.BsonValue /** @@ -28,7 +25,7 @@ import org.bson.BsonValue * @param T The type of the result. * @see [List indexes](https://www.mongodb.com/docs/manual/reference/command/listIndexes/) */ -public class ListIndexesFlow(private val wrapped: ListIndexesPublisher) : Flow { +public class ListIndexesFlow(private val wrapped: ListIndexesPublisher) : MongoAbstractFlow(wrapped) { /** * Sets the maximum execution time on the server for this operation. * @@ -65,6 +62,4 @@ public class ListIndexesFlow(private val wrapped: ListIndexesPublisher< * @return this */ public fun comment(comment: BsonValue?): ListIndexesFlow = apply { wrapped.comment(comment) } - - public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt index b43fd8e2818..36560c40f5c 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt @@ -21,9 +21,6 @@ import com.mongodb.client.model.Collation import com.mongodb.client.model.MapReduceAction import com.mongodb.reactivestreams.client.MapReducePublisher import java.util.concurrent.TimeUnit -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.awaitFirstOrNull import org.bson.conversions.Bson @@ -36,7 +33,7 @@ import org.bson.conversions.Bson * @see [Map Reduce](https://www.mongodb.com/docs/manual/reference/command/mapReduce/) */ @Deprecated("Map Reduce has been deprecated. Use Aggregation instead", replaceWith = ReplaceWith("")) -public class MapReduceFlow(private val wrapped: MapReducePublisher) : Flow { +public class MapReduceFlow(private val wrapped: MapReducePublisher) : MongoAbstractFlow(wrapped) { /** * Sets the number of documents to return per batch. * @@ -209,6 +206,4 @@ public class MapReduceFlow(private val wrapped: MapReducePublisher) * @return this */ public fun collation(collation: Collation?): MapReduceFlow = apply { wrapped.collation(collation) } - - public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MongoAbstractFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MongoAbstractFlow.kt new file mode 100644 index 00000000000..2fcb89a3992 --- /dev/null +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MongoAbstractFlow.kt @@ -0,0 +1,36 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.kotlin.client.coroutine + +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.AbstractFlow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.reactive.asFlow +import org.reactivestreams.Publisher + +/** + * The Mongo Abstract Flow implementation + * + * @param T The type of the result. + * @param wrapped the underlying publisher + */ +@OptIn(FlowPreview::class) +public sealed class MongoAbstractFlow(private val wrapped: Publisher) : AbstractFlow() { + + override suspend fun collectSafely(collector: FlowCollector) { + wrapped.asFlow().collect(collector) + } +} From 01a48fe7e2e4424feb727cd7e27bc27b649ef312 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 12 Jun 2023 09:25:54 +0100 Subject: [PATCH 2/3] Revert "Kotlin coroutine update." This reverts commit 4655b4cdd2ad7363be7749a691dffb833bfdbfed. --- config/detekt/baseline.xml | 2 +- .../kotlin/client/coroutine/AggregateFlow.kt | 8 ++++- .../client/coroutine/ChangeStreamFlow.kt | 6 ++-- .../kotlin/client/coroutine/DistinctFlow.kt | 7 +++- .../kotlin/client/coroutine/FindFlow.kt | 7 +++- .../client/coroutine/ListCollectionsFlow.kt | 8 +++-- .../client/coroutine/ListDatabasesFlow.kt | 8 +++-- .../client/coroutine/ListIndexesFlow.kt | 7 +++- .../kotlin/client/coroutine/MapReduceFlow.kt | 7 +++- .../client/coroutine/MongoAbstractFlow.kt | 36 ------------------- 10 files changed, 48 insertions(+), 48 deletions(-) delete mode 100644 driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MongoAbstractFlow.kt diff --git a/config/detekt/baseline.xml b/config/detekt/baseline.xml index aecdf138852..0f6b17d2460 100644 --- a/config/detekt/baseline.xml +++ b/config/detekt/baseline.xml @@ -10,7 +10,7 @@ MaxLineLength:MapReduceIterable.kt$MapReduceIterable$* SwallowedException:MockitoHelper.kt$MockitoHelper.DeepReflectionEqMatcher$e: Throwable TooManyFunctions:ClientSession.kt$ClientSession : jClientSession - TooManyFunctions:FindFlow.kt$FindFlow<T : Any> : MongoAbstractFlow + TooManyFunctions:FindFlow.kt$FindFlow<T : Any> : Flow TooManyFunctions:FindIterable.kt$FindIterable<T : Any> : MongoIterable TooManyFunctions:MongoCollection.kt$MongoCollection<T : Any> TooManyFunctions:MongoDatabase.kt$MongoDatabase diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt index df79522d13e..787f7fbd222 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt @@ -19,6 +19,9 @@ import com.mongodb.ExplainVerbosity import com.mongodb.client.model.Collation import com.mongodb.reactivestreams.client.AggregatePublisher import java.util.concurrent.TimeUnit +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle import org.bson.BsonValue @@ -31,7 +34,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [Aggregation command](https://www.mongodb.com/docs/manual/reference/command/aggregate) */ -public class AggregateFlow(private val wrapped: AggregatePublisher) : MongoAbstractFlow(wrapped) { +public class AggregateFlow(private val wrapped: AggregatePublisher) : Flow { /** * Sets the number of documents to return per batch. @@ -164,6 +167,7 @@ public class AggregateFlow(private val wrapped: AggregatePublisher) /** * Explain the execution plan for this operation with the given verbosity level * + * @param R the type of the document class * @param verbosity the verbosity of the explanation * @return the execution plan * @see [Explain command](https://www.mongodb.com/docs/manual/reference/command/explain/) @@ -194,4 +198,6 @@ public class AggregateFlow(private val wrapped: AggregatePublisher) */ public suspend inline fun explain(verbosity: ExplainVerbosity? = null): R = explain(R::class.java, verbosity) + + public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ChangeStreamFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ChangeStreamFlow.kt index 2ab87b8dc05..4a214d6282c 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ChangeStreamFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ChangeStreamFlow.kt @@ -22,6 +22,7 @@ import com.mongodb.client.model.changestream.FullDocumentBeforeChange import com.mongodb.reactivestreams.client.ChangeStreamPublisher import java.util.concurrent.TimeUnit import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.reactive.asFlow import org.bson.BsonDocument import org.bson.BsonTimestamp @@ -36,8 +37,7 @@ import org.bson.BsonValue * * @param T The type of the result. */ -public class ChangeStreamFlow(private val wrapped: ChangeStreamPublisher) : - MongoAbstractFlow>(wrapped) { +public class ChangeStreamFlow(private val wrapped: ChangeStreamPublisher) : Flow> { /** * Sets the fullDocument value. @@ -173,4 +173,6 @@ public class ChangeStreamFlow(private val wrapped: ChangeStreamPublishe public fun showExpandedEvents(showExpandedEvents: Boolean): ChangeStreamFlow = apply { wrapped.showExpandedEvents(showExpandedEvents) } + public override suspend fun collect(collector: FlowCollector>): Unit = + wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt index 9b6f572a911..edca50a58b0 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt @@ -18,6 +18,9 @@ package com.mongodb.kotlin.client.coroutine import com.mongodb.client.model.Collation import com.mongodb.reactivestreams.client.DistinctPublisher import java.util.concurrent.TimeUnit +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.reactive.asFlow import org.bson.BsonValue import org.bson.conversions.Bson @@ -27,7 +30,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [Distinct command](https://www.mongodb.com/docs/manual/reference/command/distinct/) */ -public class DistinctFlow(private val wrapped: DistinctPublisher) : MongoAbstractFlow(wrapped) { +public class DistinctFlow(private val wrapped: DistinctPublisher) : Flow { /** * Sets the number of documents to return per batch. @@ -83,4 +86,6 @@ public class DistinctFlow(private val wrapped: DistinctPublisher) : * @return this */ public fun comment(comment: BsonValue?): DistinctFlow = apply { wrapped.comment(comment) } + + public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt index 2896b293f69..5f5381a85f3 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt @@ -20,6 +20,9 @@ import com.mongodb.ExplainVerbosity import com.mongodb.client.model.Collation import com.mongodb.reactivestreams.client.FindPublisher import java.util.concurrent.TimeUnit +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.awaitSingle import org.bson.BsonValue import org.bson.Document @@ -31,7 +34,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [Collection filter](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/) */ -public class FindFlow(private val wrapped: FindPublisher) : MongoAbstractFlow(wrapped) { +public class FindFlow(private val wrapped: FindPublisher) : Flow { /** * Sets the number of documents to return per batch. @@ -289,4 +292,6 @@ public class FindFlow(private val wrapped: FindPublisher) : MongoAbs */ public suspend inline fun explain(verbosity: ExplainVerbosity? = null): R = explain(R::class.java, verbosity) + + public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt index 627e810892e..a35273046ac 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt @@ -17,6 +17,9 @@ package com.mongodb.kotlin.client.coroutine import com.mongodb.reactivestreams.client.ListCollectionsPublisher import java.util.concurrent.TimeUnit +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.reactive.asFlow import org.bson.BsonValue import org.bson.conversions.Bson @@ -26,8 +29,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [List collections](https://www.mongodb.com/docs/manual/reference/command/listCollections/) */ -public class ListCollectionsFlow(private val wrapped: ListCollectionsPublisher) : - MongoAbstractFlow(wrapped) { +public class ListCollectionsFlow(private val wrapped: ListCollectionsPublisher) : Flow { /** * Sets the maximum execution time on the server for this operation. * @@ -72,4 +74,6 @@ public class ListCollectionsFlow(private val wrapped: ListCollectionsPu * @return this */ public fun comment(comment: BsonValue?): ListCollectionsFlow = apply { wrapped.comment(comment) } + + public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt index 4663d1995b7..ad642da8d25 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt @@ -17,6 +17,9 @@ package com.mongodb.kotlin.client.coroutine import com.mongodb.reactivestreams.client.ListDatabasesPublisher import java.util.concurrent.TimeUnit +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.reactive.asFlow import org.bson.BsonValue import org.bson.conversions.Bson @@ -26,8 +29,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [List databases](https://www.mongodb.com/docs/manual/reference/command/listDatabases/) */ -public class ListDatabasesFlow(private val wrapped: ListDatabasesPublisher) : - MongoAbstractFlow(wrapped) { +public class ListDatabasesFlow(private val wrapped: ListDatabasesPublisher) : Flow { /** * Sets the maximum execution time on the server for this operation. * @@ -91,4 +93,6 @@ public class ListDatabasesFlow(private val wrapped: ListDatabasesPublis * @return this */ public fun comment(comment: BsonValue?): ListDatabasesFlow = apply { wrapped.comment(comment) } + + public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt index 0e009ddae12..cacd0853a72 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt @@ -17,6 +17,9 @@ package com.mongodb.kotlin.client.coroutine import com.mongodb.reactivestreams.client.ListIndexesPublisher import java.util.concurrent.TimeUnit +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.reactive.asFlow import org.bson.BsonValue /** @@ -25,7 +28,7 @@ import org.bson.BsonValue * @param T The type of the result. * @see [List indexes](https://www.mongodb.com/docs/manual/reference/command/listIndexes/) */ -public class ListIndexesFlow(private val wrapped: ListIndexesPublisher) : MongoAbstractFlow(wrapped) { +public class ListIndexesFlow(private val wrapped: ListIndexesPublisher) : Flow { /** * Sets the maximum execution time on the server for this operation. * @@ -62,4 +65,6 @@ public class ListIndexesFlow(private val wrapped: ListIndexesPublisher< * @return this */ public fun comment(comment: BsonValue?): ListIndexesFlow = apply { wrapped.comment(comment) } + + public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt index 36560c40f5c..b43fd8e2818 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt @@ -21,6 +21,9 @@ import com.mongodb.client.model.Collation import com.mongodb.client.model.MapReduceAction import com.mongodb.reactivestreams.client.MapReducePublisher import java.util.concurrent.TimeUnit +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.awaitFirstOrNull import org.bson.conversions.Bson @@ -33,7 +36,7 @@ import org.bson.conversions.Bson * @see [Map Reduce](https://www.mongodb.com/docs/manual/reference/command/mapReduce/) */ @Deprecated("Map Reduce has been deprecated. Use Aggregation instead", replaceWith = ReplaceWith("")) -public class MapReduceFlow(private val wrapped: MapReducePublisher) : MongoAbstractFlow(wrapped) { +public class MapReduceFlow(private val wrapped: MapReducePublisher) : Flow { /** * Sets the number of documents to return per batch. * @@ -206,4 +209,6 @@ public class MapReduceFlow(private val wrapped: MapReducePublisher) * @return this */ public fun collation(collation: Collation?): MapReduceFlow = apply { wrapped.collation(collation) } + + public override suspend fun collect(collector: FlowCollector): Unit = wrapped.asFlow().collect(collector) } diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MongoAbstractFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MongoAbstractFlow.kt deleted file mode 100644 index 2fcb89a3992..00000000000 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MongoAbstractFlow.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.mongodb.kotlin.client.coroutine - -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.flow.AbstractFlow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.reactive.asFlow -import org.reactivestreams.Publisher - -/** - * The Mongo Abstract Flow implementation - * - * @param T The type of the result. - * @param wrapped the underlying publisher - */ -@OptIn(FlowPreview::class) -public sealed class MongoAbstractFlow(private val wrapped: Publisher) : AbstractFlow() { - - override suspend fun collectSafely(collector: FlowCollector) { - wrapped.asFlow().collect(collector) - } -} From 4b19e7486cf67e9609a16d30b9b16e1d6bc15328 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 12 Jun 2023 09:29:14 +0100 Subject: [PATCH 3/3] Kotlin coroutine update. As the Flow interface is not stable for inheritance in 3rd party libraries, use delegation instead. JAVA-4950 --- .../com/mongodb/kotlin/client/coroutine/AggregateFlow.kt | 2 +- .../kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt | 2 +- .../kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt | 2 +- .../com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt | 3 ++- .../com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt | 2 +- .../com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt | 2 +- .../com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt | 2 +- 7 files changed, 8 insertions(+), 7 deletions(-) diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt index 787f7fbd222..683746efc96 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt @@ -34,7 +34,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [Aggregation command](https://www.mongodb.com/docs/manual/reference/command/aggregate) */ -public class AggregateFlow(private val wrapped: AggregatePublisher) : Flow { +public class AggregateFlow(private val wrapped: AggregatePublisher) : Flow by wrapped.asFlow() { /** * Sets the number of documents to return per batch. diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt index edca50a58b0..3583e4a2390 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt @@ -30,7 +30,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [Distinct command](https://www.mongodb.com/docs/manual/reference/command/distinct/) */ -public class DistinctFlow(private val wrapped: DistinctPublisher) : Flow { +public class DistinctFlow(private val wrapped: DistinctPublisher) : Flow by wrapped.asFlow() { /** * Sets the number of documents to return per batch. diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt index 5f5381a85f3..ed0992b1bf7 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt @@ -34,7 +34,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [Collection filter](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/) */ -public class FindFlow(private val wrapped: FindPublisher) : Flow { +public class FindFlow(private val wrapped: FindPublisher) : Flow by wrapped.asFlow() { /** * Sets the number of documents to return per batch. diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt index a35273046ac..bc205b7073f 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt @@ -29,7 +29,8 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [List collections](https://www.mongodb.com/docs/manual/reference/command/listCollections/) */ -public class ListCollectionsFlow(private val wrapped: ListCollectionsPublisher) : Flow { +public class ListCollectionsFlow(private val wrapped: ListCollectionsPublisher) : + Flow by wrapped.asFlow() { /** * Sets the maximum execution time on the server for this operation. * diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt index ad642da8d25..4b56333bb38 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt @@ -29,7 +29,7 @@ import org.bson.conversions.Bson * @param T The type of the result. * @see [List databases](https://www.mongodb.com/docs/manual/reference/command/listDatabases/) */ -public class ListDatabasesFlow(private val wrapped: ListDatabasesPublisher) : Flow { +public class ListDatabasesFlow(private val wrapped: ListDatabasesPublisher) : Flow by wrapped.asFlow() { /** * Sets the maximum execution time on the server for this operation. * diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt index cacd0853a72..9e856d28ee3 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt @@ -28,7 +28,7 @@ import org.bson.BsonValue * @param T The type of the result. * @see [List indexes](https://www.mongodb.com/docs/manual/reference/command/listIndexes/) */ -public class ListIndexesFlow(private val wrapped: ListIndexesPublisher) : Flow { +public class ListIndexesFlow(private val wrapped: ListIndexesPublisher) : Flow by wrapped.asFlow() { /** * Sets the maximum execution time on the server for this operation. * diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt index b43fd8e2818..aef6ffedb31 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt @@ -36,7 +36,7 @@ import org.bson.conversions.Bson * @see [Map Reduce](https://www.mongodb.com/docs/manual/reference/command/mapReduce/) */ @Deprecated("Map Reduce has been deprecated. Use Aggregation instead", replaceWith = ReplaceWith("")) -public class MapReduceFlow(private val wrapped: MapReducePublisher) : Flow { +public class MapReduceFlow(private val wrapped: MapReducePublisher) : Flow by wrapped.asFlow() { /** * Sets the number of documents to return per batch. *