Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue notifications in an async stream instead of blocking forward progress. #2946

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 62 additions & 48 deletions NSE/Sources/NotificationServiceExtension.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,19 @@ private let keychainController = KeychainController(service: .sessions,
accessGroup: InfoPlistReader.main.keychainAccessGroupIdentifier)

class NotificationServiceExtension: UNNotificationServiceExtension {
/// It's imperative that we create **at most** one UserSession/Client per process.
/// Apparently ``didReceive(_:withContentHandler:)`` may potentially be called concurrently and/or from
/// different processes. This queue manages the UserSession/Client to ensure we only ever create a single instance.
private static let notificationQueue = NotificationQueue()

private var handler: ((UNNotificationContent) -> Void)?
private var modifiedContent: UNMutableNotificationContent?

// Used to create one single UserSession across process/instances/runs
private static let serialQueue = DispatchQueue(label: "io.element.elementx.nse")
private static var userSession: NSEUserSession?

override func didReceive(_ request: UNNotificationRequest,
withContentHandler contentHandler: @escaping (UNNotificationContent) -> Void) {
guard !DataProtectionManager.isDeviceLockedAfterReboot(containerURL: URL.appGroupContainerDirectory),
let roomId = request.roomId,
let eventId = request.eventId,
let roomID = request.roomId,
let eventID = request.eventId,
let clientID = request.pusherNotificationClientIdentifier,
let credentials = keychainController.restorationTokens().first(where: { $0.restorationToken.pusherNotificationClientIdentifier == clientID }) else {
// We cannot process this notification, it might be due to one of these:
Expand All @@ -78,30 +79,18 @@ class NotificationServiceExtension: UNNotificationServiceExtension {
NSELogger.logMemory(with: tag)
MXLog.info("\(tag) Payload came: \(request.content.userInfo)")

Self.serialQueue.sync {
if Self.userSession == nil {
// This function might be run concurrently and from different processes
// It's imperative that we create **at most** one UserSession/Client per process
Task.synchronous {
do {
Self.userSession = try await NSEUserSession(credentials: credentials, clientSessionDelegate: keychainController)
} catch {
MXLog.error("Failed creating user session with error: \(error)")
}
}
}
Self.notificationQueue.handle(.init(credentials: credentials) { [weak self] userSession in
guard let self else { return }

if Self.userSession == nil {
guard let userSession else {
return discard(unreadCount: request.unreadCount)
}
}

Task {
await run(with: credentials,
roomId: roomId,
eventId: eventId,

await run(with: userSession,
roomID: roomID,
eventID: eventID,
unreadCount: request.unreadCount)
}
})
}

override func serviceExtensionTimeWillExpire() {
Expand All @@ -111,19 +100,14 @@ class NotificationServiceExtension: UNNotificationServiceExtension {
notify(unreadCount: nil)
}

private func run(with credentials: KeychainCredentials,
roomId: String,
eventId: String,
private func run(with userSession: NSEUserSession,
roomID: String,
eventID: String,
unreadCount: Int?) async {
MXLog.info("\(tag) run with roomId: \(roomId), eventId: \(eventId)")

guard let userSession = Self.userSession else {
MXLog.error("Invalid NSE User Session, discarding.")
return discard(unreadCount: unreadCount)
}
MXLog.info("\(tag) run with roomID: \(roomID), eventID: \(eventID)")

do {
guard let itemProxy = await userSession.notificationItemProxy(roomID: roomId, eventID: eventId) else {
guard let itemProxy = await userSession.notificationItemProxy(roomID: roomID, eventID: eventID) else {
MXLog.info("\(tag) no notification for the event, discard")
return discard(unreadCount: unreadCount)
}
Expand Down Expand Up @@ -243,19 +227,49 @@ class NotificationServiceExtension: UNNotificationServiceExtension {
}
}

// https://stackoverflow.com/a/77300959/730924
private extension Task where Failure == Error {
/// Performs an async task in a sync context.
///
/// - Note: This function blocks the thread until the given operation is finished. The caller is responsible for managing multithreading.
static func synchronous(priority: TaskPriority? = nil, operation: @escaping @Sendable () async throws -> Success) {
let semaphore = DispatchSemaphore(value: 0)
/// A piece of work to handle a single notification.
struct NotificationHandler {
/// The keychain credentials for the notification's user session.
let credentials: KeychainCredentials
/// The work to perform when handling the notification. This provides the necessary user session.
let process: (NSEUserSession?) async -> Void
}

Task(priority: priority) {
defer { semaphore.signal() }
return try await operation()
/// A queue to handle incoming notification requests sequentially.
private class NotificationQueue {
private let continuation: AsyncStream<NotificationHandler>.Continuation
/// The task that processes the stream of notifications.
private var streamTask: Task<Void, Never>?
/// The shared user session for the queue.
///
/// This should **only** be accessed within the `for await in`.
private var userSession: NSEUserSession?

deinit { streamTask?.cancel() }

init() {
let (stream, continuation) = AsyncStream<NotificationHandler>.makeStream(bufferingPolicy: .unbounded)
self.continuation = continuation

streamTask = Task { [weak self] in
for await request in stream {
guard let self else { return }

if userSession == nil {
do {
userSession = try await NSEUserSession(credentials: request.credentials, clientSessionDelegate: keychainController)
} catch {
MXLog.error("Failed creating user session with error: \(error)")
}
}

await request.process(userSession)
}
}

semaphore.wait()
}

/// Adds a notification to the queue.
func handle(_ notification: NotificationHandler) {
continuation.yield(notification)
}
}
Loading