Skip to content

Commit

Permalink
Add optional onlisten function to listen
Browse files Browse the repository at this point in the history
  • Loading branch information
porsager committed Apr 6, 2022
1 parent 2ad65c4 commit 1dc2fd2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,10 @@ Do note that you can often achieve the same result using [`WITH` queries (Common
## Listen & notify
When you call `.listen`, a dedicated connection will be created to ensure that you receive notifications in real-time. This connection will be used for any further calls to `.listen`.
When you call `.listen`, a dedicated connection will be created to ensure that you receive notifications instantly. This connection will be used for any further calls to `.listen`. The connection will automatically reconnect according to a backoff reconnection pattern to not overload the database server.
`.listen` returns a promise which resolves once the `LISTEN` query to Postgres completes, or if there is already a listener active.
### Listen `await sql.listen(channel, onnotify, [onlisten]) -> { state }`
`.listen` takes the channel name, a function to handle each notify, and an optional function to run every time listen is registered and ready (happens on initial connect and reconnects). It returns a promise which resolves once the `LISTEN` query to Postgres completes, or if there is already a listener active.
```js
await sql.listen('news', payload => {
Expand All @@ -530,6 +531,20 @@ await sql.listen('news', payload => {
})
```
The optional `onlisten` method is great to use for a very simply queue mechanism:
```js
await sql.listen(
'jobs',
(x) => run(JSON.parse(x)),
( ) => sql`select unfinished_jobs()`.forEach(run)
)

function run(job) {
// And here you do the work you please
}
```
### Notify `await sql.notify(channel, payload) -> Result[]`
Notify can be done as usual in SQL, or by using the `sql.notify` method.
```js
sql.notify('news', JSON.stringify({ no: 'this', is: 'news' }))
Expand Down
20 changes: 12 additions & 8 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,34 +139,38 @@ function Postgres(a, b) {
}
}

async function listen(name, fn) {
async function listen(name, fn, onlisten) {
const listener = { fn, onlisten }

const sql = listen.sql || (listen.sql = Postgres({
...options,
max: 1,
idle_timeout: null,
max_lifetime: null,
fetch_types: false,
onclose() {
Object.entries(listen.channels).forEach(([channel, { listeners }]) => {
delete listen.channels[channel]
Promise.all(listeners.map(fn => listen(channel, fn).catch(() => { /* noop */ })))
Object.entries(listen.channels).forEach(([name, { listeners }]) => {
delete listen.channels[name]
Promise.all(listeners.map(l => listen(name, l.fn, l.onlisten).catch(() => { /* noop */ })))
})
},
onnotify(c, x) {
c in listen.channels && listen.channels[c].listeners.forEach(fn => fn(x))
c in listen.channels && listen.channels[c].listeners.forEach(l => l.fn(x))
}
}))

const channels = listen.channels || (listen.channels = {})
, exists = name in channels
, channel = exists ? channels[name] : (channels[name] = { listeners: [fn] })
, channel = exists ? channels[name] : (channels[name] = { listeners: [listener] })

if (exists) {
channel.listeners.push(fn)
channel.listeners.push(listener)
listener.onlisten && listener.onlisten()
return Promise.resolve({ ...channel.result, unlisten })
}

channel.result = await sql`listen ${ sql(name) }`
listener.onlisten && listener.onlisten()
channel.result.unlisten = unlisten

return channel.result
Expand All @@ -175,7 +179,7 @@ function Postgres(a, b) {
if (name in channels === false)
return

channel.listeners = channel.listeners.filter(x => x !== fn)
channel.listeners = channel.listeners.filter(x => x !== listener)
if (channels[name].listeners.length)
return

Expand Down
20 changes: 13 additions & 7 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -672,15 +672,21 @@ t('listen reconnects', { timeout: 2 }, async() => {
, a = new Promise(r => resolvers.a = r)
, b = new Promise(r => resolvers.b = r)

const { state: { pid } } = await sql.listen('test', x => x in resolvers && resolvers[x]())
let connects = 0

const { state: { pid } } = await sql.listen(
'test',
x => x in resolvers && resolvers[x](),
() => connects++
)
await sql.notify('test', 'a')
await a
await sql`select pg_terminate_backend(${ pid })`
await delay(100)
await sql.notify('test', 'b')
await b
sql.end()
return [true, true]
return [connects, 2]
})

t('listen result reports correct connection state after reconnection', async() => {
Expand Down Expand Up @@ -1545,12 +1551,12 @@ t('Multiple hosts', {
const x1 = await sql`select 1`
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
await s1`select pg_terminate_backend(${ x1.state.pid }::int)`
await delay(10)
await delay(50)

const x2 = await sql`select 1`
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
await s2`select pg_terminate_backend(${ x2.state.pid }::int)`
await delay(10)
await delay(50)

result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)

Expand Down Expand Up @@ -1764,11 +1770,11 @@ t('Cancel running query', async() => {
return ['57014', error.code]
})

t('Cancel piped query', { timeout: 1 }, async() => {
t('Cancel piped query', async() => {
await sql`select 1`
const last = sql`select pg_sleep(0.1)`.execute()
const last = sql`select pg_sleep(0.05)`.execute()
const query = sql`select pg_sleep(2) as dig`
setTimeout(() => query.cancel(), 50)
setTimeout(() => query.cancel(), 10)
const error = await query.catch(x => x)
await last
return ['57014', error.code]
Expand Down

0 comments on commit 1dc2fd2

Please sign in to comment.