Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger jobs from the plugins interface #6870

Closed
mariusandra opened this issue May 23, 2021 · 9 comments
Closed

Trigger jobs from the plugins interface #6870

mariusandra opened this issue May 23, 2021 · 9 comments
Labels

Comments

@mariusandra
Copy link
Collaborator

After PostHog/plugin-server#413 is done, we could list all the exposed jobs from a plugin, and have a "run job" button next to it, plus an ability to specify a params JSON.

This could somehow be integrated with the plugin logs view, as I imagine you'd want to see the job output as soon as it arrives.

This will be useful for jobs such as "export from the beginning", "clear cache", "sync annotations now", etc. It allows a lot more interactivity with the plugins.

@mariusandra
Copy link
Collaborator Author

Alternatively to a params JSON, these publicly runnable jobs could have their own configSchema object, which works just like the plugin drawer and its config.

@yakkomajuri
Copy link
Contributor

yakkomajuri commented Aug 19, 2021

I've been doing some thinking on this. There are a few things to consider, here are some quick thoughts before I put this laptop aside:

Triggering plugin server jobs from PostHog

There are a few ways we could send a payload to trigger a job from the Django server to the plugin server:

  1. Redis (via pubsub)
  2. Celery & Kafka (via events)
  3. Celery & Kafka (via a new task/topic)

(4. Bring back Fastify)

Ultimately, it seems clear 3 should be the way to go. Redis pubsub is better suited for smaller messages, plus it triggers immediate action from us, which is fine in the case of reloading plugins, but not for triggering jobs - definitely nicer to have a queue in between.

A low-effort approach would be using the task/topic we already have with some internal event name that would trigger a job. This would be the quickest thing to get up and running but is just too messy.

So yeah I think we just need a new Kafka topic and Celery task that would take everything we need to trigger a job. Things like team ID, plugin config ID, a payload.

What jobs can be UI-triggered?

we could list all the exposed jobs from a plugin, and have a "run job" button next to it

I don't think we should let all jobs be UI-triggered. I've used jobs a lot for internal processes that I wouldn't want exposed to a user.

Thus, I think jobs that can be triggered in the UI should be explicitly marked as such. This could be done either in plugin.json or maybe in the plugin code too.

In building exportFromTheBeginning for example, I realized there are fields in the job payload I wouldn't want exposed, so what I'd do is create a job that could be UI-triggered and use that to trigger the internal job with the other values that shouldn't be exposed (like retriesPerformedSoFar or timestampCursor).

I also agree that it could be worth these jobs (optionally?) having an explicit schema so we can provide a nicer UI for the payload than a JSON-validated textarea that pushes a lot of the validation work to the plugin code.

@mariusandra
Copy link
Collaborator Author

There's one more option: triggering graphile jobs directly from python, or, in fact, from within postgres. This is a valid postgres query:

PERFORM graphile_worker.add_job('task_identifier_here', json_build_object('id', NEW.id));

Django already has the graphile worker URL, so everything should be there... :)

@yakkomajuri
Copy link
Contributor

hmm that is interesting indeed

@yakkomajuri
Copy link
Contributor

yakkomajuri commented Aug 24, 2021

Ok so I can no longer find where you made the comment about a dedicated table for exports/imports so continuing the discussion here.

Here's some very rough rubber ducking.

How about a table for "long running tasks" or something of that nature? That would look something like this:

Plugin Config ID Task Name Task ID Progress Total

We can then expose an API for plugin devs to create tasks and update progress, something like:

export const jobs = {
    exportJobTriggeredFromTheUi: ({ tasks, global }) => {
        const uniqueTaskId = Math.random()
        await tasks.startTaskIfNotRunning({
            name: 'export_task', 
            id: uniqueTaskId, 
            total: global.totalEventsToExport,
            maxConcurrentTasks: 1
        }) 

        // export stuff, trigger more jobs ...

        tasks.incrementProgress(uniqueTaskId, global.totalEventsToExportPerBatch)
    }
}

We can infer when the task finishes (progress == total) for them (or they can manually call it off too), and can use these values to show progress bars.

There's then a few mechanisms we can have in place to allow devs to specify how many long running tasks can be going on concurrently (e.g. this might be 1 for exports/imports).

Could be done via the plugin.json: https://github.com/yakkomajuri/test-plugin/blob/3bfc18949cd1c2035ea0296bbbfdaf5df13d7e0a/plugin.json#L19 or directly in the methods we expose, as shown above.

@mariusandra

@mariusandra
Copy link
Collaborator Author

There's a real danger of concept-creep here. Jobs, tasks, workers, etc. If the terminology is that you control jobs through tasks, but only if they call other jobs, then it'll be hard to wrap your head around all of this. Basically, jobs and tasks are so interchangeable, I'd stick to just one of the two, and find an alternative term if we need the other one. In this case, keeping "jobs" as is, and instead having a "job controls" option or a "job pipeline" to control orchestration of tiny jobs... or something like that.

Thinking a bit ahead, what could we do to support any long running task, which the user could chunk into bits and pieces? Can we mapreduce the original payload and keep track of where we are? How would the entire process look like?

Could we abstract away towards something like this:

export const jobs = {
    async longRunningThing: ({ limit, offset, filters }, meta) => {
        try {
            await performNextBitOfTheWork({ limit, offset, filters }, meta)
        } catch (e) {
            throw new RetryError() // if legit error to retry
        }
    },
    async longRunningThingFromTheUI: ({ filters }, { jobs, global }) => {
        count total = await getWorkAmount(filters)
        await jobs.longRunningThing({ filters }).runBatchedPipeline({ startOffset: 0, limit: total, perBatch: 50 })
    },
    async cancelLongRunningThingFromTheUL: () => {
        await jobs.longRunningThing({ filters }).cancelPipeline(optionalPipelineId)
    },
}

Such a pipeline would be backed by a table like:

  • pluginConfigId: 42
  • jobName: "longRunningThing"
  • cursor: 0
  • total: 131231
  • stepCount: 50
  • state: "running"

... and would orchestrate itself to run as needed, surviving restarts, incrementing cursors, etc.

@mariusandra
Copy link
Collaborator Author

Here's the random code we were looking at while on call:

const fetchEvents = ({ limit, offset, filters }) => posthog.api.get('/api/events', { limit, offset, filters })
const getWorkAmount = (filters) => posthog.api.getTotal()
const exportEvent = (event) => posthog.capture(event.event, event.properties)

export function runEveryMinute({ cursor, jobs }) {
    if (!cursor.running()) {
        await jobs.longRunningThing({ filters }).runWithCursor({ cursor: cursor.last })
    }
}

export const jobsWithFixedRange = {
    longRunningThing: async ({ filters, cursor: { limit, offset } }) => {
        try {
            const events = await fetchEvents({ limit, offset, filters })
            await Promise.all(events.map(exportEvent))
        } catch (e) {
            throw new RetryError() // if legit error to retry
        }
    },
    longRunningThingFromTheUI: async ({ filters }, { jobs, utils: { cursor, geoip, getPerson } }) => {
        const total = await getWorkAmount(filters)
        await jobs.longRunningThing({ filters }).runWithCursor({ start: 0, total, step: 100 })
        // await jobs.longRunningThing({ filters }).runWithCursor({ start: 1629891534, end: 1836791534, step: 86400*1000 })
    },
}

export const jobsWithUnknownRange = {
    longRunningThing: async ({ filters, utils: { cursor } }) => {
        try {
            const events = await fetchEvents({ 
                filters, 
                where: cursor ? `timestamp > ${cursor.current}` : '',
                // orderBy: 'timestamp asc', 
                offset: 0,
                limit: 50,
            })

            await Promise.all(events.map(exportEvent))
            const lastTimestamp =  events[events.length - 1].timestamp
            if (lastTimestamp === cursor.current) {
                cursor.setNext([cursor.current[0], cursor.current[1] + 50])
            } else {
                cursor.setNext([lastTimestamp, 0])
            }
        } catch (e) {
            throw new RetryError() // if legit error to retry
        }
    },
    longRunningThingFromTheUI: async ({ filters }, { jobs, utils: { cursor, geoip, getPerson } }) => {
        const total = await getWorkAmount(filters)

        await jobs.longRunningThing({ filters }).runWithCursor({ cursor: [null, null] })
        // await jobs.longRunningThing({ filters }).runWithCursor({ start: 1629891534, end: 1836791534, step: 86400*1000 })
    },


    pause: (_, { utils: { cursor }})  => {
        await cursor.pauseAll()
    },
    stop: (_, { utils: { cursor }})  => {
        await cursor.stopAll()
    },

/* -- posthog_plugincursors
id: 0
pluginConfigId: 42
jobName: "longRunningThing"
cursor: 0
total: 131231
stepCount: 50
state: "running"
*/



    cancelLongRunningThingFromTheUI: async () => {
        await jobs.longRunningThing({ filters }).cancelPipeline(optionalPipelineId)
    },
}```

@tiina303 tiina303 transferred this issue from PostHog/plugin-server Nov 3, 2021
@posthog-bot
Copy link
Contributor

This issue hasn't seen activity in two years! If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in two weeks.

@posthog-bot
Copy link
Contributor

This issue was closed due to lack of activity. Feel free to reopen if it's still relevant.

@posthog-bot posthog-bot closed this as not planned Won't fix, can't repro, duplicate, stale Nov 20, 2023
@github-project-automation github-project-automation bot moved this to Done This Sprint in Extensibility Aug 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Done This Sprint
Development

No branches or pull requests

3 participants