Skip to content

Commit

Permalink
server: wip cluster mode load balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Nov 20, 2024
1 parent 432c178 commit aed6e0c
Show file tree
Hide file tree
Showing 15 changed files with 479 additions and 401 deletions.
7 changes: 7 additions & 0 deletions sdk/types/scrypted_python/scrypted_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ def sdk_init2(scryptedStatic: ScryptedStatic):
systemManager = sdk.systemManager
deviceManager = sdk.deviceManager
mediaManager = sdk.mediaManager
async def initDescriptors():
global api
try:
await api.setScryptedInterfaceDescriptors(TYPES_VERSION, ScryptedInterfaceDescriptors)
except:
pass
asyncio.ensure_future(initDescriptors())
if hasattr(sdk, 'clusterManager'):
clusterManager = sdk.clusterManager
zip = sdk.zip
Expand Down
3 changes: 3 additions & 0 deletions sdk/types/scrypted_python/scrypted_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,9 @@ class TamperState(TypedDict):
pass


TYPES_VERSION = "0.3.77"


class AirPurifier:

airPurifierState: AirPurifierState
Expand Down
5 changes: 4 additions & 1 deletion sdk/types/src/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ function selfSignature(method: any) {

const enums = schema.children?.filter((child) => child.kind === ReflectionKind.Enum) ?? [];
const interfaces = schema.children?.filter((child: any) => Object.values(ScryptedInterface).includes(child.name)) ?? [];
let python = '';
let python = `
TYPES_VERSION = "${typesVersion}"
`;

for (const iface of ['Logger', 'DeviceManager', 'SystemManager', 'MediaManager', 'EndpointManager', 'ClusterManager']) {
const child = schema.children?.find((child: any) => child.name === iface);
Expand Down
9 changes: 5 additions & 4 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"dependencies": {
"@scrypted/ffmpeg-static": "^6.1.0-build3",
"@scrypted/node-pty": "^1.0.22",
"@scrypted/types": "^0.3.69",
"@scrypted/types": "^0.3.77",
"adm-zip": "^0.5.16",
"body-parser": "^1.20.3",
"cookie-parser": "^1.4.7",
Expand Down
18 changes: 18 additions & 0 deletions server/python/plugin_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,21 @@ def notifyEventDetails(self, id: str, eventDetails: scrypted_python.scrypted_sdk

return True

class ClusterManager(scrypted_python.scrypted_sdk.types.ClusterManager):
def __init__(self, api: Any):
self.api = api
self.clusterService = None

def getClusterMode(self) -> Any | Any:
return os.getenv("SCRYPTED_CLUSTER_MODE", None)

def getClusterWorkerId(self) -> str:
return os.getenv("SCRYPTED_CLUSTER_WORKER_ID", None)

async def getClusterWorkers(self) -> Mapping[str, scrypted_python.scrypted_sdk.types.ClusterWorker]:
self.clusterService = self.clusterService or asyncio.ensure_future(self.api.getComponent("cluster-fork"))
cs = await self.clusterService
return await cs.getClusterWorkers()

class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager):
def __init__(
Expand Down Expand Up @@ -554,6 +569,7 @@ def __init__(
self.systemState: Mapping[str, Mapping[str, SystemDeviceState]] = {}
self.nativeIds: Mapping[str, DeviceStorage] = {}
self.mediaManager: MediaManager
self.clusterManager: ClusterManager
self.consoles: Mapping[str, Future[Tuple[StreamReader, StreamWriter]]] = {}
self.peer = clusterSetup.peer
self.clusterSetup = clusterSetup
Expand Down Expand Up @@ -768,11 +784,13 @@ def read_requirements(filename: str) -> str:
self.systemManager = SystemManager(self.api, self.systemState)
self.deviceManager = DeviceManager(self.nativeIds, self.systemManager)
self.mediaManager = MediaManager(await self.api.getMediaManager())
self.clusterManager = ClusterManager(self.api)

try:
sdk.systemManager = self.systemManager
sdk.deviceManager = self.deviceManager
sdk.mediaManager = self.mediaManager
sdk.clusterManager = self.clusterManager
sdk.remote = self
sdk.api = self.api
sdk.zip = zip
Expand Down
261 changes: 261 additions & 0 deletions server/src/plugin/device.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import { Device, DeviceManager, DeviceManifest, DeviceState, Logger, ScryptedNativeId, WritableDeviceState } from '@scrypted/types';
import { RpcPeer } from '../rpc';
import { PluginAPI, PluginLogger } from './plugin-api';
import { checkProperty } from './plugin-state-check';
import { SystemManagerImpl } from './system';

class DeviceLogger implements Logger {
nativeId: ScryptedNativeId;
api: PluginAPI;
logger: Promise<PluginLogger>;

constructor(api: PluginAPI, nativeId: ScryptedNativeId, public console: any) {
this.api = api;
this.nativeId = nativeId;
}

async ensureLogger(): Promise<PluginLogger> {
if (!this.logger)
this.logger = this.api.getLogger(this.nativeId);
return await this.logger;
}

async log(level: string, message: string) {
(await this.ensureLogger()).log(level, message);
}

a(msg: string): void {
this.log('a', msg);
}
async clear() {
(await this.ensureLogger()).clear();
}
async clearAlert(msg: string) {
(await this.ensureLogger()).clearAlert(msg);
}
async clearAlerts() {
(await this.ensureLogger()).clearAlerts();
}
d(msg: string): void {
this.log('d', msg);
}
e(msg: string): void {
this.log('e', msg);
}
i(msg: string): void {
this.log('i', msg);
}
v(msg: string): void {
this.log('v', msg);
}
w(msg: string): void {
this.log('w', msg);
}
}

export class DeviceStateProxyHandler implements ProxyHandler<any> {
constructor(public deviceManager: DeviceManagerImpl, public id: string,
public setState: (property: string, value: any) => Promise<void>) {
}

get?(target: any, p: PropertyKey, receiver: any) {
if (p === 'id')
return this.id;
if (p === RpcPeer.PROPERTY_PROXY_PROPERTIES)
return { id: this.id }
if (p === 'setState')
return this.setState;
return this.deviceManager.systemManager.state[this.id][p as string]?.value;
}

set?(target: any, p: PropertyKey, value: any, receiver: any) {
checkProperty(p.toString(), value);
this.deviceManager.systemManager.state[this.id][p as string] = {
value,
};
this.setState(p.toString(), value);
return true;
}
}

interface DeviceManagerDevice {
id: string;
storage: { [key: string]: any };
}

export class DeviceManagerImpl implements DeviceManager {
api: PluginAPI;
nativeIds = new Map<string, DeviceManagerDevice>();
deviceStorage = new Map<string, StorageImpl>();
mixinStorage = new Map<string, Map<string, StorageImpl>>();

constructor(public systemManager: SystemManagerImpl,
public getDeviceConsole: (nativeId?: ScryptedNativeId) => Console,
public getMixinConsole: (mixinId: string, nativeId?: ScryptedNativeId) => Console) {
}

async requestRestart() {
return this.api.requestRestart();
}

getDeviceLogger(nativeId?: ScryptedNativeId): Logger {
return new DeviceLogger(this.api, nativeId, this.getDeviceConsole?.(nativeId) || console);
}

getDeviceState(nativeId?: any): DeviceState {
const handler = new DeviceStateProxyHandler(this, this.nativeIds.get(nativeId).id,
(property, value) => this.api.setState(nativeId, property, value));
return new Proxy(handler, handler);
}

createDeviceState(id: string, setState: (property: string, value: any) => Promise<void>): WritableDeviceState {
const handler = new DeviceStateProxyHandler(this, id, setState);
return new Proxy(handler, handler);
}

getDeviceStorage(nativeId?: any): StorageImpl {
let ret = this.deviceStorage.get(nativeId);
if (!ret) {
ret = new StorageImpl(this, nativeId);
this.deviceStorage.set(nativeId, ret);
}
return ret;
}
getMixinStorage(id: string, nativeId?: ScryptedNativeId) {
let ms = this.mixinStorage.get(nativeId);
if (!ms) {
ms = new Map();
this.mixinStorage.set(nativeId, ms);
}
let ret = ms.get(id);
if (!ret) {
ret = new StorageImpl(this, nativeId, `mixin:${id}:`);
ms.set(id, ret);
}
return ret;
}
pruneMixinStorage() {
for (const nativeId of this.nativeIds.keys()) {
const storage = this.nativeIds.get(nativeId).storage;
for (const key of Object.keys(storage)) {
if (!key.startsWith('mixin:'))
continue;
const [, id,] = key.split(':');
// there's no rush to persist this, it will happen automatically on the plugin
// persisting something at some point.
// the key itself is unreachable due to the device no longer existing.
if (id && !this.systemManager.state[id])
delete storage[key];
}
}
}
async onMixinEvent(id: string, nativeId: ScryptedNativeId, eventInterface: string, eventData: any) {
return this.api.onMixinEvent(id, nativeId, eventInterface, eventData);
}
getNativeIds(): string[] {
return Array.from(this.nativeIds.keys());
}
async onDeviceDiscovered(device: Device) {
return this.api.onDeviceDiscovered(device);
}
async onDeviceRemoved(nativeId: string) {
return this.api.onDeviceRemoved(nativeId);
}
async onDeviceEvent(nativeId: any, eventInterface: any, eventData?: any) {
return this.api.onDeviceEvent(nativeId, eventInterface, eventData);
}
async onDevicesChanged(devices: DeviceManifest) {
return this.api.onDevicesChanged(devices);
}
}


function toStorageString(value: any) {
if (value === null)
return 'null';
if (value === undefined)
return 'undefined';

return value.toString();
}

export class StorageImpl implements Storage {
api: PluginAPI;
[name: string]: any;

private static allowedMethods = [
'length',
'clear',
'getItem',
'setItem',
'key',
'removeItem',
];
private static indexedHandler: ProxyHandler<StorageImpl> = {
get(target, property) {
const keyString = property.toString();
if (StorageImpl.allowedMethods.includes(keyString)) {
const f = target[keyString];
if (keyString === 'length')
return f;
return f.bind(target);
}
return target.getItem(toStorageString(property));
},
set(target, property, value): boolean {
target.setItem(toStorageString(property), value);
return true;
}
};

constructor(public deviceManager: DeviceManagerImpl, public nativeId: ScryptedNativeId, public prefix?: string) {
this.deviceManager = deviceManager;
this.api = deviceManager.api;
this.nativeId = nativeId;
if (!this.prefix)
this.prefix = '';

return new Proxy(this, StorageImpl.indexedHandler);
}

get storage(): { [key: string]: any } {
return this.deviceManager.nativeIds.get(this.nativeId).storage;
}

get length(): number {
return Object.keys(this.storage).filter(key => key.startsWith(this.prefix)).length;
}

clear(): void {
if (!this.prefix) {
this.deviceManager.nativeIds.get(this.nativeId).storage = {};
}
else {
const storage = this.storage;
Object.keys(this.storage).filter(key => key.startsWith(this.prefix)).forEach(key => delete storage[key]);
}
this.api.setStorage(this.nativeId, this.storage);
}

getItem(key: string): string {
return this.storage[this.prefix + key];
}
key(index: number): string {
if (!this.prefix) {
return Object.keys(this.storage)[index];
}
return Object.keys(this.storage).filter(key => key.startsWith(this.prefix))[index].substring(this.prefix.length);
}
removeItem(key: string): void {
delete this.storage[this.prefix + key];
this.api.setStorage(this.nativeId, this.storage);
}
setItem(key: string, value: string): void {
key = toStorageString(key);
value = toStorageString(value);
if (this.storage[this.prefix + key] === value)
return;
this.storage[this.prefix + key] = value;
this.api.setStorage(this.nativeId, this.storage);
}
}
Loading

0 comments on commit aed6e0c

Please sign in to comment.