You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Once PostHog/plugin-server#351 lands, we'll have pretty sweet support for job queues. However that's just the first step. Many things are still less than ideal and they should be fixed next.
Current state
We have a system of dispatching and running jobs asynchronously in the background within the context of plugins.
We have a system of multiple job queue providers, where if one throws, the next one is tried automatically. For example if we can't connect to Postgres to enqueue jobs, put them in S3 instead. As long as we don't lose data, we're good. In theory.
We have implemented one job queue, based on postgres (graphile/worker). This actually works with serverless Amazon Aurora, which we'll use on cloud.
We have no idea what kind of performance we can expect from this.
Graphile/worker stores all of its data not in the "public" postgres schema, but in a separate "graphile_worker" schema. First, are we OK with that default name or maybe we should change it to "posthog_jobs" or something?
I'd like to enable graphile by default for everyone in a format where it stores data in the main database (via DATABASE_URL or whatnot) if not configured differently (via JOB_QUEUE_GRAPHILE_URL). The problem is that graphile will still want to make a separate schema, and I can't assure that this will work for all OSS installations. Some might have restricted their postgres enough.
Thus we need some kind of backup/fallback system. Ideally in the instance status page we could split "plugin server alive/dead" into a more detailed breakdown by service.
Ultimately I'd suggest crashing the plugin server on boot if there's no usable job queue, though that sounds like too drastic of a step to take now.
Alternative queue adapters (in progress)
For now we just have one queue: postgres via graphile/worker. For cloud we will hook it up to Serverless Aurora and hopefully that will be able to absorb anything we throw at it.
Still, what if the network between us and Aurora is down... or the link between our customer and their job queue running in a Hetzner VPS is down? Or the Kafka producer is down and can't retry and in an emergency last ditch effort dumped the enqueued Kafka batch into the job queue, but that can't connect either? We will need to store this enqueued data... somewhere... before the plugin server restarts or crashes and we lose it forever.
That's why we should also have some alternative/backup queues in place. For example a S3 queue (won't help with Aurora being down, but might help self hosted customers if their postgres gets full). Or a GCS queue. Or a Clickhouse queue (if we have it built and there's nowhere else to put the events, put them here at least for safe keeping). Or a "upload a .txt file into a FTP server" queue.
We must do all that we can to not lose any in flight data.
Queue types
Postgres and S3 are two different types of queues.
With the current implementation, postgres could be read concurrently (each plugin server reads from it), but S3 can't really be. For S3 we would use a redlocked consumer, where only one plugin server reads from it at a time. For postgres we currently also use a redlocked consumer, but we don't really have to.
Then there's also the idea of a memory queue. A quick memory buffer (with a 20 MB memory and 1000 task limit perhaps?) that sits in front of the postgres queue. Any "run this in 5 seconds" or "run this async, but now" tasks could be directly placed there. No need to go through postgres for that. This memory queue should also flush itself to postgres in case the plugin server terminates.
Hence, we have different types of plugin queues:
persistent or transient
local or remote
high or low throughput
concurrently readable or must have one reader with a cursor
This should be reflected in the code somehow. For example, the memory queue should know to flush itself into postgres, but not into a local SQLite or equivalent database (acceptRecoveryFlush = true/false?). Concurrently readable queues should be read concurrently, irrelevant of the redlock.
Shaky connections
We should also handle shaky connections and removing/re-adding broken queues from the pipeline. This also means we should somehow test that the producer works whenever we start up a queue. The graphile queue is notoriously non-verbose right now. If I give it a postgres URL where the user/pass work, but the database doesn't exits, it stalls forever for example.
Recovery mode
In case the network goes down, or we lose access to all queues, or we need to shut down, or whatever else bad happens, we should implement a special recovery mode. For example, the postgres queue gets full... or we can't enqueue tasks fast enough (because no plugin awaits for .runNow(), but lets that happen in the background and now we have a thousand pending promises)... or ingestion just stops for unknown reasons. When something like this happens, we turn on recovery mode, flush all the memory queues, finish processing whatever needs to be processed... and then possibly restart the server for good measure.
Flow control
We're entering a world of flow.
When we get an event via Kafka, we can potentially start all sorts of services, any many of them asynchronously in the backend. For example:
producing the ingested event back out via kafka
enqueuing a job to the job queue
using the plugin-contrib batching logic to store the event somewhere in memory in the background
While we already have some rudimentary flow control in place thanks to the pause/resume methods on ingestion queues that get called when needed, we should change this up a bit.
First, kafka.produce is batched, but still run in the main thread. Basically the 20th ingested event triggers flushing the Kafka batch... and that event will have a longer await processEvent, since it must send itself and 19 other events to Kafka. Instead, kafka producing should happen 100% in the background, possibly even just on the main thread.
This would keep the ingestion pipeline flowing smoothly. However, if the Kafka producer can't keep up, we must pause ingestion. If it starts to develop a backlog, we must slow down. If it gets stuck (kafka is down, keeps retrying), we must stop ingestion. If it's stuck enough, we send the batches to a job queue to be retried later.
This is just one example, but basically in this structure there are too many places where we could just keep enqueuing objects until we run out of memory:
Events in <> Processing events <> Events out
If any step runs faster than any other step, we need to apply the breaks.
The book "Thinking in Systems" could be of use here. It's full of graphs like this:
Grafana
We should also add metrics about job queues into Grafana.
Anything else?
No way I covered everything above. Comment below! :)
The text was updated successfully, but these errors were encountered:
mariusandra
changed the title
Job queue next steps
Job queue next steps [Epic]
May 4, 2021
This issue hasn't seen activity in two years! If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in two weeks.
Once PostHog/plugin-server#351 lands, we'll have pretty sweet support for job queues. However that's just the first step. Many things are still less than ideal and they should be fixed next.
Current state
Enable the postgres/graphile queue for OSS (done)
Graphile/worker stores all of its data not in the "public" postgres schema, but in a separate "graphile_worker" schema. First, are we OK with that default name or maybe we should change it to "posthog_jobs" or something?
I'd like to enable graphile by default for everyone in a format where it stores data in the main database (via
DATABASE_URL
or whatnot) if not configured differently (viaJOB_QUEUE_GRAPHILE_URL
). The problem is that graphile will still want to make a separate schema, and I can't assure that this will work for all OSS installations. Some might have restricted their postgres enough.Thus we need some kind of backup/fallback system. Ideally in the instance status page we could split "plugin server alive/dead" into a more detailed breakdown by service.
Ultimately I'd suggest crashing the plugin server on boot if there's no usable job queue, though that sounds like too drastic of a step to take now.
Alternative queue adapters (in progress)
For now we just have one queue: postgres via graphile/worker. For cloud we will hook it up to Serverless Aurora and hopefully that will be able to absorb anything we throw at it.
Still, what if the network between us and Aurora is down... or the link between our customer and their job queue running in a Hetzner VPS is down? Or the Kafka producer is down and can't retry and in an emergency last ditch effort dumped the enqueued Kafka batch into the job queue, but that can't connect either? We will need to store this enqueued data... somewhere... before the plugin server restarts or crashes and we lose it forever.
That's why we should also have some alternative/backup queues in place. For example a S3 queue (won't help with Aurora being down, but might help self hosted customers if their postgres gets full). Or a GCS queue. Or a Clickhouse queue (if we have it built and there's nowhere else to put the events, put them here at least for safe keeping). Or a "upload a .txt file into a FTP server" queue.
We must do all that we can to not lose any in flight data.
Queue types
Postgres and S3 are two different types of queues.
With the current implementation, postgres could be read concurrently (each plugin server reads from it), but S3 can't really be. For S3 we would use a redlocked consumer, where only one plugin server reads from it at a time. For postgres we currently also use a redlocked consumer, but we don't really have to.
Then there's also the idea of a memory queue. A quick memory buffer (with a 20 MB memory and 1000 task limit perhaps?) that sits in front of the postgres queue. Any "run this in 5 seconds" or "run this async, but now" tasks could be directly placed there. No need to go through postgres for that. This memory queue should also flush itself to postgres in case the plugin server terminates.
Hence, we have different types of plugin queues:
This should be reflected in the code somehow. For example, the memory queue should know to flush itself into postgres, but not into a local SQLite or equivalent database (acceptRecoveryFlush = true/false?). Concurrently readable queues should be read concurrently, irrelevant of the redlock.
Shaky connections
We should also handle shaky connections and removing/re-adding broken queues from the pipeline. This also means we should somehow test that the producer works whenever we start up a queue. The graphile queue is notoriously non-verbose right now. If I give it a postgres URL where the user/pass work, but the database doesn't exits, it stalls forever for example.
Recovery mode
In case the network goes down, or we lose access to all queues, or we need to shut down, or whatever else bad happens, we should implement a special recovery mode. For example, the postgres queue gets full... or we can't enqueue tasks fast enough (because no plugin
await
s for.runNow()
, but lets that happen in the background and now we have a thousand pending promises)... or ingestion just stops for unknown reasons. When something like this happens, we turn on recovery mode, flush all the memory queues, finish processing whatever needs to be processed... and then possibly restart the server for good measure.Flow control
We're entering a world of flow.
When we get an event via Kafka, we can potentially start all sorts of services, any many of them asynchronously in the backend. For example:
While we already have some rudimentary flow control in place thanks to the
pause
/resume
methods on ingestion queues that get called when needed, we should change this up a bit.First,
kafka.produce
is batched, but still run in the main thread. Basically the 20th ingested event triggers flushing the Kafka batch... and that event will have a longerawait processEvent
, since it must send itself and 19 other events to Kafka. Instead, kafka producing should happen 100% in the background, possibly even just on the main thread.This would keep the ingestion pipeline flowing smoothly. However, if the Kafka producer can't keep up, we must pause ingestion. If it starts to develop a backlog, we must slow down. If it gets stuck (kafka is down, keeps retrying), we must stop ingestion. If it's stuck enough, we send the batches to a job queue to be retried later.
This is just one example, but basically in this structure there are too many places where we could just keep enqueuing objects until we run out of memory:
If any step runs faster than any other step, we need to apply the breaks.
The book "Thinking in Systems" could be of use here. It's full of graphs like this:
Grafana
We should also add metrics about job queues into Grafana.
Anything else?
No way I covered everything above. Comment below! :)
The text was updated successfully, but these errors were encountered: