Skip to content

Commit

Permalink
Refactor conjoin emitter (#18)
Browse files Browse the repository at this point in the history
* fix: correct conjoined name

* refactor

* refactor

* refactor
  • Loading branch information
lisez authored May 27, 2024
1 parent de9515e commit e98d7b5
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 57 deletions.
90 changes: 35 additions & 55 deletions modules/conjoin_emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import type {

import { CoreEmitter } from "modules/core_emitter.ts";
import { Emitter } from "modules/emitter.ts";
import { SequenceRunner } from "modules/runners/sequence.ts";
import { SeriesRunner } from "modules/runners/series.ts";
import { ConjoinQueue } from "modules/conjoin_queue.ts";

export class ConjoinEmitter extends CoreEmitter<ConjoinEvents>
implements XConjoinEmitter {
private nameIndex: Map<EventName, number> = new Map();
private conjoinedNames: Map<EventName, ConjoinEvents> = new Map();
private indexCounter = 0;
private waitingQueue: PendingConjoinEvent[] = [];
private idleQueue: PendingConjoinEvent[] = [];
private waitingQueue = new ConjoinQueue();
private idleQueue = new ConjoinQueue();
private errorEmitter = new Emitter();
private prevEvents?: Promise<any>;
debug = false;
Expand All @@ -42,14 +43,21 @@ export class ConjoinEmitter extends CoreEmitter<ConjoinEvents>
const name = this.getConjoinedEventName(signature.name);
if (!this.conjoinedNames.has(name)) {
this.conjoinedNames.set(name, signature.name);
this.idleQueue.push({ event: name, conjoined: signature.name.slice() });
this.idleQueue.enqueue({
event: name,
conjoined: signature.name.slice(),
});
}

return this.onBySignature(name, signature);
}

getConjoinedEventName(events: EventName[] | ConjoinEvents): EventName {
const keys = ([] as EventName[]).concat(events);
private getConjoinedEventName(
events: EventName[] | ConjoinEvents,
): EventName {
const keys = ([] as EventName[]).concat(events).map((e) =>
this.nameIndex.get(e)
);
keys.sort();
return keys.join(".");
}
Expand Down Expand Up @@ -97,74 +105,46 @@ export class ConjoinEmitter extends CoreEmitter<ConjoinEvents>
this.errorEmitter.on("error", handler);
}

private scan(event: EventName, queue: PendingConjoinEvent[]) {
const fulfill: EventName[] = [];
const idle: PendingConjoinEvent[] = [];

for (const pending of queue) {
const found = pending.conjoined.indexOf(event);
if (found === -1) {
idle.push(pending);
} else {
pending.conjoined.splice(found, 1);
if (pending.conjoined.length === 0) {
fulfill.push(pending.event);
} else {
idle.push(pending);
}
}
}

return { fulfill, idle };
}

private exec(pointer: number, events: EventName[]): any {
const event = events[pointer];
if (!event) return;

const handlers = this.handlers.get(event)?.slice() || [];
for (const e of handlers.filter((e) => e.options?.once)) {
this.offByHandler(event, e.handler);
}

private exec(events: EventName[]): any {
try {
if (handlers.length) {
const result = new SequenceRunner(handlers).exec(0);
if (result) {
return result.then(() => this.exec(pointer + 1, events));
}
return this.exec(pointer + 1, events);
}
return new SeriesRunner(this.handlers).exec(events);
} catch (e) {
this.errorEmitter.emit("error", e);
}
}

emit(event: EventName): any {
if (this.debug) this.logger.debug("emit", event);
if (!this.hasEvent(event)) return;

private consume(event: EventName): EventName[] {
let executing: EventName[] = [];
let nextIdle: PendingConjoinEvent[] = [];

for (const queue of [this.waitingQueue, this.idleQueue]) {
const { fulfill, idle } = this.scan(event, queue);
const { fulfill, idle } = queue.consume(event);
executing = executing.concat(fulfill);
nextIdle = nextIdle.concat(idle);
}

this.idleQueue = nextIdle;
this.waitingQueue = executing.map((e) => ({
event: e,
conjoined: this.conjoinedNames.get(e)?.slice() || [],
}));
this.idleQueue = new ConjoinQueue(nextIdle);
this.waitingQueue = new ConjoinQueue(
executing.map((e) => ({
event: e,
conjoined: this.conjoinedNames.get(e)?.slice() || [],
})),
);

return executing;
}

emit(event: EventName): any {
if (this.debug) this.logger.debug("emit", event);
if (!this.hasEvent(event)) return;

const executing = this.consume(event);
if (executing.length) {
if (this.debug) this.logger.debug("conjoined", executing);
if (this.prevEvents) {
this.prevEvents = this.prevEvents.then(() => this.exec(0, executing));
this.prevEvents = this.prevEvents.then(() => this.exec(executing));
} else {
this.prevEvents = this.exec(0, executing);
this.prevEvents = this.exec(executing);
}
}
return this.prevEvents;
Expand Down
49 changes: 49 additions & 0 deletions modules/conjoin_queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import type { EventName, PendingConjoinEvent } from "modules/types.ts";

/**
* A queue for conjoined events.
*/
export class ConjoinQueue {
/**
* Create a new instance of the ConjoinQueue.
* @param queue The pending conjoin events.
*/
constructor(private queue: PendingConjoinEvent[] = []) {
this.queue = queue;
}

/**
* Consume the conjoined event.
* @param event The event name.
* @returns The fulfilled events and idle events.
*/
consume(event: EventName) {
const fulfill: EventName[] = [];
const idle: PendingConjoinEvent[] = [];

for (const pending of this.queue) {
const found = pending.conjoined.indexOf(event);
if (found === -1) {
idle.push(pending);
} else {
pending.conjoined.splice(found, 1);
if (pending.conjoined.length === 0) {
fulfill.push(pending.event);
} else {
idle.push(pending);
}
}
}

return { fulfill, idle };
}

/**
* Enqueue a pending conjoin event.
* @param event The pending conjoin event.
*/
enqueue(event: PendingConjoinEvent) {
this.queue.push(event);
}
}

54 changes: 54 additions & 0 deletions modules/runners/series.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type {
EventHandlerSignature,
EventName,
RegisteredHandlers,
} from "modules/types.ts";

import { SequenceRunner } from "modules/runners/sequence.ts";

/**
* Run handlers each in series.
*/
export class SeriesRunner {
/**
* Create a new instance of the SeriesRunner.
* @param handlers The handlers with series name to run.
*/
constructor(private handlers: RegisteredHandlers) {
this.handlers = handlers;
}

/**
* Remove a handler.
* @param key The event name.
* @param profile The handler profile.
*/
private remove(key: EventName, profile: EventHandlerSignature<any>): void {
const handlers = this.handlers.get(key);
if (!handlers) return;
const idx = handlers.findIndex((h) => h.handler === profile.handler);
if (idx !== -1) handlers.splice(idx, 1);
}

/**
* Execute the handlers in series.
* @param series The series of event names.
* @param idx The current event index.
*/
exec(series: EventName[], idx = 0): void | Promise<void> {
const key = series[idx];
if (!key) return;

const handlers = this.handlers.get(key)?.slice() || [];
for (const p of handlers.filter((p) => !!p.options?.once)) {
this.remove(key, p);
}
if (!handlers.length) return;

const result = new SequenceRunner(handlers).exec(0);
if (result) {
return result.then(() => this.exec(series, idx + 1));
}
return this.exec(series, idx + 1);
}
}
4 changes: 2 additions & 2 deletions tests/deno/general_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ it("list all events", () => {
assertEquals(emitter.eventNames(), [
"foo",
"bar",
"test1.test2",
"test2.test3",
"0.1",
"1.2",
]);
});

Expand Down

0 comments on commit e98d7b5

Please sign in to comment.