Skip to content

Commit

Permalink
Merge pull request #797 from simple-robot/event-dispatch-flow-context
Browse files Browse the repository at this point in the history
优化/改变 EventProcessor.push 默认实现中的行为:现在会直接使用 flowOn 来指定事件处理器所处的协程上下文
  • Loading branch information
ForteScarlet authored Feb 19, 2024
2 parents 1261807 + 6b67bf9 commit b4d1c6b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Project https://github.com/simple-robot/simpler-robot
* Email [email protected]
*
* This file is part of the Simple Robot Library.
* This file is part of the Simple Robot Library (Alias: simple-robot, simbot, etc.).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
Expand Down Expand Up @@ -41,8 +41,8 @@ import kotlin.jvm.JvmSynthetic
* ## 协程上下文
*
* 当通过 [EventProcessor.push] 推送一个事件并得到一个事件处理链时,
* 这其中的每一个事件处理器所处上下文可由 [EventDispatcherConfiguration.coroutineContext]
* 中的配置值或事件处理链 [Flow] 的结果来决定
* 这其中的每一个事件处理器所处上下文会通过 [Flow.flowOn] 运行在
* [EventDispatcherConfiguration.coroutineContext] 中
*
* ```kotlin
* val app = launchApplication(...) {
Expand All @@ -59,13 +59,14 @@ import kotlin.jvm.JvmSynthetic
* }
*
* val flow = app.eventDispatcher.push(event)
* // 上游的逻辑会通过 flowOn 运行在 context1 中
* .onEach { ... } // 会切换到 context3 上
* .flowOn(context3) // 将上游调度器切换至 context3
* .onEach { ... } // 会在 collect 所在的默认(当前)环境中
* .collect { ... } // 在默认(当前)环境收集结果
* ```
*
* 参考上述示例,协程上下文的使用“优先级”可近似地参考为 `context2` > `context1` > `context3` 。
* 参考上述示例,协程上下文的使用“优先级”可“近似地”参考为 `context2` > `context1` > `context3` 。
*
* ## Java API
*
Expand All @@ -92,26 +93,12 @@ public interface EventProcessor {
* 推送一个事件,
* 得到内部所有事件依次将其处理后得到最终的结果流。
*
* 结果流是 _冷流_ 。
*
* 结果流是 _冷流_ ,
* 只有当对结果进行收集时事件才会真正的被处理。
* 可以通过响应的流对事件处理量进行控制。
*
* 事件内部实际的调度器由构造 [EventProcessor] 时的配置属性和具体实现为准。
*
* 返回结果的 [Flow] 中每一次事件调度都可能伴随着上下文的切换(通过 [EventDispatcherConfiguration.coroutineContext] 的配置),
* 但并非通过 [Flow.flowOn] 进行切换,不会导致实际执行的事件调度逻辑比收集到的逻辑更多。
*
* ```kotlin
* eventDispatcher.push(event)
* .take(3)
* .collect { ... } // flow 中只会执行优先级最高的三个 listener 并收集到它们的结果
*
* eventDispatcher.push(event)
* .flowOn(Dispatchers.IO) // 切换事件调度流程中的上下文
* .take(3)
* .collect { ... } // flow 中切换了调度上下文,这可能会使所有的listener都被实际上的执行,但是只收集到3个最新的结果。
* ```
* 返回结果的 [Flow] 会通过 [Flow.flowOn]
* 使上游切换到 [EventDispatcherConfiguration.coroutineContext] 中。
*
* ## 异常
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Project https://github.com/simple-robot/simpler-robot
* Email [email protected]
*
* This file is part of the Simple Robot Library.
* This file is part of the Simple Robot Library (Alias: simple-robot, simbot, etc.).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
Expand All @@ -30,7 +30,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.withContext
import kotlinx.coroutines.flow.flowOn
import love.forte.simbot.common.PriorityConstant
import love.forte.simbot.common.attribute.MutableAttributeMap
import love.forte.simbot.common.attribute.mutableAttributeMapOf
Expand Down Expand Up @@ -330,15 +330,15 @@ public class SimpleEventDispatcherImpl(
}

private fun eventFlow(context: EventContext): Flow<EventResult> {
return if (dispatcherContext == EmptyCoroutineContext) {
flow {
dispatchInFlowWithoutCoroutineContext(context, this)
}
} else {
flow {
dispatchInFlow(context, dispatcherContext, this)
}
var flow = flow {
dispatchInFlow(context, this)
}

if (dispatcherContext != EmptyCoroutineContext) {
flow = flow.flowOn(dispatcherContext)
}

return flow
}

private data class EventContextImpl(
Expand All @@ -353,30 +353,10 @@ public class SimpleEventDispatcherImpl(

private suspend fun dispatchInFlow(
context: EventContext,
dispatcherContext: CoroutineContext,
collector: FlowCollector<EventResult>
) {
val listenerIterator = listenersQueue.iterator()

for (listenerInvoker in listenerIterator) {
val lContext = EventListenerContextImpl(context, listenerInvoker.listener)
val result = withContext(dispatcherContext) {
listenerInvoker.invokeAndCollectedOrErrorResult(lContext)
}

collector.emit(result)

if (result.isTruncated) {
break
}
}
}

private suspend fun dispatchInFlowWithoutCoroutineContext(
context: EventContext,
collector: FlowCollector<EventResult>
) {
val listenerIterator = listenersQueue.iterator()
for (listenerInvoker in listenerIterator) {
val lContext = EventListenerContextImpl(context, listenerInvoker.listener)
val result = listenerInvoker.invokeAndCollectedOrErrorResult(lContext)
Expand Down
2 changes: 1 addition & 1 deletion website

0 comments on commit b4d1c6b

Please sign in to comment.