diff --git a/caching_store_wrapper.js b/caching_store_wrapper.js index d7eff5e..8151ffd 100644 --- a/caching_store_wrapper.js +++ b/caching_store_wrapper.js @@ -60,12 +60,13 @@ const initializedKey = '$checkedInit'; in the specified order. The store should delete any obsolete items only after writing all of the items provided. */ -function CachingStoreWrapper(underlyingStore, ttl) { +function CachingStoreWrapper(underlyingStore, ttl, description) { const cache = ttl ? new NodeCache({ stdTTL: ttl }) : null; const queue = new UpdateQueue(); let initialized = false; this.underlyingStore = underlyingStore; + this.description = description; this.init = (allData, cb) => { queue.enqueue(cb => { diff --git a/diagnostic_events.js b/diagnostic_events.js new file mode 100644 index 0000000..a10b1dc --- /dev/null +++ b/diagnostic_events.js @@ -0,0 +1,119 @@ +const os = require('os'); +const uuidv4 = require('uuid/v4'); +const configuration = require('./configuration'); +const packageJson = require('./package.json'); + +// An object that maintains information that will go into diagnostic events, and knows how to format +// those events. It is instantiated by the SDK client, and shared with the event processor. +function DiagnosticsManager(config, diagnosticId, startTime) { + let dataSinceDate = startTime; + const acc = {}; + + // Creates the initial event that is sent by the event processor when the SDK starts up. This will not + // be repeated during the lifetime of the SDK client. + acc.createInitEvent = () => ({ + kind: 'diagnostic-init', + id: diagnosticId, + creationDate: startTime, + sdk: makeSdkData(config), + configuration: makeConfigData(config), + platform: makePlatformData() + }); + + // Creates a periodic event containing time-dependent stats, and resets the state of the manager with + // regard to those stats. + // Note: the reason droppedEvents, deduplicatedUsers, and eventsInQueue are passed into this function, + // instead of being properties of the DiagnosticsManager, is that the event processor is the one who's + // calling this function and is also the one who's tracking those stats. + acc.createStatsEventAndReset = (droppedEvents, deduplicatedUsers, eventsInQueue) => { + const currentTime = new Date().getTime(); + const ret = { + kind: 'diagnostic', + id: diagnosticId, + creationDate: currentTime, + dataSinceDate, + droppedEvents, + deduplicatedUsers, + eventsInQueue + }; + dataSinceDate = currentTime; + return ret; + }; + + return acc; +} + +function DiagnosticId(sdkKey) { + const ret = { + diagnosticId: uuidv4() + }; + if (sdkKey) { + ret.sdkKeySuffix = sdkKey.length > 6 ? sdkKey.substring(sdkKey.length - 6) : sdkKey; + } + return ret; +} + +function makeSdkData(config) { + const sdkData = { + name: 'node-server-sdk', + version: packageJson.version + }; + if (config.wrapperName) { + sdkData.wrapperName = config.wrapperName; + } + if (config.wrapperVersion) { + sdkData.wrapperVersion = config.wrapperVersion; + } + return sdkData; +} + +function makeConfigData(config) { + const defaults = configuration.defaults(); + const secondsToMillis = sec => Math.trunc(sec * 1000); + + const configData = { + customBaseURI: config.baseUri !== defaults.baseUri, + customStreamURI: config.streamUri !== defaults.streamUri, + customEventsURI: config.eventsUri !== defaults.eventsUri, + eventsCapacity: config.capacity, + connectTimeoutMillis: secondsToMillis(config.timeout), + socketTimeoutMillis: secondsToMillis(config.timeout), // Node doesn't distinguish between these two kinds of timeouts + eventsFlushIntervalMillis: secondsToMillis(config.flushInterval), + pollingIntervalMillis: secondsToMillis(config.pollInterval), + // startWaitMillis: n/a (Node SDK does not have this feature) + // samplingInterval: n/a (Node SDK does not have this feature) + reconnectTimeMillis: 1000, // hard-coded in eventsource.js + streamingDisabled: !config.stream, + usingRelayDaemon: !!config.useLdd, + offline: !!config.offline, + allAttributesPrivate: !!config.allAttributesPrivate, + inlineUsersInEvents: !!config.inlineUsersInEvents, + userKeysCapacity: config.userKeysCapacity, + userKeysFlushIntervalMillis: secondsToMillis(config.userKeysFlushInterval), + usingProxy: !!(config.proxyAgent || config.proxyHost), + usingProxyAuthenticator: !!config.proxyAuth, + diagnosticRecordingIntervalMillis: secondsToMillis(config.diagnosticRecordingInterval) + }; + if (config.featureStore && config.featureStore.description) { + configData.featureStore = config.featureStore.description; + } + + return configData; +} + +function makePlatformData() { + return { + name: 'Node', + osArch: os.arch(), + osName: os.platform(), + osVersion: os.release() + // Note that os.release() is not the same OS version string that would be reported by other languages. + // It's defined as being the value returned by "uname -r" (e.g. on Mac OS 10.14, this is "18.7.0"; on + // Ubuntu 16.04, it is "4.4.0-1095-aws"), or GetVersionExW in Windows. + }; +} + +module.exports = { + DiagnosticsManager: DiagnosticsManager, + DiagnosticId: DiagnosticId +}; diff --git a/event_processor.js b/event_processor.js index cdc2513..ea4a549 100644 --- a/event_processor.js +++ b/event_processor.js @@ -10,19 +10,24 @@ const wrapPromiseCallback = require('./utils/wrapPromiseCallback'); const userAttrsToStringifyForEvents = [ 'key', 'secondary', 'ip', 'country', 'email', 'firstName', 'lastName', 'avatar', 'name' ]; -function EventProcessor(sdkKey, config, errorReporter) { +function EventProcessor(sdkKey, config, errorReporter, diagnosticsManager) { const ep = {}; const userFilter = UserFilter(config), summarizer = EventSummarizer(config), - userKeysCache = LRUCache(config.userKeysCapacity); + userKeysCache = LRUCache(config.userKeysCapacity), + mainEventsUri = config.eventsUri + '/bulk', + diagnosticEventsUri = config.eventsUri + '/diagnostic'; let queue = [], lastKnownPastTime = 0, + droppedEvents = 0, + deduplicatedUsers = 0, exceededCapacity = false, shutdown = false, flushTimer, - flushUsersTimer; + flushUsersTimer, + diagnosticsTimer; function enqueue(event) { if (queue.length < config.capacity) { @@ -33,6 +38,7 @@ function EventProcessor(sdkKey, config, errorReporter) { exceededCapacity = true; config.logger.warn('Exceeded event queue capacity. Increase capacity to avoid dropping events.'); } + droppedEvents++; } } @@ -138,10 +144,17 @@ function EventProcessor(sdkKey, config, errorReporter) { // For each user we haven't seen before, we add an index event - unless this is already // an identify event for that user. if (!addFullEvent || !config.inlineUsersInEvents) { - if (event.user && !userKeysCache.get(event.user.key)) { - userKeysCache.set(event.user.key, true); - if (event.kind != 'identify') { - addIndexEvent = true; + if (event.user) { + const isIdentify = (event.kind === 'identify'); + if (userKeysCache.get(event.user.key)) { + if (!isIdentify) { + deduplicatedUsers++; + } + } else { + userKeysCache.set(event.user.key, true); + if (!isIdentify) { + addIndexEvent = true; + } } } } @@ -189,16 +202,16 @@ function EventProcessor(sdkKey, config, errorReporter) { config.logger.debug('Flushing %d events', worklist.length); - tryPostingEvents(worklist, resolve, reject, true); + tryPostingEvents(worklist, mainEventsUri, resolve, reject, true); }), callback); }; - function tryPostingEvents(events, resolve, reject, canRetry) { + function tryPostingEvents(events, uri, resolve, reject, canRetry) { const retryOrReject = err => { if (canRetry) { config.logger && config.logger.warn('Will retry posting events after 1 second'); setTimeout(() => { - tryPostingEvents(events, resolve, reject, false); + tryPostingEvents(events, uri, resolve, reject, false); }, 1000); } else { reject(err); @@ -210,7 +223,7 @@ function EventProcessor(sdkKey, config, errorReporter) { const options = Object.assign({}, config.tlsParams, { method: 'POST', - url: config.eventsUri + '/bulk', + url: uri, headers, json: true, body: events, @@ -241,9 +254,14 @@ function EventProcessor(sdkKey, config, errorReporter) { }); } + function postDiagnosticEvent(event) { + tryPostingEvents(event, diagnosticEventsUri, () => {}, () => {}, true); + } + ep.close = () => { clearInterval(flushTimer); clearInterval(flushUsersTimer); + diagnosticsTimer && clearInterval(diagnosticsTimer); }; flushTimer = setInterval(() => { @@ -254,6 +272,18 @@ function EventProcessor(sdkKey, config, errorReporter) { userKeysCache.removeAll(); }, config.userKeysFlushInterval * 1000); + if (!config.diagnosticOptOut && diagnosticsManager) { + const initEvent = diagnosticsManager.createInitEvent(); + postDiagnosticEvent(initEvent); + + diagnosticsTimer = setInterval(() => { + const statsEvent = diagnosticsManager.createStatsEventAndReset(droppedEvents, deduplicatedUsers, queue.length); + droppedEvents = 0; + deduplicatedUsers = 0; + postDiagnosticEvent(statsEvent); + }, config.diagnosticRecordingInterval * 1000); + } + return ep; } diff --git a/index.js b/index.js index f303234..db67446 100644 --- a/index.js +++ b/index.js @@ -9,6 +9,7 @@ const PollingProcessor = require('./polling'); const StreamingProcessor = require('./streaming'); const FlagsStateBuilder = require('./flags_state'); const configuration = require('./configuration'); +const diagnostics = require('./diagnostic_events'); const evaluate = require('./evaluate_flag'); const messages = require('./messages'); const tunnel = require('tunnel'); @@ -72,6 +73,8 @@ const newClient = function(sdkKey, originalConfig) { const maybeReportError = createErrorReporter(client, config.logger); + let diagnosticsManager = null; + eventFactoryDefault = EventFactory(false); eventFactoryWithReasons = EventFactory(true); @@ -81,7 +84,9 @@ const newClient = function(sdkKey, originalConfig) { if (config.offline || !config.sendEvents) { eventProcessor = NullEventProcessor(); } else { - eventProcessor = EventProcessor(sdkKey, config, maybeReportError); + const diagnosticId = diagnostics.DiagnosticId(sdkKey); + diagnosticsManager = diagnostics.DiagnosticsManager(config, diagnosticId, new Date().getTime()); + eventProcessor = EventProcessor(sdkKey, config, maybeReportError, diagnosticsManager); } } diff --git a/redis_feature_store.js b/redis_feature_store.js index eea6581..9cc767f 100644 --- a/redis_feature_store.js +++ b/redis_feature_store.js @@ -7,7 +7,8 @@ const noop = function(){}; function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) { - return new CachingStoreWrapper(new redisFeatureStoreInternal(redisOpts, prefix, logger), cacheTTL); + return new CachingStoreWrapper(new redisFeatureStoreInternal(redisOpts, prefix, logger), cacheTTL, + 'RedisFeatureStore'); } function redisFeatureStoreInternal(redisOpts, prefix, logger) { diff --git a/test/LDClient-end-to-end-test.js b/test/LDClient-end-to-end-test.js index 155a86e..90567df 100644 --- a/test/LDClient-end-to-end-test.js +++ b/test/LDClient-end-to-end-test.js @@ -12,7 +12,7 @@ async function withAllServers(asyncCallback) { baseUri: pollingServer.url, streamUri: streamingServer.url, eventsUri: eventsServer.url, - logger: stubLogger() + logger: stubLogger(), }; return await asyncCallback(servers, baseConfig); }) @@ -53,7 +53,11 @@ describe('LDClient end-to-end', () => { expect(servers.polling.requestCount()).toEqual(1); expect(servers.streaming.requestCount()).toEqual(0); - expect(servers.events.requestCount()).toEqual(1); + expect(servers.events.requestCount()).toEqual(2); + const req0 = await servers.events.nextRequest(); + expect(req0.path).toEqual('/diagnostic'); + const req1 = await servers.events.nextRequest(); + expect(req1.path).toEqual('/bulk'); }); }); @@ -94,7 +98,11 @@ describe('LDClient end-to-end', () => { expect(servers.polling.requestCount()).toEqual(0); expect(servers.streaming.requestCount()).toEqual(1); - expect(servers.events.requestCount()).toEqual(1); + expect(servers.events.requestCount()).toEqual(2); + const req0 = await servers.events.nextRequest(); + expect(req0.path).toEqual('/diagnostic'); + const req1 = await servers.events.nextRequest(); + expect(req1.path).toEqual('/bulk'); }); }); }); diff --git a/test/LDClient-tls-test.js b/test/LDClient-tls-test.js index 9df942b..72d5ff4 100644 --- a/test/LDClient-tls-test.js +++ b/test/LDClient-tls-test.js @@ -43,6 +43,7 @@ describe('LDClient TLS configuration', () => { stream: false, logger: stubs.stubLogger(), tlsParams: { ca: certData.cert }, + diagnosticOptOut: true, }; await withCloseable(LDClient.init(sdkKey, config), async client => { @@ -60,6 +61,7 @@ describe('LDClient TLS configuration', () => { sendEvents: false, stream: false, logger: stubs.stubLogger(), + diagnosticOptOut: true, }; await withCloseable(LDClient.init(sdkKey, config), async client => { @@ -83,6 +85,7 @@ describe('LDClient TLS configuration', () => { sendEvents: false, logger: logger, tlsParams: { ca: certData.cert }, + diagnosticOptOut: true, }; await withCloseable(LDClient.init(sdkKey, config), async client => { @@ -103,6 +106,7 @@ describe('LDClient TLS configuration', () => { stream: false, logger: stubs.stubLogger(), tlsParams: { ca: certData.cert }, + diagnosticOptOut: true, }; await withCloseable(LDClient.init(sdkKey, config), async client => { diff --git a/test/diagnostic_events-test.js b/test/diagnostic_events-test.js new file mode 100644 index 0000000..ea4ac33 --- /dev/null +++ b/test/diagnostic_events-test.js @@ -0,0 +1,150 @@ +const os = require('os'); +const packageJson = require('../package.json'); +const configuration = require('../configuration'); +const { DiagnosticsManager, DiagnosticId } = require('../diagnostic_events'); + +describe('DiagnosticId', () => { + it('uses last 6 characters of SDK key', () => { + const id = DiagnosticId('my-sdk-key'); + expect(id.sdkKeySuffix).toEqual('dk-key'); + }); + + it('creates random UUID', () => { + const id0 = DiagnosticId('my-sdk-key'); + const id1 = DiagnosticId('my-sdk-key'); + expect(id0.diagnosticId).toBeTruthy(); + expect(id1.diagnosticId).toBeTruthy(); + expect(id0.diagnosticId).not.toEqual(id1.diagnosticId); + }); +}); + +describe('DiagnosticsManager', () => { + const id = DiagnosticId('my-sdk-key'); + const defaultConfig = configuration.validate({}); + + it('copies DiagnosticId', () => { + const manager = DiagnosticsManager(defaultConfig, id, 100000); + const event = manager.createInitEvent(); + expect(event.id).toEqual(id); + }); + + it('copies start time', () => { + const manager = DiagnosticsManager(defaultConfig, id, 100000); + const event = manager.createInitEvent(); + expect(event.creationDate).toEqual(100000); + }); + + it('provides SDK data', () => { + const manager = DiagnosticsManager(defaultConfig, id, 100000); + const event = manager.createInitEvent(); + expect(event.sdk).toEqual({ + name: 'node-server-sdk', + version: packageJson.version + }); + }); + + it('provides platform data', () => { + const manager = DiagnosticsManager(defaultConfig, id, 100000); + const event = manager.createInitEvent(); + expect(event.platform).toEqual({ + name: 'Node', + osArch: os.arch(), + osName: os.platform(), + osVersion: os.release(), + }); + }); + + function verifyConfig(configIn, configOut) { + const config = configuration.validate(configIn); + const manager = DiagnosticsManager(config, id, 100000); + const event = manager.createInitEvent(); + expect(event.configuration).toMatchObject(configOut); + } + + it('translates default configuration', () => { + verifyConfig({}, { + allAttributesPrivate: false, + connectTimeoutMillis: 5000, + customBaseURI: false, + customEventsURI: false, + customStreamURI: false, + diagnosticRecordingIntervalMillis: 900000, + eventsCapacity: 10000, + eventsFlushIntervalMillis: 5000, + inlineUsersInEvents: false, + offline: false, + pollingIntervalMillis: 30000, + reconnectTimeMillis: 1000, + socketTimeoutMillis: 5000, + streamingDisabled: false, + userKeysCapacity: 1000, + userKeysFlushIntervalMillis: 300000, + usingProxy: false, + usingProxyAuthenticator: false, + usingRelayDaemon: false, + }); + }); + + it('translates custom configuration', () => { + verifyConfig({ baseUri: 'http://other' }, { + customBaseURI: true, + customEventsURI: false, + customStreamURI: false, + }); + verifyConfig({ eventsUri: 'http://other' }, { + customBaseURI: false, + customEventsURI: true, + customStreamURI: false, + }); + verifyConfig({ streamUri: 'http://other' }, { + customBaseURI: false, + customEventsURI: false, + customStreamURI: true, + }); + verifyConfig({ allAttributesPrivate: true }, { allAttributesPrivate: true }); + verifyConfig({ timeout: 6 }, { connectTimeoutMillis: 6000, socketTimeoutMillis: 6000 }); + verifyConfig({ diagnosticRecordingInterval: 999 }, { diagnosticRecordingIntervalMillis: 999000 }); + verifyConfig({ capacity: 999 }, { eventsCapacity: 999 }); + verifyConfig({ flushInterval: 33 }, { eventsFlushIntervalMillis: 33000 }); + verifyConfig({ stream: false }, { streamingDisabled: true }); + verifyConfig({ userKeysCapacity: 111 }, { userKeysCapacity: 111 }); + verifyConfig({ userKeysFlushInterval: 33 }, { userKeysFlushIntervalMillis: 33000 }); + verifyConfig({ useLdd: true }, { usingRelayDaemon: true }); + + const fakeProxy = {}; + verifyConfig({ proxyAgent: fakeProxy }, { usingProxy: true, usingProxyAuthenticator: false }); + verifyConfig({ proxyHost: 'my-proxy' }, { usingProxy: true, usingProxyAuthenticator: false }); + verifyConfig({ proxyAgent: fakeProxy, proxyAuth: 'basic' }, { usingProxy: true, usingProxyAuthenticator: true }); + + const fakeStore = { description: 'WeirdStore' }; + verifyConfig({ featureStore: fakeStore }, { featureStore: fakeStore.description }); + }); + + it('creates periodic event from stats, then resets', () => { + const manager = DiagnosticsManager(defaultConfig, id, 100000); + const timeBeforeReset = new Date().getTime(); + const event1 = manager.createStatsEventAndReset(4, 5, 6); + + expect(event1).toMatchObject({ + kind: 'diagnostic', + dataSinceDate: 100000, + droppedEvents: 4, + deduplicatedUsers: 5, + eventsInQueue: 6, + }); + + expect(event1.creationDate).toBeGreaterThanOrEqual(timeBeforeReset); + + const event2 = manager.createStatsEventAndReset(1, 2, 3); + + expect(event2).toMatchObject({ + kind: 'diagnostic', + dataSinceDate: event1.creationDate, + droppedEvents: 1, + deduplicatedUsers: 2, + eventsInQueue: 3, + }); + + expect(event2.creationDate).toBeGreaterThanOrEqual(event1.creationDate); + }); +}); diff --git a/test/event_processor-test.js b/test/event_processor-test.js index fee6a90..500dab6 100644 --- a/test/event_processor-test.js +++ b/test/event_processor-test.js @@ -1,3 +1,4 @@ +const { DiagnosticsManager, DiagnosticId } = require('../diagnostic_events'); const EventProcessor = require('../event_processor'); const { createServer, respond } = require('./http_server'); const { sleepAsync, withCloseable } = require('./async_utils'); @@ -12,6 +13,7 @@ describe('EventProcessor', () => { flushInterval: 30, userKeysCapacity: 1000, userKeysFlushInterval: 300, + diagnosticRecordingInterval: 900, logger: { debug: jest.fn(), warn: jest.fn() @@ -27,12 +29,14 @@ describe('EventProcessor', () => { function eventsServerTest(asyncCallback) { return async () => withCloseable(createServer, async server => { server.forMethodAndPath('post', '/bulk', respond(200)); + server.forMethodAndPath('post', '/diagnostic', respond(200)); return await asyncCallback(server); }); } - async function withEventProcessor(config, server, asyncCallback) { - const ep = EventProcessor(sdkKey, Object.assign({}, config, { eventsUri: server.url })); + async function withEventProcessor(baseConfig, server, asyncCallback) { + const config = Object.assign({}, baseConfig, { eventsUri: server.url, diagnosticOptOut: true }); + const ep = EventProcessor(sdkKey, config); try { return await asyncCallback(ep); } finally { @@ -40,6 +44,18 @@ describe('EventProcessor', () => { } } + async function withDiagnosticEventProcessor(baseConfig, server, asyncCallback) { + const config = Object.assign({}, baseConfig, { eventsUri: server.url }); + const id = DiagnosticId(sdkKey); + const manager = DiagnosticsManager(config, id, new Date().getTime()); + const ep = EventProcessor(sdkKey, config, null, manager); + try { + return await asyncCallback(ep, id, manager); + } finally { + ep.close(); + } + } + function headersWithDate(timestamp) { return { date: new Date(timestamp).toUTCString() }; } @@ -549,4 +565,41 @@ describe('EventProcessor', () => { expect(s.requestCount()).toEqual(2); }); })); + + describe('diagnostic events', () => { + it('sends initial diagnostic event', eventsServerTest(async s => { + const startTime = new Date().getTime(); + await withDiagnosticEventProcessor(defaultConfig, s, async (ep, id) => { + const req = await s.nextRequest(); + expect(req.path).toEqual('/diagnostic'); + const data = JSON.parse(req.body); + expect(data.kind).toEqual('diagnostic-init'); + expect(data.id).toEqual(id); + expect(data.creationDate).toBeGreaterThanOrEqual(startTime); + expect(data.configuration).toMatchObject({ customEventsURI: true }); + expect(data.sdk).toMatchObject({ name: 'node-server-sdk' }); + expect(data.platform).toMatchObject({ name: 'Node' }); + }); + })); + + it('sends periodic diagnostic event', eventsServerTest(async s => { + const startTime = new Date().getTime(); + const config = Object.assign({}, defaultConfig, { diagnosticRecordingInterval: 0.1 }); + await withDiagnosticEventProcessor(config, s, async (ep, id) => { + const req0 = await s.nextRequest(); + expect(req0.path).toEqual('/diagnostic'); + + const req1 = await s.nextRequest(); + expect(req1.path).toEqual('/diagnostic'); + const data = JSON.parse(req1.body); + expect(data.kind).toEqual('diagnostic'); + expect(data.id).toEqual(id); + expect(data.creationDate).toBeGreaterThanOrEqual(startTime); + expect(data.dataSinceDate).toBeGreaterThanOrEqual(startTime); + expect(data.droppedEvents).toEqual(0); + expect(data.deduplicatedUsers).toEqual(0); + expect(data.eventsInQueue).toEqual(0); + }); + })); + }); });