-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Trigger jobs from the plugins interface #6870
Comments
Alternatively to a params JSON, these publicly runnable jobs could have their own |
I've been doing some thinking on this. There are a few things to consider, here are some quick thoughts before I put this laptop aside: Triggering plugin server jobs from PostHogThere are a few ways we could send a payload to trigger a job from the Django server to the plugin server:
(4. Bring back Fastify) Ultimately, it seems clear 3 should be the way to go. Redis pubsub is better suited for smaller messages, plus it triggers immediate action from us, which is fine in the case of reloading plugins, but not for triggering jobs - definitely nicer to have a queue in between. A low-effort approach would be using the task/topic we already have with some internal event name that would trigger a job. This would be the quickest thing to get up and running but is just too messy. So yeah I think we just need a new Kafka topic and Celery task that would take everything we need to trigger a job. Things like team ID, plugin config ID, a payload. What jobs can be UI-triggered?
I don't think we should let all jobs be UI-triggered. I've used jobs a lot for internal processes that I wouldn't want exposed to a user. Thus, I think jobs that can be triggered in the UI should be explicitly marked as such. This could be done either in In building I also agree that it could be worth these jobs (optionally?) having an explicit schema so we can provide a nicer UI for the payload than a JSON-validated textarea that pushes a lot of the validation work to the plugin code. |
There's one more option: triggering graphile jobs directly from python, or, in fact, from within postgres. This is a valid postgres query: PERFORM graphile_worker.add_job('task_identifier_here', json_build_object('id', NEW.id)); Django already has the graphile worker URL, so everything should be there... :) |
hmm that is interesting indeed |
Ok so I can no longer find where you made the comment about a dedicated table for exports/imports so continuing the discussion here. Here's some very rough rubber ducking. How about a table for "long running tasks" or something of that nature? That would look something like this:
We can then expose an API for plugin devs to create tasks and update progress, something like: export const jobs = {
exportJobTriggeredFromTheUi: ({ tasks, global }) => {
const uniqueTaskId = Math.random()
await tasks.startTaskIfNotRunning({
name: 'export_task',
id: uniqueTaskId,
total: global.totalEventsToExport,
maxConcurrentTasks: 1
})
// export stuff, trigger more jobs ...
tasks.incrementProgress(uniqueTaskId, global.totalEventsToExportPerBatch)
}
} We can infer when the task finishes ( There's then a few mechanisms we can have in place to allow devs to specify how many long running tasks can be going on concurrently (e.g. this might be 1 for exports/imports). Could be done via the |
There's a real danger of concept-creep here. Jobs, tasks, workers, etc. If the terminology is that you control jobs through tasks, but only if they call other jobs, then it'll be hard to wrap your head around all of this. Basically, Thinking a bit ahead, what could we do to support any long running task, which the user could chunk into bits and pieces? Can we mapreduce the original payload and keep track of where we are? How would the entire process look like? Could we abstract away towards something like this: export const jobs = {
async longRunningThing: ({ limit, offset, filters }, meta) => {
try {
await performNextBitOfTheWork({ limit, offset, filters }, meta)
} catch (e) {
throw new RetryError() // if legit error to retry
}
},
async longRunningThingFromTheUI: ({ filters }, { jobs, global }) => {
count total = await getWorkAmount(filters)
await jobs.longRunningThing({ filters }).runBatchedPipeline({ startOffset: 0, limit: total, perBatch: 50 })
},
async cancelLongRunningThingFromTheUL: () => {
await jobs.longRunningThing({ filters }).cancelPipeline(optionalPipelineId)
},
} Such a pipeline would be backed by a table like:
... and would orchestrate itself to run as needed, surviving restarts, incrementing cursors, etc. |
Here's the random code we were looking at while on call: const fetchEvents = ({ limit, offset, filters }) => posthog.api.get('/api/events', { limit, offset, filters })
const getWorkAmount = (filters) => posthog.api.getTotal()
const exportEvent = (event) => posthog.capture(event.event, event.properties)
export function runEveryMinute({ cursor, jobs }) {
if (!cursor.running()) {
await jobs.longRunningThing({ filters }).runWithCursor({ cursor: cursor.last })
}
}
export const jobsWithFixedRange = {
longRunningThing: async ({ filters, cursor: { limit, offset } }) => {
try {
const events = await fetchEvents({ limit, offset, filters })
await Promise.all(events.map(exportEvent))
} catch (e) {
throw new RetryError() // if legit error to retry
}
},
longRunningThingFromTheUI: async ({ filters }, { jobs, utils: { cursor, geoip, getPerson } }) => {
const total = await getWorkAmount(filters)
await jobs.longRunningThing({ filters }).runWithCursor({ start: 0, total, step: 100 })
// await jobs.longRunningThing({ filters }).runWithCursor({ start: 1629891534, end: 1836791534, step: 86400*1000 })
},
}
export const jobsWithUnknownRange = {
longRunningThing: async ({ filters, utils: { cursor } }) => {
try {
const events = await fetchEvents({
filters,
where: cursor ? `timestamp > ${cursor.current}` : '',
// orderBy: 'timestamp asc',
offset: 0,
limit: 50,
})
await Promise.all(events.map(exportEvent))
const lastTimestamp = events[events.length - 1].timestamp
if (lastTimestamp === cursor.current) {
cursor.setNext([cursor.current[0], cursor.current[1] + 50])
} else {
cursor.setNext([lastTimestamp, 0])
}
} catch (e) {
throw new RetryError() // if legit error to retry
}
},
longRunningThingFromTheUI: async ({ filters }, { jobs, utils: { cursor, geoip, getPerson } }) => {
const total = await getWorkAmount(filters)
await jobs.longRunningThing({ filters }).runWithCursor({ cursor: [null, null] })
// await jobs.longRunningThing({ filters }).runWithCursor({ start: 1629891534, end: 1836791534, step: 86400*1000 })
},
pause: (_, { utils: { cursor }}) => {
await cursor.pauseAll()
},
stop: (_, { utils: { cursor }}) => {
await cursor.stopAll()
},
/* -- posthog_plugincursors
id: 0
pluginConfigId: 42
jobName: "longRunningThing"
cursor: 0
total: 131231
stepCount: 50
state: "running"
*/
cancelLongRunningThingFromTheUI: async () => {
await jobs.longRunningThing({ filters }).cancelPipeline(optionalPipelineId)
},
}``` |
This issue hasn't seen activity in two years! If you want to keep it open, post a comment or remove the |
This issue was closed due to lack of activity. Feel free to reopen if it's still relevant. |
After PostHog/plugin-server#413 is done, we could list all the exposed jobs from a plugin, and have a "run job" button next to it, plus an ability to specify a params JSON.
This could somehow be integrated with the plugin logs view, as I imagine you'd want to see the job output as soon as it arrives.
This will be useful for jobs such as "export from the beginning", "clear cache", "sync annotations now", etc. It allows a lot more interactivity with the plugins.
The text was updated successfully, but these errors were encountered: