Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do Not Merge] [Core] Add Log to Debug Issue during Cleanup for 2.42.0 #50317

Open
wants to merge 15 commits into
base: releases/2.42.0
Choose a base branch
from
24 changes: 24 additions & 0 deletions src/ray/common/client_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <boost/bind/bind.hpp>
#include <chrono>
#include <sstream>
#include <stdexcept>
#include <thread>

#include "ray/common/event_stats.h"
Expand Down Expand Up @@ -149,11 +150,24 @@ Status ServerConnection::WriteBuffer(
while (bytes_remaining != 0) {
size_t bytes_written =
socket_.write_some(boost::asio::buffer(b + position, bytes_remaining), error);

RAY_LOG(INFO) << "[myan] WriteBuffer bytes_written=" << bytes_written;
std::ostringstream byte_stream;
for (size_t i = 0; i < bytes_written; ++i) {
byte_stream << static_cast<int>(
*(boost::asio::buffer_cast<const uint8_t *>(b) + position + i))
<< " ";
}
RAY_LOG(INFO) << "[myan] Bytes: " << byte_stream.str();

position += bytes_written;
bytes_remaining -= bytes_written;
if (error.value() == EINTR) {
RAY_LOG(INFO) << "[myan] WriteBuffer interrupted by signal.";
continue;
} else if (error.value() != boost::system::errc::errc_t::success) {
RAY_LOG(INFO) << "[myan] WriteBuffer encounter error=" << error.value()
<< ", message=" << error.message();
return boost_to_ray_status(error);
}
}
Expand Down Expand Up @@ -250,6 +264,13 @@ ray::Status ServerConnection::WriteMessage(int64_t type,
bytes_written_ += length;

auto write_cookie = RayConfig::instance().ray_cookie();

std::ostringstream message_stream;
for (int i = 0; i < length; ++i) {
message_stream << static_cast<int>(message[i]) << ",";
}
RAY_LOG(INFO) << "[myan] WriteMessage cookie=" << write_cookie << ", type=" << type
<< ", length=" << length << ", message=" << message_stream.str();
return WriteBuffer({
boost::asio::buffer(&write_cookie, sizeof(write_cookie)),
boost::asio::buffer(&type, sizeof(type)),
Expand Down Expand Up @@ -575,6 +596,9 @@ void ClientConnection::ProcessMessage(const boost::system::error_code &error) {
read_message_ = error_data;
}

RAY_LOG(INFO) << "[myan] ProcessMessageHeader. read_cookie_= " << read_cookie_
<< ", read_type_=" << read_type_ << ", read_length=" << read_length_;

int64_t start_ms = current_time_ms();
message_handler_(shared_ClientConnection_from_this(), read_type_, read_message_);
int64_t interval = current_time_ms() - start_ms;
Expand Down
15 changes: 15 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1560,12 +1560,20 @@ Status CoreWorker::PutInLocalPlasmaStore(const RayObject &object,
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
// Only release the object once the raylet has responded to avoid the race
// condition that the object could be evicted before the raylet pins it.
RAY_LOG(INFO)
<< "[myan] Releasing in PutInLocalPlasmaStore in PinObjectIDs callback. "
<< "object_id=" << object_id.Hex()
<< ", worker_id=" << this->GetWorkerID().Hex();

if (!plasma_store_provider_->Release(object_id).ok()) {
RAY_LOG(ERROR).WithField(object_id)
<< "Failed to release object, might cause a leak in plasma.";
}
});
} else {
RAY_LOG(INFO) << "[myan] Releasing in PutInLocalPlasmaStore. "
<< "object_id=" << object_id.Hex()
<< ", worker_id=" << this->GetWorkerID().Hex();
RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id));
}
}
Expand Down Expand Up @@ -1752,6 +1760,10 @@ Status CoreWorker::SealExisting(const ObjectID &object_id,
{object_id},
generator_id,
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
RAY_LOG(INFO) << "[myan] Releasing in SealExisting in PinObjectIDs callback. "
<< "object_id=" << object_id.Hex()
<< ", worker_id=" << this->GetWorkerID().Hex();

// Only release the object once the raylet has responded to avoid the race
// condition that the object could be evicted before the raylet pins it.
if (!plasma_store_provider_->Release(object_id).ok()) {
Expand All @@ -1760,6 +1772,9 @@ Status CoreWorker::SealExisting(const ObjectID &object_id,
}
});
} else {
RAY_LOG(INFO) << "[myan] Release in SealExisting callback. "
<< "object_id=" << object_id.Hex()
<< ", worker_id=" << this->GetWorkerID().Hex();
RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id));
reference_counter_->FreePlasmaObjects({object_id});
}
Expand Down
8 changes: 6 additions & 2 deletions src/ray/object_manager/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,11 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
std::recursive_mutex client_mutex_;
};

PlasmaBuffer::~PlasmaBuffer() { RAY_UNUSED(client_->Release(object_id_)); }
PlasmaBuffer::~PlasmaBuffer() {
RAY_LOG(INFO) << "[myan] PlasmaBuffer::~PlasmaBuffer(). object_id="
<< (object_id_.IsNil() ? "Nil" : object_id_.Hex());
RAY_UNUSED(client_->Release(object_id_));
}

PlasmaClient::Impl::Impl() : store_capacity_(0) {}

Expand Down Expand Up @@ -781,7 +785,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID &object_id, bool *has_object)

Status PlasmaClient::Impl::Seal(const ObjectID &object_id) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
RAY_LOG(DEBUG) << "Seal " << object_id;
RAY_LOG(INFO) << "[myan] Seal " << object_id.Hex();

// Make sure this client has a reference to the object before sending the
// request to Plasma.
Expand Down
8 changes: 8 additions & 0 deletions src/ray/object_manager/plasma/protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,19 @@ Status ReadReleaseRequest(uint8_t *data,
size_t size,
ObjectID *object_id,
bool *may_unmap) {
RAY_LOG(INFO) << "[myan] Reading release request...";
RAY_DCHECK(data);
RAY_LOG(INFO) << "[myan] Passed DCHECK(data)";
auto message = flatbuffers::GetRoot<fb::PlasmaReleaseRequest>(data);
RAY_LOG(INFO) << "[myan] Successfully read the message...";
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
RAY_LOG(INFO) << "[myan] Successfully verified the flatbuffer...";
RAY_DCHECK(message->object_id() != nullptr);
RAY_LOG(INFO) << "[myan] Successfully verified the message->object_id() != null...";
*object_id = ObjectID::FromBinary(message->object_id()->str());
RAY_LOG(INFO) << "[myan] Successfully read the object_id...";
*may_unmap = message->may_unmap();
RAY_LOG(INFO) << "[myan] Read the may_unmap...";
return Status::OK();
}

Expand Down
30 changes: 25 additions & 5 deletions src/ray/object_manager/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ PlasmaStore::PlasmaStore(instrumented_io_context &main_service,
this->AddToClientObjectIds(object_id, fallback_allocated_fd, request->client);
},
[this](const auto &request) { this->ReturnFromGet(request); }) {
RAY_LOG(ERROR) << "[myan] RAY_LOG_IS_LEVEL_ENABLED=" << RAY_LOG_ENABLED(INFO);
RAY_LOG(INFO) << "[myan] Creating PlasmaStore...";
ray::SetCloseOnExec(acceptor_);

if (RayConfig::instance().event_stats_print_interval_ms() > 0 &&
Expand All @@ -129,6 +131,7 @@ PlasmaStore::PlasmaStore(instrumented_io_context &main_service,
PlasmaStore::~PlasmaStore() {}

void PlasmaStore::Start() {
RAY_LOG(INFO) << "[myan] Starting the PlasmaStore...";
// Start listening for clients.
DoAccept();
}
Expand Down Expand Up @@ -272,8 +275,11 @@ bool PlasmaStore::RemoveFromClientObjectIds(const ObjectID &object_id,

bool PlasmaStore::ReleaseObject(const ObjectID &object_id,
const std::shared_ptr<Client> &client) {
RAY_LOG(INFO) << "[myan] Releasing object..." << object_id;
auto entry = object_lifecycle_mgr_.GetObject(object_id);
RAY_LOG(INFO) << "[myan] Get Entry..." << object_id;
if (entry != nullptr) {
RAY_LOG(INFO) << "[myan] Entry is not null..." << object_id;
// Remove the client from the object's array of clients.
return RemoveFromClientObjectIds(object_id, client);
}
Expand Down Expand Up @@ -325,7 +331,7 @@ void PlasmaStore::ConnectClient(const boost::system::error_code &error) {

void PlasmaStore::DisconnectClient(const std::shared_ptr<Client> &client) {
client->Close();
RAY_LOG(DEBUG) << "Disconnecting client on fd " << client;
RAY_LOG(INFO) << "Disconnecting client on fd " << client;
// Release all the objects that the client was using.
absl::flat_hash_map<ObjectID, const LocalObject *> sealed_objects;
auto &object_ids = client->GetObjectIDs();
Expand Down Expand Up @@ -407,6 +413,7 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
ReplyToCreateClient(client, object_id, request->request_id());
} break;
case fb::MessageType::PlasmaAbortRequest: {
RAY_LOG(INFO) << "[myan] Received abort request...";
RAY_RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
RAY_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, the only "
"client currently using it "
Expand All @@ -424,9 +431,14 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
case fb::MessageType::PlasmaReleaseRequest: {
// May unmap: client knows a fallback-allocated fd is involved.
// Should unmap: server finds refcnt == 0 -> need to be unmapped.
RAY_LOG(INFO) << "[myan] Received release request..."
<< "MessageType=" << EnumNameMessageType(type)
<< "IntegerValue=" << static_cast<int>(type);
bool may_unmap;
RAY_RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id, &may_unmap));
RAY_LOG(INFO) << "[myan] Successfully read the release request..." << object_id;
bool should_unmap = ReleaseObject(object_id, client);
RAY_LOG(INFO) << "[myan] Successfully released the object..." << object_id;
if (!may_unmap) {
RAY_CHECK(!should_unmap)
<< "Plasma client thinks a mmap should not be unmapped but server thinks so. "
Expand All @@ -441,6 +453,7 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,

} break;
case fb::MessageType::PlasmaDeleteRequest: {
RAY_LOG(INFO) << "[myan] Received abort request...";
std::vector<ObjectID> object_ids;
std::vector<PlasmaError> error_codes;
RAY_RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_ids));
Expand All @@ -459,12 +472,14 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
}
} break;
case fb::MessageType::PlasmaSealRequest: {
RAY_LOG(INFO) << "[myan] Received seal request...";
RAY_RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id));
SealObjects({object_id});
RAY_RETURN_NOT_OK(SendSealReply(client, object_id, PlasmaError::OK));
} break;
case fb::MessageType::PlasmaEvictRequest: {
// This code path should only be used for testing.
RAY_LOG(INFO) << "[myan] Received Evict request...";
int64_t num_bytes;
RAY_RETURN_NOT_OK(ReadEvictRequest(input, input_size, &num_bytes));
int64_t num_bytes_evicted = object_lifecycle_mgr_.RequireSpace(num_bytes);
Expand All @@ -473,16 +488,21 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
case fb::MessageType::PlasmaConnectRequest: {
RAY_RETURN_NOT_OK(SendConnectReply(client, allocator_.GetFootprintLimit()));
} break;
case fb::MessageType::PlasmaDisconnectClient:
case fb::MessageType::PlasmaDisconnectClient: {
RAY_LOG(INFO) << "[myan] Received Disconnect request...";
RAY_LOG(DEBUG) << "Disconnecting client on fd " << client;
DisconnectClient(client);
return Status::Disconnected("The Plasma Store client is disconnected.");
break;
} break;
case fb::MessageType::PlasmaGetDebugStringRequest: {
RAY_LOG(INFO) << "[myan] Received DebugString request...";
RAY_RETURN_NOT_OK(SendGetDebugStringReply(
client, object_lifecycle_mgr_.EvictionPolicyDebugString()));
} break;
default:
RAY_LOG(INFO) << "[myan] Received request falls into default... "
<< "MessageType=" << EnumNameMessageType(type)
<< "IntegerValue=" << static_cast<long>(type);
// This code should be unreachable.
RAY_CHECK(0);
}
Expand Down Expand Up @@ -510,7 +530,7 @@ void PlasmaStore::ProcessCreateRequests() {
retry_after_ms = delay_on_oom_ms_;

if (!dumped_on_oom_) {
RAY_LOG(INFO) << "Plasma store at capacity\n" << GetDebugDump();
RAY_LOG(INFO) << "[myan] Plasma store at capacity\n" << GetDebugDump();
dumped_on_oom_ = true;
}
} else {
Expand Down Expand Up @@ -561,7 +581,7 @@ bool PlasmaStore::IsObjectSpillable(const ObjectID &object_id) {

void PlasmaStore::PrintAndRecordDebugDump() const {
absl::MutexLock lock(&mutex_);
RAY_LOG(INFO) << GetDebugDump();
RAY_LOG(INFO) << "[myan] " << GetDebugDump();
stats_timer_ = execute_after(
io_context_,
[this]() { PrintAndRecordDebugDump(); },
Expand Down
5 changes: 4 additions & 1 deletion src/ray/object_manager/plasma/store_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name,
if (fallback_directory.empty()) {
fallback_directory = "/tmp";
}
RAY_LOG(INFO) << "Starting object store with directory " << plasma_directory
RAY_LOG(INFO) << "[myan] Starting object store with directory " << plasma_directory
<< ", fallback " << fallback_directory << ", and huge page support "
<< (hugepages_enabled ? "enabled" : "disabled");
#ifdef __linux__
Expand Down Expand Up @@ -106,6 +106,7 @@ void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback,
// Create noop monitor for Windows.
fs_monitor_ = std::make_unique<ray::FileSystemMonitor>();
#endif
RAY_LOG(INFO) << "[myan] Creating the PlasmaStore...";
store_.reset(new PlasmaStore(main_service_,
*allocator_,
*fs_monitor_,
Expand All @@ -115,7 +116,9 @@ void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback,
object_store_full_callback,
add_object_callback,
delete_object_callback));
RAY_LOG(INFO) << "[myan] Starting the PlasmaStore...";
store_->Start();
RAY_LOG(INFO) << "[myan] PlasmaStore started.";
}
main_service_.run();
Shutdown();
Expand Down
10 changes: 10 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,12 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
if (is_worker) {
const ActorID &actor_id = worker->GetActorId();
const TaskID &task_id = worker->GetAssignedTaskId();
RAY_LOG(INFO) << "[myan] Disconnecting Worker with actor_id="
<< (actor_id.IsNil() ? "Nil" : actor_id.Hex())
<< ", task_id=" << (task_id.IsNil() ? "Nil" : task_id.Hex())
<< ", worker_id="
<< (worker->WorkerId().IsNil() ? "Nil" : worker->WorkerId().Hex());

// If the worker was running a task or actor, clean up the task and push an
// error to the driver, unless the worker is already dead.
if ((!task_id.IsNil() || !actor_id.IsNil()) && !worker->IsDead()) {
Expand Down Expand Up @@ -1649,6 +1655,10 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
const auto job_id = worker->GetAssignedJobId();
RAY_CHECK(!job_id.IsNil());
RAY_CHECK_OK(gcs_client_->Jobs().AsyncMarkFinished(job_id, nullptr));
RAY_LOG(INFO) << "[myan] Disconnecting Driver with job_id=" << job_id.Hex()
<< ", worker_id="
<< (worker->WorkerId().IsNil() ? "Nil" : worker->WorkerId().Hex());

worker_pool_.DisconnectDriver(worker);

RAY_LOG(INFO).WithField(worker->WorkerId()).WithField(worker->GetAssignedJobId())
Expand Down