Skip to content

Commit

Permalink
Merge pull request #253 from influxdata/queryApi/response_text
Browse files Browse the repository at this point in the history
feat(core/query): allow to receive the whole query response as text
  • Loading branch information
sranka authored Sep 9, 2020
2 parents ec924c6 + 9b32a55 commit 22a2fdd
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
1. [#238](https://github.com/influxdata/influxdb-client-js/pull/238): Respect context path in client's url option.
1. [#240](https://github.com/influxdata/influxdb-client-js/pull/240): Add helpers to let users choose how to deserialize dateTime:RFC3339 query response data type
1. [#250](https://github.com/influxdata/influxdb-client-js/pull/250): Simplify precision for WriteApi retrieval.
1. [#253](https://github.com/influxdata/influxdb-client-js/pull/253): Allow to simply receive the whole query response as a string.

### Bug Fixes

Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/QueryApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ export default interface QueryApi {
consumer: FluxResultObserver<string[]>
): void

/**
* QueryRaw executes a query and returns the full response as a string.
* Use with caution, a possibly huge stream is copied to memory.
*
* @param query - query
* @returns Promise of response text
*/
queryRaw(query: string | ParameterizedQuery): Promise<string>

/**
* CollectRows executes the query and collects all the results in the returned Promise.
* This method is suitable to collect simple results. Use with caution,
Expand Down
22 changes: 22 additions & 0 deletions packages/core/src/impl/QueryApiImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@ export class QueryApiImpl implements QueryApi {
})
}

queryRaw(query: string | ParameterizedQuery): Promise<string> {
const {org, type, gzip} = this.options
return this.transport.request(
`/api/v2/query?org=${encodeURIComponent(org)}`,
JSON.stringify(
this.decorateRequest({
query: query.toString(),
dialect: DEFAULT_dialect,
type,
})
),
{
method: 'POST',
headers: {
accept: 'text/csv',
'accept-encoding': gzip ? 'gzip' : 'identity',
'content-type': 'application/json; encoding=utf-8',
},
}
)
}

private createExecutor(query: string | ParameterizedQuery): QueryExecutor {
const {org, type, gzip} = this.options

Expand Down
19 changes: 7 additions & 12 deletions packages/core/src/impl/browser/FetchTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,8 @@ export default class FetchTransport implements Transport {
const {status, headers} = response
const responseContentType = headers.get('content-type') || ''

let data = undefined
try {
if (responseContentType.includes('json')) {
data = await response.json()
} else if (responseContentType.includes('text')) {
data = await response.text()
}
} catch (_e) {
// ignore
Logger.warn('Unable to read error body', _e)
}
if (status >= 300) {
let data = await response.text()
if (!data) {
const headerError = headers.get('x-influxdb-error')
if (headerError) {
Expand All @@ -143,7 +133,12 @@ export default class FetchTransport implements Transport {
response.headers.get('retry-after')
)
}
return data
const responseType = options.headers?.accept ?? responseContentType
if (responseType.includes('json')) {
return await response.json()
} else if (responseType.includes('text')) {
return await response.text()
}
}

private fetch(
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/impl/node/NodeHttpTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,11 @@ export class NodeHttpTransport implements Transport {
buffer = Buffer.concat([buffer, data])
},
complete: (): void => {
const responseType = options.headers?.accept ?? contentType
try {
if (contentType.includes('json')) {
if (responseType.includes('json')) {
resolve(JSON.parse(buffer.toString('utf8')))
} else if (contentType.includes('text')) {
} else if (responseType.includes('text')) {
resolve(buffer.toString('utf8'))
} else {
resolve(buffer)
Expand Down
15 changes: 7 additions & 8 deletions packages/core/test/integration/rxjs/QueryApi.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,13 @@ describe('RxJS QueryApi integration', () => {
})
.persist()

try {
await from(subject.rows('from(bucket:"my-bucket") |> range(start: 0)'))
.pipe(toArray())
.toPromise()
expect.fail('Server returned 500!')
} catch (_) {
// expected failure
}
await from(subject.rows('from(bucket:"my-bucket") |> range(start: 0)'))
.pipe(toArray())
.toPromise()
.then(
() => expect.fail('Server returned 500!'),
() => true // failure is expected
)
})
;[
['response2', undefined],
Expand Down
101 changes: 87 additions & 14 deletions packages/core/test/unit/QueryApi.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('QueryApi', () => {
nock.cleanAll()
nock.enableNetConnect()
})
it('receives raw lines', async () => {
it('receives lines', async () => {
const subject = new InfluxDB(clientOptions).getQueryApi(ORG).with({})
nock(clientOptions.url)
.post(QUERY_PATH)
Expand Down Expand Up @@ -209,7 +209,7 @@ describe('QueryApi', () => {
expect(body?.now).to.deep.equal(pair.now)
}
})
it('collectLines collects raw lines', async () => {
it('collectLines collects lines', async () => {
const subject = new InfluxDB(clientOptions).getQueryApi(ORG).with({})
nock(clientOptions.url)
.post(QUERY_PATH)
Expand Down Expand Up @@ -238,12 +238,12 @@ describe('QueryApi', () => {
]
})
.persist()
try {
await subject.collectLines('from(bucket:"my-bucket") |> range(start: 0)')
expect.fail('client error expected on server error')
} catch (e) {
// OK error is expected
}
await subject
.collectLines('from(bucket:"my-bucket") |> range(start: 0)')
.then(
() => expect.fail('client error expected on server error'),
() => true // failure is expected
)
})
it('collectRows collects rows', async () => {
const subject = new InfluxDB(clientOptions).getQueryApi(ORG).with({})
Expand Down Expand Up @@ -297,11 +297,84 @@ describe('QueryApi', () => {
]
})
.persist()
try {
await subject.collectRows('from(bucket:"my-bucket") |> range(start: 0)')
expect.fail('client error expected on server error')
} catch (e) {
// OK error is expected
}
await subject
.collectRows('from(bucket:"my-bucket") |> range(start: 0)')
.then(
() => expect.fail('client error expected on server error'),
() => true // error is expected
)
})
it('queryRaw returns the whole response text', async () => {
const subject = new InfluxDB(clientOptions).getQueryApi(ORG).with({})
const expected = fs
.readFileSync('test/fixture/query/simpleResponse.txt')
.toString()
nock(clientOptions.url)
.post(QUERY_PATH)
.reply((_uri, _requestBody) => {
return [200, expected, {'retry-after': '1', 'content-type': 'text/csv'}]
})
.persist()
const data = await subject.queryRaw(
'from(bucket:"my-bucket") |> range(start: 0)'
)
expect(data).equals(expected)
})
it('queryRaw returns the whole response even if response content type is not text', async () => {
const subject = new InfluxDB(clientOptions).getQueryApi(ORG).with({})
const expected = fs
.readFileSync('test/fixture/query/simpleResponse.txt')
.toString()
nock(clientOptions.url)
.post(QUERY_PATH)
.reply((_uri, _requestBody) => {
return [200, expected, {'retry-after': '1'}]
})
.persist()
const data = await subject.queryRaw(
'from(bucket:"my-bucket") |> range(start: 0)'
)
expect(data).equals(expected)
})
it('queryRaw returns the plain response text even it is gzip encoded', async () => {
const subject = new InfluxDB(clientOptions)
.getQueryApi(ORG)
.with({gzip: true})
nock(clientOptions.url)
.post(QUERY_PATH)
.reply((_uri, _requestBody) => {
return [
200,
fs
.createReadStream('test/fixture/query/simpleResponse.txt')
.pipe(zlib.createGzip()),
{'content-encoding': 'gzip', 'content-type': 'text/csv'},
]
})
.persist()
const data = await subject.queryRaw(
'from(bucket:"my-bucket") |> range(start: 0)'
)
const expected = fs
.readFileSync('test/fixture/query/simpleResponse.txt')
.toString()
expect(data).equals(expected)
})
it('queryRaw fails on server error', async () => {
const subject = new InfluxDB(clientOptions).getQueryApi(ORG).with({})
nock(clientOptions.url)
.post(QUERY_PATH)
.reply((_uri, _requestBody) => {
return [
500,
fs.createReadStream('test/fixture/query/simpleResponse.txt'),
{'retry-after': '1'},
]
})
.persist()
await subject.queryRaw('from(bucket:"my-bucket") |> range(start: 0)').then(
() => expect.fail('client error expected on server error'),
() => true // error is expected
)
})
})
77 changes: 48 additions & 29 deletions packages/core/test/unit/impl/browser/FetchTransport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,18 @@ describe('FetchTransport', () => {
})
expect(response).is.deep.equal('{}')
})
it('receives no data', async () => {
it('receives text data even if response is application/json', async () => {
emulateFetchApi({
headers: {'content-type': 'application/json; charset=utf-8'},
body: '{}',
})
const response = await transport.request('/whatever', '', {
method: 'GET',
headers: {accept: 'text/plain'},
})
expect(response).is.deep.equal('{}')
})
it('receives no data for HEAD method', async () => {
emulateFetchApi({
headers: {},
body: '{}',
Expand All @@ -71,31 +82,37 @@ describe('FetchTransport', () => {
})
expect(response).is.equal(undefined)
})
it('receives no data', async () => {
it('throws error when unable to read response body', async () => {
emulateFetchApi({
headers: {'content-type': 'text/plain'},
body: 'error',
})
const response = await transport.request('/whatever', '', {
method: 'POST',
})
expect(response).is.equal(undefined)
await transport
.request('/whatever', '', {
method: 'POST',
})
.then(
() => Promise.reject('client error expected'),
(e: any) =>
expect(e)
.property('message')
.equals('error data') //thrown by emulator
)
})
it('throws error', async () => {
emulateFetchApi({
headers: {'content-type': 'application/json'},
body: '{}',
status: 500,
})
try {
await transport.request('/whatever', '', {
await transport
.request('/whatever', '', {
method: 'GET',
})
expect.fail()
} catch (_e) {
// eslint-disable-next-line no-console
// console.log(` OK, received ${_e}`)
}
.then(
() => Promise.reject('client error expected'),
() => true // OK error
)
})
it('throws error with X-Influxdb-Error header body', async () => {
const message = 'this is a header message'
Expand All @@ -107,33 +124,35 @@ describe('FetchTransport', () => {
body: '',
status: 500,
})
try {
await transport.request('/whatever', '', {
await transport
.request('/whatever', '', {
method: 'GET',
})
expect.fail()
} catch (e) {
expect(e)
.property('body')
.equals(message)
}
.then(
() => Promise.reject('client error expected'),
(e: any) =>
expect(e)
.property('body')
.equals(message)
)
})
it('throws error with empty body', async () => {
emulateFetchApi({
headers: {'content-type': 'text/plain'},
body: '',
status: 500,
})
try {
await transport.request('/whatever', '', {
await transport
.request('/whatever', '', {
method: 'GET',
})
expect.fail()
} catch (e) {
expect(e)
.property('body')
.equals('')
}
.then(
() => Promise.reject('client error expected'),
(e: any) =>
expect(e)
.property('body')
.equals('')
)
})
})
describe('send', () => {
Expand Down
8 changes: 2 additions & 6 deletions packages/core/test/unit/impl/browser/emulateBrowser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ function createResponse({
}
if (typeof body === 'string') {
retVal.text = function(): Promise<string> {
if (typeof body === 'string') {
if (body === 'error') return Promise.reject(new Error('error data'))
return Promise.resolve(body)
} else {
return Promise.reject(new Error('String body expected, but ' + body))
}
if (body === 'error') return Promise.reject(new Error('error data'))
return Promise.resolve(body)
}
}
if (body instanceof Uint8Array) {
Expand Down
Loading

0 comments on commit 22a2fdd

Please sign in to comment.