diff --git a/README.md b/README.md index 12a564cf..deb9c38b 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,52 @@ await sql` ``` +## Cursor ```sql` `.cursor([rows = 1], fn) -> Promise``` + +Use cursors if you need to throttle the amount of rows being returned from a query. New results won't be requested until the promise / async callack function has resolved. + +```js + +await sql.cursor` + select * from generate_series(1,4) as x +`.cursor(row => { + // row = { x: 1 } + http.request('https://example.com/wat', { row }) +}) + +// No more rows + +``` + +A single row will be returned by default, but you can also request batches by setting the number of rows desired in each batch as the first argument. That is usefull if you can do work with the rows in parallel like in this example: + +```js + +await sql.cursor` + select * from generate_series(1,1000) as x +`.cursor(10, rows => { + // rows = [{ x: 1 }, { x: 2 }, ... ] + await Promise.all(rows.map(row => + http.request('https://example.com/wat', { row }) + )) +}) + +``` + +If an error is thrown inside the callback function no more rows will be requested and the promise will reject with the thrown error. + +You can also stop receiving any more rows early by returning an end token `sql.END` from the callback function. + +```js + +await sql.cursor` + select * from generate_series(1,1000) as x +`.cursor(row => { + return Math.random() > 0.9 && sql.END +}) + +``` + ## Listen and notify When you call listen, a dedicated connection will automatically be made to ensure that you receive notifications in real time. This connection will be used for any further calls to listen. Listen returns a promise which resolves once the `LISTEN` query to Postgres completes, or if there is already a listener active. diff --git a/lib/backend.js b/lib/backend.js index ea27acd6..108f7dc6 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -8,6 +8,8 @@ module.exports = Backend function Backend({ onparse, onparameter, + onsuspended, + oncomplete, parsers, onauth, onready, @@ -83,6 +85,8 @@ function Backend({ break } } + + oncomplete() } /* c8 ignore next 3 */ @@ -161,9 +165,10 @@ function Backend({ onparameter(k, v) } - /* c8 ignore next 3 */ function PortalSuspended() { - backend.error = errors.notSupported('PortalSuspended') + onsuspended(backend.query.result) + backend.query.result = [] + rows = 0 } /* c8 ignore next 3 */ diff --git a/lib/bytes.js b/lib/bytes.js index 5c947130..2fc78fa5 100644 --- a/lib/bytes.js +++ b/lib/bytes.js @@ -1,35 +1,18 @@ const size = 256 let buffer = Buffer.allocUnsafe(size) -const messages = { - B: 'B'.charCodeAt(0), - Q: 'Q'.charCodeAt(0), - P: 'P'.charCodeAt(0), - p: 'p'.charCodeAt(0) -} - -const b = { - i: 0, - B() { - buffer[0] = messages.B - b.i = 5 - return b - }, - Q() { - buffer[0] = messages.Q - b.i = 5 - return b - }, - P() { - buffer[0] = messages.P +const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S'].reduce((acc, x) => { + const v = x.charCodeAt(0) + acc[x] = () => { + buffer[0] = v b.i = 5 return b - }, - p() { - buffer[0] = messages.p - b.i = 5 - return b - }, + } + return acc +}, {}) + +const b = Object.assign(messages, { + i: 0, inc(x) { b.i += x return b @@ -69,7 +52,7 @@ const b = { buffer = Buffer.allocUnsafe(size) return out } -} +}) module.exports = b diff --git a/lib/connection.js b/lib/connection.js index b8ebdc7e..0fabac39 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -3,7 +3,7 @@ const tls = require('tls') const frontend = require('./frontend.js') const Backend = require('./backend.js') const Queue = require('./queue.js') -const { errors } = require('./types.js') +const { errors, END } = require('./types.js') module.exports = Connection @@ -42,6 +42,8 @@ function Connection(options = {}) { const backend = Backend({ onparse, onparameter, + onsuspended, + oncomplete, transform, parsers, onnotify, @@ -51,6 +53,23 @@ function Connection(options = {}) { error }) + function onsuspended(x) { + new Promise(r => r(backend.query.cursor( + backend.query.cursor.rows === 1 ? x[0] : x + ))).then(x => { + x === END + ? socket.write(frontend.Close()) + : socket.write(frontend.Execute(backend.query.cursor.rows)) + }).catch(err => { + socket.write(frontend.Close()) + backend.query.reject(err) + }) + } + + function oncomplete() { + backend.query.cursor && socket.write(frontend.Close()) + } + function onparse() { if (backend.query && backend.query.statement.sig) statements[backend.query.statement.sig] = backend.query.statement @@ -72,8 +91,7 @@ function Connection(options = {}) { ended = () => resolve(socket.end()) }) - if (!backend.query && queries.length === 0) - ended() + process.nextTick(() => ready && ended()) return promise } @@ -117,17 +135,23 @@ function Connection(options = {}) { function prepared(statement, args, query) { query.statement = statement - return frontend.Bind(statement.name, args) + return bind(query, args) } function prepare(sig, str, args, query) { query.statement = { name: sig ? 'p' + statement_id++ : '', sig } return Buffer.concat([ frontend.Parse(query.statement.name, str, args), - frontend.Bind(query.statement.name, args) + bind(query, args) ]) } + function bind(query, args) { + return query.cursor + ? frontend.Bind(query.statement.name, args, query.cursor.rows) + : frontend.Bind(query.statement.name, args) + } + function idle() { clearTimeout(timer) timer = setTimeout(socket.end, timeout * 1000) @@ -141,9 +165,6 @@ function Connection(options = {}) { backend.query = backend.error = null timeout && queries.length === 0 && idle() - if (queries.length === 0 && ended) - return ended() - if (!open) { messages.forEach(socket.write) messages = [] @@ -152,6 +173,7 @@ function Connection(options = {}) { backend.query = queries.shift() ready = !backend.query + ready && ended && ended() } function data(x) { @@ -246,7 +268,7 @@ function postgresSocket(options, { return Promise.resolve() }, end: () => { - return new Promise(r => socket && socket.end(r)) + return new Promise(r => socket ? socket.end(r) : r()) }, connect } diff --git a/lib/frontend.js b/lib/frontend.js index 11e4ca40..97d9c348 100644 --- a/lib/frontend.js +++ b/lib/frontend.js @@ -3,13 +3,12 @@ const bytes = require('./bytes.js') const { errors, entries } = require('./types.js') const N = String.fromCharCode(0) -const execute = bytes - .inc(5) - .str('D').i32(6).str('P').str(N) - .str('E').i32(9).z(5) - .str('H').i32(4) - .str('S').i32(4) - .end().slice(5) +const execute = Buffer.concat([ + bytes.D().str('P').str(N).end(), + bytes.E().str(N).i32(0).end(), + bytes.H().end(), + bytes.S().end() +]) const authNames = { 2 : 'KerberosV5', @@ -38,7 +37,9 @@ module.exports = { auth, Bind, Parse, - Query + Query, + Close, + Execute } function connect({ user, database, connection }) { @@ -140,7 +141,7 @@ function Query(x) { .end() } -function Bind(name, args) { +function Bind(name, args, rows = 0) { let prev bytes @@ -165,7 +166,13 @@ function Bind(name, args) { return Buffer.concat([ bytes.end(), - execute + rows + ? Buffer.concat([ + bytes.D().str('P').str(N).end(), + bytes.E().str(N).i32(rows).end(), + bytes.H().end() + ]) + : execute ]) } @@ -181,6 +188,20 @@ function Parse(name, str, args) { return bytes.end() } +function Execute(rows) { + return Buffer.concat([ + bytes.E().str(N).i32(rows).end(), + bytes.H().end() + ]) +} + +function Close() { + return Buffer.concat([ + bytes.C().str('P').str(N).end(), + bytes.S().end() + ]) +} + function md5(x) { return crypto.createHash('md5').update(x).digest('hex') } diff --git a/lib/index.js b/lib/index.js index b5d63a15..865dde47 100644 --- a/lib/index.js +++ b/lib/index.js @@ -14,7 +14,8 @@ const { toKebab, errors, escape, - types + types, + END } = require('./types.js') const notPromise = { @@ -171,7 +172,7 @@ function Postgres(a, b) { : fetchArrayTypes(connection).then(() => send(connection, query, xs, args)).catch(reject) }) - promise.stream = (fn) => (query.stream = fn, promise) + addMethods(promise, query) return promise } @@ -185,7 +186,7 @@ function Postgres(a, b) { function send(connection, query, xs, args) { connection - ? connection.send(query, query.raw ? parseRaw(xs, args) : parse(query, xs, args)) + ? process.nextTick(connection.send, query, query.raw ? parseRaw(xs, args) : parse(query, xs, args)) : queries.push({ query, xs, args }) } @@ -246,6 +247,7 @@ function Postgres(a, b) { function addTypes(sql, connection) { Object.assign(sql, { + END, types: {}, notify, unsafe, @@ -287,7 +289,7 @@ function Postgres(a, b) { }) }))).then(str => query(q, connection || getConnection(), str, args || [])) - promise.stream = fn => (q.stream = fn, promise) + addMethods(promise, q) return promise } @@ -297,6 +299,19 @@ function Postgres(a, b) { }) } + function addMethods(promise, query) { + promise.stream = (fn) => (query.stream = fn, promise) + promise.cursor = (rows, fn) => { + if (typeof rows === 'function') { + fn = rows + rows = 1 + } + fn.rows = rows + query.cursor = fn + return promise + } + } + function listen(channel, fn) { if (channel in listeners) { listeners[channel].push(fn) @@ -329,7 +344,7 @@ function Postgres(a, b) { return ended = Promise.all(all.map(c => c.destroy())).then(() => undefined) return ended = Promise.race([ - Promise.all(all.map(c => c.end())) + Promise.resolve(arrayTypesPromise).then(() => Promise.all(all.map(c => c.end()))) ].concat( timeout > 0 ? new Promise(r => destroy = setTimeout(() => (all.map(c => c.destroy()), r()), timeout * 1000)) diff --git a/lib/types.js b/lib/types.js index f413d0db..c698d517 100644 --- a/lib/types.js +++ b/lib/types.js @@ -56,6 +56,8 @@ const parsers = module.exports.parsers = defaultHandlers.parsers module.exports.entries = entries +module.exports.END = {} + module.exports.mergeUserTypes = function(types) { const user = typeHandlers(types || {}) return { diff --git a/tests/index.js b/tests/index.js index 78c03cd2..813bba23 100644 --- a/tests/index.js +++ b/tests/index.js @@ -413,10 +413,7 @@ t('Connection ended error', async() => { t('Connection end does not cancel query', async() => { const sql = postgres(options) - await sql`select 1` - const promise = sql`select 1 as x` - sql.end() return [1, (await promise)[0].x] @@ -771,6 +768,43 @@ t('Stream returns empty array', async() => { return [0, (await sql`select 1 as x`.stream(() => { /* noop */ })).length] }) +t('Cursor works', async() => { + const order = [] + await sql`select 1 as x union select 2 as x`.cursor(async (x) => { + order.push(x.x + 'a') + await new Promise(r => setTimeout(r, 100)) + order.push(x.x + 'b') + }) + return ['1a1b2a2b', order.join('')] +}) + +t('Cursor custom n works', async() => { + const order = [] + await sql`select * from generate_series(1,20)`.cursor(10, async (x) => { + order.push(x.length) + }) + return ['10,10', order.join(',')] +}) + +t('Cursor cancel works', async() => { + let result + await sql`select * from generate_series(1,10) as x`.cursor(async ({ x }) => { + result = x + return sql.END + }) + return [1, result] +}) + +t('Cursor throw works', async() => { + const order = [] + await sql`select 1 as x union select 2 as x`.cursor(async (x) => { + order.push(x.x + 'a') + await new Promise(r => setTimeout(r, 100)) + throw 'watty' + }).catch(() => order.push('err')) + return ['1aerr', order.join('')] +}) + t('Transform row', async() => { const sql = postgres({ ...options,