Skip to content
This repository has been archived by the owner on May 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #160 from launchdarkly/eb/ch56381/diag-events-2-in…
Browse files Browse the repository at this point in the history
…itial

diagnostic events, part 2: initial event and stats, except for stream inits
  • Loading branch information
eli-darkly authored Nov 23, 2019
2 parents 79db3e0 + 9630ddb commit 31a9b41
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 19 deletions.
3 changes: 2 additions & 1 deletion caching_store_wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ const initializedKey = '$checkedInit';
in the specified order. The store should delete any obsolete items only after writing
all of the items provided.
*/
function CachingStoreWrapper(underlyingStore, ttl) {
function CachingStoreWrapper(underlyingStore, ttl, description) {
const cache = ttl ? new NodeCache({ stdTTL: ttl }) : null;
const queue = new UpdateQueue();
let initialized = false;

this.underlyingStore = underlyingStore;
this.description = description;

this.init = (allData, cb) => {
queue.enqueue(cb => {
Expand Down
119 changes: 119 additions & 0 deletions diagnostic_events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
const os = require('os');
const uuidv4 = require('uuid/v4');
const configuration = require('./configuration');
const packageJson = require('./package.json');

// An object that maintains information that will go into diagnostic events, and knows how to format
// those events. It is instantiated by the SDK client, and shared with the event processor.
function DiagnosticsManager(config, diagnosticId, startTime) {
let dataSinceDate = startTime;
const acc = {};

// Creates the initial event that is sent by the event processor when the SDK starts up. This will not
// be repeated during the lifetime of the SDK client.
acc.createInitEvent = () => ({
kind: 'diagnostic-init',
id: diagnosticId,
creationDate: startTime,
sdk: makeSdkData(config),
configuration: makeConfigData(config),
platform: makePlatformData()
});

// Creates a periodic event containing time-dependent stats, and resets the state of the manager with
// regard to those stats.
// Note: the reason droppedEvents, deduplicatedUsers, and eventsInQueue are passed into this function,
// instead of being properties of the DiagnosticsManager, is that the event processor is the one who's
// calling this function and is also the one who's tracking those stats.
acc.createStatsEventAndReset = (droppedEvents, deduplicatedUsers, eventsInQueue) => {
const currentTime = new Date().getTime();
const ret = {
kind: 'diagnostic',
id: diagnosticId,
creationDate: currentTime,
dataSinceDate,
droppedEvents,
deduplicatedUsers,
eventsInQueue
};
dataSinceDate = currentTime;
return ret;
};

return acc;
}

function DiagnosticId(sdkKey) {
const ret = {
diagnosticId: uuidv4()
};
if (sdkKey) {
ret.sdkKeySuffix = sdkKey.length > 6 ? sdkKey.substring(sdkKey.length - 6) : sdkKey;
}
return ret;
}

function makeSdkData(config) {
const sdkData = {
name: 'node-server-sdk',
version: packageJson.version
};
if (config.wrapperName) {
sdkData.wrapperName = config.wrapperName;
}
if (config.wrapperVersion) {
sdkData.wrapperVersion = config.wrapperVersion;
}
return sdkData;
}

function makeConfigData(config) {
const defaults = configuration.defaults();
const secondsToMillis = sec => Math.trunc(sec * 1000);

const configData = {
customBaseURI: config.baseUri !== defaults.baseUri,
customStreamURI: config.streamUri !== defaults.streamUri,
customEventsURI: config.eventsUri !== defaults.eventsUri,
eventsCapacity: config.capacity,
connectTimeoutMillis: secondsToMillis(config.timeout),
socketTimeoutMillis: secondsToMillis(config.timeout), // Node doesn't distinguish between these two kinds of timeouts
eventsFlushIntervalMillis: secondsToMillis(config.flushInterval),
pollingIntervalMillis: secondsToMillis(config.pollInterval),
// startWaitMillis: n/a (Node SDK does not have this feature)
// samplingInterval: n/a (Node SDK does not have this feature)
reconnectTimeMillis: 1000, // hard-coded in eventsource.js
streamingDisabled: !config.stream,
usingRelayDaemon: !!config.useLdd,
offline: !!config.offline,
allAttributesPrivate: !!config.allAttributesPrivate,
inlineUsersInEvents: !!config.inlineUsersInEvents,
userKeysCapacity: config.userKeysCapacity,
userKeysFlushIntervalMillis: secondsToMillis(config.userKeysFlushInterval),
usingProxy: !!(config.proxyAgent || config.proxyHost),
usingProxyAuthenticator: !!config.proxyAuth,
diagnosticRecordingIntervalMillis: secondsToMillis(config.diagnosticRecordingInterval)
};
if (config.featureStore && config.featureStore.description) {
configData.featureStore = config.featureStore.description;
}

return configData;
}

function makePlatformData() {
return {
name: 'Node',
osArch: os.arch(),
osName: os.platform(),
osVersion: os.release()
// Note that os.release() is not the same OS version string that would be reported by other languages.
// It's defined as being the value returned by "uname -r" (e.g. on Mac OS 10.14, this is "18.7.0"; on
// Ubuntu 16.04, it is "4.4.0-1095-aws"), or GetVersionExW in Windows.
};
}

module.exports = {
DiagnosticsManager: DiagnosticsManager,
DiagnosticId: DiagnosticId
};
52 changes: 41 additions & 11 deletions event_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,24 @@ const wrapPromiseCallback = require('./utils/wrapPromiseCallback');

const userAttrsToStringifyForEvents = [ 'key', 'secondary', 'ip', 'country', 'email', 'firstName', 'lastName', 'avatar', 'name' ];

function EventProcessor(sdkKey, config, errorReporter) {
function EventProcessor(sdkKey, config, errorReporter, diagnosticsManager) {
const ep = {};

const userFilter = UserFilter(config),
summarizer = EventSummarizer(config),
userKeysCache = LRUCache(config.userKeysCapacity);
userKeysCache = LRUCache(config.userKeysCapacity),
mainEventsUri = config.eventsUri + '/bulk',
diagnosticEventsUri = config.eventsUri + '/diagnostic';

let queue = [],
lastKnownPastTime = 0,
droppedEvents = 0,
deduplicatedUsers = 0,
exceededCapacity = false,
shutdown = false,
flushTimer,
flushUsersTimer;
flushUsersTimer,
diagnosticsTimer;

function enqueue(event) {
if (queue.length < config.capacity) {
Expand All @@ -33,6 +38,7 @@ function EventProcessor(sdkKey, config, errorReporter) {
exceededCapacity = true;
config.logger.warn('Exceeded event queue capacity. Increase capacity to avoid dropping events.');
}
droppedEvents++;
}
}

Expand Down Expand Up @@ -138,10 +144,17 @@ function EventProcessor(sdkKey, config, errorReporter) {
// For each user we haven't seen before, we add an index event - unless this is already
// an identify event for that user.
if (!addFullEvent || !config.inlineUsersInEvents) {
if (event.user && !userKeysCache.get(event.user.key)) {
userKeysCache.set(event.user.key, true);
if (event.kind != 'identify') {
addIndexEvent = true;
if (event.user) {
const isIdentify = (event.kind === 'identify');
if (userKeysCache.get(event.user.key)) {
if (!isIdentify) {
deduplicatedUsers++;
}
} else {
userKeysCache.set(event.user.key, true);
if (!isIdentify) {
addIndexEvent = true;
}
}
}
}
Expand Down Expand Up @@ -189,16 +202,16 @@ function EventProcessor(sdkKey, config, errorReporter) {

config.logger.debug('Flushing %d events', worklist.length);

tryPostingEvents(worklist, resolve, reject, true);
tryPostingEvents(worklist, mainEventsUri, resolve, reject, true);
}), callback);
};

function tryPostingEvents(events, resolve, reject, canRetry) {
function tryPostingEvents(events, uri, resolve, reject, canRetry) {
const retryOrReject = err => {
if (canRetry) {
config.logger && config.logger.warn('Will retry posting events after 1 second');
setTimeout(() => {
tryPostingEvents(events, resolve, reject, false);
tryPostingEvents(events, uri, resolve, reject, false);
}, 1000);
} else {
reject(err);
Expand All @@ -210,7 +223,7 @@ function EventProcessor(sdkKey, config, errorReporter) {

const options = Object.assign({}, config.tlsParams, {
method: 'POST',
url: config.eventsUri + '/bulk',
url: uri,
headers,
json: true,
body: events,
Expand Down Expand Up @@ -241,9 +254,14 @@ function EventProcessor(sdkKey, config, errorReporter) {
});
}

function postDiagnosticEvent(event) {
tryPostingEvents(event, diagnosticEventsUri, () => {}, () => {}, true);
}

ep.close = () => {
clearInterval(flushTimer);
clearInterval(flushUsersTimer);
diagnosticsTimer && clearInterval(diagnosticsTimer);
};

flushTimer = setInterval(() => {
Expand All @@ -254,6 +272,18 @@ function EventProcessor(sdkKey, config, errorReporter) {
userKeysCache.removeAll();
}, config.userKeysFlushInterval * 1000);

if (!config.diagnosticOptOut && diagnosticsManager) {
const initEvent = diagnosticsManager.createInitEvent();
postDiagnosticEvent(initEvent);

diagnosticsTimer = setInterval(() => {
const statsEvent = diagnosticsManager.createStatsEventAndReset(droppedEvents, deduplicatedUsers, queue.length);
droppedEvents = 0;
deduplicatedUsers = 0;
postDiagnosticEvent(statsEvent);
}, config.diagnosticRecordingInterval * 1000);
}

return ep;
}

Expand Down
7 changes: 6 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const PollingProcessor = require('./polling');
const StreamingProcessor = require('./streaming');
const FlagsStateBuilder = require('./flags_state');
const configuration = require('./configuration');
const diagnostics = require('./diagnostic_events');
const evaluate = require('./evaluate_flag');
const messages = require('./messages');
const tunnel = require('tunnel');
Expand Down Expand Up @@ -72,6 +73,8 @@ const newClient = function(sdkKey, originalConfig) {

const maybeReportError = createErrorReporter(client, config.logger);

let diagnosticsManager = null;

eventFactoryDefault = EventFactory(false);
eventFactoryWithReasons = EventFactory(true);

Expand All @@ -81,7 +84,9 @@ const newClient = function(sdkKey, originalConfig) {
if (config.offline || !config.sendEvents) {
eventProcessor = NullEventProcessor();
} else {
eventProcessor = EventProcessor(sdkKey, config, maybeReportError);
const diagnosticId = diagnostics.DiagnosticId(sdkKey);
diagnosticsManager = diagnostics.DiagnosticsManager(config, diagnosticId, new Date().getTime());
eventProcessor = EventProcessor(sdkKey, config, maybeReportError, diagnosticsManager);
}
}

Expand Down
3 changes: 2 additions & 1 deletion redis_feature_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const noop = function(){};


function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) {
return new CachingStoreWrapper(new redisFeatureStoreInternal(redisOpts, prefix, logger), cacheTTL);
return new CachingStoreWrapper(new redisFeatureStoreInternal(redisOpts, prefix, logger), cacheTTL,
'RedisFeatureStore');
}

function redisFeatureStoreInternal(redisOpts, prefix, logger) {
Expand Down
14 changes: 11 additions & 3 deletions test/LDClient-end-to-end-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async function withAllServers(asyncCallback) {
baseUri: pollingServer.url,
streamUri: streamingServer.url,
eventsUri: eventsServer.url,
logger: stubLogger()
logger: stubLogger(),
};
return await asyncCallback(servers, baseConfig);
})
Expand Down Expand Up @@ -53,7 +53,11 @@ describe('LDClient end-to-end', () => {

expect(servers.polling.requestCount()).toEqual(1);
expect(servers.streaming.requestCount()).toEqual(0);
expect(servers.events.requestCount()).toEqual(1);
expect(servers.events.requestCount()).toEqual(2);
const req0 = await servers.events.nextRequest();
expect(req0.path).toEqual('/diagnostic');
const req1 = await servers.events.nextRequest();
expect(req1.path).toEqual('/bulk');
});
});

Expand Down Expand Up @@ -94,7 +98,11 @@ describe('LDClient end-to-end', () => {

expect(servers.polling.requestCount()).toEqual(0);
expect(servers.streaming.requestCount()).toEqual(1);
expect(servers.events.requestCount()).toEqual(1);
expect(servers.events.requestCount()).toEqual(2);
const req0 = await servers.events.nextRequest();
expect(req0.path).toEqual('/diagnostic');
const req1 = await servers.events.nextRequest();
expect(req1.path).toEqual('/bulk');
});
});
});
Expand Down
4 changes: 4 additions & 0 deletions test/LDClient-tls-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ describe('LDClient TLS configuration', () => {
stream: false,
logger: stubs.stubLogger(),
tlsParams: { ca: certData.cert },
diagnosticOptOut: true,
};

await withCloseable(LDClient.init(sdkKey, config), async client => {
Expand All @@ -60,6 +61,7 @@ describe('LDClient TLS configuration', () => {
sendEvents: false,
stream: false,
logger: stubs.stubLogger(),
diagnosticOptOut: true,
};

await withCloseable(LDClient.init(sdkKey, config), async client => {
Expand All @@ -83,6 +85,7 @@ describe('LDClient TLS configuration', () => {
sendEvents: false,
logger: logger,
tlsParams: { ca: certData.cert },
diagnosticOptOut: true,
};

await withCloseable(LDClient.init(sdkKey, config), async client => {
Expand All @@ -103,6 +106,7 @@ describe('LDClient TLS configuration', () => {
stream: false,
logger: stubs.stubLogger(),
tlsParams: { ca: certData.cert },
diagnosticOptOut: true,
};

await withCloseable(LDClient.init(sdkKey, config), async client => {
Expand Down
Loading

0 comments on commit 31a9b41

Please sign in to comment.