Skip to content

Commit 989a2a1

Browse files
Eugene Ostroukhovevanlucas
Eugene Ostroukhov
authored andcommitted
inspector: Unify event queues
Current implementation tracks connected/disconnected status separately which potentially introduces race condition. This change introduces notion of session IDs and also posts connect/disconnect events into the same queue as the messages. This way Node knows what session given response belongs to and can discard messages if the frontend for that session had disconnected. This also fixes an issue when frontend was unable to attach to V8 instance that was running infinite loop. PR-URL: #7271 Reviewed-By: bnoordhuis - Ben Noordhuis <[email protected]>
1 parent 5a2ce36 commit 989a2a1

File tree

1 file changed

+110
-103
lines changed

1 file changed

+110
-103
lines changed

src/inspector_agent.cc

+110-103
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "libplatform/libplatform.h"
1818

1919
#include <string.h>
20+
#include <utility>
2021
#include <vector>
2122

2223
// We need pid to use as ID with Chrome
@@ -31,6 +32,9 @@
3132
namespace node {
3233
namespace {
3334

35+
const char TAG_CONNECT[] = "#connect";
36+
const char TAG_DISCONNECT[] = "#disconnect";
37+
3438
const char DEVTOOLS_PATH[] = "/node";
3539
const char DEVTOOLS_HASH[] = "521e5b7e2b7cc66b4006a8a54cb9c4e57494a5ef";
3640

@@ -154,7 +158,6 @@ bool RespondToGet(inspector_socket_t* socket, const char* path, int port) {
154158
namespace inspector {
155159

156160
using blink::protocol::DictionaryValue;
157-
using blink::protocol::String16;
158161

159162
class AgentImpl {
160163
public:
@@ -171,24 +174,27 @@ class AgentImpl {
171174
void WaitForDisconnect();
172175

173176
private:
177+
using MessageQueue = std::vector<std::pair<int, String16>>;
178+
174179
static void ThreadCbIO(void* agent);
175180
static void OnSocketConnectionIO(uv_stream_t* server, int status);
176181
static bool OnInspectorHandshakeIO(inspector_socket_t* socket,
177182
enum inspector_handshake_event state,
178183
const char* path);
179-
static void OnRemoteDataIO(uv_stream_t* stream, ssize_t read,
180-
const uv_buf_t* b);
181184
static void WriteCbIO(uv_async_t* async);
182185

183186
void WorkerRunIO();
184187
void OnInspectorConnectionIO(inspector_socket_t* socket);
185-
void PushPendingMessage(std::vector<std::string>* queue,
186-
const std::string& message);
187-
void SwapBehindLock(std::vector<std::string> AgentImpl::*queue,
188-
std::vector<std::string>* output);
188+
void OnRemoteDataIO(inspector_socket_t* stream, ssize_t read,
189+
const uv_buf_t* b);
189190
void PostMessages();
190191
void SetConnected(bool connected);
191-
void Write(const std::string& message);
192+
void DispatchMessages();
193+
void Write(int session_id, const String16& message);
194+
void AppendMessage(MessageQueue* vector, int session_id,
195+
const String16& message);
196+
void SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2);
197+
void PostIncomingMessage(const String16& message);
192198

193199
uv_sem_t start_sem_;
194200
ConditionVariable pause_cond_;
@@ -208,27 +214,36 @@ class AgentImpl {
208214
inspector_socket_t* client_socket_;
209215
blink::V8Inspector* inspector_;
210216
v8::Platform* platform_;
211-
std::vector<std::string> message_queue_;
212-
std::vector<std::string> outgoing_message_queue_;
217+
MessageQueue incoming_message_queue_;
218+
MessageQueue outgoing_message_queue_;
213219
bool dispatching_messages_;
220+
int frontend_session_id_;
221+
int backend_session_id_;
214222

215223
friend class ChannelImpl;
216224
friend class DispatchOnInspectorBackendTask;
217225
friend class SetConnectedTask;
218226
friend class V8NodeInspector;
219227
friend void InterruptCallback(v8::Isolate*, void* agent);
228+
friend void DataCallback(uv_stream_t* stream, ssize_t read,
229+
const uv_buf_t* buf);
220230
};
221231

222232
void InterruptCallback(v8::Isolate*, void* agent) {
223-
static_cast<AgentImpl*>(agent)->PostMessages();
233+
static_cast<AgentImpl*>(agent)->DispatchMessages();
234+
}
235+
236+
void DataCallback(uv_stream_t* stream, ssize_t read, const uv_buf_t* buf) {
237+
inspector_socket_t* socket = static_cast<inspector_socket_t*>(stream->data);
238+
static_cast<AgentImpl*>(socket->data)->OnRemoteDataIO(socket, read, buf);
224239
}
225240

226241
class DispatchOnInspectorBackendTask : public v8::Task {
227242
public:
228243
explicit DispatchOnInspectorBackendTask(AgentImpl* agent) : agent_(agent) {}
229244

230245
void Run() override {
231-
agent_->PostMessages();
246+
agent_->DispatchMessages();
232247
}
233248

234249
private:
@@ -251,27 +266,12 @@ class ChannelImpl final : public blink::protocol::FrontendChannel {
251266
void flushProtocolNotifications() override { }
252267

253268
void sendMessageToFrontend(const String16& message) {
254-
agent_->Write(message.utf8());
269+
agent_->Write(agent_->frontend_session_id_, message);
255270
}
256271

257272
AgentImpl* const agent_;
258273
};
259274

260-
class SetConnectedTask : public v8::Task {
261-
public:
262-
SetConnectedTask(AgentImpl* agent, bool connected)
263-
: agent_(agent),
264-
connected_(connected) {}
265-
266-
void Run() override {
267-
agent_->SetConnected(connected_);
268-
}
269-
270-
private:
271-
AgentImpl* agent_;
272-
bool connected_;
273-
};
274-
275275
class V8NodeInspector : public blink::V8Inspector {
276276
public:
277277
V8NodeInspector(AgentImpl* agent, node::Environment* env,
@@ -320,7 +320,9 @@ AgentImpl::AgentImpl(Environment* env) : port_(0),
320320
client_socket_(nullptr),
321321
inspector_(nullptr),
322322
platform_(nullptr),
323-
dispatching_messages_(false) {
323+
dispatching_messages_(false),
324+
frontend_session_id_(0),
325+
backend_session_id_(0) {
324326
CHECK_EQ(0, uv_sem_init(&start_sem_, 0));
325327
memset(&data_written_, 0, sizeof(data_written_));
326328
memset(&io_thread_req_, 0, sizeof(io_thread_req_));
@@ -355,10 +357,7 @@ void AgentImpl::Start(v8::Platform* platform, int port, bool wait) {
355357
uv_sem_wait(&start_sem_);
356358

357359
if (wait) {
358-
// Flush messages in case of wait to connect, see OnRemoteDataIO on how it
359-
// should be fixed.
360-
SetConnected(true);
361-
PostMessages();
360+
DispatchMessages();
362361
}
363362
}
364363

@@ -424,67 +423,54 @@ bool AgentImpl::OnInspectorHandshakeIO(inspector_socket_t* socket,
424423
}
425424
}
426425

427-
// static
428-
void AgentImpl::OnRemoteDataIO(uv_stream_t* stream,
429-
ssize_t read,
430-
const uv_buf_t* b) {
431-
inspector_socket_t* socket = static_cast<inspector_socket_t*>(stream->data);
432-
AgentImpl* agent = static_cast<AgentImpl*>(socket->data);
433-
Mutex::ScopedLock scoped_lock(agent->pause_lock_);
426+
void AgentImpl::OnRemoteDataIO(inspector_socket_t* socket,
427+
ssize_t read,
428+
const uv_buf_t* buf) {
429+
Mutex::ScopedLock scoped_lock(pause_lock_);
434430
if (read > 0) {
435-
std::string str(b->base, read);
436-
agent->PushPendingMessage(&agent->message_queue_, str);
437-
free(b->base);
438-
431+
String16 str = String16::fromUTF8(buf->base, read);
432+
PostIncomingMessage(str);
439433
// TODO(pfeldman): Instead of blocking execution while debugger
440434
// engages, node should wait for the run callback from the remote client
441435
// and initiate its startup. This is a change to node.cc that should be
442436
// upstreamed separately.
443-
if (agent->wait_ && str.find("\"Runtime.run\"") != std::string::npos) {
444-
agent->wait_ = false;
445-
uv_sem_post(&agent->start_sem_);
437+
if (wait_ && str.find("\"Runtime.run\"") != std::string::npos) {
438+
wait_ = false;
439+
uv_sem_post(&start_sem_);
446440
}
447441

448-
agent->platform_->CallOnForegroundThread(agent->parent_env_->isolate(),
449-
new DispatchOnInspectorBackendTask(agent));
450-
agent->parent_env_->isolate()
451-
->RequestInterrupt(InterruptCallback, agent);
452-
uv_async_send(&agent->data_written_);
442+
platform_->CallOnForegroundThread(parent_env_->isolate(),
443+
new DispatchOnInspectorBackendTask(this));
444+
parent_env_->isolate()->RequestInterrupt(InterruptCallback, this);
445+
uv_async_send(&data_written_);
453446
} else if (read <= 0) {
454447
// EOF
455-
if (agent->client_socket_ == socket) {
456-
agent->client_socket_ = nullptr;
457-
agent->platform_->CallOnForegroundThread(agent->parent_env_->isolate(),
458-
new SetConnectedTask(agent, false));
459-
uv_async_send(&agent->data_written_);
448+
if (client_socket_ == socket) {
449+
String16 message(TAG_DISCONNECT, sizeof(TAG_DISCONNECT) - 1);
450+
client_socket_ = nullptr;
451+
PostIncomingMessage(message);
460452
}
461453
DisconnectAndDisposeIO(socket);
462454
}
463-
agent->pause_cond_.Broadcast(scoped_lock);
464-
}
465-
466-
void AgentImpl::PushPendingMessage(std::vector<std::string>* queue,
467-
const std::string& message) {
468-
Mutex::ScopedLock scoped_lock(queue_lock_);
469-
queue->push_back(message);
470-
}
471-
472-
void AgentImpl::SwapBehindLock(std::vector<std::string> AgentImpl::*queue,
473-
std::vector<std::string>* output) {
474-
Mutex::ScopedLock scoped_lock(queue_lock_);
475-
(this->*queue).swap(*output);
455+
if (buf) {
456+
free(buf->base);
457+
}
458+
pause_cond_.Broadcast(scoped_lock);
476459
}
477460

478461
// static
479462
void AgentImpl::WriteCbIO(uv_async_t* async) {
480463
AgentImpl* agent = static_cast<AgentImpl*>(async->data);
481464
inspector_socket_t* socket = agent->client_socket_;
482465
if (socket) {
483-
std::vector<std::string> outgoing_messages;
484-
agent->SwapBehindLock(&AgentImpl::outgoing_message_queue_,
485-
&outgoing_messages);
486-
for (auto const& message : outgoing_messages)
487-
inspector_write(socket, message.c_str(), message.length());
466+
MessageQueue outgoing_messages;
467+
agent->SwapBehindLock(&agent->outgoing_message_queue_, &outgoing_messages);
468+
for (const MessageQueue::value_type& outgoing : outgoing_messages) {
469+
if (outgoing.first == agent->frontend_session_id_) {
470+
std::string message = outgoing.second.utf8();
471+
inspector_write(socket, message.c_str(), message.length());
472+
}
473+
}
488474
}
489475
}
490476

@@ -518,49 +504,70 @@ void AgentImpl::WorkerRunIO() {
518504
uv_run(&child_loop_, UV_RUN_DEFAULT);
519505
}
520506

507+
void AgentImpl::AppendMessage(MessageQueue* queue, int session_id,
508+
const String16& message) {
509+
Mutex::ScopedLock scoped_lock(queue_lock_);
510+
queue->push_back(std::make_pair(session_id, message));
511+
}
512+
513+
void AgentImpl::SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2) {
514+
Mutex::ScopedLock scoped_lock(queue_lock_);
515+
vector1->swap(*vector2);
516+
}
517+
518+
void AgentImpl::PostIncomingMessage(const String16& message) {
519+
AppendMessage(&incoming_message_queue_, frontend_session_id_, message);
520+
v8::Isolate* isolate = parent_env_->isolate();
521+
platform_->CallOnForegroundThread(isolate,
522+
new DispatchOnInspectorBackendTask(this));
523+
isolate->RequestInterrupt(InterruptCallback, this);
524+
uv_async_send(&data_written_);
525+
}
526+
521527
void AgentImpl::OnInspectorConnectionIO(inspector_socket_t* socket) {
522528
if (client_socket_) {
523529
DisconnectAndDisposeIO(socket);
524530
return;
525531
}
526532
client_socket_ = socket;
527-
inspector_read_start(socket, OnBufferAlloc, AgentImpl::OnRemoteDataIO);
528-
platform_->CallOnForegroundThread(parent_env_->isolate(),
529-
new SetConnectedTask(this, true));
533+
inspector_read_start(socket, OnBufferAlloc, DataCallback);
534+
frontend_session_id_++;
535+
PostIncomingMessage(String16(TAG_CONNECT, sizeof(TAG_CONNECT) - 1));
530536
}
531537

532-
void AgentImpl::PostMessages() {
538+
void AgentImpl::DispatchMessages() {
533539
if (dispatching_messages_)
534540
return;
535541
dispatching_messages_ = true;
536-
std::vector<std::string> messages;
537-
SwapBehindLock(&AgentImpl::message_queue_, &messages);
538-
for (auto const& message : messages)
539-
inspector_->dispatchMessageFromFrontend(
540-
String16::fromUTF8(message.c_str(), message.length()));
542+
MessageQueue tasks;
543+
SwapBehindLock(&incoming_message_queue_, &tasks);
544+
for (const MessageQueue::value_type& pair : tasks) {
545+
const String16& message = pair.second;
546+
if (message == TAG_CONNECT) {
547+
CHECK_EQ(false, connected_);
548+
backend_session_id_++;
549+
connected_ = true;
550+
fprintf(stderr, "Debugger attached.\n");
551+
inspector_->connectFrontend(new ChannelImpl(this));
552+
} else if (message == TAG_DISCONNECT) {
553+
CHECK(connected_);
554+
connected_ = false;
555+
if (!shutting_down_)
556+
PrintDebuggerReadyMessage(port_);
557+
inspector_->quitMessageLoopOnPause();
558+
inspector_->disconnectFrontend();
559+
} else {
560+
inspector_->dispatchMessageFromFrontend(message);
561+
}
562+
}
541563
uv_async_send(&data_written_);
542564
dispatching_messages_ = false;
543565
}
544566

545-
void AgentImpl::SetConnected(bool connected) {
546-
if (connected_ == connected)
547-
return;
548-
549-
connected_ = connected;
550-
if (connected) {
551-
fprintf(stderr, "Debugger attached.\n");
552-
inspector_->connectFrontend(new ChannelImpl(this));
553-
} else {
554-
if (!shutting_down_)
555-
PrintDebuggerReadyMessage(port_);
556-
inspector_->quitMessageLoopOnPause();
557-
inspector_->disconnectFrontend();
558-
}
559-
}
560-
561-
void AgentImpl::Write(const std::string& message) {
562-
PushPendingMessage(&outgoing_message_queue_, message);
563-
ASSERT_EQ(0, uv_async_send(&io_thread_req_));
567+
void AgentImpl::Write(int session_id, const String16& message) {
568+
AppendMessage(&outgoing_message_queue_, session_id, message);
569+
int err = uv_async_send(&io_thread_req_);
570+
CHECK_EQ(0, err);
564571
}
565572

566573
// Exported class Agent

0 commit comments

Comments
 (0)