Skip to content

Commit

Permalink
feat(core): Introduce groups joinable via NamedGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
aholstenson committed Jun 24, 2021
1 parent 8996117 commit b65b5a8
Show file tree
Hide file tree
Showing 20 changed files with 436 additions and 492 deletions.
12 changes: 5 additions & 7 deletions examples/helpers/counter.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
const { Exchange } = require('../../packages/core');
const { NamedGroup } = require('../../packages/core');

module.exports = async function(net) {

let counter = 0;

const exchange = net.createExchange('counter');
exchange.onNodeAvailable(node => {
const group = new NamedGroup(net, 'counter');
group.onNodeAvailable(node => {
node.send('hello', { counter: counter })
.catch(e => console.log('Timed out sending hello'));
})

await exchange.join();
await group.join();

// Increment and broadcast counter to all nodes every five seconds
setInterval(() => {
exchange.broadcast('counter', { current: ++counter });
group.broadcast('counter', { current: ++counter });
}, 5000);

}
8 changes: 5 additions & 3 deletions packages/cli/src/commands/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import chalk from 'chalk';
import { Minimatch } from 'minimatch';
import { Argv } from 'yargs';

import { NamedGroup } from 'ataraxia';

import { indent, log, logInfo } from '../log';
import { createNetwork } from '../utils/createNetwork';

Expand All @@ -16,9 +18,9 @@ export const builder = (yargs: Argv) =>
string: true,
description: 'Optional filter to apply to message type, supports glob expressions'
})
.option('exchange', {
.option('group', {
string: true,
description: 'Optional exchange to join'
description: 'Optional group to join'
});

export const handler = async (args: any) => {
Expand All @@ -37,6 +39,6 @@ export const handler = async (args: any) => {
await net.join();

if(args.exchange) {
await net.createExchange(args.exchange).join();
await new NamedGroup(net, args.group).join();
}
};
12 changes: 6 additions & 6 deletions packages/core/src/Group.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ export interface Group<MessageTypes extends object = any> {
readonly name: string;

/**
* Event emitted when a new node joins this exchange.
* Event emitted when a new node joins this group.
*/
readonly onNodeAvailable: Subscribable<this, [ node: Node<MessageTypes> ]>;

/**
* Event emitted when a node leaves this exchange.
* Event emitted when a node leaves this group.
*/
readonly onNodeUnavailable: Subscribable<this, [ node: Node<MessageTypes> ]>;

/**
* Event emitted when a message is received on this exchange.
* Event emitted when a message is received on this group.
*/
readonly onMessage: Subscribable<this, [ message: MessageUnion<MessageTypes> ]>;

Expand All @@ -53,7 +53,7 @@ export interface Group<MessageTypes extends object = any> {
readonly nodes: ReadonlyArray<Node>;

/**
* Broadcast a message to all nodes that have joined this exchange.
* Broadcast a message to all nodes that have joined this group.
*
* @param type -
* the type of message to send
Expand All @@ -66,12 +66,12 @@ export interface Group<MessageTypes extends object = any> {
): Promise<void>;

/**
* Join this exchange.
* Join this group.
*/
join(): Promise<void>;

/**
* Leave this exchange.
* Leave this group.
*/
leave(): Promise<void>;
}
72 changes: 36 additions & 36 deletions packages/core/src/Network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import { Event } from 'atvik';
import { Transport, generateId, encodeId } from 'ataraxia-transport';

import { Debugger } from './Debugger';
import { Exchange } from './exchange/Exchange';
import { Exchanges } from './exchange/Exchanges';
import { Group } from './Group';
import { MessageData } from './MessageData';
import { MessageType } from './MessageType';
Expand Down Expand Up @@ -85,25 +83,25 @@ export interface NetworkOptions {
* {@link broadcast}, but as with regular messages no delivery is guaranteed
* and large broadcasts are discouraged.
*
* ## Exchanges
* ## Groups
*
* Exchanges are a way to create named sub-groups of the network that nodes can
* join and leave as needed. Broadcasting a message on an exchange will only
* send it to known members of the exchange.
* Groups are a way to create named area of the network that nodes can
* join and leave as needed. Broadcasting a message on an group will only
* send it to known members of the groups.
*
* ```typescript
* const exchange = net.createExchange('name-of-exchange');
* const group = new NamedGroup(net, 'name-of-group');
*
* // Exchanges need to be joined
* await exchange.join();
* // Groups need to be joined
* await group.join();
*
* // Broadcast to the known members
* await exchange.broadcast('typeOfMessage', dataOfMessage);
* await group.broadcast('typeOfMessage', dataOfMessage);
* ```
*
* ## Typing of messages
*
* The network and exchanges can be typed when using TypeScript.
* The network and groups can be typed when using TypeScript.
*
* The types are defined as an interface with the keys representing the
* message types tied to the type of message:
Expand All @@ -115,27 +113,27 @@ export interface NetworkOptions {
* }
* ```
*
* An exchange can then be typed via:
* An group can then be typed via:
*
* ```typescript
* const exchange: Exchange<EchoMessages> = net.createExchange<EchoMessage>('echo');
* const group: Group<EchoMessages> = new NamedGroup<EchoMessage>(net, 'echo');
* ```
*
* This will help TypeScript validate messages that are sent:
*
* ```typescript
* // TypeScript will allow this
* exchange.broadcast('namespace:echo', { message: 'Test' });
* group.broadcast('namespace:echo', { message: 'Test' });
*
* // TypeScript will not allow these
* exchange.broadcast('namespace:echo', { msg: 'Test' });
* exchange.broadcast('namespace:e', { message: 'Test' });
* group.broadcast('namespace:echo', { msg: 'Test' });
* group.broadcast('namespace:e', { message: 'Test' });
* ```
*
* The same is true for listeners:
*
* ```typescript
* exchange.onMessage(msg => {
* group.onMessage(msg => {
* if(msg.type === 'namespace:echo') {
* // In here msg.data will be of the type { message: string }
* const data = msg.data;
Expand Down Expand Up @@ -190,10 +188,7 @@ export class Network<MessageTypes extends object = any> implements Group<Message
*/
readonly #nodes: Map<string, NetworkNode>;

/**
* Tracking for exchanges.
*/
readonly #exchanges: Exchanges;
private readonly services: Map<any, any>;

readonly #nodeAvailableEvent: Event<this, [ node: Node<MessageTypes> ]>;
readonly #nodeUnavailableEvent: Event<this, [ node: Node<MessageTypes> ]>;
Expand Down Expand Up @@ -239,7 +234,7 @@ export class Network<MessageTypes extends object = any> implements Group<Message

this.#nodes = new Map();

this.#exchanges = new Exchanges(this);
this.services = new Map();

// Setup the topology of the network
this.#topology = new Topology(this, options);
Expand Down Expand Up @@ -275,6 +270,24 @@ export class Network<MessageTypes extends object = any> implements Group<Message
options.transports?.forEach(t => this.addTransport(t));
}

/**
* Get a service as a singleton. This is useful for starting a single
* instance of shared services.
*
* @param factory -
* constructor that takes instance of network
* @returns
* instance of factory
*/
public getService<T>(factory: (new (handle: Network) => T)): T {
let instance = this.services.get(factory);
if(instance) return instance;

instance = new factory(this);
this.services.set(factory, instance);
return instance;
}

/**
* Event emitted when a {@link Node} becomes available.
*
Expand Down Expand Up @@ -424,7 +437,7 @@ export class Network<MessageTypes extends object = any> implements Group<Message
public broadcast<T extends MessageType<MessageTypes>>(type: T, data: MessageData<MessageTypes, T>): Promise<void> {
const promises: Promise<void>[] = [];

// Send to all nodes that have joined the exchange
// Send to all nodes that have joined the group
for(const node of this.#nodes.values()) {
promises.push(node.send(type, data)
.catch(ex => {
Expand All @@ -435,17 +448,4 @@ export class Network<MessageTypes extends object = any> implements Group<Message
return Promise.all(promises)
.then(() => undefined);
}

/**
* Create an exchange with the given id. This will create a sub-group of the
* network that nodes can join, leave and easily broadcast to.
*
* @param id -
* exchange to join
* @returns
* instance of Exchange
*/
public createExchange<MT extends object = any>(id: string): Exchange<MT> {
return this.#exchanges.createExchange(id);
}
}
4 changes: 2 additions & 2 deletions packages/core/src/Node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import { MessageUnion } from './MessageUnion';

/**
* Node within the network. Nodes are individual instances that you can
* exchange messages with.
* receive and send messages from/to.
*
* Nodes are usually retrieved via a {@link Network} or {@link Exchange}. When
* Nodes are usually retrieved via a {@link Network} or {@link Group}. When
* a node becomes available you may opt in to events about the node via
* subscribable functions such as {@link onMessage} and {@link onUnavailable}.
*
Expand Down
80 changes: 0 additions & 80 deletions packages/core/src/exchange/Exchange.ts

This file was deleted.

Loading

0 comments on commit b65b5a8

Please sign in to comment.