Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): require status code 204 in write response #264

Merged
merged 3 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 1.8.0 [unreleased]

### Bug Fixes

1. [#264](https://github.com/influxdata/influxdb-client-js/pull/264): Require 204 status code in a write response.

## 1.7.0 [2020-10-02]

### Features
Expand Down
23 changes: 20 additions & 3 deletions packages/core/src/impl/WriteApiImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
WriteOptions,
WritePrecisionType,
} from '../options'
import {Transport, SendOptions} from '../transport'
import {Transport, SendOptions, Headers} from '../transport'
import Logger from './Logger'
import {HttpError, RetryDelayStrategy} from '../errors'
import Point from '../Point'
Expand Down Expand Up @@ -132,7 +132,11 @@ export default class WriteApiImpl implements WriteApi, PointSettings {
const self: WriteApiImpl = this
if (!this.closed && lines.length > 0) {
return new Promise<void>((resolve, reject) => {
let responseStatusCode: number | undefined
this.transport.send(this.httpPath, lines.join('\n'), this.sendOptions, {
responseStarted(_headers: Headers, statusCode?: number): void {
responseStatusCode = statusCode
},
error(error: Error): void {
const failedAttempts = self.writeOptions.maxRetries + 2 - attempts
// call the writeFailed listener and check if we can retry
Expand Down Expand Up @@ -170,7 +174,19 @@ export default class WriteApiImpl implements WriteApi, PointSettings {
},
complete(): void {
self.retryStrategy.success()
resolve()
// older implementations of transport do not report status code
if (responseStatusCode == 204 || responseStatusCode == undefined) {
resolve()
} else {
const error = new HttpError(
responseStatusCode,
`204 HTTP response status code expected, but ${responseStatusCode} returned`,
undefined,
'0'
)
Logger.error(`Write to InfluxDB failed.`, error)
reject(error)
}
},
})
})
Expand Down Expand Up @@ -212,7 +228,8 @@ export default class WriteApiImpl implements WriteApi, PointSettings {
throw new Error('writeApi: already closed!')
}
for (let i = 0; i < points.length; i++) {
this.writePoint(points[i])
const line = points[i].toLineProtocol(this)
if (line) this.writeBuffer.add(line)
}
}
async flush(withRetryBuffer?: boolean): Promise<void> {
Expand Down
17 changes: 15 additions & 2 deletions packages/core/src/impl/browser/FetchTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {CLIENT_LIB_VERSION} from '../version'
export default class FetchTransport implements Transport {
chunkCombiner = pureJsChunkCombiner
private defaultHeaders: {[key: string]: string}
private url: string
constructor(private connectionOptions: ConnectionOptions) {
this.defaultHeaders = {
'content-type': 'application/json; charset=utf-8',
Expand All @@ -27,6 +28,18 @@ export default class FetchTransport implements Transport {
this.defaultHeaders['Authorization'] =
'Token ' + this.connectionOptions.token
}
this.url = String(this.connectionOptions.url)
if (this.url.endsWith('/')) {
this.url = this.url.substring(0, this.url.length - 1)
}
// https://github.com/influxdata/influxdb-client-js/issues/263
// don't allow /api/v2 suffix to avoid future problems
if (this.url.endsWith('/api/v2')) {
this.url = this.url.substring(0, this.url.length - '/api/v2'.length)
Logger.warn(
`Please remove '/api/v2' context path from InfluxDB base url, using ${this.url} !`
)
}
}
send(
path: string,
Expand Down Expand Up @@ -67,7 +80,7 @@ export default class FetchTransport implements Transport {
headers[key] = [previous, value]
}
})
observer.responseStarted(headers)
observer.responseStarted(headers, response.status)
}
if (response.status >= 300) {
return response
Expand Down Expand Up @@ -160,7 +173,7 @@ export default class FetchTransport implements Transport {
options: SendOptions
): Promise<Response> {
const {method, headers, ...other} = options
return fetch(`${this.connectionOptions.url}${path}`, {
return fetch(`${this.url}${path}`, {
method: method,
body:
method === 'GET' || method === 'HEAD'
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/impl/completeCommunicationObserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ export default function completeCommunicationObserver(
if (callbacks.complete) callbacks.complete()
}
},
responseStarted: (headers: Headers): void => {
if (callbacks.responseStarted) callbacks.responseStarted(headers)
responseStarted: (headers: Headers, statusCode?: number): void => {
if (callbacks.responseStarted)
callbacks.responseStarted(headers, statusCode)
},
}
return retVal
Expand Down
12 changes: 11 additions & 1 deletion packages/core/src/impl/node/NodeHttpTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import nodeChunkCombiner from './nodeChunkCombiner'
import zlib from 'zlib'
import completeCommunicationObserver from '../completeCommunicationObserver'
import {CLIENT_LIB_VERSION} from '../version'
import Logger from '../Logger'

const zlibOptions = {
flush: zlib.Z_SYNC_FLUSH,
Expand Down Expand Up @@ -67,6 +68,15 @@ export class NodeHttpTransport implements Transport {
this.contextPath.length - 1
)
}
// https://github.com/influxdata/influxdb-client-js/issues/263
// don't allow /api/v2 suffix to avoid future problems
if (this.contextPath == '/api/v2') {
Logger.warn(
`Please remove '/api/v2' context path from InfluxDB base url, using ${url.protocol}//${url.hostname}:${url.port} !`
)
this.contextPath = ''
}

if (url.protocol === 'http:') {
this.requestApi = http.request
} else if (url.protocol === 'https:') {
Expand Down Expand Up @@ -205,7 +215,7 @@ export class NodeHttpTransport implements Transport {
res.on('aborted', () => {
listeners.error(new AbortError())
})
listeners.responseStarted(res.headers)
listeners.responseStarted(res.headers, res.statusCode)
/* istanbul ignore next statusCode is optional in http.IncomingMessage */
const statusCode = res.statusCode ?? 600
const contentEncoding = res.headers['content-encoding']
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ export interface CommunicationObserver<T> {
/**
* Informs about a start of response processing.
* @param headers - response HTTP headers
* @param statusCode - response status code
*/
responseStarted?: (headers: Headers) => void
responseStarted?: (headers: Headers, statusCode?: number) => void
/**
* Setups cancelllable for this communication.
*/
Expand Down
28 changes: 25 additions & 3 deletions packages/core/test/unit/WriteApi.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {expect} from 'chai'
import nock from 'nock' // WARN: nock must be imported before NodeHttpTransport, since it modifies node's http
import {
ClientOptions,
HttpError,
WritePrecision,
WriteOptions,
Point,
Expand Down Expand Up @@ -123,7 +124,7 @@ describe('WriteApi', () => {
subject.writeRecord('test value=1')
subject.writeRecords(['test value=2', 'test value=3'])
// wait for http calls to finish
await new Promise(resolve => setTimeout(resolve, 10))
await new Promise(resolve => setTimeout(resolve, 20))
await subject.close().then(() => {
expect(logs.error).to.length(1)
expect(logs.warn).length(3) // 3 warnings about write failure
Expand Down Expand Up @@ -164,7 +165,7 @@ describe('WriteApi', () => {
it('uses the pre-configured batchSize', async () => {
useSubject({flushInterval: 0, maxRetries: 0, batchSize: 2})
subject.writeRecords(['test value=1', 'test value=2', 'test value=3'])
await new Promise(resolve => setTimeout(resolve, 10)) // wait for HTTP to finish
await new Promise(resolve => setTimeout(resolve, 20)) // wait for HTTP to finish
let count = subject.dispose()
expect(logs.error).to.length(1)
expect(logs.warn).to.length(0)
Expand Down Expand Up @@ -236,7 +237,7 @@ describe('WriteApi', () => {
return [429, '', {'retry-after': '1'}]
} else {
messages.push(_requestBody.toString())
return [200, '', {'retry-after': '1'}]
return [204, '', {'retry-after': '1'}]
}
})
.persist()
Expand All @@ -249,6 +250,7 @@ describe('WriteApi', () => {
await new Promise(resolve => setTimeout(resolve, 10)) // wait for background flush and HTTP to finish
expect(logs.error).to.length(0)
expect(logs.warn).to.length(1)
subject.writePoint(new Point()) // ignored, since it generates no line
subject.writePoints([
new Point('test'), // will be ignored + warning
new Point('test').floatField('value', 2),
Expand Down Expand Up @@ -284,5 +286,25 @@ describe('WriteApi', () => {
expect(lines[4]).to.be.equal('test,xtra=1 value=6 3000000')
expect(lines[5]).to.be.equal('test,xtra=1 value=7 false')
})
it('fails on write response status not being exactly 204', async () => {
// required because of https://github.com/influxdata/influxdb-client-js/issues/263
useSubject({flushInterval: 5, maxRetries: 0, batchSize: 10})
nock(clientOptions.url)
.post(WRITE_PATH_NS)
.reply((_uri, _requestBody) => {
return [200, '', {}]
})
.persist()
subject.writePoint(new Point('test').floatField('value', 1))
await new Promise(resolve => setTimeout(resolve, 20)) // wait for background flush and HTTP to finish
expect(logs.error).has.length(1)
expect(logs.error[0][0]).equals('Write to InfluxDB failed.')
expect(logs.error[0][1]).instanceOf(HttpError)
expect(logs.error[0][1].statusCode).equals(200)
expect(logs.error[0][1].statusMessage).equals(
`204 HTTP response status code expected, but 200 returned`
)
expect(logs.warn).deep.equals([])
})
})
})
39 changes: 36 additions & 3 deletions packages/core/test/unit/impl/browser/FetchTransport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@ import {removeFetchApi, emulateFetchApi} from './emulateBrowser'
import sinon from 'sinon'
import {CLIENT_LIB_VERSION} from '../../../../src/impl/version'
import {SendOptions, Cancellable} from '../../../../src'
import {CollectedLogs, collectLogging} from '../../../util'

describe('FetchTransport', () => {
afterEach(() => {
removeFetchApi()
})

describe('constructor', () => {
let logs: CollectedLogs
beforeEach(() => {
logs = collectLogging.replace()
})
afterEach(async () => {
collectLogging.after()
})
it('creates the transport with url', () => {
const options = {
url: 'http://test:8086',
Expand All @@ -35,6 +43,28 @@ describe('FetchTransport', () => {
})
expect(transport.connectionOptions).to.deep.equal(options)
})
it('ignore last slash / in url', () => {
const options = {
url: 'http://test:8086/',
token: 'a',
}
const transport: any = new FetchTransport(options)
expect(transport.url).equals('http://test:8086')
})
it('ignore /api/v2 suffix in url', () => {
const options = {
url: 'http://test:8086/api/v2',
token: 'a',
}
const transport: any = new FetchTransport(options)
expect(transport.url).equals('http://test:8086')
expect(logs.warn).is.deep.equal([
[
"Please remove '/api/v2' context path from InfluxDB base url, using http://test:8086 !",
undefined,
],
])
})
})
describe('request', () => {
const transport = new FetchTransport({url: 'http://test:8086'})
Expand Down Expand Up @@ -323,10 +353,13 @@ describe('FetchTransport', () => {
cancellable.cancel()
expect(cancellable.isCancelled()).is.equal(true)
}
if (url === 'error') {
expect(callbacks.responseStarted.callCount).equals(0)
} else {
expect(callbacks.responseStarted.callCount).equals(1)
expect(callbacks.responseStarted.args[0][1]).equals(status)
}
const isError = url === 'error' || status !== 200
expect(callbacks.responseStarted.callCount).equals(
url === 'error' ? 0 : 1
)
expect(callbacks.error.callCount).equals(isError ? 1 : 0)
expect(callbacks.complete.callCount).equals(isError ? 0 : 1)
const customNext = url.startsWith('customNext')
Expand Down
45 changes: 36 additions & 9 deletions packages/core/test/unit/impl/node/NodeHttpTransport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import sinon from 'sinon'
import {Readable} from 'stream'
import zlib from 'zlib'
import {CLIENT_LIB_VERSION} from '../../../../src/impl/version'
import {CollectedLogs, collectLogging} from '../../../util'

function sendTestData(
connectionOptions: ConnectionOptions,
Expand Down Expand Up @@ -41,6 +42,13 @@ const TEST_URL = 'http://test:8086'

describe('NodeHttpTransport', () => {
describe('constructor', () => {
let logs: CollectedLogs
beforeEach(() => {
logs = collectLogging.replace()
})
afterEach(async () => {
collectLogging.after()
})
it('creates the transport from http url', () => {
const transport: any = new NodeHttpTransport({
url: 'http://test:8086',
Expand Down Expand Up @@ -101,6 +109,19 @@ describe('NodeHttpTransport', () => {
})
).to.throw()
})
it('warn about unsupported /api/v2 context path', () => {
const transport: any = new NodeHttpTransport({
url: 'http://test:8086/api/v2',
})
// don;t use context path at all
expect(transport.contextPath).equals('')
expect(logs.warn).is.deep.equal([
[
"Please remove '/api/v2' context path from InfluxDB base url, using http://test:8086 !",
undefined,
],
])
})
})
describe('send', () => {
beforeEach(() => {
Expand Down Expand Up @@ -133,6 +154,7 @@ describe('NodeHttpTransport', () => {
const responseData = 'yes'
it(`works with options ${JSON.stringify(extras)}`, async () => {
const nextFn = sinon.fake()
const responseStartedFn = sinon.fake()
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(
() => reject(new Error('timeouted')),
Expand Down Expand Up @@ -183,9 +205,8 @@ describe('NodeHttpTransport', () => {
'',
{...extras, method: 'POST'},
{
next(data: any) {
nextFn(data)
},
responseStarted: responseStartedFn,
next: nextFn,
error(error: any) {
clearTimeout(timeout)
reject(new Error('No error expected!, but: ' + error))
Expand All @@ -202,16 +223,22 @@ describe('NodeHttpTransport', () => {
if (extras.cancel) {
cancellable.cancel()
}
})
.then(() => {
expect(nextFn.called)
}).then(
() => {
if (!extras.cancel) {
expect(nextFn.callCount).equals(1)
expect(responseStartedFn.callCount).equals(1)
expect(responseStartedFn.args[0][1]).equals(200)
expect(nextFn.args[0][0].toString()).to.equal(responseData)
} else {
expect(nextFn.callCount).equals(0)
expect(responseStartedFn.callCount).equals(0)
}
})
.catch(e => {
},
e => {
expect.fail(undefined, e, e.toString())
})
}
)
})
}
})
Expand Down