-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generic Non-Blocking Task Management ("Queue") for discovery and nodes domains #329
Comments
There's a go implementation of persistent priority queue backed by leveldb here: https://github.com/beeker1121/goque. It can be used as a reference for this. Also a JS implementation here: https://github.com/eugeneware/level-q (the goque is probably more comprehensive). Our priority queue needs to by default maintain order, because we do want to know the sorted list of jobs. But also allow us to add a special priority number on top. From my imagination: This reminds me of the indexing problem, where you can ask for a list of rows sorted by several columns. The first column would dictate the base sort, then subsequent columns would sort any ambiguous sub-orders. Imagine we had 2 indexes. The first being your priority index using an arbitrary number, the second being the monotonic time index using Maybe this then has a relationship to #188. However this would only be for if you are looking up items. If we are streaming data from the level db that may be more complicated. |
This can be done with a compound index. Prefix can be the priority number (lexinted), suffix can be IdSortable. This means you can then stream results that are always ordered in terms of priority first then by time second. Priority can start at 0 by default, and one can increment priorities depending on the origin of the task. Like tasks emitted by user wanting to lookup something can be set to a higher priority number. We could do this directly by changing the queue domain key. But I'd suggest first solving the indexing issue in general first then building a compound index on top. |
We discovered that the priority queue can also benefit from a uniqueness index creating a uniqueness constraint: #311 (comment) This means that duplicate tasks cannot go into the priority queue. Not entirely sure if this is required because a queue can still say they should process the same task over and over. |
We should have a concurrency bound in the queue. This means how many tasks should be executed at the same time. By default unbounded meaning all tasks gets executed immediately without waiting to be done. For IO bound tasks, you might as well have unbounded concurrency. For CPU-bound it can be sent to the web worker pool which is bounded by core count. Battery usage optimisation may also affect our limit too. |
A generic Queue class has been implemented here: 91287ab This queue is not persistent, or a priority queue, however, it is designed to be a generic queue that can eventually be used in all places that require this functionality (including the Discovery Queue). The generic Queue can be refactored to meet this issue and #328 at some point in the future, potentially incorporating the DB. |
Renamed this issue to the general idea of non-blocking task management. It now has to solve for discovery, nodes management in terms of setting nodes, pinging nodes and garbage collection, as well as in relation to:
There's a relationship between the queue design and the Most important is for us to develop a Our task manager will need to have configurable:
Stretch goal is to also incorporate "time/calendar-scheduling" so that tasks can be executed at a point in time like cron. Interaction between Tasks can be:
This rabbit hole for this goes deep. So we should make sure not to feature creep our non-blocking task queuing needs. |
Example of prior work: https://github.com/fluture-js/Fluture |
Also to clarify, we are not creating a "generic distributed job queue", that's the realm of things like redis queue and https://en.wikipedia.org/wiki/List_of_job_scheduler_software. There's so much of this already. We just need something in-process relative to Polykey. |
Along with the configurable concurrency limit and executor, I think we should have an interface for the queue as well. Depending on the situation we may need just a simple queue, a priority queue, a persistent database queue like discovery uses, etc etc... It shouldn't be too hard to make the change. We just need to decide if this degree of control is desired. I can see a need for it though. |
Is this a part of #326 ? |
Nope, this can be done later. |
More prior work: Actually the entire modern-async library is quite interesting, as it has implemented some of the things that we've been working on as well. But I think we won't just bring in that library, but instead extract parts out of it for our own implementation. It has interesting ideas for promise cancellation as well, The library also exports a bunch of collection combinators that can work with asynchronous execution. So instead of say |
When scheduling a new task. The task object is created. This task object needs to represent a lazy promise. A lazy promise in this case means that the promise doesn't mean that the execution has started. It's simply queued. Because tasks are persisted into the DB, it's possible for the PK agent to be restarted, and one may wish to await for a given task ID. That means acquiring a promise for that task ID. I'm thinking that we can lazily create a promise, that is one to one for each task. Then you can await this promise's result. If multiple calls to acquiring this promise is made, the same promise is shared among all callers. This means a single Here's an example: async function main () {
const p = new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error('oh no'))
}, 500);
setTimeout(resolve, 1000);
});
const f = async () => {
await p;
return 'f';
};
const g = async () => {
await p;
return 'g';
};
const r = await Promise.allSettled([
f(),
g()
]);
console.log(r);
// @ts-ignore
console.log(r[0].reason === r[1].reason); // This is `true`
// The same exception object is thrown to all awaiters
}
void main(); There are some constraints:
How is point 4 guaranteed? It is only possible to get a task promise in 2 ways:
Now because it's a lazy promise, this could mean that the task is already executed by the time you ask for a promise for the task. This is only possible if the task is no longer in the queue (or is in some invalid state). In this situation, when asking for the promise, the promise should be immediately rejected. Alternatively since the acquisition of this promise is lazy, one may throw an exception at this point. The point is, if you do get a promise, the promise must be settled eventually. One of the initial ways to do this is to add an event listener for every task as soon as it is put into the queue. The problem with this is that now you get in-memory space complexity of O(n), where you have 1 listener for every task. Listeners aren't always necessary, and maybe lots of tasks are put into the queue. In such a case, we can make the promise/listener itself lazy.
Alternatively we do something like:
And |
Another problem is that exactly is a The I'm considering an API like this:
Something like this means Either way, the In this sense, lazy simply means whether the task itself is being tracked or not. If it is being tracked, then I may have something like:
|
Is there a utility in having |
So within
Their job is to peek into the job schedule. (I've started to realise that this is more a "schedule" not a queue, since the priority doesn't apply until the tasks are due for execution). In the job schedule, they find:
The Calling |
The queue now would have 2 "queues".
If the task is never fulfilled (resolve/rejected), it should stay in the execution queue (which should still be persistent). |
Originally in order to "connect" a lazy promise to a task execution, this was done with a callback that I called a "listener". Now I realised that the deconstructed promise is itself already a set of callbacks to be executed on the task execution. This means during actual task dispatch, we could just do something like: taskHandlerExecution(...taskParameters).then(
resolveTaskP,
rejectTaskP
).finally(() => {
this.promises.delete(taskId.toString() as TaskIdString);
}); That is, the promise that comes from executing the task handler gets connected to the deconstructed promise of the task abstraction. And at the end, the task promise is deleted once the task is done. This only occurs IF the task promise was first created. If it was lazy promise, it may never get created in which, and if so, nothing is there to observe/await the task execution. That's fine as the task's side effects continues to be done. |
So now I have:
During the const scheduler = new Scheduler();
await Discovery.createDiscovery({ scheduler });
await NodeGraph.createNodeGraph({ scheduler });
await scheduler.start(); Why not use Here we are directly constructing the However it is actually CDSS, as there is a |
One of the issues with this is that certain methods must be possible by the time it is constructed, but not necessarily asynchronously started... But at the same time asynchronous start is necessary to do any async setup such as creating the database levels... etc. So now I'm thinking that But then it's not going to be symmetric if the |
The alternative is that we use a callback/hook abstraction similar to the Another alternative is that |
I've added a By default the This means in the const scheduler = await Scheduler.createScheduler({ delay: true });
await Discovery.createDiscovery({ scheduler });
await NodeGraph.createNodeGraph({ scheduler });
await scheduler.startProcessing(); When stopping the processing this doesn't actually stop the execution of any tasks, it just stops the processing of the scheduler. |
The scheduler doesn't execute the tasks. It dispatches to the queue. The queue assigns tasks to workers, workers is what executes the task. At the same time, the workers may also pull tasks from the queue when they are idle. |
The Alternatively we actually don't use Note that threadsjs has 2 concurrency limits:
The only real reason to use workers is to run CPU-intensive tasks, not IO-intensive tasks. (Hard to know precisely until we do benchmarking). So the number of concurrent async tasks should be 1. Therefore we ignore If the worker manager was not available, the parallel number of workers to launch should be the same concurrency limit of the number of tasks to run concurrently in our Actually we can always default it to One issue with this is that the |
I haven't completed the full design of /**
* Task that has been `pool.queued()`-ed.
*/
export interface QueuedTask<ThreadType extends Thread, Return> {
/** @private */
id: number;
/** @private */
run: TaskRunFunction<ThreadType, Return>;
/**
* Queued tasks can be cancelled until the pool starts running them on a worker thread.
*/
cancel(): void;
/**
* `QueuedTask` is thenable, so you can `await` it.
* Resolves when the task has successfully been executed. Rejects if the task fails.
*/
then: Promise<Return>["then"];
} |
If Alternative is to form an a plain object like threadsjs does instead of using classes. |
There are some interesting timer APIs: https://nodejs.org/api/timers.html#timeoutrefresh |
Note that since But this assumes the last Task ID is always stored, and we are intending on deleting tasks off the schedule once completed. I wasn't thinking keeping historical tasks are useful (except for maybe debugging? Although it seems like it would be dropped in production, and logging/tracing systems should be maintaining the audit log). This means the last task ID may be undefined. So we would store the last Task ID regardless of whether there are any tasks left in the scheduler. Furthermore, when the clock is shifted backwards, the time will be incremented by 1 until it is greater than the last time. The 1 is the smallest unit of precision, in which case this would be 1 millisecond. Afterwards, it will be strictly monotonic ID but have a weakly monotonic timestamp up to 4096 IDs per millisecond. After 4096 it would roll over. The expectation is that it's not possible to generate more than 4096 IDs in a millisecond, so by that time, the time must have increased by at least 1 millisecond. Anyway this means we need to store |
Benchmark in js-workers shows that the overhead to call the workers takes about 1.16 to 1.5ms. Worker Threads Intersection.xlsx A CPU intensive task should be greater than that time to be worth sending to the worker. However most scheduling work seems it might not actually be CPU intensive. Like NodeGraph and Discovery is mostly IO. I suppose discovery may have have CPU work to pattern match the data to find the right data on the pages it loads, but this should be dominated by the time spent on IO. Furthermore sending it to a worker can introduce locking problems. The async locks do not work across the worker threads, they only work within the same event loop context. They are not thread-safe nor process-safe. This should mean that we should not directly integrate |
This means naturally the Priority only comes into play with a concurrency limit so that things get put into priority order. Otherwise all tasks will be asynchronous and immediately executed. The worker's concurrency/parallel limit is not a concern of the |
We decided not to bother with preventing resource starvation, however an idea is like this.
Simultaneous iteration that uses the timestamp to weight the priority, where the timestamp delta starts from 0 and goes towards infinity. Once could say that this multiples the priority based on a "rate". A delta of 0 multiplies by 1. A delta of infinity multiplies by infinity. Therefore the rate produces a multiplier between 1 to infinity. Here is an example of the 3 policies:
|
Once we have the tasks system, all other domains should not have any kind of background processing implemented, they should delegate ALL of that functionality into the tasks system. |
The task management is ready. However integration into discovery and nodes domains is being done in #445. Priority management is static, we won't bother with dynamic priority in #329 (comment) before we see it be a problem. Issue description here is still relevant to #445, since it contains notes on how best to refactor the discovery system. |
Specification
Unattended discovery was added in #320, however, there is no concept of priority within the queue. There are three ways that a vertex (a node or identity) can be added to the discovery queue, and they should follow this order of priority:
queueDiscoveryByNode()
andqueueDiscoveryByIdentity()
(these are called in the commandsidentities discover
(explicit discovery) andidentities trust
(explicitly setting a permission, so we want the Gestalt to be updated via discovery).Vertexes with a higher priority should be discovered first, either by being placed at the front of the queue or by modifying the traversal method of the queue. The priority queue could also be further optimised by grouping vertices from the same gestalt together when this is known (for example when adding child vertices).
Additional context
Tasks
The text was updated successfully, but these errors were encountered: