Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use an agent-id rather than the process PID #24968

Merged
merged 3 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(
raylet_name=None,
logging_params=None,
disable_metrics_collection: bool = False,
agent_id: int = os.getpid(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need the default id as pid in this case. Can you remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to use a required keyword-only * signature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I might be missing something here, but it still seems to receive os.getpid() as default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, check now :)

):
"""Initialize the DashboardAgent object."""
# Public attributes are accessible for all agent modules.
Expand All @@ -76,6 +77,7 @@ def __init__(
self.logging_params = logging_params
self.node_id = os.environ["RAY_NODE_ID"]
self.metrics_collection_disabled = disable_metrics_collection
self.agent_id = agent_id
# TODO(edoakes): RAY_RAYLET_PID isn't properly set on Windows. This is
# only used for fate-sharing with the raylet and we need a different
# fate-sharing mechanism for Windows anyways.
Expand Down Expand Up @@ -203,7 +205,7 @@ async def _check_parent():

await raylet_stub.RegisterAgent(
agent_manager_pb2.RegisterAgentRequest(
agent_pid=os.getpid(),
agent_pid=self.agent_id,
agent_port=self.grpc_port,
agent_ip_address=self.ip,
)
Expand Down Expand Up @@ -354,6 +356,13 @@ async def _check_parent():
action="store_true",
help=("If this arg is set, metrics report won't be enabled from the agent."),
)
parser.add_argument(
"--agent-id",
required=False,
type=int,
default=os.getpid(),
help="ID to report register with raylet, default is {}.".format(os.getpid()),
)

args = parser.parse_args()
try:
Expand Down Expand Up @@ -383,6 +392,7 @@ async def _check_parent():
raylet_name=args.raylet_name,
logging_params=logging_params,
disable_metrics_collection=args.disable_metrics_collection,
agent_id=args.agent_id,
)
if os.environ.get("_RAY_AGENT_FAILING"):
raise Exception("Failure injection failure.")
Expand Down
51 changes: 29 additions & 22 deletions src/ray/raylet/agent_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,58 +56,65 @@ void AgentManager::StartAgent() {
return;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the stanza below to after adding the new arguments so it prints out everything


if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting agent process with command:";
for (const auto &arg : options_.agent_commands) {
stream << " " << arg;
}
RAY_LOG(DEBUG) << stream.str();
}

// Launch the process to create the agent.
std::error_code ec;
// Create a random agent_id to pass to the child process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Create a random agent_id to pass to the child process
// Create a random agent_id to pass to the child process. We cannot use pid an id because os.getpid() from the python process is not reliable in Windows. See [Github issue link]

int agent_id = rand();
const std::string agent_id_str = std::to_string(agent_id);
std::vector<const char *> argv;
for (const std::string &arg : options_.agent_commands) {
argv.push_back(arg.c_str());
}
argv.push_back("--agent-id");
argv.push_back(agent_id_str.c_str());

// Disable metrics report if needed.
if (!RayConfig::instance().enable_metrics_collection()) {
argv.push_back("--disable-metrics-collection");
}
argv.push_back(NULL);

if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting agent process with command:";
for (const auto &arg : argv) {
stream << " " << arg;
}
RAY_LOG(DEBUG) << stream.str();
}

// Set node id to agent.
ProcessEnvironment env;
env.insert({"RAY_NODE_ID", options_.node_id.Hex()});
env.insert({"RAY_RAYLET_PID", std::to_string(getpid())});

// Launch the process to create the agent.
std::error_code ec;
Process child(argv.data(), nullptr, ec, false, env);
if (!child.IsValid() || ec) {
// The worker failed to start. This is a fatal error.
RAY_LOG(FATAL) << "Failed to start agent with return value " << ec << ": "
<< ec.message();
}

std::thread monitor_thread([this, child]() mutable {
std::thread monitor_thread([this, child, agent_id]() mutable {
SetThreadName("agent.monitor");
RAY_LOG(INFO) << "Monitor agent process with pid " << child.GetId()
<< ", register timeout "
RAY_LOG(INFO) << "Monitor agent process with id " << agent_id << ", register timeout "
<< RayConfig::instance().agent_register_timeout_ms() << "ms.";
auto timer = delay_executor_(
[this, child]() mutable {
if (agent_pid_ != child.GetId()) {
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId()
<< " has not registered. ip " << agent_ip_address_
<< ", pid " << agent_pid_;
[this, child, agent_id]() mutable {
if (agent_pid_ != agent_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of agent_pid_, can we rename it to agent_id_? Since it is not pid anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, and improved the error message

RAY_LOG(WARNING) << "Agent process with id " << agent_id
<< " has not registered. ip " << agent_ip_address_ << ", id "
<< agent_pid_;
child.Kill();
}
},
RayConfig::instance().agent_register_timeout_ms());

int exit_code = child.Wait();
timer->cancel();
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId()
<< " exit, return value " << exit_code << ". ip "
<< agent_ip_address_ << ". pid " << agent_pid_;
RAY_LOG(WARNING) << "Agent process with id " << agent_id << " exit, return value "
<< exit_code << ". ip " << agent_ip_address_ << ". pid "
<< agent_pid_;
RAY_LOG(ERROR)
<< "The raylet exited immediately because the Ray agent failed. "
"The raylet fate shares with the agent. This can happen because the "
Expand Down