Skip to content

Commit

Permalink
Merge pull request #264 from influxdata/263/check_write_response
Browse files Browse the repository at this point in the history
fix(core): require status code 204 in write response
  • Loading branch information
sranka authored Oct 9, 2020
2 parents 500013c + 54f44a8 commit f8e0eaa
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 1.8.0 [unreleased]

### Bug Fixes

1. [#264](https://github.com/influxdata/influxdb-client-js/pull/264): Require 204 status code in a write response.

## 1.7.0 [2020-10-02]

### Features
Expand Down
23 changes: 20 additions & 3 deletions packages/core/src/impl/WriteApiImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
WriteOptions,
WritePrecisionType,
} from '../options'
import {Transport, SendOptions} from '../transport'
import {Transport, SendOptions, Headers} from '../transport'
import Logger from './Logger'
import {HttpError, RetryDelayStrategy} from '../errors'
import Point from '../Point'
Expand Down Expand Up @@ -132,7 +132,11 @@ export default class WriteApiImpl implements WriteApi, PointSettings {
const self: WriteApiImpl = this
if (!this.closed && lines.length > 0) {
return new Promise<void>((resolve, reject) => {
let responseStatusCode: number | undefined
this.transport.send(this.httpPath, lines.join('\n'), this.sendOptions, {
responseStarted(_headers: Headers, statusCode?: number): void {
responseStatusCode = statusCode
},
error(error: Error): void {
const failedAttempts = self.writeOptions.maxRetries + 2 - attempts
// call the writeFailed listener and check if we can retry
Expand Down Expand Up @@ -170,7 +174,19 @@ export default class WriteApiImpl implements WriteApi, PointSettings {
},
complete(): void {
self.retryStrategy.success()
resolve()
// older implementations of transport do not report status code
if (responseStatusCode == 204 || responseStatusCode == undefined) {
resolve()
} else {
const error = new HttpError(
responseStatusCode,
`204 HTTP response status code expected, but ${responseStatusCode} returned`,
undefined,
'0'
)
Logger.error(`Write to InfluxDB failed.`, error)
reject(error)
}
},
})
})
Expand Down Expand Up @@ -212,7 +228,8 @@ export default class WriteApiImpl implements WriteApi, PointSettings {
throw new Error('writeApi: already closed!')
}
for (let i = 0; i < points.length; i++) {
this.writePoint(points[i])
const line = points[i].toLineProtocol(this)
if (line) this.writeBuffer.add(line)
}
}
async flush(withRetryBuffer?: boolean): Promise<void> {
Expand Down
17 changes: 15 additions & 2 deletions packages/core/src/impl/browser/FetchTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {CLIENT_LIB_VERSION} from '../version'
export default class FetchTransport implements Transport {
chunkCombiner = pureJsChunkCombiner
private defaultHeaders: {[key: string]: string}
private url: string
constructor(private connectionOptions: ConnectionOptions) {
this.defaultHeaders = {
'content-type': 'application/json; charset=utf-8',
Expand All @@ -27,6 +28,18 @@ export default class FetchTransport implements Transport {
this.defaultHeaders['Authorization'] =
'Token ' + this.connectionOptions.token
}
this.url = String(this.connectionOptions.url)
if (this.url.endsWith('/')) {
this.url = this.url.substring(0, this.url.length - 1)
}
// https://github.com/influxdata/influxdb-client-js/issues/263
// don't allow /api/v2 suffix to avoid future problems
if (this.url.endsWith('/api/v2')) {
this.url = this.url.substring(0, this.url.length - '/api/v2'.length)
Logger.warn(
`Please remove '/api/v2' context path from InfluxDB base url, using ${this.url} !`
)
}
}
send(
path: string,
Expand Down Expand Up @@ -67,7 +80,7 @@ export default class FetchTransport implements Transport {
headers[key] = [previous, value]
}
})
observer.responseStarted(headers)
observer.responseStarted(headers, response.status)
}
if (response.status >= 300) {
return response
Expand Down Expand Up @@ -160,7 +173,7 @@ export default class FetchTransport implements Transport {
options: SendOptions
): Promise<Response> {
const {method, headers, ...other} = options
return fetch(`${this.connectionOptions.url}${path}`, {
return fetch(`${this.url}${path}`, {
method: method,
body:
method === 'GET' || method === 'HEAD'
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/impl/completeCommunicationObserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ export default function completeCommunicationObserver(
if (callbacks.complete) callbacks.complete()
}
},
responseStarted: (headers: Headers): void => {
if (callbacks.responseStarted) callbacks.responseStarted(headers)
responseStarted: (headers: Headers, statusCode?: number): void => {
if (callbacks.responseStarted)
callbacks.responseStarted(headers, statusCode)
},
}
return retVal
Expand Down
12 changes: 11 additions & 1 deletion packages/core/src/impl/node/NodeHttpTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import nodeChunkCombiner from './nodeChunkCombiner'
import zlib from 'zlib'
import completeCommunicationObserver from '../completeCommunicationObserver'
import {CLIENT_LIB_VERSION} from '../version'
import Logger from '../Logger'

const zlibOptions = {
flush: zlib.Z_SYNC_FLUSH,
Expand Down Expand Up @@ -67,6 +68,15 @@ export class NodeHttpTransport implements Transport {
this.contextPath.length - 1
)
}
// https://github.com/influxdata/influxdb-client-js/issues/263
// don't allow /api/v2 suffix to avoid future problems
if (this.contextPath == '/api/v2') {
Logger.warn(
`Please remove '/api/v2' context path from InfluxDB base url, using ${url.protocol}//${url.hostname}:${url.port} !`
)
this.contextPath = ''
}

if (url.protocol === 'http:') {
this.requestApi = http.request
} else if (url.protocol === 'https:') {
Expand Down Expand Up @@ -205,7 +215,7 @@ export class NodeHttpTransport implements Transport {
res.on('aborted', () => {
listeners.error(new AbortError())
})
listeners.responseStarted(res.headers)
listeners.responseStarted(res.headers, res.statusCode)
/* istanbul ignore next statusCode is optional in http.IncomingMessage */
const statusCode = res.statusCode ?? 600
const contentEncoding = res.headers['content-encoding']
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ export interface CommunicationObserver<T> {
/**
* Informs about a start of response processing.
* @param headers - response HTTP headers
* @param statusCode - response status code
*/
responseStarted?: (headers: Headers) => void
responseStarted?: (headers: Headers, statusCode?: number) => void
/**
* Setups cancelllable for this communication.
*/
Expand Down
28 changes: 25 additions & 3 deletions packages/core/test/unit/WriteApi.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {expect} from 'chai'
import nock from 'nock' // WARN: nock must be imported before NodeHttpTransport, since it modifies node's http
import {
ClientOptions,
HttpError,
WritePrecision,
WriteOptions,
Point,
Expand Down Expand Up @@ -123,7 +124,7 @@ describe('WriteApi', () => {
subject.writeRecord('test value=1')
subject.writeRecords(['test value=2', 'test value=3'])
// wait for http calls to finish
await new Promise(resolve => setTimeout(resolve, 10))
await new Promise(resolve => setTimeout(resolve, 20))
await subject.close().then(() => {
expect(logs.error).to.length(1)
expect(logs.warn).length(3) // 3 warnings about write failure
Expand Down Expand Up @@ -164,7 +165,7 @@ describe('WriteApi', () => {
it('uses the pre-configured batchSize', async () => {
useSubject({flushInterval: 0, maxRetries: 0, batchSize: 2})
subject.writeRecords(['test value=1', 'test value=2', 'test value=3'])
await new Promise(resolve => setTimeout(resolve, 10)) // wait for HTTP to finish
await new Promise(resolve => setTimeout(resolve, 20)) // wait for HTTP to finish
let count = subject.dispose()
expect(logs.error).to.length(1)
expect(logs.warn).to.length(0)
Expand Down Expand Up @@ -236,7 +237,7 @@ describe('WriteApi', () => {
return [429, '', {'retry-after': '1'}]
} else {
messages.push(_requestBody.toString())
return [200, '', {'retry-after': '1'}]
return [204, '', {'retry-after': '1'}]
}
})
.persist()
Expand All @@ -249,6 +250,7 @@ describe('WriteApi', () => {
await new Promise(resolve => setTimeout(resolve, 10)) // wait for background flush and HTTP to finish
expect(logs.error).to.length(0)
expect(logs.warn).to.length(1)
subject.writePoint(new Point()) // ignored, since it generates no line
subject.writePoints([
new Point('test'), // will be ignored + warning
new Point('test').floatField('value', 2),
Expand Down Expand Up @@ -284,5 +286,25 @@ describe('WriteApi', () => {
expect(lines[4]).to.be.equal('test,xtra=1 value=6 3000000')
expect(lines[5]).to.be.equal('test,xtra=1 value=7 false')
})
it('fails on write response status not being exactly 204', async () => {
// required because of https://github.com/influxdata/influxdb-client-js/issues/263
useSubject({flushInterval: 5, maxRetries: 0, batchSize: 10})
nock(clientOptions.url)
.post(WRITE_PATH_NS)
.reply((_uri, _requestBody) => {
return [200, '', {}]
})
.persist()
subject.writePoint(new Point('test').floatField('value', 1))
await new Promise(resolve => setTimeout(resolve, 20)) // wait for background flush and HTTP to finish
expect(logs.error).has.length(1)
expect(logs.error[0][0]).equals('Write to InfluxDB failed.')
expect(logs.error[0][1]).instanceOf(HttpError)
expect(logs.error[0][1].statusCode).equals(200)
expect(logs.error[0][1].statusMessage).equals(
`204 HTTP response status code expected, but 200 returned`
)
expect(logs.warn).deep.equals([])
})
})
})
39 changes: 36 additions & 3 deletions packages/core/test/unit/impl/browser/FetchTransport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@ import {removeFetchApi, emulateFetchApi} from './emulateBrowser'
import sinon from 'sinon'
import {CLIENT_LIB_VERSION} from '../../../../src/impl/version'
import {SendOptions, Cancellable} from '../../../../src'
import {CollectedLogs, collectLogging} from '../../../util'

describe('FetchTransport', () => {
afterEach(() => {
removeFetchApi()
})

describe('constructor', () => {
let logs: CollectedLogs
beforeEach(() => {
logs = collectLogging.replace()
})
afterEach(async () => {
collectLogging.after()
})
it('creates the transport with url', () => {
const options = {
url: 'http://test:8086',
Expand All @@ -35,6 +43,28 @@ describe('FetchTransport', () => {
})
expect(transport.connectionOptions).to.deep.equal(options)
})
it('ignore last slash / in url', () => {
const options = {
url: 'http://test:8086/',
token: 'a',
}
const transport: any = new FetchTransport(options)
expect(transport.url).equals('http://test:8086')
})
it('ignore /api/v2 suffix in url', () => {
const options = {
url: 'http://test:8086/api/v2',
token: 'a',
}
const transport: any = new FetchTransport(options)
expect(transport.url).equals('http://test:8086')
expect(logs.warn).is.deep.equal([
[
"Please remove '/api/v2' context path from InfluxDB base url, using http://test:8086 !",
undefined,
],
])
})
})
describe('request', () => {
const transport = new FetchTransport({url: 'http://test:8086'})
Expand Down Expand Up @@ -323,10 +353,13 @@ describe('FetchTransport', () => {
cancellable.cancel()
expect(cancellable.isCancelled()).is.equal(true)
}
if (url === 'error') {
expect(callbacks.responseStarted.callCount).equals(0)
} else {
expect(callbacks.responseStarted.callCount).equals(1)
expect(callbacks.responseStarted.args[0][1]).equals(status)
}
const isError = url === 'error' || status !== 200
expect(callbacks.responseStarted.callCount).equals(
url === 'error' ? 0 : 1
)
expect(callbacks.error.callCount).equals(isError ? 1 : 0)
expect(callbacks.complete.callCount).equals(isError ? 0 : 1)
const customNext = url.startsWith('customNext')
Expand Down
45 changes: 36 additions & 9 deletions packages/core/test/unit/impl/node/NodeHttpTransport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import sinon from 'sinon'
import {Readable} from 'stream'
import zlib from 'zlib'
import {CLIENT_LIB_VERSION} from '../../../../src/impl/version'
import {CollectedLogs, collectLogging} from '../../../util'

function sendTestData(
connectionOptions: ConnectionOptions,
Expand Down Expand Up @@ -41,6 +42,13 @@ const TEST_URL = 'http://test:8086'

describe('NodeHttpTransport', () => {
describe('constructor', () => {
let logs: CollectedLogs
beforeEach(() => {
logs = collectLogging.replace()
})
afterEach(async () => {
collectLogging.after()
})
it('creates the transport from http url', () => {
const transport: any = new NodeHttpTransport({
url: 'http://test:8086',
Expand Down Expand Up @@ -101,6 +109,19 @@ describe('NodeHttpTransport', () => {
})
).to.throw()
})
it('warn about unsupported /api/v2 context path', () => {
const transport: any = new NodeHttpTransport({
url: 'http://test:8086/api/v2',
})
// don;t use context path at all
expect(transport.contextPath).equals('')
expect(logs.warn).is.deep.equal([
[
"Please remove '/api/v2' context path from InfluxDB base url, using http://test:8086 !",
undefined,
],
])
})
})
describe('send', () => {
beforeEach(() => {
Expand Down Expand Up @@ -133,6 +154,7 @@ describe('NodeHttpTransport', () => {
const responseData = 'yes'
it(`works with options ${JSON.stringify(extras)}`, async () => {
const nextFn = sinon.fake()
const responseStartedFn = sinon.fake()
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(
() => reject(new Error('timeouted')),
Expand Down Expand Up @@ -183,9 +205,8 @@ describe('NodeHttpTransport', () => {
'',
{...extras, method: 'POST'},
{
next(data: any) {
nextFn(data)
},
responseStarted: responseStartedFn,
next: nextFn,
error(error: any) {
clearTimeout(timeout)
reject(new Error('No error expected!, but: ' + error))
Expand All @@ -202,16 +223,22 @@ describe('NodeHttpTransport', () => {
if (extras.cancel) {
cancellable.cancel()
}
})
.then(() => {
expect(nextFn.called)
}).then(
() => {
if (!extras.cancel) {
expect(nextFn.callCount).equals(1)
expect(responseStartedFn.callCount).equals(1)
expect(responseStartedFn.args[0][1]).equals(200)
expect(nextFn.args[0][0].toString()).to.equal(responseData)
} else {
expect(nextFn.callCount).equals(0)
expect(responseStartedFn.callCount).equals(0)
}
})
.catch(e => {
},
e => {
expect.fail(undefined, e, e.toString())
})
}
)
})
}
})
Expand Down

0 comments on commit f8e0eaa

Please sign in to comment.