Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kotlin coroutine update. #1135

Merged
merged 3 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<ID>MaxLineLength:MapReduceIterable.kt$MapReduceIterable$*</ID>
<ID>SwallowedException:MockitoHelper.kt$MockitoHelper.DeepReflectionEqMatcher$e: Throwable</ID>
<ID>TooManyFunctions:ClientSession.kt$ClientSession : jClientSession</ID>
<ID>TooManyFunctions:FindFlow.kt$FindFlow&lt;T : Any> : Flow</ID>
<ID>TooManyFunctions:FindFlow.kt$FindFlow&lt;T : Any> : MongoAbstractFlow</ID>
<ID>TooManyFunctions:FindIterable.kt$FindIterable&lt;T : Any> : MongoIterable</ID>
<ID>TooManyFunctions:MongoCollection.kt$MongoCollection&lt;T : Any></ID>
<ID>TooManyFunctions:MongoDatabase.kt$MongoDatabase</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import com.mongodb.ExplainVerbosity
import com.mongodb.client.model.Collation
import com.mongodb.reactivestreams.client.AggregatePublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
import org.bson.BsonValue
Expand All @@ -34,7 +31,7 @@ import org.bson.conversions.Bson
* @param T The type of the result.
* @see [Aggregation command](https://www.mongodb.com/docs/manual/reference/command/aggregate)
*/
public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>) : Flow<T> {
public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>) : MongoAbstractFlow<T>(wrapped) {

/**
* Sets the number of documents to return per batch.
Expand Down Expand Up @@ -167,7 +164,6 @@ public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>)
/**
* Explain the execution plan for this operation with the given verbosity level
*
* @param R the type of the document class
* @param verbosity the verbosity of the explanation
* @return the execution plan
* @see [Explain command](https://www.mongodb.com/docs/manual/reference/command/explain/)
Expand Down Expand Up @@ -198,6 +194,4 @@ public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>)
*/
public suspend inline fun <reified R : Any> explain(verbosity: ExplainVerbosity? = null): R =
explain(R::class.java, verbosity)

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.mongodb.client.model.changestream.FullDocumentBeforeChange
import com.mongodb.reactivestreams.client.ChangeStreamPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.bson.BsonDocument
import org.bson.BsonTimestamp
Expand All @@ -37,7 +36,8 @@ import org.bson.BsonValue
*
* @param T The type of the result.
*/
public class ChangeStreamFlow<T : Any>(private val wrapped: ChangeStreamPublisher<T>) : Flow<ChangeStreamDocument<T>> {
public class ChangeStreamFlow<T : Any>(private val wrapped: ChangeStreamPublisher<T>) :
MongoAbstractFlow<ChangeStreamDocument<T>>(wrapped) {

/**
* Sets the fullDocument value.
Expand Down Expand Up @@ -173,6 +173,4 @@ public class ChangeStreamFlow<T : Any>(private val wrapped: ChangeStreamPublishe
public fun showExpandedEvents(showExpandedEvents: Boolean): ChangeStreamFlow<T> = apply {
wrapped.showExpandedEvents(showExpandedEvents)
}
public override suspend fun collect(collector: FlowCollector<ChangeStreamDocument<T>>): Unit =
wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ package com.mongodb.kotlin.client.coroutine
import com.mongodb.client.model.Collation
import com.mongodb.reactivestreams.client.DistinctPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.bson.BsonValue
import org.bson.conversions.Bson

Expand All @@ -30,7 +27,7 @@ import org.bson.conversions.Bson
* @param T The type of the result.
* @see [Distinct command](https://www.mongodb.com/docs/manual/reference/command/distinct/)
*/
public class DistinctFlow<T : Any>(private val wrapped: DistinctPublisher<T>) : Flow<T> {
public class DistinctFlow<T : Any>(private val wrapped: DistinctPublisher<T>) : MongoAbstractFlow<T>(wrapped) {

/**
* Sets the number of documents to return per batch.
Expand Down Expand Up @@ -86,6 +83,4 @@ public class DistinctFlow<T : Any>(private val wrapped: DistinctPublisher<T>) :
* @return this
*/
public fun comment(comment: BsonValue?): DistinctFlow<T> = apply { wrapped.comment(comment) }

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import com.mongodb.ExplainVerbosity
import com.mongodb.client.model.Collation
import com.mongodb.reactivestreams.client.FindPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitSingle
import org.bson.BsonValue
import org.bson.Document
Expand All @@ -34,7 +31,7 @@ import org.bson.conversions.Bson
* @param T The type of the result.
* @see [Collection filter](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/)
*/
public class FindFlow<T : Any>(private val wrapped: FindPublisher<T>) : Flow<T> {
public class FindFlow<T : Any>(private val wrapped: FindPublisher<T>) : MongoAbstractFlow<T>(wrapped) {

/**
* Sets the number of documents to return per batch.
Expand Down Expand Up @@ -292,6 +289,4 @@ public class FindFlow<T : Any>(private val wrapped: FindPublisher<T>) : Flow<T>
*/
public suspend inline fun <reified R : Any> explain(verbosity: ExplainVerbosity? = null): R =
explain(R::class.java, verbosity)

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine

import com.mongodb.reactivestreams.client.ListCollectionsPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.bson.BsonValue
import org.bson.conversions.Bson

Expand All @@ -29,7 +26,8 @@ import org.bson.conversions.Bson
* @param T The type of the result.
* @see [List collections](https://www.mongodb.com/docs/manual/reference/command/listCollections/)
*/
public class ListCollectionsFlow<T : Any>(private val wrapped: ListCollectionsPublisher<T>) : Flow<T> {
public class ListCollectionsFlow<T : Any>(private val wrapped: ListCollectionsPublisher<T>) :
MongoAbstractFlow<T>(wrapped) {
/**
* Sets the maximum execution time on the server for this operation.
*
Expand Down Expand Up @@ -74,6 +72,4 @@ public class ListCollectionsFlow<T : Any>(private val wrapped: ListCollectionsPu
* @return this
*/
public fun comment(comment: BsonValue?): ListCollectionsFlow<T> = apply { wrapped.comment(comment) }

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine

import com.mongodb.reactivestreams.client.ListDatabasesPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.bson.BsonValue
import org.bson.conversions.Bson

Expand All @@ -29,7 +26,8 @@ import org.bson.conversions.Bson
* @param T The type of the result.
* @see [List databases](https://www.mongodb.com/docs/manual/reference/command/listDatabases/)
*/
public class ListDatabasesFlow<T : Any>(private val wrapped: ListDatabasesPublisher<T>) : Flow<T> {
public class ListDatabasesFlow<T : Any>(private val wrapped: ListDatabasesPublisher<T>) :
MongoAbstractFlow<T>(wrapped) {
/**
* Sets the maximum execution time on the server for this operation.
*
Expand Down Expand Up @@ -93,6 +91,4 @@ public class ListDatabasesFlow<T : Any>(private val wrapped: ListDatabasesPublis
* @return this
*/
public fun comment(comment: BsonValue?): ListDatabasesFlow<T> = apply { wrapped.comment(comment) }

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine

import com.mongodb.reactivestreams.client.ListIndexesPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.bson.BsonValue

/**
Expand All @@ -28,7 +25,7 @@ import org.bson.BsonValue
* @param T The type of the result.
* @see [List indexes](https://www.mongodb.com/docs/manual/reference/command/listIndexes/)
*/
public class ListIndexesFlow<T : Any>(private val wrapped: ListIndexesPublisher<T>) : Flow<T> {
public class ListIndexesFlow<T : Any>(private val wrapped: ListIndexesPublisher<T>) : MongoAbstractFlow<T>(wrapped) {
/**
* Sets the maximum execution time on the server for this operation.
*
Expand Down Expand Up @@ -65,6 +62,4 @@ public class ListIndexesFlow<T : Any>(private val wrapped: ListIndexesPublisher<
* @return this
*/
public fun comment(comment: BsonValue?): ListIndexesFlow<T> = apply { wrapped.comment(comment) }

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import com.mongodb.client.model.Collation
import com.mongodb.client.model.MapReduceAction
import com.mongodb.reactivestreams.client.MapReducePublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import org.bson.conversions.Bson

Expand All @@ -36,7 +33,7 @@ import org.bson.conversions.Bson
* @see [Map Reduce](https://www.mongodb.com/docs/manual/reference/command/mapReduce/)
*/
@Deprecated("Map Reduce has been deprecated. Use Aggregation instead", replaceWith = ReplaceWith(""))
public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>) : Flow<T> {
public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>) : MongoAbstractFlow<T>(wrapped) {
/**
* Sets the number of documents to return per batch.
*
Expand Down Expand Up @@ -209,6 +206,4 @@ public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>)
* @return this
*/
public fun collation(collation: Collation?): MapReduceFlow<T> = apply { wrapped.collation(collation) }

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.kotlin.client.coroutine

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.reactivestreams.Publisher

/**
* The Mongo Abstract Flow implementation
*
* @param T The type of the result.
* @param wrapped the underlying publisher
*/
@OptIn(FlowPreview::class)
public sealed class MongoAbstractFlow<T : Any>(private val wrapped: Publisher<T>) : AbstractFlow<T>() {

override suspend fun collectSafely(collector: FlowCollector<T>) {
wrapped.asFlow().collect(collector)
}
}