Skip to content

Commit

Permalink
feat: SOF-1135 make createDevice and initDevice abortable
Browse files Browse the repository at this point in the history
use AbortSignal to allow safely aborting the methods when desired
  • Loading branch information
ianshade committed Dec 16, 2022
1 parent 8add5f1 commit 70bfef2
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

exports[`index imports 1`] = `
Array [
"AbortError",
"AtemMediaPoolType",
"AtemTransitionStyle",
"BlendMode",
Expand Down
157 changes: 119 additions & 38 deletions packages/timeline-state-resolver/src/conductor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ export type ConductorEvents = {
timeTrace: [trace: FinishedTrace]
}

export class AbortError extends Error {
name = 'AbortError'
}

/**
* The Conductor class serves as the main class for interacting. It contains
* methods for setting mappings, timelines and adding/removing devices. It keeps
Expand Down Expand Up @@ -402,17 +406,25 @@ export class Conductor extends EventEmitter<ConductorEvents> {
* Creates an uninitialised device that can be referenced by the timeline and mappings.
* @param deviceId Id used by the mappings to reference the device.
* @param deviceOptions The options used to initalize the device
* @param options Additional options
* @returns A promise that resolves with the created device, or rejects with an error message.
*/
public async createDevice(
deviceId: string,
deviceOptions: DeviceOptionsAnyInternal
deviceOptions: DeviceOptionsAnyInternal,
options?: { signal?: AbortSignal }
): Promise<DeviceContainer<DeviceOptionsBase<any>>> {
let newDevice: DeviceContainer<DeviceOptionsBase<any>> | undefined
const throwIfAborted = () => {
if (options?.signal?.aborted) {
throw new AbortError(`Device "${deviceId}" creation aborted`)
}
}
try {
if (this.devices.has(deviceId)) {
throw new Error(`Device "${deviceId}" already exists when creating device`)
}
throwIfAborted()

const threadedClassOptions: ThreadedClassConfig = {
threadUsage: deviceOptions.threadUsage || 1,
Expand All @@ -426,8 +438,10 @@ export class Conductor extends EventEmitter<ConductorEvents> {
return this.getCurrentTime()
}

let newDevicePs: Promise<DeviceContainer<DeviceOptionsBase<any>> | undefined>

if (deviceOptions.type === DeviceType.ABSTRACT) {
newDevice = await DeviceContainer.create<DeviceOptionsAbstractInternal, typeof AbstractDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsAbstractInternal, typeof AbstractDevice>(
'../../dist/integrations/abstract/index.js',
'AbstractDevice',
deviceId,
Expand All @@ -440,7 +454,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
)
} else if (deviceOptions.type === DeviceType.CASPARCG) {
// Add CasparCG device:
newDevice = await DeviceContainer.create<DeviceOptionsCasparCGInternal, typeof CasparCGDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsCasparCGInternal, typeof CasparCGDevice>(
'../../dist/integrations/casparCG/index.js',
'CasparCGDevice',
deviceId,
Expand All @@ -449,7 +463,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.ATEM) {
newDevice = await DeviceContainer.create<DeviceOptionsAtemInternal, typeof AtemDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsAtemInternal, typeof AtemDevice>(
'../../dist/integrations/atem/index.js',
'AtemDevice',
deviceId,
Expand All @@ -458,7 +472,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.HTTPSEND) {
newDevice = await DeviceContainer.create<DeviceOptionsHTTPSendInternal, typeof HTTPSendDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsHTTPSendInternal, typeof HTTPSendDevice>(
'../../dist/integrations/httpSend/index.js',
'HTTPSendDevice',
deviceId,
Expand All @@ -467,7 +481,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.HTTPWATCHER) {
newDevice = await DeviceContainer.create<DeviceOptionsHTTPWatcherInternal, typeof HTTPWatcherDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsHTTPWatcherInternal, typeof HTTPWatcherDevice>(
'../../dist/integrations/httpWatcher/index.js',
'HTTPWatcherDevice',
deviceId,
Expand All @@ -476,7 +490,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.LAWO) {
newDevice = await DeviceContainer.create<DeviceOptionsLawoInternal, typeof LawoDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsLawoInternal, typeof LawoDevice>(
'../../dist/integrations/lawo/index.js',
'LawoDevice',
deviceId,
Expand All @@ -485,7 +499,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.TCPSEND) {
newDevice = await DeviceContainer.create<DeviceOptionsTCPSendInternal, typeof TCPSendDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsTCPSendInternal, typeof TCPSendDevice>(
'../../dist/integrations/tcpSend/index.js',
'TCPSendDevice',
deviceId,
Expand All @@ -494,7 +508,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.PANASONIC_PTZ) {
newDevice = await DeviceContainer.create<DeviceOptionsPanasonicPTZInternal, typeof PanasonicPtzDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsPanasonicPTZInternal, typeof PanasonicPtzDevice>(
'../../dist/integrations/panasonicPTZ/index.js',
'PanasonicPtzDevice',
deviceId,
Expand All @@ -503,7 +517,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.HYPERDECK) {
newDevice = await DeviceContainer.create<DeviceOptionsHyperdeckInternal, typeof HyperdeckDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsHyperdeckInternal, typeof HyperdeckDevice>(
'../../dist/integrations/hyperdeck/index.js',
'HyperdeckDevice',
deviceId,
Expand All @@ -512,7 +526,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.PHAROS) {
newDevice = await DeviceContainer.create<DeviceOptionsPharosInternal, typeof PharosDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsPharosInternal, typeof PharosDevice>(
'../../dist/integrations/pharos/index.js',
'PharosDevice',
deviceId,
Expand All @@ -521,7 +535,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.OSC) {
newDevice = await DeviceContainer.create<DeviceOptionsOSCInternal, typeof OSCMessageDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsOSCInternal, typeof OSCMessageDevice>(
'../../dist/integrations/osc/index.js',
'OSCMessageDevice',
deviceId,
Expand All @@ -530,7 +544,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.QUANTEL) {
newDevice = await DeviceContainer.create<DeviceOptionsQuantelInternal, typeof QuantelDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsQuantelInternal, typeof QuantelDevice>(
'../../dist/integrations/quantel/index.js',
'QuantelDevice',
deviceId,
Expand All @@ -539,7 +553,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.SHOTOKU) {
newDevice = await DeviceContainer.create<DeviceOptionsShotokuInternal, typeof ShotokuDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsShotokuInternal, typeof ShotokuDevice>(
'../../dist/integrations/shotoku/index.js',
'ShotokuDevice',
deviceId,
Expand All @@ -548,7 +562,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.SISYFOS) {
newDevice = await DeviceContainer.create<DeviceOptionsSisyfosInternal, typeof SisyfosMessageDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsSisyfosInternal, typeof SisyfosMessageDevice>(
'../../dist/integrations/sisyfos/index.js',
'SisyfosMessageDevice',
deviceId,
Expand All @@ -557,7 +571,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.VIZMSE) {
newDevice = await DeviceContainer.create<DeviceOptionsVizMSEInternal, typeof VizMSEDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsVizMSEInternal, typeof VizMSEDevice>(
'../../dist/integrations/vizMSE/index.js',
'VizMSEDevice',
deviceId,
Expand All @@ -566,7 +580,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.SINGULAR_LIVE) {
newDevice = await DeviceContainer.create<DeviceOptionsSingularLiveInternal, typeof SingularLiveDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsSingularLiveInternal, typeof SingularLiveDevice>(
'../../dist/integrations/singularLive/index.js',
'SingularLiveDevice',
deviceId,
Expand All @@ -575,7 +589,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.VMIX) {
newDevice = await DeviceContainer.create<DeviceOptionsVMixInternal, typeof VMixDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsVMixInternal, typeof VMixDevice>(
'../../dist/integrations/vmix/index.js',
'VMixDevice',
deviceId,
Expand All @@ -584,7 +598,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.OBS) {
newDevice = await DeviceContainer.create<DeviceOptionsOBSInternal, typeof OBSDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsOBSInternal, typeof OBSDevice>(
'../../dist/integrations/obs/index.js',
'OBSDevice',
deviceId,
Expand All @@ -593,7 +607,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
threadedClassOptions
)
} else if (deviceOptions.type === DeviceType.TELEMETRICS) {
newDevice = await DeviceContainer.create<DeviceOptionsTelemetrics, typeof TelemetricsDevice>(
newDevicePs = DeviceContainer.create<DeviceOptionsTelemetrics, typeof TelemetricsDevice>(
'../../dist/devices/telemetrics.js',
'TelemetricsDevice',
deviceId,
Expand All @@ -607,6 +621,18 @@ export class Conductor extends EventEmitter<ConductorEvents> {
return Promise.reject(`No matching device type for "${type}" ("${DeviceType[type]}") found in conductor`)
}

newDevice = await makeAbortable(async () => {
throwIfAborted()
const newDevice = await newDevicePs
if (options?.signal?.aborted) {
// if the promise above did not resolve before aborted,
// this executes some time after raceAbortable rejects, serving as a cleanup
await this.terminateUnwantedDevice(newDevice)
throw new AbortError(`Device "${deviceId}" creation aborted`)
}
return newDevice
}, options?.signal)

if (!newDevice) {
const type: any = deviceOptions.type
return Promise.reject(`No device could be created for "${type}" ("${DeviceType[type]}")`)
Expand All @@ -621,33 +647,43 @@ export class Conductor extends EventEmitter<ConductorEvents> {
if (this.devices.has(deviceId)) {
throw new Error(`Device "${deviceId}" already exists when creating device`)
}
throwIfAborted()
this.devices.set(deviceId, newDevice)

return newDevice
} catch (e) {
if (newDevice) {
try {
await newDevice.terminate()
} catch (e) {
this.emit('error', `Cleanup failed of aborted device "${newDevice.deviceId}": ${e}`)
}
}
await this.terminateUnwantedDevice(newDevice)
this.devices.delete(deviceId)
this.emit('error', 'conductor.createDevice', e)
return Promise.reject(e)
}
}
private async terminateUnwantedDevice(newDevice: DeviceContainer<DeviceOptionsBase<any>> | undefined) {
if (newDevice) {
try {
await newDevice.terminate()
} catch (e) {
this.emit('error', `Cleanup failed of aborted device "${newDevice.deviceId}": ${e}`)
}
}
}
/**
* Initialises an existing device that can be referenced by the timeline and mappings.
* @param deviceId Id used by the mappings to reference the device.
* @param deviceOptions The options used to initalize the device
* @param activeRundownPlaylistId Id of the current rundown playlist
* @param options Additional options
* @returns A promise that resolves with the initialised device, or rejects with an error message.
*/
public async initDevice(
deviceId: string,
deviceOptions: DeviceOptionsAnyInternal,
activeRundownPlaylistId?: string
activeRundownPlaylistId?: string,
options?: { signal?: AbortSignal }
): Promise<DeviceContainer<DeviceOptionsBase<any>>> {
if (options?.signal?.aborted) {
throw new AbortError(`Device "${deviceId}" initialisation aborted`)
}
const newDevice = this.devices.get(deviceId)

if (!newDevice) {
Expand All @@ -657,19 +693,24 @@ export class Conductor extends EventEmitter<ConductorEvents> {
if (newDevice.initialized === true) {
throw new Error('Device ' + deviceId + ' is already initialized!')
}

this.emit(
'info',
`Initializing device ${newDevice.deviceId} (${newDevice.instanceId}) of type ${DeviceType[deviceOptions.type]}...`
)

await newDevice.init(deviceOptions.options, activeRundownPlaylistId)

await newDevice.reloadProps() // because the device name might have changed after init

this.emit('info', `Device ${newDevice.deviceId} (${newDevice.instanceId}) initialized!`)

return newDevice
return makeAbortable(async () => {
const throwIfAborted = () => {
if (options?.signal?.aborted) {
throw new AbortError(`Device "${deviceId}" initialisation aborted`)
}
}
throwIfAborted()
await newDevice.init(deviceOptions.options, activeRundownPlaylistId)
throwIfAborted()
await newDevice.reloadProps()
throwIfAborted()
this.emit('info', `Device ${newDevice.deviceId} (${newDevice.instanceId}) initialized!`)
return newDevice
}, options?.signal)
}
/**
* Safely remove a device
Expand All @@ -685,7 +726,7 @@ export class Conductor extends EventEmitter<ConductorEvents> {
])
} catch (e) {
// An error while terminating is probably not that important, since we'll kill the instance anyway
this.emit('warning', 'Error when terminating device', e)
this.emit('warning', `Error when terminating device ${e}`)
}
await device.terminate()
this.devices.delete(deviceId)
Expand Down Expand Up @@ -1492,3 +1533,43 @@ function removeParentFromState(o: TimelineState): TimelineState {
}
return o
}

/**
* If aborted, rejects as soon as possible, but lets the wraped function safely resolve or reject on its own
* @param func async function to wrap
* @param abortSignal the AbortSignal
* @returns Promise of the same type as `func`
*/
async function makeAbortable<T>(
func: (abortSignal?: AbortSignal) => Promise<T>,
abortSignal?: AbortSignal
): Promise<T> {
const mainPs = func(abortSignal)
if (!abortSignal) {
return mainPs
}
let resolveAbortPs: Function
const abortPs = new Promise<void>((resolve, reject) => {
resolveAbortPs = () => {
resolve()
// @ts-expect-error
abortSignal.removeEventListener('abort', rejectPromise)
}
const rejectPromise = () => {
reject(new AbortError())
}
// @ts-expect-error
abortSignal.addEventListener('abort', rejectPromise, { once: true })
})
return Promise.race([mainPs, abortPs])
.then((result) => {
// only mainPs could have resolved, so the result must be T
resolveAbortPs()
return result as T
})
.catch((reason) => {
// mainPs or abortPs might have rejected; calling resolveAbortPs in the latter case is safe
resolveAbortPs()
throw reason
})
}

0 comments on commit 70bfef2

Please sign in to comment.