Skip to content

Commit

Permalink
feat: Added AsyncLocalStorage implementation (#430)
Browse files Browse the repository at this point in the history
refs 77490

- #430
- #416

Co-authored-by: kirrg001 <[email protected]>
Co-authored-by: Bastian Krol <[email protected]>
  • Loading branch information
3 people authored Dec 8, 2021
1 parent e9e27c2 commit 6685000
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,49 +99,54 @@ async function receiveAsync() {
try {
const sqsPromise = sqs.receiveMessage(receiveParams).promise();
const data = await sqsPromise;
instana.sdk.runInAsyncContext(sqsPromise.instanaAsyncContext, async () => {
if (data && data.error) {
log('receive message data error', data.error);
return;
} else if (!data || !data.Messages || data.Messages.length === 0) {
log('no messages, doing nothing');
return;
}

data.Messages.forEach(message => {
sendToParent(message);
});
return new Promise(resolve => {
instana.sdk.runInAsyncContext(sqsPromise.instanaAsyncContext, async () => {
if (data && data.error) {
log('receive message data error', data.error);
return resolve();
} else if (!data || !data.Messages || data.Messages.length === 0) {
log('no messages, doing nothing');
return resolve();
}

log(
'got messages:',
data.Messages.map(m => m.MessageId)
);
data.Messages.forEach(message => {
sendToParent(message);
});

span = instana.currentSpan();
span.disableAutoEnd();
log(
'got messages:',
data.Messages.map(m => m.MessageId)
);

const messagesForDeletion = data.Messages.map(message => {
return {
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle
};
});
span = instana.currentSpan();
span.disableAutoEnd();

await sqs
.deleteMessageBatch({
QueueUrl: queueURL,
Entries: messagesForDeletion
})
.promise();
const messagesForDeletion = data.Messages.map(message => {
return {
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle
};
});

await delay(1000);
await request(`http://127.0.0.1:${agentPort}`);
log('The follow up request after receiving a message has happened.');
span.end();
await sqs
.deleteMessageBatch({
QueueUrl: queueURL,
Entries: messagesForDeletion
})
.promise();

await delay(1000);
await request(`http://127.0.0.1:${agentPort}`);
log('The follow up request after receiving a message has happened.');
span.end();
return resolve();
});
});
} catch (err) {
span && span.end(1);
log('ERROR receiving/deleting message', err);
return Promise.reject(err);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ asyncIterator.pullValue().then(event1 => {
// Chronologically, everything inside the then-handler this happens after cls.ns.set (see below). Due to custom
// queueing in pubsub_async_iterator, the cls context would get lost though (unless we fix it).
log(event1);
valuesReadFromCls.push(cls.ns.get('key'));
valuesReadFromCls.push(cls.ns.get('key', true));
asyncIterator.pullValue().then(event2 => {
log(event2);
valuesReadFromCls.push(cls.ns.get('key'));
valuesReadFromCls.push(cls.ns.get('key', true));
asyncIterator.pullValue().then(event3 => {
log(event3);
valuesReadFromCls.push(cls.ns.get('key'));
valuesReadFromCls.push(cls.ns.get('key', true));
});
});
});
Expand Down
12 changes: 7 additions & 5 deletions packages/core/src/tracing/cls.js
Original file line number Diff line number Diff line change
Expand Up @@ -349,17 +349,19 @@ function setCurrentSpan(span) {

/**
* Get the currently active span.
* @param {boolean} [fallbackToSharedContext=false]
* @returns {InstanaSpan}
*/
function getCurrentSpan() {
return ns.get(currentSpanKey);
function getCurrentSpan(fallbackToSharedContext = false) {
return ns.get(currentSpanKey, fallbackToSharedContext);
}

/*
/**
* Get the reduced backup of the last active span in this cls context.
* @param {boolean} [fallbackToSharedContext=false]
*/
function getReducedSpan() {
return ns.get(reducedSpanKey);
function getReducedSpan(fallbackToSharedContext = false) {
return ns.get(reducedSpanKey, fallbackToSharedContext);
}

/**
Expand Down
Loading

0 comments on commit 6685000

Please sign in to comment.