Skip to content

Commit

Permalink
Add cursor support
Browse files Browse the repository at this point in the history
  • Loading branch information
porsager committed Feb 3, 2020
1 parent 295ac4f commit 4156c3f
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 57 deletions.
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,52 @@ await sql`

```

## Cursor ```sql` `.cursor([rows = 1], fn) -> Promise```

Use cursors if you need to throttle the amount of rows being returned from a query. New results won't be requested until the promise / async callack function has resolved.

```js

await sql.cursor`
select * from generate_series(1,4) as x
`.cursor(row => {
// row = { x: 1 }
http.request('https://example.com/wat', { row })
})

// No more rows

```

A single row will be returned by default, but you can also request batches by setting the number of rows desired in each batch as the first argument. That is usefull if you can do work with the rows in parallel like in this example:

```js

await sql.cursor`
select * from generate_series(1,1000) as x
`.cursor(10, rows => {
// rows = [{ x: 1 }, { x: 2 }, ... ]
await Promise.all(rows.map(row =>
http.request('https://example.com/wat', { row })
))
})

```

If an error is thrown inside the callback function no more rows will be requested and the promise will reject with the thrown error.

You can also stop receiving any more rows early by returning an end token `sql.END` from the callback function.

```js

await sql.cursor`
select * from generate_series(1,1000) as x
`.cursor(row => {
return Math.random() > 0.9 && sql.END
})

```

## Listen and notify

When you call listen, a dedicated connection will automatically be made to ensure that you receive notifications in real time. This connection will be used for any further calls to listen. Listen returns a promise which resolves once the `LISTEN` query to Postgres completes, or if there is already a listener active.
Expand Down
9 changes: 7 additions & 2 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ module.exports = Backend
function Backend({
onparse,
onparameter,
onsuspended,
oncomplete,
parsers,
onauth,
onready,
Expand Down Expand Up @@ -83,6 +85,8 @@ function Backend({
break
}
}

oncomplete()
}

/* c8 ignore next 3 */
Expand Down Expand Up @@ -161,9 +165,10 @@ function Backend({
onparameter(k, v)
}

/* c8 ignore next 3 */
function PortalSuspended() {
backend.error = errors.notSupported('PortalSuspended')
onsuspended(backend.query.result)
backend.query.result = []
rows = 0
}

/* c8 ignore next 3 */
Expand Down
39 changes: 11 additions & 28 deletions lib/bytes.js
Original file line number Diff line number Diff line change
@@ -1,35 +1,18 @@
const size = 256
let buffer = Buffer.allocUnsafe(size)

const messages = {
B: 'B'.charCodeAt(0),
Q: 'Q'.charCodeAt(0),
P: 'P'.charCodeAt(0),
p: 'p'.charCodeAt(0)
}

const b = {
i: 0,
B() {
buffer[0] = messages.B
b.i = 5
return b
},
Q() {
buffer[0] = messages.Q
b.i = 5
return b
},
P() {
buffer[0] = messages.P
const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S'].reduce((acc, x) => {
const v = x.charCodeAt(0)
acc[x] = () => {
buffer[0] = v
b.i = 5
return b
},
p() {
buffer[0] = messages.p
b.i = 5
return b
},
}
return acc
}, {})

const b = Object.assign(messages, {
i: 0,
inc(x) {
b.i += x
return b
Expand Down Expand Up @@ -69,7 +52,7 @@ const b = {
buffer = Buffer.allocUnsafe(size)
return out
}
}
})

module.exports = b

Expand Down
40 changes: 31 additions & 9 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const tls = require('tls')
const frontend = require('./frontend.js')
const Backend = require('./backend.js')
const Queue = require('./queue.js')
const { errors } = require('./types.js')
const { errors, END } = require('./types.js')

module.exports = Connection

Expand Down Expand Up @@ -42,6 +42,8 @@ function Connection(options = {}) {
const backend = Backend({
onparse,
onparameter,
onsuspended,
oncomplete,
transform,
parsers,
onnotify,
Expand All @@ -51,6 +53,23 @@ function Connection(options = {}) {
error
})

function onsuspended(x) {
new Promise(r => r(backend.query.cursor(
backend.query.cursor.rows === 1 ? x[0] : x
))).then(x => {
x === END
? socket.write(frontend.Close())
: socket.write(frontend.Execute(backend.query.cursor.rows))
}).catch(err => {
socket.write(frontend.Close())
backend.query.reject(err)
})
}

function oncomplete() {
backend.query.cursor && socket.write(frontend.Close())
}

function onparse() {
if (backend.query && backend.query.statement.sig)
statements[backend.query.statement.sig] = backend.query.statement
Expand All @@ -72,8 +91,7 @@ function Connection(options = {}) {
ended = () => resolve(socket.end())
})

if (!backend.query && queries.length === 0)
ended()
process.nextTick(() => ready && ended())

return promise
}
Expand Down Expand Up @@ -117,17 +135,23 @@ function Connection(options = {}) {

function prepared(statement, args, query) {
query.statement = statement
return frontend.Bind(statement.name, args)
return bind(query, args)
}

function prepare(sig, str, args, query) {
query.statement = { name: sig ? 'p' + statement_id++ : '', sig }
return Buffer.concat([
frontend.Parse(query.statement.name, str, args),
frontend.Bind(query.statement.name, args)
bind(query, args)
])
}

function bind(query, args) {
return query.cursor
? frontend.Bind(query.statement.name, args, query.cursor.rows)
: frontend.Bind(query.statement.name, args)
}

function idle() {
clearTimeout(timer)
timer = setTimeout(socket.end, timeout * 1000)
Expand All @@ -141,9 +165,6 @@ function Connection(options = {}) {
backend.query = backend.error = null
timeout && queries.length === 0 && idle()

if (queries.length === 0 && ended)
return ended()

if (!open) {
messages.forEach(socket.write)
messages = []
Expand All @@ -152,6 +173,7 @@ function Connection(options = {}) {

backend.query = queries.shift()
ready = !backend.query
ready && ended && ended()
}

function data(x) {
Expand Down Expand Up @@ -246,7 +268,7 @@ function postgresSocket(options, {
return Promise.resolve()
},
end: () => {
return new Promise(r => socket && socket.end(r))
return new Promise(r => socket ? socket.end(r) : r())
},
connect
}
Expand Down
41 changes: 31 additions & 10 deletions lib/frontend.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ const bytes = require('./bytes.js')
const { errors, entries } = require('./types.js')

const N = String.fromCharCode(0)
const execute = bytes
.inc(5)
.str('D').i32(6).str('P').str(N)
.str('E').i32(9).z(5)
.str('H').i32(4)
.str('S').i32(4)
.end().slice(5)
const execute = Buffer.concat([
bytes.D().str('P').str(N).end(),
bytes.E().str(N).i32(0).end(),
bytes.H().end(),
bytes.S().end()
])

const authNames = {
2 : 'KerberosV5',
Expand Down Expand Up @@ -38,7 +37,9 @@ module.exports = {
auth,
Bind,
Parse,
Query
Query,
Close,
Execute
}

function connect({ user, database, connection }) {
Expand Down Expand Up @@ -140,7 +141,7 @@ function Query(x) {
.end()
}

function Bind(name, args) {
function Bind(name, args, rows = 0) {
let prev

bytes
Expand All @@ -165,7 +166,13 @@ function Bind(name, args) {

return Buffer.concat([
bytes.end(),
execute
rows
? Buffer.concat([
bytes.D().str('P').str(N).end(),
bytes.E().str(N).i32(rows).end(),
bytes.H().end()
])
: execute
])
}

Expand All @@ -181,6 +188,20 @@ function Parse(name, str, args) {
return bytes.end()
}

function Execute(rows) {
return Buffer.concat([
bytes.E().str(N).i32(rows).end(),
bytes.H().end()
])
}

function Close() {
return Buffer.concat([
bytes.C().str('P').str(N).end(),
bytes.S().end()
])
}

function md5(x) {
return crypto.createHash('md5').update(x).digest('hex')
}
Expand Down
25 changes: 20 additions & 5 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ const {
toKebab,
errors,
escape,
types
types,
END
} = require('./types.js')

const notPromise = {
Expand Down Expand Up @@ -171,7 +172,7 @@ function Postgres(a, b) {
: fetchArrayTypes(connection).then(() => send(connection, query, xs, args)).catch(reject)
})

promise.stream = (fn) => (query.stream = fn, promise)
addMethods(promise, query)

return promise
}
Expand All @@ -185,7 +186,7 @@ function Postgres(a, b) {

function send(connection, query, xs, args) {
connection
? connection.send(query, query.raw ? parseRaw(xs, args) : parse(query, xs, args))
? process.nextTick(connection.send, query, query.raw ? parseRaw(xs, args) : parse(query, xs, args))
: queries.push({ query, xs, args })
}

Expand Down Expand Up @@ -246,6 +247,7 @@ function Postgres(a, b) {

function addTypes(sql, connection) {
Object.assign(sql, {
END,
types: {},
notify,
unsafe,
Expand Down Expand Up @@ -287,7 +289,7 @@ function Postgres(a, b) {
})
}))).then(str => query(q, connection || getConnection(), str, args || []))

promise.stream = fn => (q.stream = fn, promise)
addMethods(promise, q)

return promise
}
Expand All @@ -297,6 +299,19 @@ function Postgres(a, b) {
})
}

function addMethods(promise, query) {
promise.stream = (fn) => (query.stream = fn, promise)
promise.cursor = (rows, fn) => {
if (typeof rows === 'function') {
fn = rows
rows = 1
}
fn.rows = rows
query.cursor = fn
return promise
}
}

function listen(channel, fn) {
if (channel in listeners) {
listeners[channel].push(fn)
Expand Down Expand Up @@ -329,7 +344,7 @@ function Postgres(a, b) {
return ended = Promise.all(all.map(c => c.destroy())).then(() => undefined)

return ended = Promise.race([
Promise.all(all.map(c => c.end()))
Promise.resolve(arrayTypesPromise).then(() => Promise.all(all.map(c => c.end())))
].concat(
timeout > 0
? new Promise(r => destroy = setTimeout(() => (all.map(c => c.destroy()), r()), timeout * 1000))
Expand Down
Loading

0 comments on commit 4156c3f

Please sign in to comment.