Skip to content

Commit

Permalink
Start instrumenting for streaming tail workers
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Jan 20, 2025
1 parent dd63afd commit 997b581
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 38 deletions.
7 changes: 5 additions & 2 deletions samples/tail-workers/tail.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ export default {
tail(traces) {
console.log(traces[0].logs);
},
tailStream() {
return {};
tailStream(...args) {
console.log(...args);
return (...args) => {
console.log(...args);
};
},
};
2 changes: 2 additions & 0 deletions samples/tail-workers/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
export default {
async fetch(req, env) {
console.log('hello to the tail worker!');
reportError('boom');
reportError(new Error('test'));
return new Response("Hello World\n");
}
};
44 changes: 24 additions & 20 deletions src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,34 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve

auto eventParameters = consumeParams();

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
tracing::HibernatableWebSocketEventInfo::Type type =
[&]() -> tracing::HibernatableWebSocketEventInfo::Type {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
KJ_CASE_ONEOF(_, HibernatableSocketParams::Text) {
return tracing::HibernatableWebSocketEventInfo::Message{};
}
KJ_CASE_ONEOF(data, HibernatableSocketParams::Data) {
return tracing::HibernatableWebSocketEventInfo::Message{};
}
KJ_CASE_ONEOF(close, HibernatableSocketParams::Close) {
return tracing::HibernatableWebSocketEventInfo::Close{
.code = close.code, .wasClean = close.wasClean};
}
KJ_CASE_ONEOF(_, HibernatableSocketParams::Error) {
return tracing::HibernatableWebSocketEventInfo::Error{};
}
auto getType = [&]() -> tracing::HibernatableWebSocketEventInfo::Type {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
KJ_CASE_ONEOF(_, HibernatableSocketParams::Text) {
return tracing::HibernatableWebSocketEventInfo::Message{};
}
KJ_UNREACHABLE;
}();
KJ_CASE_ONEOF(data, HibernatableSocketParams::Data) {
return tracing::HibernatableWebSocketEventInfo::Message{};
}
KJ_CASE_ONEOF(close, HibernatableSocketParams::Close) {
return tracing::HibernatableWebSocketEventInfo::Close{
.code = close.code, .wasClean = close.wasClean};
}
KJ_CASE_ONEOF(_, HibernatableSocketParams::Error) {
return tracing::HibernatableWebSocketEventInfo::Error{};
}
}
KJ_UNREACHABLE;
};

t.setEventInfo(context.now(), tracing::HibernatableWebSocketEventInfo(kj::mv(type)));
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), tracing::HibernatableWebSocketEventInfo(getType()));
}

context.getMetrics().reportTailEvent(context, [&] {
return tracing::Onset(
tracing::HibernatableWebSocketEventInfo(getType()), tracing::Onset::WorkerInfo{}, kj::none);
});

try {
co_await context.run(
[entrypointName = entrypointName, &context, eventParameters = kj::mv(eventParameters),
Expand Down
7 changes: 6 additions & 1 deletion src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,14 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
}

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), tracing::QueueEventInfo(kj::mv(queueName), batchSize));
t.setEventInfo(context.now(), tracing::QueueEventInfo(kj::str(queueName), batchSize));
}

context.getMetrics().reportTailEvent(context, [&] {
return tracing::Onset(tracing::QueueEventInfo(kj::mv(queueName), batchSize),
tracing::Onset::WorkerInfo{}, kj::none);
});

// Create a custom refcounted type for holding the queueEvent so that we can pass it to the
// waitUntil'ed callback safely without worrying about whether this coroutine gets canceled.
struct QueueEventHolder: public kj::Refcounted {
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest
t.setEventInfo(context.now(), tracing::TraceEventInfo(traces));
}

metrics.reportTailEvent(context, [&] {
return tracing::Onset(tracing::TraceEventInfo(traces), tracing::Onset::WorkerInfo{}, kj::none);
});

auto nonEmptyTraces = kj::Vector<kj::Own<Trace>>(kj::size(traces));
for (auto& trace: traces) {
if (trace->eventInfo != kj::none) {
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,10 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {
KJ_IF_SOME(t, tracer) {
t->setEventInfo(ioctx.now(), tracing::JsRpcEventInfo(kj::str(methodName)));
}
ioctx.getMetrics().reportTailEvent(ioctx, [&] {
return tracing::Onset(
tracing::JsRpcEventInfo(kj::str(methodName)), tracing::Onset::WorkerInfo{}, kj::none);
});
}
};

Expand Down
5 changes: 3 additions & 2 deletions src/workerd/io/trace-stream.c++
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ class TailStreamHandler final: public TailStreamTargetBase {
// Take the received set of events and dispatch them to the correct handler.

v8::Local<v8::Value> h = handler.getHandle(js);
v8::LocalVector<v8::Value> returnValues(js.v8Isolate, events.size());
v8::LocalVector<v8::Value> returnValues(js.v8Isolate);
StringCache stringCache;

if (h->IsFunction()) {
Expand Down Expand Up @@ -836,7 +836,8 @@ class TailStreamEntrypoint final: public TailStreamTargetBase {

return ioContext.awaitJs(js,
js.toPromise(result).then(js,
ioContext.addFunctor([&results, &ioContext](jsg::Lock& js, jsg::Value value) {
ioContext.addFunctor(
[results = kj::mv(results), &ioContext](jsg::Lock& js, jsg::Value value) mutable {
// The value here can be one of a function, an object, or undefined.
// Any value other than these will result in a warning but will otherwise
// be treated like undefined.
Expand Down
13 changes: 11 additions & 2 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
tracing::FetchEventInfo(method, kj::str(url), kj::mv(cfJson), kj::mv(traceHeadersArray)));
}

// TODO(streaming-tail-workers): Instrument properly
context.getMetrics().reportTailEvent(context, [&] {
return tracing::Onset(tracing::FetchEventInfo(method, kj::str(url), kj::str("{}"), nullptr),
tracing::Onset::WorkerInfo{}, kj::none);
Expand Down Expand Up @@ -501,11 +500,17 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
// calling context->drain(). We don't ever send scheduled events to actors. If we do, we'll have
// to think more about this.

double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS;

KJ_IF_SOME(t, context.getWorkerTracer()) {
double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS;
t.setEventInfo(context.now(), tracing::ScheduledEventInfo(eventTime, kj::str(cron)));
}

context.getMetrics().reportTailEvent(context, [&] {
return tracing::Onset(tracing::ScheduledEventInfo(eventTime, kj::str(cron)),
tracing::Onset::WorkerInfo{}, kj::none);
});

// Scheduled handlers run entirely in waitUntil() tasks.
context.addWaitUntil(context.run(
[scheduledTime, cron, entrypointName = entrypointName, props = kj::mv(props), &context,
Expand Down Expand Up @@ -563,6 +568,10 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), tracing::AlarmEventInfo(scheduledTime));
}
context.getMetrics().reportTailEvent(context, [&] {
return tracing::Onset(
tracing::AlarmEventInfo(scheduledTime), tracing::Onset::WorkerInfo{}, kj::none);
});

auto scheduleAlarmResult = co_await actor.scheduleAlarm(scheduledTime);
KJ_SWITCH_ONEOF(scheduleAlarmResult) {
Expand Down
27 changes: 27 additions & 0 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,18 @@ Worker::Isolate::Isolate(kj::Own<Api> apiParam,
addExceptionToTrace(js, ioContext, tracer, UncaughtExceptionSource::REQUEST_HANDLER,
error, api->getErrorInterfaceTypeHandler(js));
}

ioContext.getMetrics().reportTailEvent(ioContext, [&] {
KJ_IF_SOME(obj, error.tryCast<jsg::JsObject>()) {
auto name = obj.get(js, "name"_kj);
auto message = obj.get(js, "message"_kj);
auto stack = obj.get(js, "stack"_kj);
return tracing::Mark(tracing::Exception(
ioContext.now(), kj::str(name), kj::str(message), kj::str(stack)));
}
return tracing::Mark(
tracing::Exception(ioContext.now(), kj::str(), kj::str(error), kj::none));
});
}

KJ_IF_SOME(i, impl->inspector) {
Expand Down Expand Up @@ -1865,6 +1877,9 @@ void Worker::handleLog(jsg::Lock& js,
auto timestamp = ioContext.now();
tracer.addLog(timestamp, level, message());
}

ioContext.getMetrics().reportTailEvent(
ioContext, [&] { return tracing::Mark(tracing::Log(ioContext.now(), level, message())); });
}

if (consoleMode == ConsoleMode::INSPECTOR_ONLY) {
Expand Down Expand Up @@ -2066,6 +2081,18 @@ void Worker::Lock::logUncaughtException(
worker.getIsolate().getApi().getErrorInterfaceTypeHandler(*this));
});
}

ioContext.getMetrics().reportTailEvent(ioContext, [&] {
KJ_IF_SOME(obj, exception.tryCast<jsg::JsObject>()) {
auto name = obj.get(*this, "name"_kj);
auto message = obj.get(*this, "message"_kj);
auto stack = obj.get(*this, "stack"_kj);
return tracing::Mark(
tracing::Exception(ioContext.now(), kj::str(name), kj::str(message), kj::str(stack)));
}
return tracing::Mark(
tracing::Exception(ioContext.now(), kj::str(), kj::str(exception), kj::none));
});
}

KJ_IF_SOME(i, worker.script->isolate->impl->inspector) {
Expand Down
22 changes: 11 additions & 11 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ struct TailStreamWriterState {

struct Closed {};

kj::OneOf<Pending, kj::Array<Active>, Closed> inner;
kj::OneOf<Pending, kj::Array<kj::Own<Active>>, Closed> inner;
kj::TaskSet& waitUntilTasks;

TailStreamWriterState(Pending pending, kj::TaskSet& waitUntilTasks)
Expand All @@ -1502,12 +1502,12 @@ struct TailStreamWriterState {

void reportImpl(tracing::TailEvent&& event) {
// In reportImpl, our inner state must be active.
auto& actives = KJ_ASSERT_NONNULL(inner.tryGet<kj::Array<Active>>());
auto& actives = KJ_ASSERT_NONNULL(inner.tryGet<kj::Array<kj::Own<Active>>>());

// We only care about sessions that are currently active.
kj::Vector<Active> alive(actives.size());
kj::Vector<kj::Own<Active>> alive(actives.size());
for (auto& active: actives) {
if (active.capability != kj::none) {
if (active->capability != kj::none) {
alive.add(kj::mv(active));
}
}
Expand All @@ -1520,10 +1520,10 @@ struct TailStreamWriterState {
}

// Deliver the event to the queue and make sure we are processing.
for (Active& active: alive) {
active.queue.push_back(event.clone());
if (!active.pumping) {
waitUntilTasks.add(pump(active));
for (auto& active: alive) {
active->queue.push_back(event.clone());
if (!active->pumping) {
waitUntilTasks.add(pump(*active));
}
}

Expand Down Expand Up @@ -1618,13 +1618,13 @@ kj::Maybe<kj::Own<tracing::TailStreamWriter>> initializeTailStreamWriter(
ioContext.addTask(
wi->customEvent(kj::mv(customEvent)).attach(kj::mv(wi)).then([](auto&&) {
}, [](kj::Exception&&) {}));
return TailStreamWriterState::Active{
return kj::heap<TailStreamWriterState::Active>({
.capability = kj::mv(result),
};
});
};
state->reportImpl(kj::mv(event));
}
KJ_CASE_ONEOF(active, kj::Array<TailStreamWriterState::Active>) {
KJ_CASE_ONEOF(active, kj::Array<kj::Own<TailStreamWriterState::Active>>) {
// Event cannot be a onset, which should have been validated by the writer.
KJ_ASSERT(!event.event.is<tracing::Onset>(), "Only the first event can be an onset");
auto final = event.event.is<tracing::Outcome>() || event.event.is<tracing::Hibernate>();
Expand Down

0 comments on commit 997b581

Please sign in to comment.