Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose public and private IP in LightningWork #17742

Merged
merged 10 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/lightning/app/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

- Allow customize `gradio` components with lightning colors ([#17054](https://github.com/Lightning-AI/lightning/pull/17054))

- Added the property `LightningWork.public_ip` that exposes the public IP of the `LightningWork` instance ([#17742](https://github.com/Lightning-AI/lightning/pull/17742))


### Changed

Expand All @@ -29,7 +31,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

### Fixed

-
- Fixed `LightningWork.internal_ip` that was mistakenly exposing the public IP instead; now exposes the private/internal IP address ([#17742](https://github.com/Lightning-AI/lightning/pull/17742))


## [2.0.1.post0] - 2023-04-11
Expand Down
7 changes: 4 additions & 3 deletions src/lightning/app/components/database/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,10 @@ def db_url(self) -> Optional[str]:
use_localhost = "LIGHTNING_APP_STATE_URL" not in os.environ
if use_localhost:
return self.url
if self.internal_ip != "":
return f"http://{self.internal_ip}:{self.port}"
return self.internal_ip
ip_addr = self.public_ip or self.internal_ip
if ip_addr != "":
return f"http://{ip_addr}:{self.port}"
return ip_addr

def on_exit(self):
self._exit_event.set()
Expand Down
8 changes: 4 additions & 4 deletions src/lightning/app/components/serve/auto_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ def __init__(
raise ValueError("cold_start_proxy must be of type ColdStartProxy or str")

def get_internal_url(self) -> str:
if not self._internal_ip:
raise ValueError("Internal IP not set")
return f"http://{self._internal_ip}:{self._port}"
if not self._public_ip:
raise ValueError("Public IP not set")
return f"http://{self._public_ip}:{self._port}"

async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]], server_url: str):
request_data: List[_LoadBalancer._input_type] = [b[1] for b in batch]
Expand Down Expand Up @@ -386,7 +386,7 @@ def update_servers(self, server_works: List[LightningWork]):
"""
old_server_urls = set(self.servers)
current_server_urls = {
f"http://{server._internal_ip}:{server.port}" for server in server_works if server._internal_ip
f"http://{server._public_ip}:{server.port}" for server in server_works if server._internal_ip
}

# doing nothing if no server work has been added/removed
Expand Down
12 changes: 12 additions & 0 deletions src/lightning/app/core/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class LightningWork:
"_url",
"_restarting",
"_internal_ip",
"_public_ip",
)

_run_executor_cls: Type[WorkRunExecutor] = WorkRunExecutor
Expand Down Expand Up @@ -138,6 +139,7 @@ def __init__(
"_url",
"_future_url",
"_internal_ip",
"_public_ip",
"_restarting",
"_cloud_compute",
"_display_name",
Expand All @@ -148,6 +150,7 @@ def __init__(
self._url: str = ""
self._future_url: str = "" # The cache URL is meant to defer resolving the url values.
self._internal_ip: str = ""
self._public_ip: str = ""
# setattr_replacement is used by the multiprocessing runtime to send the latest changes to the main coordinator
self._setattr_replacement: Optional[Callable[[str, Any], None]] = None
self._name: str = ""
Expand Down Expand Up @@ -212,6 +215,15 @@ def internal_ip(self) -> str:
"""
return self._internal_ip

@property
def public_ip(self) -> str:
"""The public ip address of this LightningWork, reachable from the internet.
By default, this attribute returns the empty string and the ip address will only be returned once the work runs.
Locally, this address is undefined (empty string) and in the cloud it will be determined by the cluster.
"""
return self._public_ip

def _on_init_end(self) -> None:
self._local_build_config.on_work_init(self)
self._cloud_build_config.on_work_init(self, self._cloud_compute)
Expand Down
3 changes: 2 additions & 1 deletion src/lightning/app/utilities/proxies.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,8 @@ def run_once(self):
# Set this here after the state observer is initialized, since it needs to record it as a change and send
# it back to the flow
default_internal_ip = "127.0.0.1" if constants.LIGHTNING_CLOUDSPACE_HOST is None else "0.0.0.0" # noqa: S104
self.work._internal_ip = os.environ.get("LIGHTNING_NODE_IP", default_internal_ip)
self.work._internal_ip = os.environ.get("LIGHTNING_NODE_PRIVATE_IP", default_internal_ip)
self.work._public_ip = os.environ.get("LIGHTNING_NODE_IP", "")

# 8. Patch the setattr method of the work. This needs to be done after step 4, so we don't
# send delta while calling `set_state`.
Expand Down
4 changes: 3 additions & 1 deletion tests/tests_app/core/test_lightning_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def test_simple_app(tmpdir):
"_url": "",
"_future_url": "",
"_internal_ip": "",
"_public_ip": "",
"_paths": {},
"_port": None,
"_restarting": False,
Expand All @@ -136,6 +137,7 @@ def test_simple_app(tmpdir):
"_url": "",
"_future_url": "",
"_internal_ip": "",
"_public_ip": "",
"_paths": {},
"_port": None,
"_restarting": False,
Expand Down Expand Up @@ -982,7 +984,7 @@ def run(self):
def test_state_size_constant_growth():
app = LightningApp(SizeFlow())
MultiProcessRuntime(app, start_server=False).dispatch()
assert app.root._state_sizes[0] <= 7965
assert app.root._state_sizes[0] <= 8304
assert app.root._state_sizes[20] <= 26550


Expand Down
4 changes: 4 additions & 0 deletions tests/tests_app/core/test_lightning_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ def run(self):
"_paths": {},
"_restarting": False,
"_internal_ip": "",
"_public_ip": "",
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
Expand All @@ -349,6 +350,7 @@ def run(self):
"_paths": {},
"_restarting": False,
"_internal_ip": "",
"_public_ip": "",
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
Expand Down Expand Up @@ -388,6 +390,7 @@ def run(self):
"_paths": {},
"_restarting": False,
"_internal_ip": "",
"_public_ip": "",
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
Expand All @@ -413,6 +416,7 @@ def run(self):
"_paths": {},
"_restarting": False,
"_internal_ip": "",
"_public_ip": "",
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
Expand Down
6 changes: 6 additions & 0 deletions tests/tests_app/structures/test_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def run(self):
"_restarting": False,
"_display_name": "",
"_internal_ip": "",
"_public_ip": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "cpu-small",
Expand Down Expand Up @@ -80,6 +81,7 @@ def run(self):
"_restarting": False,
"_display_name": "",
"_internal_ip": "",
"_public_ip": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "cpu-small",
Expand Down Expand Up @@ -114,6 +116,7 @@ def run(self):
"_restarting": False,
"_display_name": "",
"_internal_ip": "",
"_public_ip": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "cpu-small",
Expand Down Expand Up @@ -199,6 +202,7 @@ def run(self):
"_paths": {},
"_restarting": False,
"_internal_ip": "",
"_public_ip": "",
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
Expand Down Expand Up @@ -233,6 +237,7 @@ def run(self):
"_paths": {},
"_restarting": False,
"_internal_ip": "",
"_public_ip": "",
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
Expand Down Expand Up @@ -262,6 +267,7 @@ def run(self):
"_paths": {},
"_restarting": False,
"_internal_ip": "",
"_public_ip": "",
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
Expand Down
23 changes: 15 additions & 8 deletions tests/tests_app/utilities/test_proxies.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,16 +641,21 @@ def test_state_observer():


@pytest.mark.parametrize(
("patch_constants", "environment", "expected_ip_addr"),
("patch_constants", "environment", "expected_public_ip", "expected_private_ip"),
[
({}, {}, "127.0.0.1"),
({"LIGHTNING_CLOUDSPACE_HOST": "any"}, {}, "0.0.0.0"), # noqa: S104
({}, {"LIGHTNING_NODE_IP": "10.10.10.5"}, "10.10.10.5"),
({}, {}, "", "127.0.0.1"),
({"LIGHTNING_CLOUDSPACE_HOST": "any"}, {}, "", "0.0.0.0"), # noqa: S104
(
{},
{"LIGHTNING_NODE_IP": "85.44.2.25", "LIGHTNING_NODE_PRIVATE_IP": "10.10.10.5"},
"85.44.2.25",
"10.10.10.5",
),
],
indirect=["patch_constants"],
)
def test_work_runner_sets_internal_ip(patch_constants, environment, expected_ip_addr):
"""Test that the WorkRunner updates the internal ip address as soon as the Work starts running."""
def test_work_runner_sets_public_and_private_ip(patch_constants, environment, expected_public_ip, expected_private_ip):
"""Test that the WorkRunner updates the public and private address as soon as the Work starts running."""

class Work(LightningWork):
def run(self):
Expand Down Expand Up @@ -690,11 +695,13 @@ def run(self):

with mock.patch.dict(os.environ, environment, clear=True):
work_runner.setup()
# The internal ip address only becomes available once the hardware is up / the work is running.
# The public ip address only becomes available once the hardware is up / the work is running.
assert work.public_ip == ""
assert work.internal_ip == ""
with contextlib.suppress(Empty):
work_runner.run_once()
assert work.internal_ip == expected_ip_addr
assert work.public_ip == expected_public_ip
assert work.internal_ip == expected_private_ip


class WorkBi(LightningWork):
Expand Down